001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.net.URI; 023 import java.net.URISyntaxException; 024 import java.util.Date; 025 import java.util.Properties; 026 import java.util.Set; 027 028 import org.apache.hadoop.conf.Configuration; 029 import org.apache.hadoop.fs.FileSystem; 030 import org.apache.hadoop.fs.Path; 031 import org.apache.oozie.DagELFunctions; 032 import org.apache.oozie.ErrorCode; 033 import org.apache.oozie.WorkflowActionBean; 034 import org.apache.oozie.WorkflowJobBean; 035 import org.apache.oozie.action.ActionExecutor; 036 import org.apache.oozie.client.WorkflowAction; 037 import org.apache.oozie.client.WorkflowJob; 038 import org.apache.oozie.command.CommandException; 039 import org.apache.oozie.command.coord.CoordActionUpdateXCommand; 040 import org.apache.oozie.service.CallbackService; 041 import org.apache.oozie.service.ELService; 042 import org.apache.oozie.service.HadoopAccessorException; 043 import org.apache.oozie.service.HadoopAccessorService; 044 import org.apache.oozie.service.JPAService; 045 import org.apache.oozie.service.LiteWorkflowStoreService; 046 import org.apache.oozie.service.Services; 047 import org.apache.oozie.util.ELEvaluator; 048 import org.apache.oozie.util.InstrumentUtils; 049 import org.apache.oozie.util.Instrumentation; 050 import org.apache.oozie.util.XConfiguration; 051 import org.apache.oozie.workflow.WorkflowException; 052 import org.apache.oozie.workflow.WorkflowInstance; 053 import org.apache.oozie.workflow.lite.LiteWorkflowInstance; 054 055 /** 056 * Base class for Action execution commands. Provides common functionality to handle different types of errors while 057 * attempting to start or end an action. 058 */ 059 public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> { 060 private static final String INSTRUMENTATION_GROUP = "action.executors"; 061 062 protected static final String INSTR_FAILED_JOBS_COUNTER = "failed"; 063 064 protected static final String RECOVERY_ID_SEPARATOR = "@"; 065 066 public ActionXCommand(String name, String type, int priority) { 067 super(name, type, priority); 068 } 069 070 /** 071 * Takes care of Transient failures. Sets the action status to retry and increments the retry count if not enough 072 * attempts have been made. Otherwise returns false. 073 * 074 * @param context the execution context. 075 * @param executor the executor instance being used. 076 * @param status the status to be set for the action. 077 * @return true if the action is scheduled for another retry. false if the number of retries has exceeded the 078 * maximum number of configured retries. 079 * @throws CommandException thrown if unable to handle transient 080 */ 081 protected boolean handleTransient(ActionExecutor.Context context, ActionExecutor executor, 082 WorkflowAction.Status status) throws CommandException { 083 LOG.debug("Attempting to retry"); 084 ActionExecutorContext aContext = (ActionExecutorContext) context; 085 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 086 incrActionErrorCounter(action.getType(), "transient", 1); 087 088 int actionRetryCount = action.getRetries(); 089 if (actionRetryCount >= executor.getMaxRetries()) { 090 LOG.warn("Exceeded max retry count [{0}]. Suspending Job", executor.getMaxRetries()); 091 return false; 092 } 093 else { 094 action.setStatus(status); 095 action.setPending(); 096 action.incRetries(); 097 long retryDelayMillis = executor.getRetryInterval() * 1000; 098 action.setPendingAge(new Date(System.currentTimeMillis() + retryDelayMillis)); 099 LOG.info("Next Retry, Attempt Number [{0}] in [{1}] milliseconds", actionRetryCount + 1, retryDelayMillis); 100 this.resetUsed(); 101 queue(this, retryDelayMillis); 102 return true; 103 } 104 } 105 106 /** 107 * Takes care of non transient failures. The job is suspended, and the state of the action is changed to *MANUAL and 108 * set pending flag of action to false 109 * 110 * @param context the execution context. 111 * @param executor the executor instance being used. 112 * @param status the status to be set for the action. 113 * @throws CommandException thrown if unable to suspend job 114 */ 115 protected void handleNonTransient(ActionExecutor.Context context, ActionExecutor executor, 116 WorkflowAction.Status status) throws CommandException { 117 ActionExecutorContext aContext = (ActionExecutorContext) context; 118 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 119 incrActionErrorCounter(action.getType(), "nontransient", 1); 120 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow(); 121 String id = workflow.getId(); 122 action.setStatus(status); 123 action.resetPendingOnly(); 124 LOG.warn("Suspending Workflow Job id=" + id); 125 try { 126 SuspendXCommand.suspendJob(Services.get().get(JPAService.class), workflow, id, action.getId(), null); 127 } 128 catch (Exception e) { 129 throw new CommandException(ErrorCode.E0727, id, e.getMessage()); 130 } 131 finally { 132 // update coordinator action 133 new CoordActionUpdateXCommand(workflow, 3).call(); 134 } 135 } 136 137 /** 138 * Takes care of errors. </p> For errors while attempting to start the action, the job state is updated and an 139 * {@link ActionEndCommand} is queued. </p> For errors while attempting to end the action, the job state is updated. 140 * </p> 141 * 142 * @param context the execution context. 143 * @param executor the executor instance being used. 144 * @param message 145 * @param isStart whether the error was generated while starting or ending an action. 146 * @param status the status to be set for the action. 147 * @throws CommandException thrown if unable to handle action error 148 */ 149 protected void handleError(ActionExecutor.Context context, ActionExecutor executor, String message, 150 boolean isStart, WorkflowAction.Status status) throws CommandException { 151 LOG.warn("Setting Action Status to [{0}]", status); 152 ActionExecutorContext aContext = (ActionExecutorContext) context; 153 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 154 155 if (!handleUserRetry(action)) { 156 incrActionErrorCounter(action.getType(), "error", 1); 157 action.setPending(); 158 if (isStart) { 159 action.setExecutionData(message, null); 160 queue(new ActionEndXCommand(action.getId(), action.getType())); 161 } 162 else { 163 action.setEndData(status, WorkflowAction.Status.ERROR.toString()); 164 } 165 } 166 } 167 168 /** 169 * Fail the job due to failed action 170 * 171 * @param context the execution context. 172 * @throws CommandException thrown if unable to fail job 173 */ 174 public void failJob(ActionExecutor.Context context) throws CommandException { 175 ActionExecutorContext aContext = (ActionExecutorContext) context; 176 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 177 failJob(context, action); 178 } 179 180 /** 181 * Fail the job due to failed action 182 * 183 * @param context the execution context. 184 * @param action the action that caused the workflow to fail 185 * @throws CommandException thrown if unable to fail job 186 */ 187 public void failJob(ActionExecutor.Context context, WorkflowActionBean action) throws CommandException { 188 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow(); 189 if (!handleUserRetry(action)) { 190 incrActionErrorCounter(action.getType(), "failed", 1); 191 LOG.warn("Failing Job due to failed action [{0}]", action.getName()); 192 try { 193 workflow.getWorkflowInstance().fail(action.getName()); 194 WorkflowInstance wfInstance = workflow.getWorkflowInstance(); 195 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED); 196 workflow.setWorkflowInstance(wfInstance); 197 workflow.setStatus(WorkflowJob.Status.FAILED); 198 action.setStatus(WorkflowAction.Status.FAILED); 199 action.resetPending(); 200 queue(new NotificationXCommand(workflow, action)); 201 queue(new KillXCommand(workflow.getId())); 202 InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1, getInstrumentation()); 203 } 204 catch (WorkflowException ex) { 205 throw new CommandException(ex); 206 } 207 } 208 } 209 210 /** 211 * Execute retry for action if this action is eligible for user-retry 212 * 213 * @param context the execution context. 214 * @return true if user-retry has to be handled for this action 215 * @throws CommandException thrown if unable to fail job 216 */ 217 public boolean handleUserRetry(WorkflowActionBean action) throws CommandException { 218 String errorCode = action.getErrorCode(); 219 Set<String> allowedRetryCode = LiteWorkflowStoreService.getUserRetryErrorCode(); 220 221 if (allowedRetryCode.contains(errorCode) && action.getUserRetryCount() < action.getUserRetryMax()) { 222 LOG.info("Preparing retry this action [{0}], errorCode [{1}], userRetryCount [{2}], " 223 + "userRetryMax [{3}], userRetryInterval [{4}]", action.getId(), errorCode, action 224 .getUserRetryCount(), action.getUserRetryMax(), action.getUserRetryInterval()); 225 int interval = action.getUserRetryInterval() * 60 * 1000; 226 action.setStatus(WorkflowAction.Status.USER_RETRY); 227 action.incrmentUserRetryCount(); 228 action.setPending(); 229 queue(new ActionStartXCommand(action.getId(), action.getType()), interval); 230 return true; 231 } 232 return false; 233 } 234 235 /* 236 * In case of action error increment the error count for instrumentation 237 */ 238 private void incrActionErrorCounter(String type, String error, int count) { 239 getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#ex." + error, count); 240 } 241 242 /** 243 * Increment the action counter in the instrumentation log. indicating how 244 * many times the action was executed since the start Oozie server 245 */ 246 protected void incrActionCounter(String type, int count) { 247 getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#" + getName(), count); 248 } 249 250 /** 251 * Adding a cron for the instrumentation time for the given Instrumentation 252 * group 253 */ 254 protected void addActionCron(String type, Instrumentation.Cron cron) { 255 getInstrumentation().addCron(INSTRUMENTATION_GROUP, type + "#" + getName(), cron); 256 } 257 258 /** 259 * Workflow action executor context 260 * 261 */ 262 public static class ActionExecutorContext implements ActionExecutor.Context { 263 private final WorkflowJobBean workflow; 264 private Configuration protoConf; 265 private final WorkflowActionBean action; 266 private final boolean isRetry; 267 private final boolean isUserRetry; 268 private boolean started; 269 private boolean ended; 270 private boolean executed; 271 272 /** 273 * Constructing the ActionExecutorContext, setting the private members 274 * and constructing the proto configuration 275 */ 276 public ActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry, boolean isUserRetry) { 277 this.workflow = workflow; 278 this.action = action; 279 this.isRetry = isRetry; 280 this.isUserRetry = isUserRetry; 281 try { 282 protoConf = new XConfiguration(new StringReader(workflow.getProtoActionConf())); 283 } 284 catch (IOException ex) { 285 throw new RuntimeException("It should not happen", ex); 286 } 287 } 288 289 /* 290 * (non-Javadoc) 291 * @see org.apache.oozie.action.ActionExecutor.Context#getCallbackUrl(java.lang.String) 292 */ 293 public String getCallbackUrl(String externalStatusVar) { 294 return Services.get().get(CallbackService.class).createCallBackUrl(action.getId(), externalStatusVar); 295 } 296 297 /* 298 * (non-Javadoc) 299 * @see org.apache.oozie.action.ActionExecutor.Context#getProtoActionConf() 300 */ 301 public Configuration getProtoActionConf() { 302 return protoConf; 303 } 304 305 /* 306 * (non-Javadoc) 307 * @see org.apache.oozie.action.ActionExecutor.Context#getWorkflow() 308 */ 309 public WorkflowJob getWorkflow() { 310 return workflow; 311 } 312 313 /** 314 * Returns the workflow action of the given action context 315 * 316 * @return the workflow action of the given action context 317 */ 318 public WorkflowAction getAction() { 319 return action; 320 } 321 322 /* 323 * (non-Javadoc) 324 * @see org.apache.oozie.action.ActionExecutor.Context#getELEvaluator() 325 */ 326 public ELEvaluator getELEvaluator() { 327 ELEvaluator evaluator = Services.get().get(ELService.class).createEvaluator("workflow"); 328 DagELFunctions.configureEvaluator(evaluator, workflow, action); 329 return evaluator; 330 } 331 332 /* 333 * (non-Javadoc) 334 * @see org.apache.oozie.action.ActionExecutor.Context#setVar(java.lang.String, java.lang.String) 335 */ 336 public void setVar(String name, String value) { 337 name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name; 338 WorkflowInstance wfInstance = workflow.getWorkflowInstance(); 339 wfInstance.setVar(name, value); 340 workflow.setWorkflowInstance(wfInstance); 341 } 342 343 /* 344 * (non-Javadoc) 345 * @see org.apache.oozie.action.ActionExecutor.Context#getVar(java.lang.String) 346 */ 347 public String getVar(String name) { 348 name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name; 349 return workflow.getWorkflowInstance().getVar(name); 350 } 351 352 /* 353 * (non-Javadoc) 354 * @see org.apache.oozie.action.ActionExecutor.Context#setStartData(java.lang.String, java.lang.String, java.lang.String) 355 */ 356 public void setStartData(String externalId, String trackerUri, String consoleUrl) { 357 action.setStartData(externalId, trackerUri, consoleUrl); 358 started = true; 359 } 360 361 /** 362 * Setting the start time of the action 363 */ 364 public void setStartTime() { 365 Date now = new Date(); 366 action.setStartTime(now); 367 } 368 369 /* 370 * (non-Javadoc) 371 * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionData(java.lang.String, java.util.Properties) 372 */ 373 public void setExecutionData(String externalStatus, Properties actionData) { 374 action.setExecutionData(externalStatus, actionData); 375 executed = true; 376 } 377 378 /* 379 * (non-Javadoc) 380 * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionStats(java.lang.String) 381 */ 382 public void setExecutionStats(String jsonStats) { 383 action.setExecutionStats(jsonStats); 384 executed = true; 385 } 386 387 /* 388 * (non-Javadoc) 389 * @see org.apache.oozie.action.ActionExecutor.Context#setExternalChildIDs(java.lang.String) 390 */ 391 public void setExternalChildIDs(String externalChildIDs) { 392 action.setExternalChildIDs(externalChildIDs); 393 executed = true; 394 } 395 396 /* 397 * (non-Javadoc) 398 * @see org.apache.oozie.action.ActionExecutor.Context#setEndData(org.apache.oozie.client.WorkflowAction.Status, java.lang.String) 399 */ 400 public void setEndData(WorkflowAction.Status status, String signalValue) { 401 action.setEndData(status, signalValue); 402 ended = true; 403 } 404 405 /* 406 * (non-Javadoc) 407 * @see org.apache.oozie.action.ActionExecutor.Context#isRetry() 408 */ 409 public boolean isRetry() { 410 return isRetry; 411 } 412 413 /** 414 * Return if the executor invocation is a user retry or not. 415 * 416 * @return if the executor invocation is a user retry or not. 417 */ 418 public boolean isUserRetry() { 419 return isUserRetry; 420 } 421 422 /** 423 * Returns whether setStartData has been called or not. 424 * 425 * @return true if start completion info has been set. 426 */ 427 public boolean isStarted() { 428 return started; 429 } 430 431 /** 432 * Returns whether setExecutionData has been called or not. 433 * 434 * @return true if execution completion info has been set, otherwise false. 435 */ 436 public boolean isExecuted() { 437 return executed; 438 } 439 440 /** 441 * Returns whether setEndData has been called or not. 442 * 443 * @return true if end completion info has been set. 444 */ 445 public boolean isEnded() { 446 return ended; 447 } 448 449 public void setExternalStatus(String externalStatus) { 450 action.setExternalStatus(externalStatus); 451 } 452 453 @Override 454 public String getRecoveryId() { 455 return action.getId() + RECOVERY_ID_SEPARATOR + workflow.getRun(); 456 } 457 458 /* (non-Javadoc) 459 * @see org.apache.oozie.action.ActionExecutor.Context#getActionDir() 460 */ 461 public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException { 462 String name = getWorkflow().getId() + "/" + action.getName() + "--" + action.getType(); 463 FileSystem fs = getAppFileSystem(); 464 String actionDirPath = Services.get().getSystemId() + "/" + name; 465 Path fqActionDir = new Path(fs.getHomeDirectory(), actionDirPath); 466 return fqActionDir; 467 } 468 469 /* (non-Javadoc) 470 * @see org.apache.oozie.action.ActionExecutor.Context#getAppFileSystem() 471 */ 472 public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException { 473 WorkflowJob workflow = getWorkflow(); 474 URI uri = new URI(getWorkflow().getAppPath()); 475 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 476 Configuration fsConf = has.createJobConf(uri.getAuthority()); 477 return has.createFileSystem(workflow.getUser(), uri, fsConf); 478 479 } 480 481 /* (non-Javadoc) 482 * @see org.apache.oozie.action.ActionExecutor.Context#setErrorInfo(java.lang.String, java.lang.String) 483 */ 484 @Override 485 public void setErrorInfo(String str, String exMsg) { 486 action.setErrorInfo(str, exMsg); 487 } 488 } 489 490 }