001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.wf; 020 021import java.io.IOException; 022import java.io.StringReader; 023import java.net.URI; 024import java.net.URISyntaxException; 025import java.util.Date; 026import java.util.HashMap; 027import java.util.Map; 028import java.util.Properties; 029import java.util.Set; 030 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.oozie.DagELFunctions; 035import org.apache.oozie.ErrorCode; 036import org.apache.oozie.WorkflowActionBean; 037import org.apache.oozie.WorkflowJobBean; 038import org.apache.oozie.action.ActionExecutor; 039import org.apache.oozie.client.Job; 040import org.apache.oozie.client.WorkflowAction; 041import org.apache.oozie.client.WorkflowJob; 042import org.apache.oozie.command.CommandException; 043import org.apache.oozie.service.CallbackService; 044import org.apache.oozie.service.ConfigurationService; 045import org.apache.oozie.service.ELService; 046import org.apache.oozie.service.HadoopAccessorException; 047import org.apache.oozie.service.HadoopAccessorService; 048import org.apache.oozie.service.JPAService; 049import org.apache.oozie.service.LiteWorkflowStoreService; 050import org.apache.oozie.service.Services; 051import org.apache.oozie.util.ELEvaluator; 052import org.apache.oozie.util.InstrumentUtils; 053import org.apache.oozie.util.Instrumentation; 054import org.apache.oozie.util.XConfiguration; 055import org.apache.oozie.workflow.WorkflowException; 056import org.apache.oozie.workflow.WorkflowInstance; 057import org.apache.oozie.workflow.lite.LiteWorkflowApp; 058import org.apache.oozie.workflow.lite.LiteWorkflowInstance; 059import org.apache.oozie.workflow.lite.NodeDef; 060 061/** 062 * Base class for Action execution commands. Provides common functionality to handle different types of errors while 063 * attempting to start or end an action. 064 */ 065public abstract class ActionXCommand<T> extends WorkflowXCommand<T> { 066 private static final String INSTRUMENTATION_GROUP = "action.executors"; 067 068 protected static final String RECOVERY_ID_SEPARATOR = "@"; 069 070 public ActionXCommand(String name, String type, int priority) { 071 super(name, type, priority); 072 } 073 074 /** 075 * Takes care of Transient failures. Sets the action status to retry and increments the retry count if not enough 076 * attempts have been made. Otherwise returns false. 077 * 078 * @param context the execution context. 079 * @param executor the executor instance being used. 080 * @param status the status to be set for the action. 081 * @return true if the action is scheduled for another retry. false if the number of retries has exceeded the 082 * maximum number of configured retries. 083 * @throws CommandException thrown if unable to handle transient 084 */ 085 protected boolean handleTransient(ActionExecutor.Context context, ActionExecutor executor, 086 WorkflowAction.Status status) throws CommandException { 087 LOG.debug("Attempting to retry"); 088 ActionExecutorContext aContext = (ActionExecutorContext) context; 089 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 090 incrActionErrorCounter(action.getType(), "transient", 1); 091 092 int actionRetryCount = action.getRetries(); 093 if (actionRetryCount >= executor.getMaxRetries()) { 094 LOG.warn("Exceeded max retry count [{0}]. Suspending Job", executor.getMaxRetries()); 095 return false; 096 } 097 else { 098 action.setStatus(status); 099 action.setPending(); 100 action.incRetries(); 101 long retryDelayMillis = getRetryDelay(actionRetryCount, executor.getRetryInterval(), executor.getRetryPolicy()); 102 action.setPendingAge(new Date(System.currentTimeMillis() + retryDelayMillis)); 103 LOG.info("Next Retry, Attempt Number [{0}] in [{1}] milliseconds", actionRetryCount + 1, retryDelayMillis); 104 this.resetUsed(); 105 queueCommandForTransientFailure(retryDelayMillis); 106 return true; 107 } 108 } 109 110 protected void queueCommandForTransientFailure(long retryDelayMillis){ 111 queue(this, retryDelayMillis); 112 } 113 /** 114 * Takes care of non transient failures. The job is suspended, and the state of the action is changed to *MANUAL and 115 * set pending flag of action to false 116 * 117 * @param context the execution context. 118 * @param executor the executor instance being used. 119 * @param status the status to be set for the action. 120 * @throws CommandException thrown if unable to suspend job 121 */ 122 protected void handleNonTransient(ActionExecutor.Context context, ActionExecutor executor, 123 WorkflowAction.Status status) throws CommandException { 124 ActionExecutorContext aContext = (ActionExecutorContext) context; 125 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 126 incrActionErrorCounter(action.getType(), "nontransient", 1); 127 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow(); 128 String id = workflow.getId(); 129 action.setStatus(status); 130 action.resetPendingOnly(); 131 LOG.warn("Suspending Workflow Job id=" + id); 132 try { 133 SuspendXCommand.suspendJob(Services.get().get(JPAService.class), workflow, id, action.getId(), null); 134 } 135 catch (Exception e) { 136 throw new CommandException(ErrorCode.E0727, id, e.getMessage()); 137 } 138 finally { 139 updateParentIfNecessary(workflow, 3); 140 } 141 } 142 143 /** 144 * Takes care of errors. <p> For errors while attempting to start the action, the job state is updated and an 145 * {@link ActionEndXCommand} is queued. <p> For errors while attempting to end the action, the job state is updated. 146 * <p> 147 * 148 * @param context the execution context. 149 * @param executor the executor instance being used. 150 * @param message 151 * @param isStart whether the error was generated while starting or ending an action. 152 * @param status the status to be set for the action. 153 * @throws CommandException thrown if unable to handle action error 154 */ 155 protected void handleError(ActionExecutor.Context context, ActionExecutor executor, String message, 156 boolean isStart, WorkflowAction.Status status) throws CommandException { 157 LOG.warn("Setting Action Status to [{0}]", status); 158 ActionExecutorContext aContext = (ActionExecutorContext) context; 159 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 160 WorkflowJobBean wfJob = (WorkflowJobBean) context.getWorkflow(); 161 if (!handleUserRetry(action, wfJob)) { 162 incrActionErrorCounter(action.getType(), "error", 1); 163 action.setPending(); 164 if (isStart) { 165 action.setExecutionData(message, null); 166 queue(new ActionEndXCommand(action.getId(), action.getType())); 167 } 168 else { 169 action.setEndData(status, WorkflowAction.Status.ERROR.toString()); 170 } 171 } 172 } 173 174 /** 175 * Fail the job due to failed action 176 * 177 * @param context the execution context. 178 * @throws CommandException thrown if unable to fail job 179 */ 180 public void failJob(ActionExecutor.Context context) throws CommandException { 181 ActionExecutorContext aContext = (ActionExecutorContext) context; 182 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 183 failJob(context, action); 184 } 185 186 /** 187 * Fail the job due to failed action 188 * 189 * @param context the execution context. 190 * @param action the action that caused the workflow to fail 191 * @throws CommandException thrown if unable to fail job 192 */ 193 public void failJob(ActionExecutor.Context context, WorkflowActionBean action) throws CommandException { 194 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow(); 195 if (!handleUserRetry(action, workflow)) { 196 incrActionErrorCounter(action.getType(), "failed", 1); 197 LOG.warn("Failing Job due to failed action [{0}]", action.getName()); 198 try { 199 workflow.getWorkflowInstance().fail(action.getName()); 200 WorkflowInstance wfInstance = workflow.getWorkflowInstance(); 201 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED); 202 workflow.setWorkflowInstance(wfInstance); 203 workflow.setStatus(WorkflowJob.Status.FAILED); 204 action.setStatus(WorkflowAction.Status.FAILED); 205 action.resetPending(); 206 queue(new WorkflowNotificationXCommand(workflow, action)); 207 queue(new KillXCommand(workflow.getId())); 208 InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER_NAME, 1, getInstrumentation()); 209 } 210 catch (WorkflowException ex) { 211 throw new CommandException(ex); 212 } 213 } 214 } 215 216 /** 217 * Execute retry for action if this action is eligible for user-retry 218 * 219 * @param action the Workflow action bean 220 * @return true if user-retry has to be handled for this action 221 * @throws CommandException thrown if unable to fail job 222 */ 223 public boolean handleUserRetry(WorkflowActionBean action, WorkflowJobBean wfJob) throws CommandException { 224 String errorCode = action.getErrorCode(); 225 Set<String> allowedRetryCode = LiteWorkflowStoreService.getUserRetryErrorCode(); 226 227 if ((allowedRetryCode.contains(LiteWorkflowStoreService.USER_ERROR_CODE_ALL) || allowedRetryCode.contains(errorCode)) 228 && action.getUserRetryCount() < action.getUserRetryMax()) { 229 LOG.info("Preparing retry this action [{0}], errorCode [{1}], userRetryCount [{2}], " 230 + "userRetryMax [{3}], userRetryInterval [{4}]", action.getId(), errorCode, action 231 .getUserRetryCount(), action.getUserRetryMax(), action.getUserRetryInterval()); 232 ActionExecutor.RETRYPOLICY retryPolicy = getUserRetryPolicy(action, wfJob); 233 long interval = getRetryDelay(action.getUserRetryCount(), action.getUserRetryInterval() * 60, retryPolicy); 234 action.setStatus(WorkflowAction.Status.USER_RETRY); 235 action.incrmentUserRetryCount(); 236 action.setPending(); 237 queue(new ActionStartXCommand(action.getId(), action.getType()), interval); 238 return true; 239 } 240 return false; 241 } 242 243 /* 244 * In case of action error increment the error count for instrumentation 245 */ 246 private void incrActionErrorCounter(String type, String error, int count) { 247 getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#ex." + error, count); 248 } 249 250 /** 251 * Increment the action counter in the instrumentation log. indicating how 252 * many times the action was executed since the start Oozie server 253 */ 254 protected void incrActionCounter(String type, int count) { 255 getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#" + getName(), count); 256 } 257 258 /** 259 * Adding a cron for the instrumentation time for the given Instrumentation 260 * group 261 */ 262 protected void addActionCron(String type, Instrumentation.Cron cron) { 263 getInstrumentation().addCron(INSTRUMENTATION_GROUP, type + "#" + getName(), cron); 264 } 265 266 /* 267 * Returns the next retry time in milliseconds, based on retry policy algorithm. 268 */ 269 private long getRetryDelay(int retryCount, long retryInterval, ActionExecutor.RETRYPOLICY retryPolicy) { 270 switch (retryPolicy) { 271 case EXPONENTIAL: 272 long retryTime = ((long) Math.pow(2, retryCount) * retryInterval * 1000L); 273 return retryTime; 274 case PERIODIC: 275 return retryInterval * 1000L; 276 default: 277 throw new UnsupportedOperationException("Retry policy not supported"); 278 } 279 } 280 281 /** 282 * Workflow action executor context 283 * 284 */ 285 public static class ActionExecutorContext implements ActionExecutor.Context { 286 protected final WorkflowJobBean workflow; 287 private Configuration protoConf; 288 protected final WorkflowActionBean action; 289 private final boolean isRetry; 290 private final boolean isUserRetry; 291 private boolean started; 292 private boolean ended; 293 private boolean executed; 294 private boolean shouldEndWF; 295 private Job.Status jobStatus; 296 297 /** 298 * Constructing the ActionExecutorContext, setting the private members 299 * and constructing the proto configuration 300 */ 301 public ActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry, boolean isUserRetry) { 302 this.workflow = workflow; 303 this.action = action; 304 this.isRetry = isRetry; 305 this.isUserRetry = isUserRetry; 306 try { 307 protoConf = new XConfiguration(new StringReader(workflow.getProtoActionConf())); 308 } 309 catch (IOException ex) { 310 throw new RuntimeException("It should not happen", ex); 311 } 312 } 313 314 /* 315 * (non-Javadoc) 316 * @see org.apache.oozie.action.ActionExecutor.Context#getCallbackUrl(java.lang.String) 317 */ 318 public String getCallbackUrl(String externalStatusVar) { 319 return Services.get().get(CallbackService.class).createCallBackUrl(action.getId(), externalStatusVar); 320 } 321 322 /* 323 * (non-Javadoc) 324 * @see org.apache.oozie.action.ActionExecutor.Context#getProtoActionConf() 325 */ 326 public Configuration getProtoActionConf() { 327 return protoConf; 328 } 329 330 /* 331 * (non-Javadoc) 332 * @see org.apache.oozie.action.ActionExecutor.Context#getWorkflow() 333 */ 334 public WorkflowJob getWorkflow() { 335 return workflow; 336 } 337 338 /** 339 * Returns the workflow action of the given action context 340 * 341 * @return the workflow action of the given action context 342 */ 343 public WorkflowAction getAction() { 344 return action; 345 } 346 347 /* 348 * (non-Javadoc) 349 * @see org.apache.oozie.action.ActionExecutor.Context#getELEvaluator() 350 */ 351 public ELEvaluator getELEvaluator() { 352 ELEvaluator evaluator = Services.get().get(ELService.class).createEvaluator("workflow"); 353 DagELFunctions.configureEvaluator(evaluator, workflow, action); 354 return evaluator; 355 } 356 357 /* 358 * (non-Javadoc) 359 * @see org.apache.oozie.action.ActionExecutor.Context#setVar(java.lang.String, java.lang.String) 360 */ 361 public void setVar(String name, String value) { 362 setVarToWorkflow(name, value); 363 } 364 365 /** 366 * This is not thread safe, don't use if workflowjob is shared among multiple actions command 367 * @param name 368 * @param value 369 */ 370 public void setVarToWorkflow(String name, String value) { 371 name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name; 372 WorkflowInstance wfInstance = workflow.getWorkflowInstance(); 373 wfInstance.setVar(name, value); 374 workflow.setWorkflowInstance(wfInstance); 375 } 376 377 /* 378 * (non-Javadoc) 379 * @see org.apache.oozie.action.ActionExecutor.Context#getVar(java.lang.String) 380 */ 381 public String getVar(String name) { 382 name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name; 383 return workflow.getWorkflowInstance().getVar(name); 384 } 385 386 /* 387 * (non-Javadoc) 388 * @see org.apache.oozie.action.ActionExecutor.Context#setStartData(java.lang.String, java.lang.String, java.lang.String) 389 */ 390 public void setStartData(String externalId, String trackerUri, String consoleUrl) { 391 action.setStartData(externalId, trackerUri, consoleUrl); 392 started = true; 393 } 394 395 /** 396 * Setting the start time of the action 397 */ 398 public void setStartTime() { 399 Date now = new Date(); 400 action.setStartTime(now); 401 } 402 403 /* 404 * (non-Javadoc) 405 * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionData(java.lang.String, java.util.Properties) 406 */ 407 public void setExecutionData(String externalStatus, Properties actionData) { 408 action.setExecutionData(externalStatus, actionData); 409 executed = true; 410 } 411 412 /* 413 * (non-Javadoc) 414 * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionStats(java.lang.String) 415 */ 416 public void setExecutionStats(String jsonStats) { 417 action.setExecutionStats(jsonStats); 418 executed = true; 419 } 420 421 /* 422 * (non-Javadoc) 423 * @see org.apache.oozie.action.ActionExecutor.Context#setExternalChildIDs(java.lang.String) 424 */ 425 public void setExternalChildIDs(String externalChildIDs) { 426 action.setExternalChildIDs(externalChildIDs); 427 executed = true; 428 } 429 430 /* 431 * (non-Javadoc) 432 * @see org.apache.oozie.action.ActionExecutor.Context#setEndData(org.apache.oozie.client.WorkflowAction.Status, java.lang.String) 433 */ 434 public void setEndData(WorkflowAction.Status status, String signalValue) { 435 action.setEndData(status, signalValue); 436 ended = true; 437 } 438 439 /* 440 * (non-Javadoc) 441 * @see org.apache.oozie.action.ActionExecutor.Context#isRetry() 442 */ 443 public boolean isRetry() { 444 return isRetry; 445 } 446 447 /** 448 * Return if the executor invocation is a user retry or not. 449 * 450 * @return if the executor invocation is a user retry or not. 451 */ 452 public boolean isUserRetry() { 453 return isUserRetry; 454 } 455 456 /** 457 * Returns whether setStartData has been called or not. 458 * 459 * @return true if start completion info has been set. 460 */ 461 public boolean isStarted() { 462 return started; 463 } 464 465 /** 466 * Returns whether setExecutionData has been called or not. 467 * 468 * @return true if execution completion info has been set, otherwise false. 469 */ 470 public boolean isExecuted() { 471 return executed; 472 } 473 474 /** 475 * Returns whether setEndData has been called or not. 476 * 477 * @return true if end completion info has been set. 478 */ 479 public boolean isEnded() { 480 return ended; 481 } 482 483 public void setExternalStatus(String externalStatus) { 484 action.setExternalStatus(externalStatus); 485 } 486 487 @Override 488 public String getRecoveryId() { 489 return action.getId() + RECOVERY_ID_SEPARATOR + workflow.getRun(); 490 } 491 492 /* (non-Javadoc) 493 * @see org.apache.oozie.action.ActionExecutor.Context#getActionDir() 494 */ 495 public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException { 496 String name = getWorkflow().getId() + "/" + action.getName() + "--" + action.getType(); 497 FileSystem fs = getAppFileSystem(); 498 String actionDirPath = Services.get().getSystemId() + "/" + name; 499 Path fqActionDir = new Path(fs.getHomeDirectory(), actionDirPath); 500 return fqActionDir; 501 } 502 503 /* (non-Javadoc) 504 * @see org.apache.oozie.action.ActionExecutor.Context#getAppFileSystem() 505 */ 506 public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException { 507 WorkflowJob workflow = getWorkflow(); 508 URI uri = new URI(getWorkflow().getAppPath()); 509 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 510 Configuration fsConf = has.createJobConf(uri.getAuthority()); 511 return has.createFileSystem(workflow.getUser(), uri, fsConf); 512 513 } 514 515 /* (non-Javadoc) 516 * @see org.apache.oozie.action.ActionExecutor.Context#setErrorInfo(java.lang.String, java.lang.String) 517 */ 518 @Override 519 public void setErrorInfo(String str, String exMsg) { 520 action.setErrorInfo(str, exMsg); 521 } 522 523 public boolean isShouldEndWF() { 524 return shouldEndWF; 525 } 526 527 public void setShouldEndWF(boolean shouldEndWF) { 528 this.shouldEndWF = shouldEndWF; 529 } 530 531 public Job.Status getJobStatus() { 532 return jobStatus; 533 } 534 535 public void setJobStatus(Job.Status jobStatus) { 536 this.jobStatus = jobStatus; 537 } 538 } 539 540 public static class ForkedActionExecutorContext extends ActionExecutorContext { 541 private Map<String, String> contextVariableMap = new HashMap<String, String>(); 542 543 public ForkedActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry, 544 boolean isUserRetry) { 545 super(workflow, action, isRetry, isUserRetry); 546 } 547 548 public void setVar(String name, String value) { 549 if (value != null) { 550 contextVariableMap.remove(name); 551 } 552 else { 553 contextVariableMap.put(name, value); 554 } 555 } 556 557 public String getVar(String name) { 558 return contextVariableMap.get(name); 559 } 560 561 public Map<String, String> getContextMap() { 562 return contextVariableMap; 563 } 564 } 565 566 /* 567 * Returns user retry policy 568 */ 569 private ActionExecutor.RETRYPOLICY getUserRetryPolicy(WorkflowActionBean wfAction, WorkflowJobBean wfJob) { 570 WorkflowInstance wfInstance = wfJob.getWorkflowInstance(); 571 LiteWorkflowApp wfApp = (LiteWorkflowApp) wfInstance.getApp(); 572 NodeDef nodeDef = wfApp.getNode(wfAction.getName()); 573 if (nodeDef == null) { 574 return ActionExecutor.RETRYPOLICY.valueOf(LiteWorkflowStoreService.DEFAULT_USER_RETRY_POLICY); 575 } 576 String userRetryPolicy = nodeDef.getUserRetryPolicy().toUpperCase(); 577 String userRetryPolicyInSysConfig = ConfigurationService.get(LiteWorkflowStoreService.CONF_USER_RETRY_POLICY) 578 .toUpperCase(); 579 if (isValidRetryPolicy(userRetryPolicy)) { 580 return ActionExecutor.RETRYPOLICY.valueOf(userRetryPolicy); 581 } 582 else if (isValidRetryPolicy(userRetryPolicyInSysConfig)) { 583 return ActionExecutor.RETRYPOLICY.valueOf(userRetryPolicyInSysConfig); 584 } 585 else { 586 return ActionExecutor.RETRYPOLICY.valueOf(LiteWorkflowStoreService.DEFAULT_USER_RETRY_POLICY); 587 } 588 } 589 590 /* 591 * Returns true if policy is valid, otherwise false 592 */ 593 private static boolean isValidRetryPolicy(String policy) { 594 try { 595 ActionExecutor.RETRYPOLICY.valueOf(policy.toUpperCase().trim()); 596 } 597 catch (IllegalArgumentException e) { 598 // Invalid Policy 599 return false; 600 } 601 return true; 602 } 603 604}