001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.wf; 019 020import java.util.ArrayList; 021import java.util.Date; 022import java.util.List; 023 024import javax.servlet.jsp.el.ELException; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.oozie.ErrorCode; 028import org.apache.oozie.FaultInjection; 029import org.apache.oozie.SLAEventBean; 030import org.apache.oozie.WorkflowActionBean; 031import org.apache.oozie.WorkflowJobBean; 032import org.apache.oozie.XException; 033import org.apache.oozie.action.ActionExecutor; 034import org.apache.oozie.action.ActionExecutorException; 035import org.apache.oozie.action.control.ControlNodeActionExecutor; 036import org.apache.oozie.client.OozieClient; 037import org.apache.oozie.client.WorkflowAction; 038import org.apache.oozie.client.WorkflowJob; 039import org.apache.oozie.client.SLAEvent.SlaAppType; 040import org.apache.oozie.client.SLAEvent.Status; 041import org.apache.oozie.client.rest.JsonBean; 042import org.apache.oozie.command.CommandException; 043import org.apache.oozie.command.PreconditionException; 044import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 045import org.apache.oozie.executor.jpa.BatchQueryExecutor; 046import org.apache.oozie.executor.jpa.JPAExecutorException; 047import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; 048import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; 049import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 050import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 051import org.apache.oozie.service.ActionService; 052import org.apache.oozie.service.EventHandlerService; 053import org.apache.oozie.service.JPAService; 054import org.apache.oozie.service.Services; 055import org.apache.oozie.service.UUIDService; 056import org.apache.oozie.util.ELEvaluationException; 057import org.apache.oozie.util.Instrumentation; 058import org.apache.oozie.util.LogUtils; 059import org.apache.oozie.util.XLog; 060import org.apache.oozie.util.XmlUtils; 061import org.apache.oozie.util.db.SLADbXOperations; 062 063@SuppressWarnings("deprecation") 064public class ActionStartXCommand extends ActionXCommand<Void> { 065 public static final String EL_ERROR = "EL_ERROR"; 066 public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR"; 067 public static final String COULD_NOT_START = "COULD_NOT_START"; 068 public static final String START_DATA_MISSING = "START_DATA_MISSING"; 069 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING"; 070 071 private String jobId = null; 072 private String actionId = null; 073 private WorkflowJobBean wfJob = null; 074 private WorkflowActionBean wfAction = null; 075 private JPAService jpaService = null; 076 private ActionExecutor executor = null; 077 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 078 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 079 080 public ActionStartXCommand(String actionId, String type) { 081 super("action.start", type, 0); 082 this.actionId = actionId; 083 this.jobId = Services.get().get(UUIDService.class).getId(actionId); 084 } 085 086 public ActionStartXCommand(WorkflowJobBean job, String actionId, String type) { 087 super("action.start", type, 0); 088 this.actionId = actionId; 089 this.wfJob = job; 090 this.jobId = wfJob.getId(); 091 } 092 093 @Override 094 protected boolean isLockRequired() { 095 return true; 096 } 097 098 @Override 099 public String getEntityKey() { 100 return this.jobId; 101 } 102 103 @Override 104 protected void loadState() throws CommandException { 105 try { 106 jpaService = Services.get().get(JPAService.class); 107 if (jpaService != null) { 108 if (wfJob == null) { 109 this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId); 110 } 111 this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION, actionId); 112 LogUtils.setLogInfo(wfJob, logInfo); 113 LogUtils.setLogInfo(wfAction, logInfo); 114 } 115 else { 116 throw new CommandException(ErrorCode.E0610); 117 } 118 } 119 catch (XException ex) { 120 throw new CommandException(ex); 121 } 122 } 123 124 @Override 125 protected void verifyPrecondition() throws CommandException, PreconditionException { 126 if (wfJob == null) { 127 throw new PreconditionException(ErrorCode.E0604, jobId); 128 } 129 if (wfAction == null) { 130 throw new PreconditionException(ErrorCode.E0605, actionId); 131 } 132 if (wfAction.isPending() 133 && (wfAction.getStatus() == WorkflowActionBean.Status.PREP 134 || wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY 135 || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL 136 || wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY 137 )) { 138 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) { 139 throw new PreconditionException(ErrorCode.E0810, WorkflowJob.Status.RUNNING.toString()); 140 } 141 } 142 else { 143 throw new PreconditionException(ErrorCode.E0816, wfAction.isPending(), wfAction.getStatusStr()); 144 } 145 146 executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType()); 147 if (executor == null) { 148 throw new CommandException(ErrorCode.E0802, wfAction.getType()); 149 } 150 } 151 152 @Override 153 protected Void execute() throws CommandException { 154 155 LOG.debug("STARTED ActionStartXCommand for wf actionId=" + actionId); 156 Configuration conf = wfJob.getWorkflowInstance().getConf(); 157 158 int maxRetries = 0; 159 long retryInterval = 0; 160 boolean execSynchronous = false; 161 162 if (!(executor instanceof ControlNodeActionExecutor)) { 163 maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries()); 164 retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval()); 165 } 166 167 executor.setMaxRetries(maxRetries); 168 executor.setRetryInterval(retryInterval); 169 170 ActionExecutorContext context = null; 171 try { 172 boolean isRetry = false; 173 if (wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY 174 || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL) { 175 isRetry = true; 176 prepareForRetry(wfAction); 177 } 178 boolean isUserRetry = false; 179 if (wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY) { 180 isUserRetry = true; 181 prepareForRetry(wfAction); 182 } 183 context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry); 184 boolean caught = false; 185 try { 186 if (!(executor instanceof ControlNodeActionExecutor)) { 187 String tmpActionConf = XmlUtils.removeComments(wfAction.getConf()); 188 String actionConf = context.getELEvaluator().evaluate(tmpActionConf, String.class); 189 wfAction.setConf(actionConf); 190 LOG.debug("Start, name [{0}] type [{1}] configuration{E}{E}{2}{E}", wfAction.getName(), wfAction 191 .getType(), actionConf); 192 } 193 } 194 catch (ELEvaluationException ex) { 195 caught = true; 196 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, EL_EVAL_ERROR, ex 197 .getMessage(), ex); 198 } 199 catch (ELException ex) { 200 caught = true; 201 context.setErrorInfo(EL_ERROR, ex.getMessage()); 202 LOG.warn("ELException in ActionStartXCommand ", ex.getMessage(), ex); 203 handleError(context, wfJob, wfAction); 204 } 205 catch (org.jdom.JDOMException je) { 206 caught = true; 207 context.setErrorInfo("ParsingError", je.getMessage()); 208 LOG.warn("JDOMException in ActionStartXCommand ", je.getMessage(), je); 209 handleError(context, wfJob, wfAction); 210 } 211 catch (Exception ex) { 212 caught = true; 213 context.setErrorInfo(EL_ERROR, ex.getMessage()); 214 LOG.warn("Exception in ActionStartXCommand ", ex.getMessage(), ex); 215 handleError(context, wfJob, wfAction); 216 } 217 if(!caught) { 218 wfAction.setErrorInfo(null, null); 219 incrActionCounter(wfAction.getType(), 1); 220 221 LOG.info("Start action [{0}] with user-retry state : userRetryCount [{1}], userRetryMax [{2}], userRetryInterval [{3}]", 222 wfAction.getId(), wfAction.getUserRetryCount(), wfAction.getUserRetryMax(), wfAction 223 .getUserRetryInterval()); 224 225 Instrumentation.Cron cron = new Instrumentation.Cron(); 226 cron.start(); 227 context.setStartTime(); 228 executor.start(context, wfAction); 229 cron.stop(); 230 FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection"); 231 addActionCron(wfAction.getType(), cron); 232 233 wfAction.setRetries(0); 234 if (wfAction.isExecutionComplete()) { 235 if (!context.isExecuted()) { 236 LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor 237 .getType()); 238 wfAction.setErrorInfo(EXEC_DATA_MISSING, 239 "Execution Complete, but Execution Data Missing from Action"); 240 failJob(context); 241 } else { 242 wfAction.setPending(); 243 if (!(executor instanceof ControlNodeActionExecutor)) { 244 queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType())); 245 } 246 else { 247 execSynchronous = true; 248 } 249 } 250 } 251 else { 252 if (!context.isStarted()) { 253 LOG.warn(XLog.OPS, "Action Started, ActionExecutor [{0}] must call setStartData()", executor 254 .getType()); 255 wfAction.setErrorInfo(START_DATA_MISSING, "Execution Started, but Start Data Missing from Action"); 256 failJob(context); 257 } else { 258 queue(new NotificationXCommand(wfJob, wfAction)); 259 } 260 } 261 262 LOG.info(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action status=" + wfAction.getStatusStr()); 263 264 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction)); 265 wfJob.setLastModifiedTime(new Date()); 266 updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob)); 267 // Add SLA status event (STARTED) for WF_ACTION 268 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.STARTED, 269 SlaAppType.WORKFLOW_ACTION); 270 if(slaEvent != null) { 271 insertList.add(slaEvent); 272 } 273 LOG.info(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action updated in DB!"); 274 } 275 } 276 catch (ActionExecutorException ex) { 277 LOG.warn("Error starting action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]", 278 wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage(), ex); 279 wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage()); 280 switch (ex.getErrorType()) { 281 case TRANSIENT: 282 if (!handleTransient(context, executor, WorkflowAction.Status.START_RETRY)) { 283 handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL); 284 wfAction.setPendingAge(new Date()); 285 wfAction.setRetries(0); 286 wfAction.setStartTime(null); 287 } 288 break; 289 case NON_TRANSIENT: 290 handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL); 291 break; 292 case ERROR: 293 handleError(context, executor, WorkflowAction.Status.ERROR.toString(), true, 294 WorkflowAction.Status.DONE); 295 break; 296 case FAILED: 297 try { 298 failJob(context); 299 updateParentIfNecessary(wfJob, 3); 300 new WfEndXCommand(wfJob).call(); // To delete the WF temp dir 301 SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED, 302 SlaAppType.WORKFLOW_ACTION); 303 if(slaEvent1 != null) { 304 insertList.add(slaEvent1); 305 } 306 SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED, 307 SlaAppType.WORKFLOW_JOB); 308 if(slaEvent2 != null) { 309 insertList.add(slaEvent2); 310 } 311 } 312 catch (XException x) { 313 LOG.warn("ActionStartXCommand - case:FAILED ", x.getMessage()); 314 } 315 break; 316 } 317 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction)); 318 wfJob.setLastModifiedTime(new Date()); 319 updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob)); 320 } 321 finally { 322 try { 323 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); 324 if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) { 325 generateEvent(wfAction, wfJob.getUser()); 326 } 327 if (execSynchronous) { 328 // Changing to synchronous call from asynchronous queuing to prevent 329 // undue delay from ::start:: to action due to queuing 330 new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call(getEntityKey()); 331 } 332 } 333 catch (JPAExecutorException e) { 334 throw new CommandException(e); 335 } 336 } 337 338 LOG.debug("ENDED ActionStartXCommand for wf actionId=" + actionId + ", jobId=" + jobId); 339 340 return null; 341 } 342 343 private void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action) 344 throws CommandException { 345 failJob(context); 346 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction)); 347 wfJob.setLastModifiedTime(new Date()); 348 updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob)); 349 SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(), 350 Status.FAILED, SlaAppType.WORKFLOW_ACTION); 351 if(slaEvent1 != null) { 352 insertList.add(slaEvent1); 353 } 354 SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(workflow.getSlaXml(), workflow.getId(), 355 Status.FAILED, SlaAppType.WORKFLOW_JOB); 356 if(slaEvent2 != null) { 357 insertList.add(slaEvent2); 358 } 359 360 new WfEndXCommand(wfJob).call(); //To delete the WF temp dir 361 return; 362 } 363 364 /* (non-Javadoc) 365 * @see org.apache.oozie.command.XCommand#getKey() 366 */ 367 @Override 368 public String getKey(){ 369 return getName() + "_" + actionId; 370 } 371 372 private void prepareForRetry(WorkflowActionBean wfAction) { 373 if (wfAction.getType().equals("map-reduce")) { 374 // need to delete child job id of original run 375 wfAction.setExternalChildIDs(""); 376 } 377 } 378 379}