001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.util.ArrayList; 021 import java.util.Date; 022 import java.util.List; 023 024 import javax.servlet.jsp.el.ELException; 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.oozie.ErrorCode; 027 import org.apache.oozie.FaultInjection; 028 import org.apache.oozie.SLAEventBean; 029 import org.apache.oozie.WorkflowActionBean; 030 import org.apache.oozie.WorkflowJobBean; 031 import org.apache.oozie.XException; 032 import org.apache.oozie.action.ActionExecutor; 033 import org.apache.oozie.action.ActionExecutorException; 034 import org.apache.oozie.action.control.ControlNodeActionExecutor; 035 import org.apache.oozie.client.OozieClient; 036 import org.apache.oozie.client.WorkflowAction; 037 import org.apache.oozie.client.WorkflowJob; 038 import org.apache.oozie.client.SLAEvent.SlaAppType; 039 import org.apache.oozie.client.SLAEvent.Status; 040 import org.apache.oozie.client.rest.JsonBean; 041 import org.apache.oozie.command.CommandException; 042 import org.apache.oozie.command.PreconditionException; 043 import org.apache.oozie.command.coord.CoordActionUpdateXCommand; 044 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 045 import org.apache.oozie.executor.jpa.JPAExecutorException; 046 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; 047 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 048 import org.apache.oozie.service.ActionService; 049 import org.apache.oozie.service.EventHandlerService; 050 import org.apache.oozie.service.JPAService; 051 import org.apache.oozie.service.Services; 052 import org.apache.oozie.service.UUIDService; 053 import org.apache.oozie.util.ELEvaluationException; 054 import org.apache.oozie.util.Instrumentation; 055 import org.apache.oozie.util.LogUtils; 056 import org.apache.oozie.util.XLog; 057 import org.apache.oozie.util.XmlUtils; 058 import org.apache.oozie.util.db.SLADbXOperations; 059 060 @SuppressWarnings("deprecation") 061 public class ActionStartXCommand extends ActionXCommand<Void> { 062 public static final String EL_ERROR = "EL_ERROR"; 063 public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR"; 064 public static final String COULD_NOT_START = "COULD_NOT_START"; 065 public static final String START_DATA_MISSING = "START_DATA_MISSING"; 066 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING"; 067 068 private String jobId = null; 069 private String actionId = null; 070 private WorkflowJobBean wfJob = null; 071 private WorkflowActionBean wfAction = null; 072 private JPAService jpaService = null; 073 private ActionExecutor executor = null; 074 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 075 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 076 077 public ActionStartXCommand(String actionId, String type) { 078 super("action.start", type, 0); 079 this.actionId = actionId; 080 this.jobId = Services.get().get(UUIDService.class).getId(actionId); 081 } 082 083 @Override 084 protected boolean isLockRequired() { 085 return true; 086 } 087 088 @Override 089 public String getEntityKey() { 090 return this.jobId; 091 } 092 093 @Override 094 protected void loadState() throws CommandException { 095 try { 096 jpaService = Services.get().get(JPAService.class); 097 if (jpaService != null) { 098 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId)); 099 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId)); 100 LogUtils.setLogInfo(wfJob, logInfo); 101 LogUtils.setLogInfo(wfAction, logInfo); 102 } 103 else { 104 throw new CommandException(ErrorCode.E0610); 105 } 106 } 107 catch (XException ex) { 108 throw new CommandException(ex); 109 } 110 } 111 112 @Override 113 protected void verifyPrecondition() throws CommandException, PreconditionException { 114 if (wfJob == null) { 115 throw new PreconditionException(ErrorCode.E0604, jobId); 116 } 117 if (wfAction == null) { 118 throw new PreconditionException(ErrorCode.E0605, actionId); 119 } 120 if (wfAction.isPending() 121 && (wfAction.getStatus() == WorkflowActionBean.Status.PREP 122 || wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY 123 || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL 124 || wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY 125 )) { 126 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) { 127 throw new PreconditionException(ErrorCode.E0810, WorkflowJob.Status.RUNNING.toString()); 128 } 129 } 130 else { 131 throw new PreconditionException(ErrorCode.E0816, wfAction.getPending(), wfAction.getStatusStr()); 132 } 133 134 executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType()); 135 if (executor == null) { 136 throw new CommandException(ErrorCode.E0802, wfAction.getType()); 137 } 138 } 139 140 @Override 141 protected Void execute() throws CommandException { 142 143 LOG.debug("STARTED ActionStartXCommand for wf actionId=" + actionId); 144 Configuration conf = wfJob.getWorkflowInstance().getConf(); 145 146 int maxRetries = 0; 147 long retryInterval = 0; 148 149 if (!(executor instanceof ControlNodeActionExecutor)) { 150 maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries()); 151 retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval()); 152 } 153 154 executor.setMaxRetries(maxRetries); 155 executor.setRetryInterval(retryInterval); 156 157 ActionExecutorContext context = null; 158 try { 159 boolean isRetry = false; 160 if (wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY 161 || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL) { 162 isRetry = true; 163 prepareForRetry(wfAction); 164 } 165 boolean isUserRetry = false; 166 if (wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY) { 167 isUserRetry = true; 168 prepareForRetry(wfAction); 169 } 170 context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry); 171 boolean caught = false; 172 try { 173 if (!(executor instanceof ControlNodeActionExecutor)) { 174 String tmpActionConf = XmlUtils.removeComments(wfAction.getConf()); 175 String actionConf = context.getELEvaluator().evaluate(tmpActionConf, String.class); 176 wfAction.setConf(actionConf); 177 LOG.debug("Start, name [{0}] type [{1}] configuration{E}{E}{2}{E}", wfAction.getName(), wfAction 178 .getType(), actionConf); 179 } 180 } 181 catch (ELEvaluationException ex) { 182 caught = true; 183 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, EL_EVAL_ERROR, ex 184 .getMessage(), ex); 185 } 186 catch (ELException ex) { 187 caught = true; 188 context.setErrorInfo(EL_ERROR, ex.getMessage()); 189 LOG.warn("ELException in ActionStartXCommand ", ex.getMessage(), ex); 190 handleError(context, wfJob, wfAction); 191 } 192 catch (org.jdom.JDOMException je) { 193 caught = true; 194 context.setErrorInfo("ParsingError", je.getMessage()); 195 LOG.warn("JDOMException in ActionStartXCommand ", je.getMessage(), je); 196 handleError(context, wfJob, wfAction); 197 } 198 catch (Exception ex) { 199 caught = true; 200 context.setErrorInfo(EL_ERROR, ex.getMessage()); 201 LOG.warn("Exception in ActionStartXCommand ", ex.getMessage(), ex); 202 handleError(context, wfJob, wfAction); 203 } 204 if(!caught) { 205 wfAction.setErrorInfo(null, null); 206 incrActionCounter(wfAction.getType(), 1); 207 208 LOG.info("Start action [{0}] with user-retry state : userRetryCount [{1}], userRetryMax [{2}], userRetryInterval [{3}]", 209 wfAction.getId(), wfAction.getUserRetryCount(), wfAction.getUserRetryMax(), wfAction 210 .getUserRetryInterval()); 211 212 Instrumentation.Cron cron = new Instrumentation.Cron(); 213 cron.start(); 214 context.setStartTime(); 215 executor.start(context, wfAction); 216 cron.stop(); 217 FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection"); 218 addActionCron(wfAction.getType(), cron); 219 220 wfAction.setRetries(0); 221 if (wfAction.isExecutionComplete()) { 222 if (!context.isExecuted()) { 223 LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor 224 .getType()); 225 wfAction.setErrorInfo(EXEC_DATA_MISSING, 226 "Execution Complete, but Execution Data Missing from Action"); 227 failJob(context); 228 } else { 229 wfAction.setPending(); 230 queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType())); 231 } 232 } 233 else { 234 if (!context.isStarted()) { 235 LOG.warn(XLog.OPS, "Action Started, ActionExecutor [{0}] must call setStartData()", executor 236 .getType()); 237 wfAction.setErrorInfo(START_DATA_MISSING, "Execution Started, but Start Data Missing from Action"); 238 failJob(context); 239 } else { 240 queue(new NotificationXCommand(wfJob, wfAction)); 241 } 242 } 243 244 LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action status=" + wfAction.getStatusStr()); 245 246 updateList.add(wfAction); 247 wfJob.setLastModifiedTime(new Date()); 248 updateList.add(wfJob); 249 // Add SLA status event (STARTED) for WF_ACTION 250 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.STARTED, 251 SlaAppType.WORKFLOW_ACTION); 252 if(slaEvent != null) { 253 insertList.add(slaEvent); 254 } 255 LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action updated in DB!"); 256 } 257 } 258 catch (ActionExecutorException ex) { 259 LOG.warn("Error starting action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]", 260 wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage(), ex); 261 wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage()); 262 switch (ex.getErrorType()) { 263 case TRANSIENT: 264 if (!handleTransient(context, executor, WorkflowAction.Status.START_RETRY)) { 265 handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL); 266 wfAction.setPendingAge(new Date()); 267 wfAction.setRetries(0); 268 wfAction.setStartTime(null); 269 } 270 break; 271 case NON_TRANSIENT: 272 handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL); 273 break; 274 case ERROR: 275 handleError(context, executor, WorkflowAction.Status.ERROR.toString(), true, 276 WorkflowAction.Status.DONE); 277 break; 278 case FAILED: 279 try { 280 failJob(context); 281 // update coordinator action 282 new CoordActionUpdateXCommand(wfJob, 3).call(); 283 new WfEndXCommand(wfJob).call(); // To delete the WF temp dir 284 SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED, 285 SlaAppType.WORKFLOW_ACTION); 286 if(slaEvent1 != null) { 287 insertList.add(slaEvent1); 288 } 289 SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED, 290 SlaAppType.WORKFLOW_JOB); 291 if(slaEvent2 != null) { 292 insertList.add(slaEvent2); 293 } 294 } 295 catch (XException x) { 296 LOG.warn("ActionStartXCommand - case:FAILED ", x.getMessage()); 297 } 298 break; 299 } 300 updateList.add(wfAction); 301 wfJob.setLastModifiedTime(new Date()); 302 updateList.add(wfJob); 303 } 304 finally { 305 try { 306 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList)); 307 if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) { 308 generateEvent(wfAction, wfJob.getUser()); 309 } 310 } 311 catch (JPAExecutorException e) { 312 throw new CommandException(e); 313 } 314 } 315 316 LOG.debug("ENDED ActionStartXCommand for wf actionId=" + actionId + ", jobId=" + jobId); 317 318 return null; 319 } 320 321 private void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action) 322 throws CommandException { 323 failJob(context); 324 updateList.add(wfAction); 325 wfJob.setLastModifiedTime(new Date()); 326 updateList.add(wfJob); 327 SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(), 328 Status.FAILED, SlaAppType.WORKFLOW_ACTION); 329 if(slaEvent1 != null) { 330 insertList.add(slaEvent1); 331 } 332 SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(workflow.getSlaXml(), workflow.getId(), 333 Status.FAILED, SlaAppType.WORKFLOW_JOB); 334 if(slaEvent2 != null) { 335 insertList.add(slaEvent2); 336 } 337 338 new WfEndXCommand(wfJob).call(); //To delete the WF temp dir 339 return; 340 } 341 342 /* (non-Javadoc) 343 * @see org.apache.oozie.command.XCommand#getKey() 344 */ 345 @Override 346 public String getKey(){ 347 return getName() + "_" + actionId; 348 } 349 350 private void prepareForRetry(WorkflowActionBean wfAction) { 351 if (wfAction.getType().equals("map-reduce")) { 352 // need to delete child job id of original run 353 wfAction.setExternalChildIDs(""); 354 } 355 } 356 357 }