001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.util.ArrayList; 021 import java.util.Date; 022 import java.util.List; 023 024 import javax.servlet.jsp.el.ELException; 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.oozie.ErrorCode; 027 import org.apache.oozie.FaultInjection; 028 import org.apache.oozie.SLAEventBean; 029 import org.apache.oozie.WorkflowActionBean; 030 import org.apache.oozie.WorkflowJobBean; 031 import org.apache.oozie.XException; 032 import org.apache.oozie.action.ActionExecutor; 033 import org.apache.oozie.action.ActionExecutorException; 034 import org.apache.oozie.action.control.ControlNodeActionExecutor; 035 import org.apache.oozie.client.OozieClient; 036 import org.apache.oozie.client.WorkflowAction; 037 import org.apache.oozie.client.WorkflowJob; 038 import org.apache.oozie.client.SLAEvent.SlaAppType; 039 import org.apache.oozie.client.SLAEvent.Status; 040 import org.apache.oozie.client.rest.JsonBean; 041 import org.apache.oozie.command.CommandException; 042 import org.apache.oozie.command.PreconditionException; 043 import org.apache.oozie.command.coord.CoordActionUpdateXCommand; 044 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 045 import org.apache.oozie.executor.jpa.JPAExecutorException; 046 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; 047 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 048 import org.apache.oozie.service.ActionService; 049 import org.apache.oozie.service.JPAService; 050 import org.apache.oozie.service.Services; 051 import org.apache.oozie.service.UUIDService; 052 import org.apache.oozie.util.ELEvaluationException; 053 import org.apache.oozie.util.Instrumentation; 054 import org.apache.oozie.util.LogUtils; 055 import org.apache.oozie.util.XLog; 056 import org.apache.oozie.util.XmlUtils; 057 import org.apache.oozie.util.db.SLADbXOperations; 058 059 public class ActionStartXCommand extends ActionXCommand<Void> { 060 public static final String EL_ERROR = "EL_ERROR"; 061 public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR"; 062 public static final String COULD_NOT_START = "COULD_NOT_START"; 063 public static final String START_DATA_MISSING = "START_DATA_MISSING"; 064 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING"; 065 066 private String jobId = null; 067 private String actionId = null; 068 private WorkflowJobBean wfJob = null; 069 private WorkflowActionBean wfAction = null; 070 private JPAService jpaService = null; 071 private ActionExecutor executor = null; 072 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 073 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 074 075 public ActionStartXCommand(String actionId, String type) { 076 super("action.start", type, 0); 077 this.actionId = actionId; 078 this.jobId = Services.get().get(UUIDService.class).getId(actionId); 079 } 080 081 @Override 082 protected boolean isLockRequired() { 083 return true; 084 } 085 086 @Override 087 public String getEntityKey() { 088 return this.jobId; 089 } 090 091 @Override 092 protected void loadState() throws CommandException { 093 try { 094 jpaService = Services.get().get(JPAService.class); 095 if (jpaService != null) { 096 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId)); 097 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId)); 098 LogUtils.setLogInfo(wfJob, logInfo); 099 LogUtils.setLogInfo(wfAction, logInfo); 100 } 101 else { 102 throw new CommandException(ErrorCode.E0610); 103 } 104 } 105 catch (XException ex) { 106 throw new CommandException(ex); 107 } 108 } 109 110 @Override 111 protected void verifyPrecondition() throws CommandException, PreconditionException { 112 if (wfJob == null) { 113 throw new PreconditionException(ErrorCode.E0604, jobId); 114 } 115 if (wfAction == null) { 116 throw new PreconditionException(ErrorCode.E0605, actionId); 117 } 118 if (wfAction.isPending() 119 && (wfAction.getStatus() == WorkflowActionBean.Status.PREP 120 || wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY 121 || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL 122 || wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY 123 )) { 124 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) { 125 throw new PreconditionException(ErrorCode.E0810, WorkflowJob.Status.RUNNING.toString()); 126 } 127 } 128 else { 129 throw new PreconditionException(ErrorCode.E0816, wfAction.getPending(), wfAction.getStatusStr()); 130 } 131 132 executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType()); 133 if (executor == null) { 134 throw new CommandException(ErrorCode.E0802, wfAction.getType()); 135 } 136 } 137 138 @Override 139 protected Void execute() throws CommandException { 140 141 LOG.debug("STARTED ActionStartXCommand for wf actionId=" + actionId); 142 Configuration conf = wfJob.getWorkflowInstance().getConf(); 143 144 int maxRetries = 0; 145 long retryInterval = 0; 146 147 if (!(executor instanceof ControlNodeActionExecutor)) { 148 maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries()); 149 retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval()); 150 } 151 152 executor.setMaxRetries(maxRetries); 153 executor.setRetryInterval(retryInterval); 154 155 ActionExecutorContext context = null; 156 try { 157 boolean isRetry = false; 158 if (wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY 159 || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL) { 160 isRetry = true; 161 } 162 boolean isUserRetry = false; 163 if (wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY) { 164 isUserRetry = true; 165 } 166 context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry); 167 boolean caught = false; 168 try { 169 if (!(executor instanceof ControlNodeActionExecutor)) { 170 String tmpActionConf = XmlUtils.removeComments(wfAction.getConf()); 171 String actionConf = context.getELEvaluator().evaluate(tmpActionConf, String.class); 172 wfAction.setConf(actionConf); 173 LOG.debug("Start, name [{0}] type [{1}] configuration{E}{E}{2}{E}", wfAction.getName(), wfAction 174 .getType(), actionConf); 175 } 176 } 177 catch (ELEvaluationException ex) { 178 caught = true; 179 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, EL_EVAL_ERROR, ex 180 .getMessage(), ex); 181 } 182 catch (ELException ex) { 183 caught = true; 184 context.setErrorInfo(EL_ERROR, ex.getMessage()); 185 LOG.warn("ELException in ActionStartXCommand ", ex.getMessage(), ex); 186 handleError(context, wfJob, wfAction); 187 } 188 catch (org.jdom.JDOMException je) { 189 caught = true; 190 context.setErrorInfo("ParsingError", je.getMessage()); 191 LOG.warn("JDOMException in ActionStartXCommand ", je.getMessage(), je); 192 handleError(context, wfJob, wfAction); 193 } 194 catch (Exception ex) { 195 caught = true; 196 context.setErrorInfo(EL_ERROR, ex.getMessage()); 197 LOG.warn("Exception in ActionStartXCommand ", ex.getMessage(), ex); 198 handleError(context, wfJob, wfAction); 199 } 200 if(!caught) { 201 wfAction.setErrorInfo(null, null); 202 incrActionCounter(wfAction.getType(), 1); 203 204 LOG.info("Start action [{0}] with user-retry state : userRetryCount [{1}], userRetryMax [{2}], userRetryInterval [{3}]", 205 wfAction.getId(), wfAction.getUserRetryCount(), wfAction.getUserRetryMax(), wfAction 206 .getUserRetryInterval()); 207 208 Instrumentation.Cron cron = new Instrumentation.Cron(); 209 cron.start(); 210 context.setStartTime(); 211 executor.start(context, wfAction); 212 cron.stop(); 213 FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection"); 214 addActionCron(wfAction.getType(), cron); 215 216 wfAction.setRetries(0); 217 if (wfAction.isExecutionComplete()) { 218 if (!context.isExecuted()) { 219 LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor 220 .getType()); 221 wfAction.setErrorInfo(EXEC_DATA_MISSING, 222 "Execution Complete, but Execution Data Missing from Action"); 223 failJob(context); 224 } else { 225 wfAction.setPending(); 226 queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType())); 227 } 228 } 229 else { 230 if (!context.isStarted()) { 231 LOG.warn(XLog.OPS, "Action Started, ActionExecutor [{0}] must call setStartData()", executor 232 .getType()); 233 wfAction.setErrorInfo(START_DATA_MISSING, "Execution Started, but Start Data Missing from Action"); 234 failJob(context); 235 } else { 236 queue(new NotificationXCommand(wfJob, wfAction)); 237 } 238 } 239 240 LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action status=" + wfAction.getStatusStr()); 241 242 updateList.add(wfAction); 243 wfJob.setLastModifiedTime(new Date()); 244 updateList.add(wfJob); 245 // Add SLA status event (STARTED) for WF_ACTION 246 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.STARTED, 247 SlaAppType.WORKFLOW_ACTION); 248 if(slaEvent != null) { 249 insertList.add(slaEvent); 250 } 251 LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action updated in DB!"); 252 } 253 } 254 catch (ActionExecutorException ex) { 255 LOG.warn("Error starting action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]", 256 wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage(), ex); 257 wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage()); 258 switch (ex.getErrorType()) { 259 case TRANSIENT: 260 if (!handleTransient(context, executor, WorkflowAction.Status.START_RETRY)) { 261 handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL); 262 wfAction.setPendingAge(new Date()); 263 wfAction.setRetries(0); 264 wfAction.setStartTime(null); 265 } 266 break; 267 case NON_TRANSIENT: 268 handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL); 269 break; 270 case ERROR: 271 handleError(context, executor, WorkflowAction.Status.ERROR.toString(), true, 272 WorkflowAction.Status.DONE); 273 break; 274 case FAILED: 275 try { 276 failJob(context); 277 // update coordinator action 278 new CoordActionUpdateXCommand(wfJob, 3).call(); 279 new WfEndXCommand(wfJob).call(); // To delete the WF temp dir 280 SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED, 281 SlaAppType.WORKFLOW_ACTION); 282 if(slaEvent1 != null) { 283 insertList.add(slaEvent1); 284 } 285 SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED, 286 SlaAppType.WORKFLOW_JOB); 287 if(slaEvent2 != null) { 288 insertList.add(slaEvent2); 289 } 290 } 291 catch (XException x) { 292 LOG.warn("ActionStartXCommand - case:FAILED ", x.getMessage()); 293 } 294 break; 295 } 296 updateList.add(wfAction); 297 wfJob.setLastModifiedTime(new Date()); 298 updateList.add(wfJob); 299 } 300 finally { 301 try { 302 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList)); 303 } 304 catch (JPAExecutorException e) { 305 throw new CommandException(e); 306 } 307 } 308 309 LOG.debug("ENDED ActionStartXCommand for wf actionId=" + actionId + ", jobId=" + jobId); 310 311 return null; 312 } 313 314 private void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action) 315 throws CommandException { 316 failJob(context); 317 updateList.add(wfAction); 318 wfJob.setLastModifiedTime(new Date()); 319 updateList.add(wfJob); 320 SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(), 321 Status.FAILED, SlaAppType.WORKFLOW_ACTION); 322 if(slaEvent1 != null) { 323 insertList.add(slaEvent1); 324 } 325 SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(workflow.getSlaXml(), workflow.getId(), 326 Status.FAILED, SlaAppType.WORKFLOW_JOB); 327 if(slaEvent2 != null) { 328 insertList.add(slaEvent2); 329 } 330 // update coordinator action 331 new CoordActionUpdateXCommand(workflow, 3).call(); 332 new WfEndXCommand(wfJob).call(); //To delete the WF temp dir 333 return; 334 } 335 336 /* (non-Javadoc) 337 * @see org.apache.oozie.command.XCommand#getKey() 338 */ 339 @Override 340 public String getKey(){ 341 return getName() + "_" + actionId; 342 } 343 344 }