001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.util.Date; 021 022 import javax.servlet.jsp.el.ELException; 023 import org.apache.hadoop.conf.Configuration; 024 import org.apache.oozie.ErrorCode; 025 import org.apache.oozie.FaultInjection; 026 import org.apache.oozie.WorkflowActionBean; 027 import org.apache.oozie.WorkflowJobBean; 028 import org.apache.oozie.XException; 029 import org.apache.oozie.action.ActionExecutor; 030 import org.apache.oozie.action.ActionExecutorException; 031 import org.apache.oozie.client.OozieClient; 032 import org.apache.oozie.client.WorkflowAction; 033 import org.apache.oozie.client.WorkflowJob; 034 import org.apache.oozie.client.SLAEvent.SlaAppType; 035 import org.apache.oozie.client.SLAEvent.Status; 036 import org.apache.oozie.command.CommandException; 037 import org.apache.oozie.command.PreconditionException; 038 import org.apache.oozie.command.coord.CoordActionUpdateXCommand; 039 import org.apache.oozie.executor.jpa.JPAExecutorException; 040 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; 041 import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor; 042 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 043 import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor; 044 import org.apache.oozie.service.ActionService; 045 import org.apache.oozie.service.JPAService; 046 import org.apache.oozie.service.Services; 047 import org.apache.oozie.service.UUIDService; 048 import org.apache.oozie.util.ELEvaluationException; 049 import org.apache.oozie.util.Instrumentation; 050 import org.apache.oozie.util.LogUtils; 051 import org.apache.oozie.util.XLog; 052 import org.apache.oozie.util.XmlUtils; 053 import org.apache.oozie.util.db.SLADbXOperations; 054 055 public class ActionStartXCommand extends ActionXCommand<Void> { 056 public static final String EL_ERROR = "EL_ERROR"; 057 public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR"; 058 public static final String COULD_NOT_START = "COULD_NOT_START"; 059 public static final String START_DATA_MISSING = "START_DATA_MISSING"; 060 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING"; 061 062 private String jobId = null; 063 private String actionId = null; 064 private WorkflowJobBean wfJob = null; 065 private WorkflowActionBean wfAction = null; 066 private JPAService jpaService = null; 067 private ActionExecutor executor = null; 068 069 public ActionStartXCommand(String actionId, String type) { 070 super("action.start", type, 0); 071 this.actionId = actionId; 072 this.jobId = Services.get().get(UUIDService.class).getId(actionId); 073 } 074 075 @Override 076 protected boolean isLockRequired() { 077 return true; 078 } 079 080 @Override 081 protected String getEntityKey() { 082 return this.jobId; 083 } 084 085 @Override 086 protected void loadState() throws CommandException { 087 try { 088 jpaService = Services.get().get(JPAService.class); 089 if (jpaService != null) { 090 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId)); 091 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId)); 092 LogUtils.setLogInfo(wfJob, logInfo); 093 LogUtils.setLogInfo(wfAction, logInfo); 094 } 095 else { 096 throw new CommandException(ErrorCode.E0610); 097 } 098 } 099 catch (XException ex) { 100 throw new CommandException(ex); 101 } 102 } 103 104 @Override 105 protected void verifyPrecondition() throws CommandException, PreconditionException { 106 if (wfJob == null) { 107 throw new PreconditionException(ErrorCode.E0604, jobId); 108 } 109 if (wfAction == null) { 110 throw new PreconditionException(ErrorCode.E0605, actionId); 111 } 112 if (wfAction.isPending() 113 && (wfAction.getStatus() == WorkflowActionBean.Status.PREP 114 || wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY 115 || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL 116 || wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY 117 )) { 118 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) { 119 throw new PreconditionException(ErrorCode.E0810, WorkflowJob.Status.RUNNING.toString()); 120 } 121 } 122 else { 123 throw new PreconditionException(ErrorCode.E0816, wfAction.getPending(), wfAction.getStatusStr()); 124 } 125 126 executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType()); 127 if (executor == null) { 128 throw new CommandException(ErrorCode.E0802, wfAction.getType()); 129 } 130 } 131 132 @Override 133 protected Void execute() throws CommandException { 134 135 LOG.debug("STARTED ActionStartXCommand for wf actionId=" + actionId); 136 Configuration conf = wfJob.getWorkflowInstance().getConf(); 137 138 int maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries()); 139 long retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval()); 140 executor.setMaxRetries(maxRetries); 141 executor.setRetryInterval(retryInterval); 142 143 ActionExecutorContext context = null; 144 try { 145 boolean isRetry = false; 146 if (wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY 147 || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL) { 148 isRetry = true; 149 } 150 boolean isUserRetry = false; 151 if (wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY) { 152 isUserRetry = true; 153 } 154 context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry); 155 try { 156 String tmpActionConf = XmlUtils.removeComments(wfAction.getConf()); 157 String actionConf = context.getELEvaluator().evaluate(tmpActionConf, String.class); 158 wfAction.setConf(actionConf); 159 LOG.debug("Start, name [{0}] type [{1}] configuration{E}{E}{2}{E}", wfAction.getName(), wfAction 160 .getType(), actionConf); 161 } 162 catch (ELEvaluationException ex) { 163 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, EL_EVAL_ERROR, ex 164 .getMessage(), ex); 165 } 166 catch (ELException ex) { 167 context.setErrorInfo(EL_ERROR, ex.getMessage()); 168 LOG.warn("ELException in ActionStartXCommand ", ex.getMessage(), ex); 169 handleError(context, wfJob, wfAction); 170 return null; 171 } 172 catch (org.jdom.JDOMException je) { 173 context.setErrorInfo("ParsingError", je.getMessage()); 174 LOG.warn("JDOMException in ActionStartXCommand ", je.getMessage(), je); 175 handleError(context, wfJob, wfAction); 176 return null; 177 } 178 catch (Exception ex) { 179 context.setErrorInfo(EL_ERROR, ex.getMessage()); 180 LOG.warn("Exception in ActionStartXCommand ", ex.getMessage(), ex); 181 handleError(context, wfJob, wfAction); 182 return null; 183 } 184 wfAction.setErrorInfo(null, null); 185 incrActionCounter(wfAction.getType(), 1); 186 187 LOG.info("Start action [{0}] with user-retry state : userRetryCount [{1}], userRetryMax [{2}], userRetryInterval [{3}]", 188 wfAction.getId(), wfAction.getUserRetryCount(), wfAction.getUserRetryMax(), wfAction 189 .getUserRetryInterval()); 190 191 Instrumentation.Cron cron = new Instrumentation.Cron(); 192 cron.start(); 193 executor.start(context, wfAction); 194 cron.stop(); 195 FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection"); 196 addActionCron(wfAction.getType(), cron); 197 198 wfAction.setRetries(0); 199 if (wfAction.isExecutionComplete()) { 200 if (!context.isExecuted()) { 201 LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor 202 .getType()); 203 wfAction.setErrorInfo(EXEC_DATA_MISSING, 204 "Execution Complete, but Execution Data Missing from Action"); 205 failJob(context); 206 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction)); 207 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob)); 208 return null; 209 } 210 wfAction.setPending(); 211 queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType())); 212 } 213 else { 214 if (!context.isStarted()) { 215 LOG.warn(XLog.OPS, "Action Started, ActionExecutor [{0}] must call setStartData()", executor 216 .getType()); 217 wfAction.setErrorInfo(START_DATA_MISSING, "Execution Started, but Start Data Missing from Action"); 218 failJob(context); 219 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction)); 220 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob)); 221 return null; 222 } 223 queue(new NotificationXCommand(wfJob, wfAction)); 224 } 225 226 LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action status=" + wfAction.getStatusStr()); 227 228 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction)); 229 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob)); 230 // Add SLA status event (STARTED) for WF_ACTION 231 SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.STARTED, 232 SlaAppType.WORKFLOW_ACTION); 233 LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action updated in DB!"); 234 235 } 236 catch (ActionExecutorException ex) { 237 LOG.warn("Error starting action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]", 238 wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage(), ex); 239 wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage()); 240 switch (ex.getErrorType()) { 241 case TRANSIENT: 242 if (!handleTransient(context, executor, WorkflowAction.Status.START_RETRY)) { 243 handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL); 244 wfAction.setPendingAge(new Date()); 245 wfAction.setRetries(0); 246 wfAction.setStartTime(null); 247 } 248 break; 249 case NON_TRANSIENT: 250 handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL); 251 break; 252 case ERROR: 253 handleError(context, executor, WorkflowAction.Status.ERROR.toString(), true, 254 WorkflowAction.Status.DONE); 255 break; 256 case FAILED: 257 try { 258 failJob(context); 259 // update coordinator action 260 new CoordActionUpdateXCommand(wfJob, 3).call(); 261 new WfEndXCommand(wfJob).call(); // To delete the WF temp dir 262 SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED, 263 SlaAppType.WORKFLOW_ACTION); 264 SLADbXOperations.writeStausEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED, 265 SlaAppType.WORKFLOW_JOB); 266 } 267 catch (XException x) { 268 LOG.warn("ActionStartXCommand - case:FAILED ", x.getMessage()); 269 } 270 break; 271 } 272 try { 273 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction)); 274 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob)); 275 } 276 catch (JPAExecutorException je) { 277 throw new CommandException(je); 278 } 279 } 280 catch (JPAExecutorException je) { 281 throw new CommandException(je); 282 } 283 284 LOG.debug("ENDED ActionStartXCommand for wf actionId=" + actionId + ", jobId=" + jobId); 285 286 return null; 287 } 288 289 private void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action) 290 throws CommandException { 291 failJob(context); 292 try { 293 jpaService.execute(new WorkflowActionUpdateJPAExecutor(action)); 294 jpaService.execute(new WorkflowJobUpdateJPAExecutor(workflow)); 295 } 296 catch (JPAExecutorException je) { 297 throw new CommandException(je); 298 } 299 SLADbXOperations.writeStausEvent(action.getSlaXml(), action.getId(), Status.FAILED, SlaAppType.WORKFLOW_ACTION); 300 SLADbXOperations.writeStausEvent(workflow.getSlaXml(), workflow.getId(), Status.FAILED, SlaAppType.WORKFLOW_JOB); 301 // update coordinator action 302 new CoordActionUpdateXCommand(workflow, 3).call(); 303 new WfEndXCommand(wfJob).call(); //To delete the WF temp dir 304 return; 305 } 306 307 /* (non-Javadoc) 308 * @see org.apache.oozie.command.XCommand#getKey() 309 */ 310 @Override 311 public String getKey(){ 312 return getName() + "_" + actionId; 313 } 314 315 }