001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.net.URI; 023 import java.net.URISyntaxException; 024 import java.util.Date; 025 import java.util.Properties; 026 import java.util.Set; 027 028 import org.apache.hadoop.conf.Configuration; 029 import org.apache.hadoop.fs.FileSystem; 030 import org.apache.hadoop.fs.Path; 031 import org.apache.oozie.DagELFunctions; 032 import org.apache.oozie.ErrorCode; 033 import org.apache.oozie.WorkflowActionBean; 034 import org.apache.oozie.WorkflowJobBean; 035 import org.apache.oozie.action.ActionExecutor; 036 import org.apache.oozie.client.WorkflowAction; 037 import org.apache.oozie.client.WorkflowJob; 038 import org.apache.oozie.command.CommandException; 039 import org.apache.oozie.command.coord.CoordActionUpdateXCommand; 040 import org.apache.oozie.service.CallbackService; 041 import org.apache.oozie.service.ELService; 042 import org.apache.oozie.service.HadoopAccessorException; 043 import org.apache.oozie.service.HadoopAccessorService; 044 import org.apache.oozie.service.JPAService; 045 import org.apache.oozie.service.LiteWorkflowStoreService; 046 import org.apache.oozie.service.Services; 047 import org.apache.oozie.util.ELEvaluator; 048 import org.apache.oozie.util.InstrumentUtils; 049 import org.apache.oozie.util.Instrumentation; 050 import org.apache.oozie.util.XConfiguration; 051 import org.apache.oozie.workflow.WorkflowException; 052 import org.apache.oozie.workflow.WorkflowInstance; 053 import org.apache.oozie.workflow.lite.LiteWorkflowInstance; 054 055 /** 056 * Base class for Action execution commands. Provides common functionality to handle different types of errors while 057 * attempting to start or end an action. 058 */ 059 public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> { 060 private static final String INSTRUMENTATION_GROUP = "action.executors"; 061 062 protected static final String INSTR_FAILED_JOBS_COUNTER = "failed"; 063 064 protected static final String RECOVERY_ID_SEPARATOR = "@"; 065 066 public ActionXCommand(String name, String type, int priority) { 067 super(name, type, priority); 068 } 069 070 /** 071 * Takes care of Transient failures. Sets the action status to retry and increments the retry count if not enough 072 * attempts have been made. Otherwise returns false. 073 * 074 * @param context the execution context. 075 * @param executor the executor instance being used. 076 * @param status the status to be set for the action. 077 * @return true if the action is scheduled for another retry. false if the number of retries has exceeded the 078 * maximum number of configured retries. 079 * @throws CommandException thrown if unable to handle transient 080 */ 081 protected boolean handleTransient(ActionExecutor.Context context, ActionExecutor executor, 082 WorkflowAction.Status status) throws CommandException { 083 LOG.debug("Attempting to retry"); 084 ActionExecutorContext aContext = (ActionExecutorContext) context; 085 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 086 incrActionErrorCounter(action.getType(), "transient", 1); 087 088 int actionRetryCount = action.getRetries(); 089 if (actionRetryCount >= executor.getMaxRetries()) { 090 LOG.warn("Exceeded max retry count [{0}]. Suspending Job", executor.getMaxRetries()); 091 return false; 092 } 093 else { 094 action.setStatus(status); 095 action.setPending(); 096 action.incRetries(); 097 long retryDelayMillis = executor.getRetryInterval() * 1000; 098 action.setPendingAge(new Date(System.currentTimeMillis() + retryDelayMillis)); 099 LOG.info("Next Retry, Attempt Number [{0}] in [{1}] milliseconds", actionRetryCount + 1, retryDelayMillis); 100 this.resetUsed(); 101 queue(this, retryDelayMillis); 102 return true; 103 } 104 } 105 106 /** 107 * Takes care of non transient failures. The job is suspended, and the state of the action is changed to *MANUAL and 108 * set pending flag of action to false 109 * 110 * @param context the execution context. 111 * @param executor the executor instance being used. 112 * @param status the status to be set for the action. 113 * @throws CommandException thrown if unable to suspend job 114 */ 115 protected void handleNonTransient(ActionExecutor.Context context, ActionExecutor executor, 116 WorkflowAction.Status status) throws CommandException { 117 ActionExecutorContext aContext = (ActionExecutorContext) context; 118 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 119 incrActionErrorCounter(action.getType(), "nontransient", 1); 120 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow(); 121 String id = workflow.getId(); 122 action.setStatus(status); 123 action.resetPendingOnly(); 124 LOG.warn("Suspending Workflow Job id=" + id); 125 try { 126 SuspendXCommand.suspendJob(Services.get().get(JPAService.class), workflow, id, action.getId()); 127 } 128 catch (Exception e) { 129 throw new CommandException(ErrorCode.E0727, e.getMessage()); 130 } 131 finally { 132 // update coordinator action 133 new CoordActionUpdateXCommand(workflow, 3).call(); 134 } 135 } 136 137 /** 138 * Takes care of errors. </p> For errors while attempting to start the action, the job state is updated and an 139 * {@link ActionEndCommand} is queued. </p> For errors while attempting to end the action, the job state is updated. 140 * </p> 141 * 142 * @param context the execution context. 143 * @param executor the executor instance being used. 144 * @param message 145 * @param isStart whether the error was generated while starting or ending an action. 146 * @param status the status to be set for the action. 147 * @throws CommandException thrown if unable to handle action error 148 */ 149 protected void handleError(ActionExecutor.Context context, ActionExecutor executor, String message, 150 boolean isStart, WorkflowAction.Status status) throws CommandException { 151 LOG.warn("Setting Action Status to [{0}]", status); 152 ActionExecutorContext aContext = (ActionExecutorContext) context; 153 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 154 155 if (!handleUserRetry(action)) { 156 incrActionErrorCounter(action.getType(), "error", 1); 157 action.setPending(); 158 if (isStart) { 159 action.setExecutionData(message, null); 160 queue(new ActionEndXCommand(action.getId(), action.getType())); 161 } 162 else { 163 action.setEndData(status, WorkflowAction.Status.ERROR.toString()); 164 } 165 } 166 } 167 168 /** 169 * Fail the job due to failed action 170 * 171 * @param context the execution context. 172 * @throws CommandException thrown if unable to fail job 173 */ 174 public void failJob(ActionExecutor.Context context) throws CommandException { 175 ActionExecutorContext aContext = (ActionExecutorContext) context; 176 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 177 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow(); 178 179 if (!handleUserRetry(action)) { 180 incrActionErrorCounter(action.getType(), "failed", 1); 181 LOG.warn("Failing Job due to failed action [{0}]", action.getName()); 182 try { 183 workflow.getWorkflowInstance().fail(action.getName()); 184 WorkflowInstance wfInstance = workflow.getWorkflowInstance(); 185 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED); 186 workflow.setWorkflowInstance(wfInstance); 187 workflow.setStatus(WorkflowJob.Status.FAILED); 188 action.setStatus(WorkflowAction.Status.FAILED); 189 action.resetPending(); 190 queue(new NotificationXCommand(workflow, action)); 191 queue(new KillXCommand(workflow.getId())); 192 InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1, getInstrumentation()); 193 } 194 catch (WorkflowException ex) { 195 throw new CommandException(ex); 196 } 197 } 198 } 199 200 /** 201 * Execute retry for action if this action is eligible for user-retry 202 * 203 * @param context the execution context. 204 * @return true if user-retry has to be handled for this action 205 * @throws CommandException thrown if unable to fail job 206 */ 207 public boolean handleUserRetry(WorkflowActionBean action) throws CommandException { 208 String errorCode = action.getErrorCode(); 209 Set<String> allowedRetryCode = LiteWorkflowStoreService.getUserRetryErrorCode(); 210 211 if (allowedRetryCode.contains(errorCode) && action.getUserRetryCount() < action.getUserRetryMax()) { 212 LOG.info("Preparing retry this action [{0}], errorCode [{1}], userRetryCount [{2}], " 213 + "userRetryMax [{3}], userRetryInterval [{4}]", action.getId(), errorCode, action 214 .getUserRetryCount(), action.getUserRetryMax(), action.getUserRetryInterval()); 215 int interval = action.getUserRetryInterval() * 60 * 1000; 216 action.setStatus(WorkflowAction.Status.USER_RETRY); 217 action.incrmentUserRetryCount(); 218 action.setPending(); 219 queue(new ActionStartXCommand(action.getId(), action.getType()), interval); 220 return true; 221 } 222 return false; 223 } 224 225 private void incrActionErrorCounter(String type, String error, int count) { 226 getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#ex." + error, count); 227 } 228 229 protected void incrActionCounter(String type, int count) { 230 getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#" + getName(), count); 231 } 232 233 protected void addActionCron(String type, Instrumentation.Cron cron) { 234 getInstrumentation().addCron(INSTRUMENTATION_GROUP, type + "#" + getName(), cron); 235 } 236 237 /** 238 * Workflow action executor context 239 * 240 */ 241 public static class ActionExecutorContext implements ActionExecutor.Context { 242 private final WorkflowJobBean workflow; 243 private Configuration protoConf; 244 private final WorkflowActionBean action; 245 private final boolean isRetry; 246 private final boolean isUserRetry; 247 private boolean started; 248 private boolean ended; 249 private boolean executed; 250 251 public ActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry, boolean isUserRetry) { 252 this.workflow = workflow; 253 this.action = action; 254 this.isRetry = isRetry; 255 this.isUserRetry = isUserRetry; 256 try { 257 protoConf = new XConfiguration(new StringReader(workflow.getProtoActionConf())); 258 } 259 catch (IOException ex) { 260 throw new RuntimeException("It should not happen", ex); 261 } 262 } 263 264 public String getCallbackUrl(String externalStatusVar) { 265 return Services.get().get(CallbackService.class).createCallBackUrl(action.getId(), externalStatusVar); 266 } 267 268 public Configuration getProtoActionConf() { 269 return protoConf; 270 } 271 272 public WorkflowJob getWorkflow() { 273 return workflow; 274 } 275 276 public WorkflowAction getAction() { 277 return action; 278 } 279 280 public ELEvaluator getELEvaluator() { 281 ELEvaluator evaluator = Services.get().get(ELService.class).createEvaluator("workflow"); 282 DagELFunctions.configureEvaluator(evaluator, workflow, action); 283 return evaluator; 284 } 285 286 public void setVar(String name, String value) { 287 name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name; 288 WorkflowInstance wfInstance = workflow.getWorkflowInstance(); 289 wfInstance.setVar(name, value); 290 workflow.setWorkflowInstance(wfInstance); 291 } 292 293 public String getVar(String name) { 294 name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name; 295 return workflow.getWorkflowInstance().getVar(name); 296 } 297 298 public void setStartData(String externalId, String trackerUri, String consoleUrl) { 299 action.setStartData(externalId, trackerUri, consoleUrl); 300 started = true; 301 } 302 303 public void setExecutionData(String externalStatus, Properties actionData) { 304 action.setExecutionData(externalStatus, actionData); 305 executed = true; 306 } 307 308 public void setEndData(WorkflowAction.Status status, String signalValue) { 309 action.setEndData(status, signalValue); 310 ended = true; 311 } 312 313 public boolean isRetry() { 314 return isRetry; 315 } 316 317 public boolean isUserRetry() { 318 return isUserRetry; 319 } 320 321 /** 322 * Returns whether setStartData has been called or not. 323 * 324 * @return true if start completion info has been set. 325 */ 326 public boolean isStarted() { 327 return started; 328 } 329 330 /** 331 * Returns whether setExecutionData has been called or not. 332 * 333 * @return true if execution completion info has been set, otherwise false. 334 */ 335 public boolean isExecuted() { 336 return executed; 337 } 338 339 /** 340 * Returns whether setEndData has been called or not. 341 * 342 * @return true if end completion info has been set. 343 */ 344 public boolean isEnded() { 345 return ended; 346 } 347 348 public void setExternalStatus(String externalStatus) { 349 action.setExternalStatus(externalStatus); 350 } 351 352 @Override 353 public String getRecoveryId() { 354 return action.getId() + RECOVERY_ID_SEPARATOR + workflow.getRun(); 355 } 356 357 /* (non-Javadoc) 358 * @see org.apache.oozie.action.ActionExecutor.Context#getActionDir() 359 */ 360 public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException { 361 String name = getWorkflow().getId() + "/" + action.getName() + "--" + action.getType(); 362 FileSystem fs = getAppFileSystem(); 363 String actionDirPath = Services.get().getSystemId() + "/" + name; 364 Path fqActionDir = new Path(fs.getHomeDirectory(), actionDirPath); 365 return fqActionDir; 366 } 367 368 /* (non-Javadoc) 369 * @see org.apache.oozie.action.ActionExecutor.Context#getAppFileSystem() 370 */ 371 public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException { 372 WorkflowJob workflow = getWorkflow(); 373 XConfiguration jobConf = new XConfiguration(new StringReader(workflow.getConf())); 374 Configuration fsConf = new Configuration(); 375 XConfiguration.copy(jobConf, fsConf); 376 return Services.get().get(HadoopAccessorService.class).createFileSystem(workflow.getUser(), 377 workflow.getGroup(), new URI(getWorkflow().getAppPath()), fsConf); 378 379 } 380 381 /* (non-Javadoc) 382 * @see org.apache.oozie.action.ActionExecutor.Context#setErrorInfo(java.lang.String, java.lang.String) 383 */ 384 @Override 385 public void setErrorInfo(String str, String exMsg) { 386 action.setErrorInfo(str, exMsg); 387 } 388 } 389 390 }