001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.net.URI; 023 import java.net.URISyntaxException; 024 import java.util.Date; 025 import java.util.Properties; 026 import java.util.Set; 027 028 import org.apache.hadoop.conf.Configuration; 029 import org.apache.hadoop.fs.FileSystem; 030 import org.apache.hadoop.fs.Path; 031 import org.apache.oozie.DagELFunctions; 032 import org.apache.oozie.ErrorCode; 033 import org.apache.oozie.WorkflowActionBean; 034 import org.apache.oozie.WorkflowJobBean; 035 import org.apache.oozie.action.ActionExecutor; 036 import org.apache.oozie.client.WorkflowAction; 037 import org.apache.oozie.client.WorkflowJob; 038 import org.apache.oozie.command.CommandException; 039 import org.apache.oozie.command.coord.CoordActionUpdateXCommand; 040 import org.apache.oozie.service.CallbackService; 041 import org.apache.oozie.service.ELService; 042 import org.apache.oozie.service.HadoopAccessorException; 043 import org.apache.oozie.service.HadoopAccessorService; 044 import org.apache.oozie.service.JPAService; 045 import org.apache.oozie.service.LiteWorkflowStoreService; 046 import org.apache.oozie.service.Services; 047 import org.apache.oozie.util.ELEvaluator; 048 import org.apache.oozie.util.InstrumentUtils; 049 import org.apache.oozie.util.Instrumentation; 050 import org.apache.oozie.util.XConfiguration; 051 import org.apache.oozie.workflow.WorkflowException; 052 import org.apache.oozie.workflow.WorkflowInstance; 053 import org.apache.oozie.workflow.lite.LiteWorkflowInstance; 054 055 /** 056 * Base class for Action execution commands. Provides common functionality to handle different types of errors while 057 * attempting to start or end an action. 058 */ 059 public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> { 060 private static final String INSTRUMENTATION_GROUP = "action.executors"; 061 062 protected static final String INSTR_FAILED_JOBS_COUNTER = "failed"; 063 064 protected static final String RECOVERY_ID_SEPARATOR = "@"; 065 066 public ActionXCommand(String name, String type, int priority) { 067 super(name, type, priority); 068 } 069 070 /** 071 * Takes care of Transient failures. Sets the action status to retry and increments the retry count if not enough 072 * attempts have been made. Otherwise returns false. 073 * 074 * @param context the execution context. 075 * @param executor the executor instance being used. 076 * @param status the status to be set for the action. 077 * @return true if the action is scheduled for another retry. false if the number of retries has exceeded the 078 * maximum number of configured retries. 079 * @throws CommandException thrown if unable to handle transient 080 */ 081 protected boolean handleTransient(ActionExecutor.Context context, ActionExecutor executor, 082 WorkflowAction.Status status) throws CommandException { 083 LOG.debug("Attempting to retry"); 084 ActionExecutorContext aContext = (ActionExecutorContext) context; 085 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 086 incrActionErrorCounter(action.getType(), "transient", 1); 087 088 int actionRetryCount = action.getRetries(); 089 if (actionRetryCount >= executor.getMaxRetries()) { 090 LOG.warn("Exceeded max retry count [{0}]. Suspending Job", executor.getMaxRetries()); 091 return false; 092 } 093 else { 094 action.setStatus(status); 095 action.setPending(); 096 action.incRetries(); 097 long retryDelayMillis = executor.getRetryInterval() * 1000; 098 action.setPendingAge(new Date(System.currentTimeMillis() + retryDelayMillis)); 099 LOG.info("Next Retry, Attempt Number [{0}] in [{1}] milliseconds", actionRetryCount + 1, retryDelayMillis); 100 this.resetUsed(); 101 queue(this, retryDelayMillis); 102 return true; 103 } 104 } 105 106 /** 107 * Takes care of non transient failures. The job is suspended, and the state of the action is changed to *MANUAL and 108 * set pending flag of action to false 109 * 110 * @param context the execution context. 111 * @param executor the executor instance being used. 112 * @param status the status to be set for the action. 113 * @throws CommandException thrown if unable to suspend job 114 */ 115 protected void handleNonTransient(ActionExecutor.Context context, ActionExecutor executor, 116 WorkflowAction.Status status) throws CommandException { 117 ActionExecutorContext aContext = (ActionExecutorContext) context; 118 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 119 incrActionErrorCounter(action.getType(), "nontransient", 1); 120 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow(); 121 String id = workflow.getId(); 122 action.setStatus(status); 123 action.resetPendingOnly(); 124 LOG.warn("Suspending Workflow Job id=" + id); 125 try { 126 SuspendXCommand.suspendJob(Services.get().get(JPAService.class), workflow, id, action.getId(), null); 127 } 128 catch (Exception e) { 129 throw new CommandException(ErrorCode.E0727, id, e.getMessage()); 130 } 131 finally { 132 // update coordinator action 133 new CoordActionUpdateXCommand(workflow, 3).call(); 134 } 135 } 136 137 /** 138 * Takes care of errors. </p> For errors while attempting to start the action, the job state is updated and an 139 * {@link ActionEndCommand} is queued. </p> For errors while attempting to end the action, the job state is updated. 140 * </p> 141 * 142 * @param context the execution context. 143 * @param executor the executor instance being used. 144 * @param message 145 * @param isStart whether the error was generated while starting or ending an action. 146 * @param status the status to be set for the action. 147 * @throws CommandException thrown if unable to handle action error 148 */ 149 protected void handleError(ActionExecutor.Context context, ActionExecutor executor, String message, 150 boolean isStart, WorkflowAction.Status status) throws CommandException { 151 LOG.warn("Setting Action Status to [{0}]", status); 152 ActionExecutorContext aContext = (ActionExecutorContext) context; 153 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 154 155 if (!handleUserRetry(action)) { 156 incrActionErrorCounter(action.getType(), "error", 1); 157 action.setPending(); 158 if (isStart) { 159 action.setExecutionData(message, null); 160 queue(new ActionEndXCommand(action.getId(), action.getType())); 161 } 162 else { 163 action.setEndData(status, WorkflowAction.Status.ERROR.toString()); 164 } 165 } 166 } 167 168 /** 169 * Fail the job due to failed action 170 * 171 * @param context the execution context. 172 * @throws CommandException thrown if unable to fail job 173 */ 174 public void failJob(ActionExecutor.Context context) throws CommandException { 175 ActionExecutorContext aContext = (ActionExecutorContext) context; 176 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 177 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow(); 178 179 if (!handleUserRetry(action)) { 180 incrActionErrorCounter(action.getType(), "failed", 1); 181 LOG.warn("Failing Job due to failed action [{0}]", action.getName()); 182 try { 183 workflow.getWorkflowInstance().fail(action.getName()); 184 WorkflowInstance wfInstance = workflow.getWorkflowInstance(); 185 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED); 186 workflow.setWorkflowInstance(wfInstance); 187 workflow.setStatus(WorkflowJob.Status.FAILED); 188 action.setStatus(WorkflowAction.Status.FAILED); 189 action.resetPending(); 190 queue(new NotificationXCommand(workflow, action)); 191 queue(new KillXCommand(workflow.getId())); 192 InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1, getInstrumentation()); 193 } 194 catch (WorkflowException ex) { 195 throw new CommandException(ex); 196 } 197 } 198 } 199 200 /** 201 * Execute retry for action if this action is eligible for user-retry 202 * 203 * @param context the execution context. 204 * @return true if user-retry has to be handled for this action 205 * @throws CommandException thrown if unable to fail job 206 */ 207 public boolean handleUserRetry(WorkflowActionBean action) throws CommandException { 208 String errorCode = action.getErrorCode(); 209 Set<String> allowedRetryCode = LiteWorkflowStoreService.getUserRetryErrorCode(); 210 211 if (allowedRetryCode.contains(errorCode) && action.getUserRetryCount() < action.getUserRetryMax()) { 212 LOG.info("Preparing retry this action [{0}], errorCode [{1}], userRetryCount [{2}], " 213 + "userRetryMax [{3}], userRetryInterval [{4}]", action.getId(), errorCode, action 214 .getUserRetryCount(), action.getUserRetryMax(), action.getUserRetryInterval()); 215 int interval = action.getUserRetryInterval() * 60 * 1000; 216 action.setStatus(WorkflowAction.Status.USER_RETRY); 217 action.incrmentUserRetryCount(); 218 action.setPending(); 219 queue(new ActionStartXCommand(action.getId(), action.getType()), interval); 220 return true; 221 } 222 return false; 223 } 224 225 /* 226 * In case of action error increment the error count for instrumentation 227 */ 228 private void incrActionErrorCounter(String type, String error, int count) { 229 getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#ex." + error, count); 230 } 231 232 /** 233 * Increment the action counter in the instrumentation log. indicating how 234 * many times the action was executed since the start Oozie server 235 */ 236 protected void incrActionCounter(String type, int count) { 237 getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#" + getName(), count); 238 } 239 240 /** 241 * Adding a cron for the instrumentation time for the given Instrumentation 242 * group 243 */ 244 protected void addActionCron(String type, Instrumentation.Cron cron) { 245 getInstrumentation().addCron(INSTRUMENTATION_GROUP, type + "#" + getName(), cron); 246 } 247 248 /** 249 * Workflow action executor context 250 * 251 */ 252 public static class ActionExecutorContext implements ActionExecutor.Context { 253 private final WorkflowJobBean workflow; 254 private Configuration protoConf; 255 private final WorkflowActionBean action; 256 private final boolean isRetry; 257 private final boolean isUserRetry; 258 private boolean started; 259 private boolean ended; 260 private boolean executed; 261 262 /** 263 * Constructing the ActionExecutorContext, setting the private members 264 * and constructing the proto configuration 265 */ 266 public ActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry, boolean isUserRetry) { 267 this.workflow = workflow; 268 this.action = action; 269 this.isRetry = isRetry; 270 this.isUserRetry = isUserRetry; 271 try { 272 protoConf = new XConfiguration(new StringReader(workflow.getProtoActionConf())); 273 } 274 catch (IOException ex) { 275 throw new RuntimeException("It should not happen", ex); 276 } 277 } 278 279 /* 280 * (non-Javadoc) 281 * @see org.apache.oozie.action.ActionExecutor.Context#getCallbackUrl(java.lang.String) 282 */ 283 public String getCallbackUrl(String externalStatusVar) { 284 return Services.get().get(CallbackService.class).createCallBackUrl(action.getId(), externalStatusVar); 285 } 286 287 /* 288 * (non-Javadoc) 289 * @see org.apache.oozie.action.ActionExecutor.Context#getProtoActionConf() 290 */ 291 public Configuration getProtoActionConf() { 292 return protoConf; 293 } 294 295 /* 296 * (non-Javadoc) 297 * @see org.apache.oozie.action.ActionExecutor.Context#getWorkflow() 298 */ 299 public WorkflowJob getWorkflow() { 300 return workflow; 301 } 302 303 /** 304 * Returns the workflow action of the given action context 305 * 306 * @return the workflow action of the given action context 307 */ 308 public WorkflowAction getAction() { 309 return action; 310 } 311 312 /* 313 * (non-Javadoc) 314 * @see org.apache.oozie.action.ActionExecutor.Context#getELEvaluator() 315 */ 316 public ELEvaluator getELEvaluator() { 317 ELEvaluator evaluator = Services.get().get(ELService.class).createEvaluator("workflow"); 318 DagELFunctions.configureEvaluator(evaluator, workflow, action); 319 return evaluator; 320 } 321 322 /* 323 * (non-Javadoc) 324 * @see org.apache.oozie.action.ActionExecutor.Context#setVar(java.lang.String, java.lang.String) 325 */ 326 public void setVar(String name, String value) { 327 name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name; 328 WorkflowInstance wfInstance = workflow.getWorkflowInstance(); 329 wfInstance.setVar(name, value); 330 workflow.setWorkflowInstance(wfInstance); 331 } 332 333 /* 334 * (non-Javadoc) 335 * @see org.apache.oozie.action.ActionExecutor.Context#getVar(java.lang.String) 336 */ 337 public String getVar(String name) { 338 name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name; 339 return workflow.getWorkflowInstance().getVar(name); 340 } 341 342 /* 343 * (non-Javadoc) 344 * @see org.apache.oozie.action.ActionExecutor.Context#setStartData(java.lang.String, java.lang.String, java.lang.String) 345 */ 346 public void setStartData(String externalId, String trackerUri, String consoleUrl) { 347 action.setStartData(externalId, trackerUri, consoleUrl); 348 started = true; 349 } 350 351 /** 352 * Setting the start time of the action 353 */ 354 public void setStartTime() { 355 Date now = new Date(); 356 action.setStartTime(now); 357 } 358 359 /* 360 * (non-Javadoc) 361 * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionData(java.lang.String, java.util.Properties) 362 */ 363 public void setExecutionData(String externalStatus, Properties actionData) { 364 action.setExecutionData(externalStatus, actionData); 365 executed = true; 366 } 367 368 /* 369 * (non-Javadoc) 370 * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionStats(java.lang.String) 371 */ 372 public void setExecutionStats(String jsonStats) { 373 action.setExecutionStats(jsonStats); 374 executed = true; 375 } 376 377 /* 378 * (non-Javadoc) 379 * @see org.apache.oozie.action.ActionExecutor.Context#setExternalChildIDs(java.lang.String) 380 */ 381 public void setExternalChildIDs(String externalChildIDs) { 382 action.setExternalChildIDs(externalChildIDs); 383 executed = true; 384 } 385 386 /* 387 * (non-Javadoc) 388 * @see org.apache.oozie.action.ActionExecutor.Context#setEndData(org.apache.oozie.client.WorkflowAction.Status, java.lang.String) 389 */ 390 public void setEndData(WorkflowAction.Status status, String signalValue) { 391 action.setEndData(status, signalValue); 392 ended = true; 393 } 394 395 /* 396 * (non-Javadoc) 397 * @see org.apache.oozie.action.ActionExecutor.Context#isRetry() 398 */ 399 public boolean isRetry() { 400 return isRetry; 401 } 402 403 /** 404 * Return if the executor invocation is a user retry or not. 405 * 406 * @return if the executor invocation is a user retry or not. 407 */ 408 public boolean isUserRetry() { 409 return isUserRetry; 410 } 411 412 /** 413 * Returns whether setStartData has been called or not. 414 * 415 * @return true if start completion info has been set. 416 */ 417 public boolean isStarted() { 418 return started; 419 } 420 421 /** 422 * Returns whether setExecutionData has been called or not. 423 * 424 * @return true if execution completion info has been set, otherwise false. 425 */ 426 public boolean isExecuted() { 427 return executed; 428 } 429 430 /** 431 * Returns whether setEndData has been called or not. 432 * 433 * @return true if end completion info has been set. 434 */ 435 public boolean isEnded() { 436 return ended; 437 } 438 439 public void setExternalStatus(String externalStatus) { 440 action.setExternalStatus(externalStatus); 441 } 442 443 @Override 444 public String getRecoveryId() { 445 return action.getId() + RECOVERY_ID_SEPARATOR + workflow.getRun(); 446 } 447 448 /* (non-Javadoc) 449 * @see org.apache.oozie.action.ActionExecutor.Context#getActionDir() 450 */ 451 public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException { 452 String name = getWorkflow().getId() + "/" + action.getName() + "--" + action.getType(); 453 FileSystem fs = getAppFileSystem(); 454 String actionDirPath = Services.get().getSystemId() + "/" + name; 455 Path fqActionDir = new Path(fs.getHomeDirectory(), actionDirPath); 456 return fqActionDir; 457 } 458 459 /* (non-Javadoc) 460 * @see org.apache.oozie.action.ActionExecutor.Context#getAppFileSystem() 461 */ 462 public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException { 463 WorkflowJob workflow = getWorkflow(); 464 URI uri = new URI(getWorkflow().getAppPath()); 465 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 466 Configuration fsConf = has.createJobConf(uri.getAuthority()); 467 return has.createFileSystem(workflow.getUser(), uri, fsConf); 468 469 } 470 471 /* (non-Javadoc) 472 * @see org.apache.oozie.action.ActionExecutor.Context#setErrorInfo(java.lang.String, java.lang.String) 473 */ 474 @Override 475 public void setErrorInfo(String str, String exMsg) { 476 action.setErrorInfo(str, exMsg); 477 } 478 } 479 480 }