001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.wf; 020 021import java.util.ArrayList; 022import java.util.Date; 023import java.util.List; 024 025import javax.servlet.jsp.el.ELException; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.oozie.ErrorCode; 029import org.apache.oozie.FaultInjection; 030import org.apache.oozie.SLAEventBean; 031import org.apache.oozie.WorkflowActionBean; 032import org.apache.oozie.WorkflowJobBean; 033import org.apache.oozie.XException; 034import org.apache.oozie.action.ActionExecutor; 035import org.apache.oozie.action.ActionExecutorException; 036import org.apache.oozie.action.control.ControlNodeActionExecutor; 037import org.apache.oozie.client.OozieClient; 038import org.apache.oozie.client.WorkflowAction; 039import org.apache.oozie.client.WorkflowJob; 040import org.apache.oozie.client.SLAEvent.SlaAppType; 041import org.apache.oozie.client.SLAEvent.Status; 042import org.apache.oozie.client.rest.JsonBean; 043import org.apache.oozie.command.CommandException; 044import org.apache.oozie.command.PreconditionException; 045import org.apache.oozie.command.XCommand; 046import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 047import org.apache.oozie.executor.jpa.BatchQueryExecutor; 048import org.apache.oozie.executor.jpa.JPAExecutorException; 049import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; 050import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; 051import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 052import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 053import org.apache.oozie.service.ActionService; 054import org.apache.oozie.service.EventHandlerService; 055import org.apache.oozie.service.JPAService; 056import org.apache.oozie.service.Services; 057import org.apache.oozie.service.UUIDService; 058import org.apache.oozie.util.ELEvaluationException; 059import org.apache.oozie.util.Instrumentation; 060import org.apache.oozie.util.LogUtils; 061import org.apache.oozie.util.XLog; 062import org.apache.oozie.util.XmlUtils; 063import org.apache.oozie.util.db.SLADbXOperations; 064 065@SuppressWarnings("deprecation") 066public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext> { 067 public static final String EL_ERROR = "EL_ERROR"; 068 public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR"; 069 public static final String COULD_NOT_START = "COULD_NOT_START"; 070 public static final String START_DATA_MISSING = "START_DATA_MISSING"; 071 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING"; 072 073 private String jobId = null; 074 protected String actionId = null; 075 protected WorkflowJobBean wfJob = null; 076 protected WorkflowActionBean wfAction = null; 077 private JPAService jpaService = null; 078 private ActionExecutor executor = null; 079 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 080 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 081 protected ActionExecutorContext context = null; 082 083 public ActionStartXCommand(String actionId, String type) { 084 super("action.start", type, 0); 085 this.actionId = actionId; 086 this.jobId = Services.get().get(UUIDService.class).getId(actionId); 087 } 088 089 public ActionStartXCommand(WorkflowJobBean job, String actionId, String type) { 090 super("action.start", type, 0); 091 this.actionId = actionId; 092 this.wfJob = job; 093 this.jobId = wfJob.getId(); 094 } 095 096 @Override 097 protected void setLogInfo() { 098 LogUtils.setLogInfo(actionId); 099 } 100 101 @Override 102 protected boolean isLockRequired() { 103 return true; 104 } 105 106 @Override 107 public String getEntityKey() { 108 return this.jobId; 109 } 110 111 @Override 112 protected void loadState() throws CommandException { 113 try { 114 jpaService = Services.get().get(JPAService.class); 115 if (jpaService != null) { 116 if (wfJob == null) { 117 this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId); 118 } 119 this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION, actionId); 120 LogUtils.setLogInfo( wfJob); 121 LogUtils.setLogInfo(wfAction); 122 } 123 else { 124 throw new CommandException(ErrorCode.E0610); 125 } 126 } 127 catch (XException ex) { 128 throw new CommandException(ex); 129 } 130 } 131 132 @Override 133 protected void verifyPrecondition() throws CommandException, PreconditionException { 134 if (wfJob == null) { 135 throw new PreconditionException(ErrorCode.E0604, jobId); 136 } 137 if (wfAction == null) { 138 throw new PreconditionException(ErrorCode.E0605, actionId); 139 } 140 if (wfAction.isPending() 141 && (wfAction.getStatus() == WorkflowActionBean.Status.PREP 142 || wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY 143 || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL 144 || wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY 145 )) { 146 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) { 147 throw new PreconditionException(ErrorCode.E0810, WorkflowJob.Status.RUNNING.toString()); 148 } 149 } 150 else { 151 throw new PreconditionException(ErrorCode.E0816, wfAction.isPending(), wfAction.getStatusStr()); 152 } 153 154 executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType()); 155 if (executor == null) { 156 throw new CommandException(ErrorCode.E0802, wfAction.getType()); 157 } 158 } 159 160 @Override 161 protected ActionExecutorContext execute() throws CommandException { 162 LOG.debug("STARTED ActionStartXCommand for wf actionId=" + actionId); 163 Configuration conf = wfJob.getWorkflowInstance().getConf(); 164 165 int maxRetries = 0; 166 long retryInterval = 0; 167 boolean execSynchronous = false; 168 169 if (!(executor instanceof ControlNodeActionExecutor)) { 170 maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries()); 171 retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval()); 172 } 173 174 executor.setMaxRetries(maxRetries); 175 executor.setRetryInterval(retryInterval); 176 177 try { 178 boolean isRetry = false; 179 if (wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY 180 || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL) { 181 isRetry = true; 182 prepareForRetry(wfAction); 183 } 184 boolean isUserRetry = false; 185 if (wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY) { 186 isUserRetry = true; 187 prepareForRetry(wfAction); 188 } 189 context = getContext(isRetry, isUserRetry); 190 boolean caught = false; 191 try { 192 if (!(executor instanceof ControlNodeActionExecutor)) { 193 String tmpActionConf = XmlUtils.removeComments(wfAction.getConf()); 194 String actionConf = context.getELEvaluator().evaluate(tmpActionConf, String.class); 195 wfAction.setConf(actionConf); 196 LOG.debug("Start, name [{0}] type [{1}] configuration{E}{E}{2}{E}", wfAction.getName(), wfAction 197 .getType(), actionConf); 198 } 199 } 200 catch (ELEvaluationException ex) { 201 caught = true; 202 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, EL_EVAL_ERROR, ex 203 .getMessage(), ex); 204 } 205 catch (ELException ex) { 206 caught = true; 207 context.setErrorInfo(EL_ERROR, ex.getMessage()); 208 LOG.warn("ELException in ActionStartXCommand ", ex.getMessage(), ex); 209 handleError(context, wfJob, wfAction); 210 } 211 catch (org.jdom.JDOMException je) { 212 caught = true; 213 context.setErrorInfo("ParsingError", je.getMessage()); 214 LOG.warn("JDOMException in ActionStartXCommand ", je.getMessage(), je); 215 handleError(context, wfJob, wfAction); 216 } 217 catch (Exception ex) { 218 caught = true; 219 context.setErrorInfo(EL_ERROR, ex.getMessage()); 220 LOG.warn("Exception in ActionStartXCommand ", ex.getMessage(), ex); 221 handleError(context, wfJob, wfAction); 222 } 223 if(!caught) { 224 wfAction.setErrorInfo(null, null); 225 incrActionCounter(wfAction.getType(), 1); 226 227 LOG.info("Start action [{0}] with user-retry state : userRetryCount [{1}], userRetryMax [{2}], userRetryInterval [{3}]", 228 wfAction.getId(), wfAction.getUserRetryCount(), wfAction.getUserRetryMax(), wfAction 229 .getUserRetryInterval()); 230 231 Instrumentation.Cron cron = new Instrumentation.Cron(); 232 cron.start(); 233 context.setStartTime(); 234 executor.start(context, wfAction); 235 cron.stop(); 236 FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection"); 237 addActionCron(wfAction.getType(), cron); 238 239 wfAction.setRetries(0); 240 if (wfAction.isExecutionComplete()) { 241 if (!context.isExecuted()) { 242 LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor 243 .getType()); 244 wfAction.setErrorInfo(EXEC_DATA_MISSING, 245 "Execution Complete, but Execution Data Missing from Action"); 246 failJob(context); 247 } else { 248 wfAction.setPending(); 249 if (!(executor instanceof ControlNodeActionExecutor)) { 250 queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType())); 251 } 252 else { 253 execSynchronous = true; 254 } 255 } 256 } 257 else { 258 if (!context.isStarted()) { 259 LOG.warn(XLog.OPS, "Action Started, ActionExecutor [{0}] must call setStartData()", executor 260 .getType()); 261 wfAction.setErrorInfo(START_DATA_MISSING, "Execution Started, but Start Data Missing from Action"); 262 failJob(context); 263 } else { 264 queue(new WorkflowNotificationXCommand(wfJob, wfAction)); 265 } 266 } 267 268 LOG.info(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action status=" + wfAction.getStatusStr()); 269 270 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction)); 271 updateJobLastModified(); 272 // Add SLA status event (STARTED) for WF_ACTION 273 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.STARTED, 274 SlaAppType.WORKFLOW_ACTION); 275 if(slaEvent != null) { 276 insertList.add(slaEvent); 277 } 278 LOG.info(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action updated in DB!"); 279 } 280 } 281 catch (ActionExecutorException ex) { 282 LOG.warn("Error starting action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]", 283 wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage(), ex); 284 wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage()); 285 switch (ex.getErrorType()) { 286 case TRANSIENT: 287 if (!handleTransient(context, executor, WorkflowAction.Status.START_RETRY)) { 288 handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL); 289 wfAction.setPendingAge(new Date()); 290 wfAction.setRetries(0); 291 wfAction.setStartTime(null); 292 } 293 break; 294 case NON_TRANSIENT: 295 handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL); 296 break; 297 case ERROR: 298 handleError(context, executor, WorkflowAction.Status.ERROR.toString(), true, 299 WorkflowAction.Status.DONE); 300 break; 301 case FAILED: 302 try { 303 failJob(context); 304 endWF(); 305 SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED, 306 SlaAppType.WORKFLOW_ACTION); 307 if(slaEvent1 != null) { 308 insertList.add(slaEvent1); 309 } 310 } 311 catch (XException x) { 312 LOG.warn("ActionStartXCommand - case:FAILED ", x.getMessage()); 313 } 314 break; 315 } 316 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction)); 317 updateJobLastModified(); 318 } 319 finally { 320 try { 321 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); 322 if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) { 323 generateEvent(wfAction, wfJob.getUser()); 324 } 325 if (execSynchronous) { 326 // Changing to synchronous call from asynchronous queuing to prevent 327 // undue delay from ::start:: to action due to queuing 328 callActionEnd(); 329 } 330 } 331 catch (JPAExecutorException e) { 332 throw new CommandException(e); 333 } 334 } 335 336 LOG.debug("ENDED ActionStartXCommand for wf actionId=" + actionId + ", jobId=" + jobId); 337 338 return null; 339 } 340 341 protected void callActionEnd() throws CommandException { 342 new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call(); 343 } 344 345 /** 346 * Get action executor context 347 * @param isRetry 348 * @param isUserRetry 349 * @return 350 */ 351 protected ActionExecutorContext getContext(boolean isRetry, boolean isUserRetry) { 352 return new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry); 353 } 354 355 protected void updateJobLastModified(){ 356 wfJob.setLastModifiedTime(new Date()); 357 updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob)); 358 } 359 360 protected void endWF() throws CommandException{ 361 updateParentIfNecessary(wfJob, 3); 362 new WfEndXCommand(wfJob).call(); // To delete the WF temp dir 363 SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED, 364 SlaAppType.WORKFLOW_JOB); 365 if(slaEvent2 != null) { 366 insertList.add(slaEvent2); 367 } 368 } 369 370 protected void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action) 371 throws CommandException { 372 failJob(context); 373 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction)); 374 updateJobLastModified(); 375 SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(), 376 Status.FAILED, SlaAppType.WORKFLOW_ACTION); 377 if(slaEvent1 != null) { 378 insertList.add(slaEvent1); 379 } 380 endWF(); 381 return; 382 } 383 384 /* (non-Javadoc) 385 * @see org.apache.oozie.command.XCommand#getKey() 386 */ 387 @Override 388 public String getKey(){ 389 return getName() + "_" + actionId; 390 } 391 392 private void prepareForRetry(WorkflowActionBean wfAction) { 393 if (wfAction.getType().equals("map-reduce")) { 394 // need to delete child job id of original run 395 wfAction.setExternalChildIDs(""); 396 } 397 } 398 399 @Override 400 protected void queueCommandForTransientFailure(long retryDelayMillis){ 401 queue(new ActionStartXCommand(wfAction.getId(), wfAction.getType()), retryDelayMillis); 402 } 403 404 protected void queue(XCommand<?> command, long msDelay) { 405 // ActionStartXCommand is synchronously called from SignalXCommand passing wfJob so that it doesn't have to 406 //reload wfJob again. We need set wfJob to null, so that it get reloaded when the requeued command executes. 407 if (command instanceof ActionStartXCommand) { 408 ((ActionStartXCommand)command).wfJob = null; 409 } 410 super.queue(command, msDelay); 411 } 412}