001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.util.Date; 021 022 import javax.servlet.jsp.el.ELException; 023 024 import org.apache.hadoop.conf.Configuration; 025 import org.apache.oozie.ErrorCode; 026 import org.apache.oozie.FaultInjection; 027 import org.apache.oozie.WorkflowActionBean; 028 import org.apache.oozie.WorkflowJobBean; 029 import org.apache.oozie.XException; 030 import org.apache.oozie.action.ActionExecutor; 031 import org.apache.oozie.action.ActionExecutorException; 032 import org.apache.oozie.client.OozieClient; 033 import org.apache.oozie.client.WorkflowAction; 034 import org.apache.oozie.client.WorkflowJob; 035 import org.apache.oozie.client.SLAEvent.SlaAppType; 036 import org.apache.oozie.client.SLAEvent.Status; 037 import org.apache.oozie.command.CommandException; 038 import org.apache.oozie.command.coord.CoordActionUpdateCommand; 039 import org.apache.oozie.service.ActionService; 040 import org.apache.oozie.service.Services; 041 import org.apache.oozie.service.UUIDService; 042 import org.apache.oozie.store.StoreException; 043 import org.apache.oozie.store.WorkflowStore; 044 import org.apache.oozie.util.ELEvaluationException; 045 import org.apache.oozie.util.Instrumentation; 046 import org.apache.oozie.util.XLog; 047 import org.apache.oozie.util.XmlUtils; 048 import org.apache.oozie.util.db.SLADbOperations; 049 050 public class ActionStartCommand extends ActionCommand<Void> { 051 public static final String EL_ERROR = "EL_ERROR"; 052 public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR"; 053 public static final String COULD_NOT_START = "COULD_NOT_START"; 054 public static final String START_DATA_MISSING = "START_DATA_MISSING"; 055 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING"; 056 057 private String id; 058 private String jobId; 059 060 public ActionStartCommand(String id, String type) { 061 super("action.start", type, 0); 062 this.id = id; 063 } 064 065 @Override 066 protected Void call(WorkflowStore store) throws StoreException, CommandException { 067 WorkflowJobBean workflow = store.getWorkflow(jobId, false); 068 setLogInfo(workflow); 069 WorkflowActionBean action = store.getAction(id, false); 070 XLog.getLog(getClass()).warn(XLog.STD, 071 "[***" + action.getId() + "***]" + "In call()....status=" + action.getStatusStr()); 072 setLogInfo(action); 073 if (action.isPending() 074 && (action.getStatus() == WorkflowActionBean.Status.PREP 075 || action.getStatus() == WorkflowActionBean.Status.START_RETRY || action.getStatus() == WorkflowActionBean.Status.START_MANUAL)) { 076 if (workflow.getStatus() == WorkflowJob.Status.RUNNING) { 077 078 ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(action.getType()); 079 Configuration conf = workflow.getWorkflowInstance().getConf(); 080 081 int maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries()); 082 long retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval()); 083 executor.setMaxRetries(maxRetries); 084 executor.setRetryInterval(retryInterval); 085 086 if (executor != null) { 087 ActionExecutorContext context = null; 088 try { 089 boolean isRetry = false; 090 if (action.getStatus() == WorkflowActionBean.Status.START_RETRY 091 || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) { 092 isRetry = true; 093 } 094 context = new ActionCommand.ActionExecutorContext(workflow, action, isRetry); 095 try { 096 String tmpActionConf = XmlUtils.removeComments(action.getConf()); 097 String actionConf = context.getELEvaluator().evaluate(tmpActionConf, String.class); 098 action.setConf(actionConf); 099 100 XLog.getLog(getClass()).debug("Start, name [{0}] type [{1}] configuration{E}{E}{2}{E}", 101 action.getName(), action.getType(), actionConf); 102 103 } 104 catch (ELEvaluationException ex) { 105 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, 106 EL_EVAL_ERROR, ex.getMessage(), ex); 107 } 108 catch (ELException ex) { 109 context.setErrorInfo(EL_ERROR, ex.getMessage()); 110 XLog.getLog(getClass()).warn("ELException in ActionStartCommand ", ex.getMessage(), ex); 111 handleError(context, store, workflow, action); 112 return null; 113 } 114 catch (org.jdom.JDOMException je) { 115 context.setErrorInfo("ParsingError", je.getMessage()); 116 XLog.getLog(getClass()).warn("JDOMException in ActionStartCommand ", je.getMessage(), je); 117 handleError(context, store, workflow, action); 118 return null; 119 } 120 catch (Exception ex) { 121 context.setErrorInfo(EL_ERROR, ex.getMessage()); 122 XLog.getLog(getClass()).warn("Exception in ActionStartCommand ", ex.getMessage(), ex); 123 handleError(context, store, workflow, action); 124 return null; 125 } 126 action.setErrorInfo(null, null); 127 incrActionCounter(action.getType(), 1); 128 129 Instrumentation.Cron cron = new Instrumentation.Cron(); 130 cron.start(); 131 executor.start(context, action); 132 cron.stop(); 133 FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection"); 134 addActionCron(action.getType(), cron); 135 136 action.setRetries(0); 137 if (action.isExecutionComplete()) { 138 if (!context.isExecuted()) { 139 XLog.getLog(getClass()).warn(XLog.OPS, 140 "Action Completed, ActionExecutor [{0}] must call setExecutionData()", 141 executor.getType()); 142 action.setErrorInfo(EXEC_DATA_MISSING, 143 "Execution Complete, but Execution Data Missing from Action"); 144 failJob(context); 145 store.updateAction(action); 146 store.updateWorkflow(workflow); 147 return null; 148 } 149 action.setPending(); 150 queueCallable(new ActionEndCommand(action.getId(), action.getType())); 151 } 152 else { 153 if (!context.isStarted()) { 154 XLog.getLog(getClass()).warn(XLog.OPS, 155 "Action Started, ActionExecutor [{0}] must call setStartData()", 156 executor.getType()); 157 action.setErrorInfo(START_DATA_MISSING, 158 "Execution Started, but Start Data Missing from Action"); 159 failJob(context); 160 store.updateAction(action); 161 store.updateWorkflow(workflow); 162 return null; 163 } 164 queueCallable(new NotificationCommand(workflow, action)); 165 } 166 167 XLog.getLog(getClass()).warn(XLog.STD, 168 "[***" + action.getId() + "***]" + "Action status=" + action.getStatusStr()); 169 170 store.updateAction(action); 171 store.updateWorkflow(workflow); 172 // Add SLA status event (STARTED) for WF_ACTION 173 // SLADbOperations.writeSlaStatusEvent(eSla, 174 // action.getId(), Status.STARTED, store); 175 SLADbOperations.writeStausEvent(action.getSlaXml(), action.getId(), store, Status.STARTED, 176 SlaAppType.WORKFLOW_ACTION); 177 XLog.getLog(getClass()).warn(XLog.STD, 178 "[***" + action.getId() + "***]" + "Action updated in DB!"); 179 180 } 181 catch (ActionExecutorException ex) { 182 XLog.getLog(getClass()).warn( 183 "Error starting action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]", 184 action.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage(), ex); 185 action.setErrorInfo(ex.getErrorCode(), ex.getMessage()); 186 switch (ex.getErrorType()) { 187 case TRANSIENT: 188 if (!handleTransient(context, executor, WorkflowAction.Status.START_RETRY)) { 189 handleNonTransient(store, context, executor, WorkflowAction.Status.START_MANUAL); 190 action.setPendingAge(new Date()); 191 action.setRetries(0); 192 action.setStartTime(null); 193 } 194 break; 195 case NON_TRANSIENT: 196 handleNonTransient(store, context, executor, WorkflowAction.Status.START_MANUAL); 197 break; 198 case ERROR: 199 handleError(context, executor, WorkflowAction.Status.ERROR.toString(), true, 200 WorkflowAction.Status.DONE); 201 break; 202 case FAILED: 203 try { 204 failJob(context); 205 queueCallable(new CoordActionUpdateCommand(workflow)); 206 SLADbOperations.writeStausEvent(action.getSlaXml(), action.getId(), store, 207 Status.FAILED, SlaAppType.WORKFLOW_ACTION); 208 SLADbOperations.writeStausEvent(workflow.getSlaXml(), workflow.getId(), store, 209 Status.FAILED, SlaAppType.WORKFLOW_JOB); 210 } 211 catch (XException x) { 212 XLog.getLog(getClass()).warn("ActionStartCommand - case:FAILED ", x.getMessage()); 213 } 214 break; 215 } 216 store.updateAction(action); 217 store.updateWorkflow(workflow); 218 } 219 } 220 else { 221 throw new CommandException(ErrorCode.E0802, action.getType()); 222 } 223 224 } 225 else { 226 XLog.getLog(getClass()).warn("Job state is not {0}. Skipping Action Execution", 227 WorkflowJob.Status.RUNNING.toString()); 228 } 229 } 230 return null; 231 } 232 233 private void handleError(ActionExecutorContext context, WorkflowStore store, WorkflowJobBean workflow, 234 WorkflowActionBean action) throws CommandException, StoreException { 235 failJob(context); 236 store.updateAction(action); 237 store.updateWorkflow(workflow); 238 SLADbOperations.writeStausEvent(action.getSlaXml(), action.getId(), store, Status.FAILED, 239 SlaAppType.WORKFLOW_ACTION); 240 SLADbOperations.writeStausEvent(workflow.getSlaXml(), workflow.getId(), store, Status.FAILED, 241 SlaAppType.WORKFLOW_JOB); 242 queueCallable(new CoordActionUpdateCommand(workflow)); 243 return; 244 } 245 246 @Override 247 protected Void execute(WorkflowStore store) throws CommandException, StoreException { 248 try { 249 XLog.getLog(getClass()).debug("STARTED ActionStartCommand for wf actionId=" + id); 250 jobId = Services.get().get(UUIDService.class).getId(id); 251 if (lock(jobId)) { 252 call(store); 253 } 254 else { 255 queueCallable(new ActionStartCommand(id, getType()), LOCK_FAILURE_REQUEUE_INTERVAL); 256 XLog.getLog(getClass()).warn("ActionStartCommand lock was not acquired - failed {0}", id); 257 } 258 } 259 catch (InterruptedException e) { 260 queueCallable(new ActionStartCommand(id, getType()), LOCK_FAILURE_REQUEUE_INTERVAL); 261 XLog.getLog(getClass()).warn("ActionStartCommand lock was not acquired - interrupted exception failed {0}", 262 id); 263 } 264 XLog.getLog(getClass()).debug("ENDED ActionStartCommand for wf actionId=" + id + ", jobId=" + jobId); 265 return null; 266 } 267 268 }