001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.net.URI; 023 import java.net.URISyntaxException; 024 import java.util.Date; 025 import java.util.Properties; 026 027 import org.apache.hadoop.conf.Configuration; 028 import org.apache.hadoop.fs.FileSystem; 029 import org.apache.hadoop.fs.Path; 030 import org.apache.oozie.DagELFunctions; 031 import org.apache.oozie.WorkflowActionBean; 032 import org.apache.oozie.WorkflowJobBean; 033 import org.apache.oozie.action.ActionExecutor; 034 import org.apache.oozie.client.WorkflowAction; 035 import org.apache.oozie.client.WorkflowJob; 036 import org.apache.oozie.command.CommandException; 037 import org.apache.oozie.service.CallbackService; 038 import org.apache.oozie.service.ELService; 039 import org.apache.oozie.service.HadoopAccessorException; 040 import org.apache.oozie.service.HadoopAccessorService; 041 import org.apache.oozie.service.Services; 042 import org.apache.oozie.store.StoreException; 043 import org.apache.oozie.store.WorkflowStore; 044 import org.apache.oozie.util.ELEvaluator; 045 import org.apache.oozie.util.Instrumentation; 046 import org.apache.oozie.util.XConfiguration; 047 import org.apache.oozie.util.XLog; 048 import org.apache.oozie.workflow.WorkflowException; 049 import org.apache.oozie.workflow.WorkflowInstance; 050 import org.apache.oozie.workflow.lite.LiteWorkflowInstance; 051 052 /** 053 * Base class for Action execution commands. Provides common functionality to handle different types of errors while 054 * attempting to start or end an action. 055 */ 056 public abstract class ActionCommand<T> extends WorkflowCommand<Void> { 057 private static final String INSTRUMENTATION_GROUP = "action.executors"; 058 059 protected static final String INSTR_FAILED_JOBS_COUNTER = "failed"; 060 061 protected static final String RECOVERY_ID_SEPARATOR = "@"; 062 063 public ActionCommand(String name, String type, int priority) { 064 super(name, type, priority, XLog.STD); 065 } 066 067 /** 068 * Takes care of Transient failures. Sets the action status to retry and increments the retry count if not enough 069 * attempts have been made. Otherwise returns false. 070 * 071 * @param context the execution context. 072 * @param executor the executor instance being used. 073 * @param status the status to be set for the action. 074 * @return true if the action is scheduled for another retry. false if the number of retries has exceeded the 075 * maximum number of configured retries. 076 * @throws StoreException 077 * @throws org.apache.oozie.command.CommandException 078 */ 079 protected boolean handleTransient(ActionExecutor.Context context, ActionExecutor executor, WorkflowAction.Status status) 080 throws StoreException, CommandException { 081 XLog.getLog(getClass()).debug("Attempting to retry"); 082 ActionExecutorContext aContext = (ActionExecutorContext) context; 083 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 084 incrActionErrorCounter(action.getType(), "transient", 1); 085 086 int actionRetryCount = action.getRetries(); 087 if (actionRetryCount >= executor.getMaxRetries()) { 088 XLog.getLog(getClass()).warn("Exceeded max retry count [{0}]. Suspending Job", executor.getMaxRetries()); 089 return false; 090 } 091 else { 092 action.setStatus(status); 093 action.setPending(); 094 action.incRetries(); 095 long retryDelayMillis = executor.getRetryInterval() * 1000; 096 action.setPendingAge(new Date(System.currentTimeMillis() + retryDelayMillis)); 097 XLog.getLog(getClass()).info("Next Retry, Attempt Number [{0}] in [{1}] milliseconds", 098 actionRetryCount + 1, retryDelayMillis); 099 queueCallable(this, retryDelayMillis); 100 return true; 101 } 102 } 103 104 /** 105 * Takes care of non transient failures. The job is suspended, and the state of the action is changed to *MANUAL 106 * and set pending flag of action to false 107 * 108 * @param store WorkflowStore 109 * @param context the execution context. 110 * @param executor the executor instance being used. 111 * @param status the status to be set for the action. 112 * @throws StoreException 113 * @throws CommandException 114 */ 115 protected void handleNonTransient(WorkflowStore store, ActionExecutor.Context context, ActionExecutor executor, 116 WorkflowAction.Status status) 117 throws StoreException, CommandException { 118 ActionExecutorContext aContext = (ActionExecutorContext) context; 119 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 120 incrActionErrorCounter(action.getType(), "nontransient", 1); 121 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow(); 122 String id = workflow.getId(); 123 action.setStatus(status); 124 action.resetPendingOnly(); 125 XLog.getLog(getClass()).warn("Suspending Workflow Job id=" + id); 126 try { 127 SuspendCommand.suspendJob(store, workflow, id, action.getId()); 128 } 129 catch (WorkflowException e) { 130 throw new CommandException(e); 131 } 132 } 133 134 /** 135 * Takes care of errors. </p> For errors while attempting to start the action, the job state is updated and an 136 * {@link ActionEndCommand} is queued. </p> For errors while attempting to end the action, the job state is updated. 137 * </p> 138 * 139 * @param context the execution context. 140 * @param executor the executor instance being used. 141 * @param message 142 * @param isStart whether the error was generated while starting or ending an action. 143 * @param status the status to be set for the action. 144 * @throws org.apache.oozie.command.CommandException 145 */ 146 protected void handleError(ActionExecutor.Context context, ActionExecutor executor, String message, 147 boolean isStart, WorkflowAction.Status status) throws CommandException { 148 XLog.getLog(getClass()).warn("Setting Action Status to [{0}]", status); 149 ActionExecutorContext aContext = (ActionExecutorContext) context; 150 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 151 incrActionErrorCounter(action.getType(), "error", 1); 152 action.setPending(); 153 if (isStart) { 154 action.setExecutionData(message, null); 155 queueCallable(new ActionEndCommand(action.getId(), action.getType())); 156 } 157 else { 158 action.setEndData(status, WorkflowAction.Status.ERROR.toString()); 159 } 160 } 161 162 public void failJob(ActionExecutor.Context context) throws CommandException { 163 ActionExecutorContext aContext = (ActionExecutorContext) context; 164 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 165 incrActionErrorCounter(action.getType(), "failed", 1); 166 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow(); 167 XLog.getLog(getClass()).warn("Failing Job due to failed action [{0}]", action.getName()); 168 try { 169 workflow.getWorkflowInstance().fail(action.getName()); 170 WorkflowInstance wfInstance = workflow.getWorkflowInstance(); 171 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED); 172 workflow.setWorkflowInstance(wfInstance); 173 workflow.setStatus(WorkflowJob.Status.FAILED); 174 action.setStatus(WorkflowAction.Status.FAILED); 175 action.resetPending(); 176 queueCallable(new NotificationCommand(workflow, action)); 177 queueCallable(new KillCommand(workflow.getId())); 178 incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1); 179 } 180 catch (WorkflowException ex) { 181 throw new CommandException(ex); 182 } 183 } 184 185 private void incrActionErrorCounter(String type, String error, int count) { 186 getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#ex." + error, count); 187 } 188 189 protected void incrActionCounter(String type, int count) { 190 getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#" + getName(), count); 191 } 192 193 protected void addActionCron(String type, Instrumentation.Cron cron) { 194 getInstrumentation().addCron(INSTRUMENTATION_GROUP, type + "#" + getName(), cron); 195 } 196 197 public static class ActionExecutorContext implements ActionExecutor.Context { 198 private WorkflowJobBean workflow; 199 private Configuration protoConf; 200 private WorkflowActionBean action; 201 private boolean isRetry; 202 private boolean started; 203 private boolean ended; 204 private boolean executed; 205 206 public ActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry) { 207 this.workflow = workflow; 208 this.action = action; 209 this.isRetry = isRetry; 210 try { 211 protoConf = new XConfiguration(new StringReader(workflow.getProtoActionConf())); 212 } 213 catch (IOException ex) { 214 throw new RuntimeException("It should not happen", ex); 215 } 216 } 217 218 public String getCallbackUrl(String externalStatusVar) { 219 return Services.get().get(CallbackService.class).createCallBackUrl(action.getId(), externalStatusVar); 220 } 221 222 public Configuration getProtoActionConf() { 223 return protoConf; 224 } 225 226 public WorkflowJob getWorkflow() { 227 return workflow; 228 } 229 230 public WorkflowAction getAction() { 231 return action; 232 } 233 234 public ELEvaluator getELEvaluator() { 235 ELEvaluator evaluator = Services.get().get(ELService.class).createEvaluator("workflow"); 236 DagELFunctions.configureEvaluator(evaluator, workflow, action); 237 return evaluator; 238 } 239 240 public void setVar(String name, String value) { 241 name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name; 242 WorkflowInstance wfInstance = workflow.getWorkflowInstance(); 243 wfInstance.setVar(name, value); 244 //workflow.getWorkflowInstance().setVar(name, value); 245 workflow.setWorkflowInstance(wfInstance); 246 } 247 248 public String getVar(String name) { 249 name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name; 250 return workflow.getWorkflowInstance().getVar(name); 251 } 252 253 public void setStartData(String externalId, String trackerUri, String consoleUrl) { 254 action.setStartData(externalId, trackerUri, consoleUrl); 255 started = true; 256 } 257 258 public void setExecutionData(String externalStatus, Properties actionData) { 259 action.setExecutionData(externalStatus, actionData); 260 executed = true; 261 } 262 263 public void setEndData(WorkflowAction.Status status, String signalValue) { 264 action.setEndData(status, signalValue); 265 ended = true; 266 } 267 268 public boolean isRetry() { 269 return isRetry; 270 } 271 272 /** 273 * Returns whether setStartData has been called or not. 274 * 275 * @return true if start completion info has been set. 276 */ 277 public boolean isStarted() { 278 return started; 279 } 280 281 /** 282 * Returns whether setExecutionData has been called or not. 283 * 284 * @return true if execution completion info has been set, otherwise false. 285 */ 286 public boolean isExecuted() { 287 return executed; 288 } 289 290 291 /** 292 * Returns whether setEndData has been called or not. 293 * 294 * @return true if end completion info has been set. 295 */ 296 public boolean isEnded() { 297 return ended; 298 } 299 300 public void setExternalStatus(String externalStatus) { 301 action.setExternalStatus(externalStatus); 302 } 303 304 @Override 305 public String getRecoveryId() { 306 return action.getId() + RECOVERY_ID_SEPARATOR + workflow.getRun(); 307 } 308 309 /* (non-Javadoc) 310 * @see org.apache.oozie.action.ActionExecutor.Context#getActionDir() 311 */ 312 public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException { 313 String name = getWorkflow().getId() + "/" + action.getName() + "--" + action.getType(); 314 FileSystem fs = getAppFileSystem(); 315 String actionDirPath = Services.get().getSystemId() + "/" + name; 316 Path fqActionDir = new Path(fs.getHomeDirectory(), actionDirPath); 317 return fqActionDir; 318 } 319 320 /* (non-Javadoc) 321 * @see org.apache.oozie.action.ActionExecutor.Context#getAppFileSystem() 322 */ 323 public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException { 324 WorkflowJob workflow = getWorkflow(); 325 XConfiguration jobConf = new XConfiguration(new StringReader(workflow.getConf())); 326 Configuration fsConf = new Configuration(); 327 XConfiguration.copy(jobConf, fsConf); 328 return Services.get().get(HadoopAccessorService.class).createFileSystem(workflow.getUser(), 329 workflow.getGroup(), new URI(getWorkflow().getAppPath()), fsConf); 330 331 } 332 333 @Override 334 public void setErrorInfo(String str, String exMsg) { 335 action.setErrorInfo(str, exMsg); 336 } 337 } 338 339 }