001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.wf; 019 020import java.util.ArrayList; 021import java.util.Date; 022import java.util.List; 023 024import org.apache.hadoop.conf.Configuration; 025import org.apache.oozie.DagELFunctions; 026import org.apache.oozie.ErrorCode; 027import org.apache.oozie.SLAEventBean; 028import org.apache.oozie.WorkflowActionBean; 029import org.apache.oozie.WorkflowJobBean; 030import org.apache.oozie.XException; 031import org.apache.oozie.action.ActionExecutor; 032import org.apache.oozie.action.ActionExecutorException; 033import org.apache.oozie.action.control.ControlNodeActionExecutor; 034import org.apache.oozie.client.OozieClient; 035import org.apache.oozie.client.WorkflowAction; 036import org.apache.oozie.client.WorkflowJob; 037import org.apache.oozie.client.SLAEvent.SlaAppType; 038import org.apache.oozie.client.SLAEvent.Status; 039import org.apache.oozie.client.rest.JsonBean; 040import org.apache.oozie.command.CommandException; 041import org.apache.oozie.command.PreconditionException; 042import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 043import org.apache.oozie.executor.jpa.BatchQueryExecutor; 044import org.apache.oozie.executor.jpa.JPAExecutorException; 045import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; 046import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 047import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; 048import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 049import org.apache.oozie.service.ActionService; 050import org.apache.oozie.service.EventHandlerService; 051import org.apache.oozie.service.JPAService; 052import org.apache.oozie.service.Services; 053import org.apache.oozie.service.UUIDService; 054import org.apache.oozie.util.Instrumentation; 055import org.apache.oozie.util.LogUtils; 056import org.apache.oozie.util.XLog; 057import org.apache.oozie.util.db.SLADbXOperations; 058import org.apache.oozie.workflow.WorkflowInstance; 059 060@SuppressWarnings("deprecation") 061public class ActionEndXCommand extends ActionXCommand<Void> { 062 public static final String COULD_NOT_END = "COULD_NOT_END"; 063 public static final String END_DATA_MISSING = "END_DATA_MISSING"; 064 065 private String jobId = null; 066 private String actionId = null; 067 private WorkflowJobBean wfJob = null; 068 private WorkflowActionBean wfAction = null; 069 private JPAService jpaService = null; 070 private ActionExecutor executor = null; 071 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 072 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 073 074 public ActionEndXCommand(String actionId, String type) { 075 super("action.end", type, 0); 076 this.actionId = actionId; 077 this.jobId = Services.get().get(UUIDService.class).getId(actionId); 078 } 079 080 @Override 081 protected boolean isLockRequired() { 082 return true; 083 } 084 085 @Override 086 public String getEntityKey() { 087 return this.jobId; 088 } 089 090 @Override 091 public String getKey() { 092 return getName() + "_" + actionId; 093 } 094 095 @Override 096 protected void loadState() throws CommandException { 097 try { 098 jpaService = Services.get().get(JPAService.class); 099 if (jpaService != null) { 100 this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_ACTION_OP, 101 jobId); 102 this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_END, 103 actionId); 104 LogUtils.setLogInfo(wfJob, logInfo); 105 LogUtils.setLogInfo(wfAction, logInfo); 106 } 107 else { 108 throw new CommandException(ErrorCode.E0610); 109 } 110 } 111 catch (XException ex) { 112 throw new CommandException(ex); 113 } 114 } 115 116 @Override 117 protected void verifyPrecondition() throws CommandException, PreconditionException { 118 if (wfJob == null) { 119 throw new PreconditionException(ErrorCode.E0604, jobId); 120 } 121 if (wfAction == null) { 122 throw new PreconditionException(ErrorCode.E0605, actionId); 123 } 124 if (wfAction.isPending() 125 && (wfAction.getStatus() == WorkflowActionBean.Status.DONE 126 || wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL)) { 127 128 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) { 129 throw new PreconditionException(ErrorCode.E0811, WorkflowJob.Status.RUNNING.toString()); 130 } 131 } 132 else { 133 throw new PreconditionException(ErrorCode.E0812, wfAction.isPending(), wfAction.getStatusStr()); 134 } 135 136 executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType()); 137 if (executor == null) { 138 throw new CommandException(ErrorCode.E0802, wfAction.getType()); 139 } 140 } 141 142 @Override 143 protected Void execute() throws CommandException { 144 LOG.debug("STARTED ActionEndXCommand for action " + actionId); 145 146 Configuration conf = wfJob.getWorkflowInstance().getConf(); 147 148 int maxRetries = 0; 149 long retryInterval = 0; 150 151 if (!(executor instanceof ControlNodeActionExecutor)) { 152 maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries()); 153 retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval()); 154 } 155 156 executor.setMaxRetries(maxRetries); 157 executor.setRetryInterval(retryInterval); 158 159 boolean isRetry = false; 160 if (wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY 161 || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL) { 162 isRetry = true; 163 } 164 boolean isUserRetry = false; 165 ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry); 166 try { 167 168 LOG.debug( 169 "End, name [{0}] type [{1}] status[{2}] external status [{3}] signal value [{4}]", 170 wfAction.getName(), wfAction.getType(), wfAction.getStatus(), wfAction.getExternalStatus(), 171 wfAction.getSignalValue()); 172 173 Instrumentation.Cron cron = new Instrumentation.Cron(); 174 cron.start(); 175 executor.end(context, wfAction); 176 cron.stop(); 177 addActionCron(wfAction.getType(), cron); 178 179 WorkflowInstance wfInstance = wfJob.getWorkflowInstance(); 180 DagELFunctions.setActionInfo(wfInstance, wfAction); 181 wfJob.setWorkflowInstance(wfInstance); 182 incrActionCounter(wfAction.getType(), 1); 183 184 if (!context.isEnded()) { 185 LOG.warn(XLog.OPS, "Action Ended, ActionExecutor [{0}] must call setEndData()", 186 executor.getType()); 187 wfAction.setErrorInfo(END_DATA_MISSING, "Execution Ended, but End Data Missing from Action"); 188 failJob(context); 189 } else { 190 wfAction.setRetries(0); 191 wfAction.setEndTime(new Date()); 192 193 boolean shouldHandleUserRetry = false; 194 Status slaStatus = null; 195 switch (wfAction.getStatus()) { 196 case OK: 197 slaStatus = Status.SUCCEEDED; 198 break; 199 case KILLED: 200 slaStatus = Status.KILLED; 201 break; 202 case FAILED: 203 slaStatus = Status.FAILED; 204 shouldHandleUserRetry = true; 205 break; 206 case ERROR: 207 LOG.info("ERROR is considered as FAILED for SLA"); 208 slaStatus = Status.KILLED; 209 shouldHandleUserRetry = true; 210 break; 211 default: 212 slaStatus = Status.FAILED; 213 shouldHandleUserRetry = true; 214 break; 215 } 216 if (!shouldHandleUserRetry || !handleUserRetry(wfAction)) { 217 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus, SlaAppType.WORKFLOW_ACTION); 218 if(slaEvent != null) { 219 insertList.add(slaEvent); 220 } 221 } 222 } 223 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END,wfAction)); 224 wfJob.setLastModifiedTime(new Date()); 225 updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob)); 226 } 227 catch (ActionExecutorException ex) { 228 LOG.warn( 229 "Error ending action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]", 230 wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage()); 231 wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage()); 232 wfAction.setEndTime(null); 233 234 switch (ex.getErrorType()) { 235 case TRANSIENT: 236 if (!handleTransient(context, executor, WorkflowAction.Status.END_RETRY)) { 237 handleNonTransient(context, executor, WorkflowAction.Status.END_MANUAL); 238 wfAction.setPendingAge(new Date()); 239 wfAction.setRetries(0); 240 } 241 wfAction.setEndTime(null); 242 break; 243 case NON_TRANSIENT: 244 handleNonTransient(context, executor, WorkflowAction.Status.END_MANUAL); 245 wfAction.setEndTime(null); 246 break; 247 case ERROR: 248 handleError(context, executor, COULD_NOT_END, false, WorkflowAction.Status.ERROR); 249 break; 250 case FAILED: 251 failJob(context); 252 break; 253 } 254 255 WorkflowInstance wfInstance = wfJob.getWorkflowInstance(); 256 DagELFunctions.setActionInfo(wfInstance, wfAction); 257 wfJob.setWorkflowInstance(wfInstance); 258 259 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END,wfAction)); 260 wfJob.setLastModifiedTime(new Date()); 261 updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob)); 262 } 263 finally { 264 try { 265 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); 266 } 267 catch (JPAExecutorException e) { 268 throw new CommandException(e); 269 } 270 if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) { 271 generateEvent(wfAction, wfJob.getUser()); 272 } 273 new SignalXCommand(jobId, actionId).call(getEntityKey()); 274 } 275 276 LOG.debug("ENDED ActionEndXCommand for action " + actionId); 277 return null; 278 } 279 280}