001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.wf; 019 020import java.io.IOException; 021import java.io.StringReader; 022import java.net.URI; 023import java.net.URISyntaxException; 024import java.util.Date; 025import java.util.Properties; 026import java.util.Set; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.oozie.DagELFunctions; 032import org.apache.oozie.ErrorCode; 033import org.apache.oozie.WorkflowActionBean; 034import org.apache.oozie.WorkflowJobBean; 035import org.apache.oozie.action.ActionExecutor; 036import org.apache.oozie.client.WorkflowAction; 037import org.apache.oozie.client.WorkflowJob; 038import org.apache.oozie.command.CommandException; 039import org.apache.oozie.service.CallbackService; 040import org.apache.oozie.service.ELService; 041import org.apache.oozie.service.HadoopAccessorException; 042import org.apache.oozie.service.HadoopAccessorService; 043import org.apache.oozie.service.JPAService; 044import org.apache.oozie.service.LiteWorkflowStoreService; 045import org.apache.oozie.service.Services; 046import org.apache.oozie.util.ELEvaluator; 047import org.apache.oozie.util.InstrumentUtils; 048import org.apache.oozie.util.Instrumentation; 049import org.apache.oozie.util.XConfiguration; 050import org.apache.oozie.workflow.WorkflowException; 051import org.apache.oozie.workflow.WorkflowInstance; 052import org.apache.oozie.workflow.lite.LiteWorkflowInstance; 053 054/** 055 * Base class for Action execution commands. Provides common functionality to handle different types of errors while 056 * attempting to start or end an action. 057 */ 058public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> { 059 private static final String INSTRUMENTATION_GROUP = "action.executors"; 060 061 protected static final String RECOVERY_ID_SEPARATOR = "@"; 062 063 public ActionXCommand(String name, String type, int priority) { 064 super(name, type, priority); 065 } 066 067 /** 068 * Takes care of Transient failures. Sets the action status to retry and increments the retry count if not enough 069 * attempts have been made. Otherwise returns false. 070 * 071 * @param context the execution context. 072 * @param executor the executor instance being used. 073 * @param status the status to be set for the action. 074 * @return true if the action is scheduled for another retry. false if the number of retries has exceeded the 075 * maximum number of configured retries. 076 * @throws CommandException thrown if unable to handle transient 077 */ 078 protected boolean handleTransient(ActionExecutor.Context context, ActionExecutor executor, 079 WorkflowAction.Status status) throws CommandException { 080 LOG.debug("Attempting to retry"); 081 ActionExecutorContext aContext = (ActionExecutorContext) context; 082 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 083 incrActionErrorCounter(action.getType(), "transient", 1); 084 085 int actionRetryCount = action.getRetries(); 086 if (actionRetryCount >= executor.getMaxRetries()) { 087 LOG.warn("Exceeded max retry count [{0}]. Suspending Job", executor.getMaxRetries()); 088 return false; 089 } 090 else { 091 action.setStatus(status); 092 action.setPending(); 093 action.incRetries(); 094 long retryDelayMillis = executor.getRetryInterval() * 1000; 095 action.setPendingAge(new Date(System.currentTimeMillis() + retryDelayMillis)); 096 LOG.info("Next Retry, Attempt Number [{0}] in [{1}] milliseconds", actionRetryCount + 1, retryDelayMillis); 097 this.resetUsed(); 098 queue(this, retryDelayMillis); 099 return true; 100 } 101 } 102 103 /** 104 * Takes care of non transient failures. The job is suspended, and the state of the action is changed to *MANUAL and 105 * set pending flag of action to false 106 * 107 * @param context the execution context. 108 * @param executor the executor instance being used. 109 * @param status the status to be set for the action. 110 * @throws CommandException thrown if unable to suspend job 111 */ 112 protected void handleNonTransient(ActionExecutor.Context context, ActionExecutor executor, 113 WorkflowAction.Status status) throws CommandException { 114 ActionExecutorContext aContext = (ActionExecutorContext) context; 115 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 116 incrActionErrorCounter(action.getType(), "nontransient", 1); 117 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow(); 118 String id = workflow.getId(); 119 action.setStatus(status); 120 action.resetPendingOnly(); 121 LOG.warn("Suspending Workflow Job id=" + id); 122 try { 123 SuspendXCommand.suspendJob(Services.get().get(JPAService.class), workflow, id, action.getId(), null); 124 } 125 catch (Exception e) { 126 throw new CommandException(ErrorCode.E0727, id, e.getMessage()); 127 } 128 finally { 129 updateParentIfNecessary(workflow, 3); 130 } 131 } 132 133 /** 134 * Takes care of errors. </p> For errors while attempting to start the action, the job state is updated and an 135 * {@link ActionEndCommand} is queued. </p> For errors while attempting to end the action, the job state is updated. 136 * </p> 137 * 138 * @param context the execution context. 139 * @param executor the executor instance being used. 140 * @param message 141 * @param isStart whether the error was generated while starting or ending an action. 142 * @param status the status to be set for the action. 143 * @throws CommandException thrown if unable to handle action error 144 */ 145 protected void handleError(ActionExecutor.Context context, ActionExecutor executor, String message, 146 boolean isStart, WorkflowAction.Status status) throws CommandException { 147 LOG.warn("Setting Action Status to [{0}]", status); 148 ActionExecutorContext aContext = (ActionExecutorContext) context; 149 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 150 151 if (!handleUserRetry(action)) { 152 incrActionErrorCounter(action.getType(), "error", 1); 153 action.setPending(); 154 if (isStart) { 155 action.setExecutionData(message, null); 156 queue(new ActionEndXCommand(action.getId(), action.getType())); 157 } 158 else { 159 action.setEndData(status, WorkflowAction.Status.ERROR.toString()); 160 } 161 } 162 } 163 164 /** 165 * Fail the job due to failed action 166 * 167 * @param context the execution context. 168 * @throws CommandException thrown if unable to fail job 169 */ 170 public void failJob(ActionExecutor.Context context) throws CommandException { 171 ActionExecutorContext aContext = (ActionExecutorContext) context; 172 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 173 failJob(context, action); 174 } 175 176 /** 177 * Fail the job due to failed action 178 * 179 * @param context the execution context. 180 * @param action the action that caused the workflow to fail 181 * @throws CommandException thrown if unable to fail job 182 */ 183 public void failJob(ActionExecutor.Context context, WorkflowActionBean action) throws CommandException { 184 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow(); 185 if (!handleUserRetry(action)) { 186 incrActionErrorCounter(action.getType(), "failed", 1); 187 LOG.warn("Failing Job due to failed action [{0}]", action.getName()); 188 try { 189 workflow.getWorkflowInstance().fail(action.getName()); 190 WorkflowInstance wfInstance = workflow.getWorkflowInstance(); 191 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED); 192 workflow.setWorkflowInstance(wfInstance); 193 workflow.setStatus(WorkflowJob.Status.FAILED); 194 action.setStatus(WorkflowAction.Status.FAILED); 195 action.resetPending(); 196 queue(new NotificationXCommand(workflow, action)); 197 queue(new KillXCommand(workflow.getId())); 198 InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER_NAME, 1, getInstrumentation()); 199 } 200 catch (WorkflowException ex) { 201 throw new CommandException(ex); 202 } 203 } 204 } 205 206 /** 207 * Execute retry for action if this action is eligible for user-retry 208 * 209 * @param context the execution context. 210 * @return true if user-retry has to be handled for this action 211 * @throws CommandException thrown if unable to fail job 212 */ 213 public boolean handleUserRetry(WorkflowActionBean action) throws CommandException { 214 String errorCode = action.getErrorCode(); 215 Set<String> allowedRetryCode = LiteWorkflowStoreService.getUserRetryErrorCode(); 216 217 if (allowedRetryCode.contains(errorCode) && action.getUserRetryCount() < action.getUserRetryMax()) { 218 LOG.info("Preparing retry this action [{0}], errorCode [{1}], userRetryCount [{2}], " 219 + "userRetryMax [{3}], userRetryInterval [{4}]", action.getId(), errorCode, action 220 .getUserRetryCount(), action.getUserRetryMax(), action.getUserRetryInterval()); 221 int interval = action.getUserRetryInterval() * 60 * 1000; 222 action.setStatus(WorkflowAction.Status.USER_RETRY); 223 action.incrmentUserRetryCount(); 224 action.setPending(); 225 queue(new ActionStartXCommand(action.getId(), action.getType()), interval); 226 return true; 227 } 228 return false; 229 } 230 231 /* 232 * In case of action error increment the error count for instrumentation 233 */ 234 private void incrActionErrorCounter(String type, String error, int count) { 235 getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#ex." + error, count); 236 } 237 238 /** 239 * Increment the action counter in the instrumentation log. indicating how 240 * many times the action was executed since the start Oozie server 241 */ 242 protected void incrActionCounter(String type, int count) { 243 getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#" + getName(), count); 244 } 245 246 /** 247 * Adding a cron for the instrumentation time for the given Instrumentation 248 * group 249 */ 250 protected void addActionCron(String type, Instrumentation.Cron cron) { 251 getInstrumentation().addCron(INSTRUMENTATION_GROUP, type + "#" + getName(), cron); 252 } 253 254 /** 255 * Workflow action executor context 256 * 257 */ 258 public static class ActionExecutorContext implements ActionExecutor.Context { 259 private final WorkflowJobBean workflow; 260 private Configuration protoConf; 261 private final WorkflowActionBean action; 262 private final boolean isRetry; 263 private final boolean isUserRetry; 264 private boolean started; 265 private boolean ended; 266 private boolean executed; 267 268 /** 269 * Constructing the ActionExecutorContext, setting the private members 270 * and constructing the proto configuration 271 */ 272 public ActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry, boolean isUserRetry) { 273 this.workflow = workflow; 274 this.action = action; 275 this.isRetry = isRetry; 276 this.isUserRetry = isUserRetry; 277 try { 278 protoConf = new XConfiguration(new StringReader(workflow.getProtoActionConf())); 279 } 280 catch (IOException ex) { 281 throw new RuntimeException("It should not happen", ex); 282 } 283 } 284 285 /* 286 * (non-Javadoc) 287 * @see org.apache.oozie.action.ActionExecutor.Context#getCallbackUrl(java.lang.String) 288 */ 289 public String getCallbackUrl(String externalStatusVar) { 290 return Services.get().get(CallbackService.class).createCallBackUrl(action.getId(), externalStatusVar); 291 } 292 293 /* 294 * (non-Javadoc) 295 * @see org.apache.oozie.action.ActionExecutor.Context#getProtoActionConf() 296 */ 297 public Configuration getProtoActionConf() { 298 return protoConf; 299 } 300 301 /* 302 * (non-Javadoc) 303 * @see org.apache.oozie.action.ActionExecutor.Context#getWorkflow() 304 */ 305 public WorkflowJob getWorkflow() { 306 return workflow; 307 } 308 309 /** 310 * Returns the workflow action of the given action context 311 * 312 * @return the workflow action of the given action context 313 */ 314 public WorkflowAction getAction() { 315 return action; 316 } 317 318 /* 319 * (non-Javadoc) 320 * @see org.apache.oozie.action.ActionExecutor.Context#getELEvaluator() 321 */ 322 public ELEvaluator getELEvaluator() { 323 ELEvaluator evaluator = Services.get().get(ELService.class).createEvaluator("workflow"); 324 DagELFunctions.configureEvaluator(evaluator, workflow, action); 325 return evaluator; 326 } 327 328 /* 329 * (non-Javadoc) 330 * @see org.apache.oozie.action.ActionExecutor.Context#setVar(java.lang.String, java.lang.String) 331 */ 332 public void setVar(String name, String value) { 333 name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name; 334 WorkflowInstance wfInstance = workflow.getWorkflowInstance(); 335 wfInstance.setVar(name, value); 336 workflow.setWorkflowInstance(wfInstance); 337 } 338 339 /* 340 * (non-Javadoc) 341 * @see org.apache.oozie.action.ActionExecutor.Context#getVar(java.lang.String) 342 */ 343 public String getVar(String name) { 344 name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name; 345 return workflow.getWorkflowInstance().getVar(name); 346 } 347 348 /* 349 * (non-Javadoc) 350 * @see org.apache.oozie.action.ActionExecutor.Context#setStartData(java.lang.String, java.lang.String, java.lang.String) 351 */ 352 public void setStartData(String externalId, String trackerUri, String consoleUrl) { 353 action.setStartData(externalId, trackerUri, consoleUrl); 354 started = true; 355 } 356 357 /** 358 * Setting the start time of the action 359 */ 360 public void setStartTime() { 361 Date now = new Date(); 362 action.setStartTime(now); 363 } 364 365 /* 366 * (non-Javadoc) 367 * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionData(java.lang.String, java.util.Properties) 368 */ 369 public void setExecutionData(String externalStatus, Properties actionData) { 370 action.setExecutionData(externalStatus, actionData); 371 executed = true; 372 } 373 374 /* 375 * (non-Javadoc) 376 * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionStats(java.lang.String) 377 */ 378 public void setExecutionStats(String jsonStats) { 379 action.setExecutionStats(jsonStats); 380 executed = true; 381 } 382 383 /* 384 * (non-Javadoc) 385 * @see org.apache.oozie.action.ActionExecutor.Context#setExternalChildIDs(java.lang.String) 386 */ 387 public void setExternalChildIDs(String externalChildIDs) { 388 action.setExternalChildIDs(externalChildIDs); 389 executed = true; 390 } 391 392 /* 393 * (non-Javadoc) 394 * @see org.apache.oozie.action.ActionExecutor.Context#setEndData(org.apache.oozie.client.WorkflowAction.Status, java.lang.String) 395 */ 396 public void setEndData(WorkflowAction.Status status, String signalValue) { 397 action.setEndData(status, signalValue); 398 ended = true; 399 } 400 401 /* 402 * (non-Javadoc) 403 * @see org.apache.oozie.action.ActionExecutor.Context#isRetry() 404 */ 405 public boolean isRetry() { 406 return isRetry; 407 } 408 409 /** 410 * Return if the executor invocation is a user retry or not. 411 * 412 * @return if the executor invocation is a user retry or not. 413 */ 414 public boolean isUserRetry() { 415 return isUserRetry; 416 } 417 418 /** 419 * Returns whether setStartData has been called or not. 420 * 421 * @return true if start completion info has been set. 422 */ 423 public boolean isStarted() { 424 return started; 425 } 426 427 /** 428 * Returns whether setExecutionData has been called or not. 429 * 430 * @return true if execution completion info has been set, otherwise false. 431 */ 432 public boolean isExecuted() { 433 return executed; 434 } 435 436 /** 437 * Returns whether setEndData has been called or not. 438 * 439 * @return true if end completion info has been set. 440 */ 441 public boolean isEnded() { 442 return ended; 443 } 444 445 public void setExternalStatus(String externalStatus) { 446 action.setExternalStatus(externalStatus); 447 } 448 449 @Override 450 public String getRecoveryId() { 451 return action.getId() + RECOVERY_ID_SEPARATOR + workflow.getRun(); 452 } 453 454 /* (non-Javadoc) 455 * @see org.apache.oozie.action.ActionExecutor.Context#getActionDir() 456 */ 457 public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException { 458 String name = getWorkflow().getId() + "/" + action.getName() + "--" + action.getType(); 459 FileSystem fs = getAppFileSystem(); 460 String actionDirPath = Services.get().getSystemId() + "/" + name; 461 Path fqActionDir = new Path(fs.getHomeDirectory(), actionDirPath); 462 return fqActionDir; 463 } 464 465 /* (non-Javadoc) 466 * @see org.apache.oozie.action.ActionExecutor.Context#getAppFileSystem() 467 */ 468 public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException { 469 WorkflowJob workflow = getWorkflow(); 470 URI uri = new URI(getWorkflow().getAppPath()); 471 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 472 Configuration fsConf = has.createJobConf(uri.getAuthority()); 473 return has.createFileSystem(workflow.getUser(), uri, fsConf); 474 475 } 476 477 /* (non-Javadoc) 478 * @see org.apache.oozie.action.ActionExecutor.Context#setErrorInfo(java.lang.String, java.lang.String) 479 */ 480 @Override 481 public void setErrorInfo(String str, String exMsg) { 482 action.setErrorInfo(str, exMsg); 483 } 484 } 485 486}