001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.action; 020 021import org.apache.commons.lang.StringUtils; 022import org.apache.hadoop.fs.FileSystem; 023import org.apache.hadoop.fs.Path; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.oozie.client.WorkflowAction; 026import org.apache.oozie.client.WorkflowJob; 027import org.apache.oozie.service.ConfigurationService; 028import org.apache.oozie.util.ELEvaluator; 029import org.apache.oozie.util.ParamChecker; 030import org.apache.oozie.util.XLog; 031import org.apache.oozie.service.HadoopAccessorException; 032import org.apache.oozie.service.Services; 033 034import java.io.ByteArrayOutputStream; 035import java.io.IOException; 036import java.io.PrintStream; 037import java.net.URISyntaxException; 038import java.util.HashMap; 039import java.util.Map; 040import java.util.Properties; 041import java.util.LinkedHashMap; 042 043/** 044 * Base action executor class. <p> All the action executors should extend this class. 045 */ 046public abstract class ActionExecutor { 047 048 /** 049 * Configuration prefix for action executor (sub-classes) properties. 050 */ 051 public static final String CONF_PREFIX = "oozie.action."; 052 053 public static final String MAX_RETRIES = CONF_PREFIX + "retries.max"; 054 055 public static final String ACTION_RETRY_INTERVAL = CONF_PREFIX + "retry.interval"; 056 057 public static final String ACTION_RETRY_POLICY = CONF_PREFIX + "retry.policy"; 058 059 public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag"; 060 061 062 /** 063 * Error code used by {@link #convertException} when there is not register error information for an exception. 064 */ 065 public static final String ERROR_OTHER = "OTHER"; 066 067 public static enum RETRYPOLICY { 068 EXPONENTIAL, PERIODIC 069 } 070 071 private static class ErrorInfo { 072 ActionExecutorException.ErrorType errorType; 073 String errorCode; 074 Class<?> errorClass; 075 076 private ErrorInfo(ActionExecutorException.ErrorType errorType, String errorCode, Class<?> errorClass) { 077 this.errorType = errorType; 078 this.errorCode = errorCode; 079 this.errorClass = errorClass; 080 } 081 } 082 083 private static boolean initMode = false; 084 private static Map<String, Map<String, ErrorInfo>> ERROR_INFOS = new HashMap<String, Map<String, ErrorInfo>>(); 085 086 /** 087 * Context information passed to the ActionExecutor methods. 088 */ 089 public interface Context { 090 091 /** 092 * Create the callback URL for the action. 093 * 094 * @param externalStatusVar variable for the caller to inject the external status. 095 * @return the callback URL. 096 */ 097 String getCallbackUrl(String externalStatusVar); 098 099 /** 100 * Return a proto configuration for actions with auth properties already set. 101 * 102 * @return a proto configuration for actions with auth properties already set. 103 */ 104 Configuration getProtoActionConf(); 105 106 /** 107 * Return the workflow job. 108 * 109 * @return the workflow job. 110 */ 111 WorkflowJob getWorkflow(); 112 113 /** 114 * Return an ELEvaluator with the context injected. 115 * 116 * @return configured ELEvaluator. 117 */ 118 ELEvaluator getELEvaluator(); 119 120 /** 121 * Set a workflow action variable. <p> Convenience method that prefixes the variable name with the action name 122 * plus a '.'. 123 * 124 * @param name variable name. 125 * @param value variable value, <code>null</code> removes the variable. 126 */ 127 void setVar(String name, String value); 128 129 /** 130 * Get a workflow action variable. <p> Convenience method that prefixes the variable name with the action name 131 * plus a '.'. 132 * 133 * @param name variable name. 134 * @return the variable value, <code>null</code> if not set. 135 */ 136 String getVar(String name); 137 138 /** 139 * Set the action tracking information for an successfully started action. 140 * 141 * @param externalId the action external ID. 142 * @param trackerUri the action tracker URI. 143 * @param consoleUrl the action console URL. 144 */ 145 void setStartData(String externalId, String trackerUri, String consoleUrl); 146 147 /** 148 * Set the action execution completion information for an action. The action status is set to {@link 149 * org.apache.oozie.client.WorkflowAction.Status#DONE} 150 * 151 * @param externalStatus the action external end status. 152 * @param actionData the action data on completion, <code>null</code> if none. 153 */ 154 void setExecutionData(String externalStatus, Properties actionData); 155 156 /** 157 * Set execution statistics information for a particular action. The action status is set to {@link 158 * org.apache.oozie.client.WorkflowAction.Status#DONE} 159 * 160 * @param jsonStats the JSON string representation of the stats. 161 */ 162 void setExecutionStats(String jsonStats); 163 164 /** 165 * Set external child IDs for a particular action (Eg: pig). The action status is set to {@link 166 * org.apache.oozie.client.WorkflowAction.Status#DONE} 167 * 168 * @param externalChildIDs the external child IDs as a comma-delimited string. 169 */ 170 void setExternalChildIDs(String externalChildIDs); 171 172 /** 173 * Set the action end completion information for a completed action. 174 * 175 * @param status the action end status, it can be {@link org.apache.oozie.client.WorkflowAction.Status#OK} or 176 * {@link org.apache.oozie.client.WorkflowAction.Status#ERROR}. 177 * @param signalValue the action external end status. 178 */ 179 void setEndData(WorkflowAction.Status status, String signalValue); 180 181 /** 182 * Return if the executor invocation is a retry or not. 183 * 184 * @return if the executor invocation is a retry or not. 185 */ 186 boolean isRetry(); 187 188 /** 189 * Sets the external status for the action in context. 190 * 191 * @param externalStatus the external status. 192 */ 193 void setExternalStatus(String externalStatus); 194 195 /** 196 * Get the Action Recovery ID. 197 * 198 * @return recovery ID. 199 */ 200 String getRecoveryId(); 201 202 /* 203 * @return the path that will be used to store action specific data 204 * @throws IOException @throws URISyntaxException @throws HadoopAccessorException 205 */ 206 Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException; 207 208 /** 209 * @return filesystem handle for the application deployment fs. 210 * @throws IOException 211 * @throws URISyntaxException 212 * @throws HadoopAccessorException 213 */ 214 FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException; 215 216 void setErrorInfo(String str, String exMsg); 217 } 218 219 220 /** 221 * Define the default inteval in seconds between retries. 222 */ 223 public static final long RETRY_INTERVAL = 60; 224 225 private String type; 226 private int maxRetries; 227 private long retryInterval; 228 private RETRYPOLICY retryPolicy; 229 230 /** 231 * Create an action executor with default retry parameters. 232 * 233 * @param type action executor type. 234 */ 235 protected ActionExecutor(String type) { 236 this(type, RETRY_INTERVAL); 237 } 238 239 /** 240 * Create an action executor. 241 * 242 * @param type action executor type. 243 * @param defaultRetryInterval retry interval, in seconds. 244 */ 245 protected ActionExecutor(String type, long defaultRetryInterval) { 246 this.type = ParamChecker.notEmpty(type, "type"); 247 this.maxRetries = ConfigurationService.getInt(MAX_RETRIES); 248 int retryInterval = ConfigurationService.getInt(ACTION_RETRY_INTERVAL); 249 this.retryInterval = retryInterval > 0 ? retryInterval : defaultRetryInterval; 250 this.retryPolicy = getRetryPolicyFromConf(); 251 } 252 253 private RETRYPOLICY getRetryPolicyFromConf() { 254 String retryPolicy = ConfigurationService.get(ACTION_RETRY_POLICY); 255 if (StringUtils.isBlank(retryPolicy)) { 256 return RETRYPOLICY.PERIODIC; 257 } else { 258 try { 259 return RETRYPOLICY.valueOf(retryPolicy.toUpperCase().trim()); 260 } catch (IllegalArgumentException e) { 261 return RETRYPOLICY.PERIODIC; 262 } 263 } 264 } 265 266 /** 267 * Clear all init settings for all action types. 268 */ 269 public static void resetInitInfo() { 270 if (!initMode) { 271 throw new IllegalStateException("Error, action type info locked"); 272 } 273 ERROR_INFOS.clear(); 274 } 275 276 /** 277 * Enable action type initialization. 278 */ 279 public static void enableInit() { 280 initMode = true; 281 } 282 283 /** 284 * Disable action type initialization. 285 */ 286 public static void disableInit() { 287 initMode = false; 288 } 289 290 /** 291 * Invoked once at system initialization time. <p> It can be used to register error information for the expected 292 * exceptions. Exceptions should be register from subclasses to superclasses to ensure proper detection, same thing 293 * that it is done in a normal catch. <p> This method should invoke the {@link #registerError} method to register 294 * all its possible errors. <p> Subclasses overriding must invoke super. 295 */ 296 public void initActionType() { 297 XLog.getLog(getClass()).trace(" Init Action Type : [{0}]", getType()); 298 ERROR_INFOS.put(getType(), new LinkedHashMap<String, ErrorInfo>()); 299 } 300 301 /** 302 * Return the system ID, this ID is defined in Oozie configuration. 303 * 304 * @return the system ID. 305 */ 306 public String getOozieSystemId() { 307 return Services.get().getSystemId(); 308 } 309 310 /** 311 * Return the runtime directory of the Oozie instance. <p> The directory is created under TMP and it is always a 312 * new directory per system initialization. 313 * 314 * @return the runtime directory of the Oozie instance. 315 */ 316 public String getOozieRuntimeDir() { 317 return Services.get().getRuntimeDir(); 318 } 319 320 /** 321 * Return Oozie configuration. <p> This is useful for actions that need access to configuration properties. 322 * 323 * @return Oozie configuration. 324 */ 325 public Configuration getOozieConf() { 326 return Services.get().getConf(); 327 } 328 329 /** 330 * Register error handling information for an exception. 331 * 332 * @param exClass exception class name (to work in case of a particular exception not being in the classpath, needed 333 * to be able to handle multiple version of Hadoop or other JARs used by executors with the same codebase). 334 * @param errorType error type for the exception. 335 * @param errorCode error code for the exception. 336 */ 337 protected void registerError(String exClass, ActionExecutorException.ErrorType errorType, String errorCode) { 338 if (!initMode) { 339 throw new IllegalStateException("Error, action type info locked"); 340 } 341 try { 342 Class errorClass = Thread.currentThread().getContextClassLoader().loadClass(exClass); 343 Map<String, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType()); 344 executorErrorInfo.put(exClass, new ErrorInfo(errorType, errorCode, errorClass)); 345 } 346 catch (ClassNotFoundException cnfe) { 347 XLog.getLog(getClass()).warn( 348 "Exception [{0}] not in classpath, ActionExecutor [{1}] will handle it as ERROR", exClass, 349 getType()); 350 } 351 catch (java.lang.NoClassDefFoundError err) { 352 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 353 err.printStackTrace(new PrintStream(baos)); 354 XLog.getLog(getClass()).warn(baos.toString()); 355 } 356 } 357 358 /** 359 * Return the action executor type. 360 * 361 * @return the action executor type. 362 */ 363 public String getType() { 364 return type; 365 } 366 367 /** 368 * Return the maximum number of retries for the action executor. 369 * 370 * @return the maximum number of retries for the action executor. 371 */ 372 public int getMaxRetries() { 373 return maxRetries; 374 } 375 376 /** 377 * Set the maximum number of retries for the action executor. 378 * 379 * @param maxRetries the maximum number of retries. 380 */ 381 public void setMaxRetries(int maxRetries) { 382 this.maxRetries = maxRetries; 383 } 384 385 /** 386 * Return the retry policy for the action executor. 387 * 388 * @return the retry policy for the action executor. 389 */ 390 public RETRYPOLICY getRetryPolicy() { 391 return retryPolicy; 392 } 393 394 /** 395 * Sets the retry policy for the action executor. 396 * 397 * @param retryPolicy retry policy for the action executor. 398 */ 399 public void setRetryPolicy(RETRYPOLICY retryPolicy) { 400 this.retryPolicy = retryPolicy; 401 } 402 403 /** 404 * Return the retry interval for the action executor in seconds. 405 * 406 * @return the retry interval for the action executor in seconds. 407 */ 408 public long getRetryInterval() { 409 return retryInterval; 410 } 411 412 /** 413 * Sets the retry interval for the action executor. 414 * 415 * @param retryInterval retry interval in seconds. 416 */ 417 public void setRetryInterval(long retryInterval) { 418 this.retryInterval = retryInterval; 419 } 420 421 /** 422 * Utility method to handle exceptions in the {@link #start}, {@link #end}, {@link #kill} and {@link #check} methods 423 * <p> It uses the error registry to convert exceptions to {@link ActionExecutorException}s. 424 * 425 * @param ex exception to convert. 426 * @return ActionExecutorException converted exception. 427 */ 428 @SuppressWarnings({"ThrowableInstanceNeverThrown"}) 429 protected ActionExecutorException convertException(Exception ex) { 430 if (ex instanceof ActionExecutorException) { 431 return (ActionExecutorException) ex; 432 } 433 434 ActionExecutorException aee = null; 435 // Check the cause of the exception first 436 if (ex.getCause() != null) { 437 aee = convertExceptionHelper(ex.getCause()); 438 } 439 // If the cause isn't registered or doesn't exist, check the exception itself 440 if (aee == null) { 441 aee = convertExceptionHelper(ex); 442 // If the cause isn't registered either, then just create a new ActionExecutorException 443 if (aee == null) { 444 String exClass = ex.getClass().getName(); 445 String errorCode = exClass.substring(exClass.lastIndexOf(".") + 1); 446 aee = new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, errorCode, "{0}", ex.getMessage(), ex); 447 } 448 } 449 return aee; 450 } 451 452 private ActionExecutorException convertExceptionHelper(Throwable ex) { 453 Map<String, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType()); 454 // Check if we have registered ex 455 ErrorInfo classErrorInfo = executorErrorInfo.get(ex.getClass().getName()); 456 if (classErrorInfo != null) { 457 return new ActionExecutorException(classErrorInfo.errorType, classErrorInfo.errorCode, "{0}", ex.getMessage(), ex); 458 } 459 // Else, check if a parent class of ex is registered 460 else { 461 for (ErrorInfo errorInfo : executorErrorInfo.values()) { 462 if (errorInfo.errorClass.isInstance(ex)) { 463 return new ActionExecutorException(errorInfo.errorType, errorInfo.errorCode, "{0}", ex.getMessage(), ex); 464 } 465 } 466 } 467 return null; 468 } 469 470 /** 471 * Convenience method that return the signal for an Action based on the action status. 472 * 473 * @param status action status. 474 * @return the action signal. 475 */ 476 protected String getActionSignal(WorkflowAction.Status status) { 477 switch (status) { 478 case OK: 479 return "OK"; 480 case ERROR: 481 case KILLED: 482 return "ERROR"; 483 default: 484 throw new IllegalArgumentException("Action status for signal can only be OK or ERROR"); 485 } 486 } 487 488 /** 489 * Return the path that will be used to store action specific data 490 * 491 * @param jobId Worfklow ID 492 * @param action Action 493 * @param key An Identifier 494 * @param temp temp directory flag 495 * @return A string that has the path 496 */ 497 protected String getActionDirPath(String jobId, WorkflowAction action, String key, boolean temp) { 498 String name = jobId + "/" + action.getName() + "--" + key; 499 if (temp) { 500 name += ".temp"; 501 } 502 return getOozieSystemId() + "/" + name; 503 } 504 505 /** 506 * Return the path that will be used to store action specific data. 507 * 508 * @param jobId Workflow ID 509 * @param action Action 510 * @param key An identifier 511 * @param temp Temp directory flag 512 * @return Path to the directory 513 */ 514 public Path getActionDir(String jobId, WorkflowAction action, String key, boolean temp) { 515 return new Path(getActionDirPath(jobId, action, key, temp)); 516 } 517 518 /** 519 * Start an action. <p> The {@link Context#setStartData} method must be called within this method. <p> If the 520 * action has completed, the {@link Context#setExecutionData} method must be called within this method. 521 * 522 * @param context executor context. 523 * @param action the action to start. 524 * @throws ActionExecutorException thrown if the action could not start. 525 */ 526 public abstract void start(Context context, WorkflowAction action) throws ActionExecutorException; 527 528 /** 529 * End an action after it has executed. <p> The {@link Context#setEndData} method must be called within this 530 * method. 531 * 532 * @param context executor context. 533 * @param action the action to end. 534 * @throws ActionExecutorException thrown if the action could not end. 535 */ 536 public abstract void end(Context context, WorkflowAction action) throws ActionExecutorException; 537 538 /** 539 * Check if an action has completed. This method must be implemented by Async Action Executors. <p> If the action 540 * has completed, the {@link Context#setExecutionData} method must be called within this method. <p> If the action 541 * has not completed, the {@link Context#setExternalStatus} method must be called within this method. 542 * 543 * @param context executor context. 544 * @param action the action to end. 545 * @throws ActionExecutorException thrown if the action could not be checked. 546 */ 547 public abstract void check(Context context, WorkflowAction action) throws ActionExecutorException; 548 549 /** 550 * Kill an action. <p> The {@link Context#setEndData} method must be called within this method. 551 * 552 * @param context executor context. 553 * @param action the action to kill. 554 * @throws ActionExecutorException thrown if the action could not be killed. 555 */ 556 public abstract void kill(Context context, WorkflowAction action) throws ActionExecutorException; 557 558 /** 559 * Return if the external status indicates that the action has completed. 560 * 561 * @param externalStatus external status to check. 562 * @return if the external status indicates that the action has completed. 563 */ 564 public abstract boolean isCompleted(String externalStatus); 565 566 /** 567 * Returns true if this action type requires a NameNode and JobTracker. These can either be specified directly in the action 568 * via <name-node> and <job-tracker>, from the fields in the global section, or from their default values. If 569 * false, Oozie won't ensure (i.e. won't throw an Exception if non-existant) that this action type has these values. 570 * 571 * @return true if a NameNode and JobTracker are required; false if not 572 */ 573 public boolean requiresNameNodeJobTracker() { 574 return false; 575 } 576 577 /** 578 * Returns true if this action type supports a Configuration and JobXML. In this case, Oozie will include the 579 * <configuration> and <job-xml> elements from the global section (if provided) with the action. If false, Oozie 580 * won't add these. 581 * 582 * @return true if the global section's Configuration and JobXML should be given; false if not 583 */ 584 public boolean supportsConfigurationJobXML() { 585 return false; 586 } 587 588 /** 589 * Creating and forwarding the tag, It will be useful during repeat attempts of Launcher, to ensure only 590 * one child job is running. Tag is formed as follows: 591 * For workflow job, tag = action-id 592 * For Coord job, tag = coord-action-id@action-name (if not part of sub flow), else 593 * coord-action-id@subflow-action-name@action-name. 594 * @param conf the conf 595 * @param wfJob the wf job 596 * @param action the action 597 * @return the action yarn tag 598 */ 599 public static String getActionYarnTag(Configuration conf, WorkflowJob wfJob, WorkflowAction action) { 600 if (conf.get(OOZIE_ACTION_YARN_TAG) != null) { 601 return conf.get(OOZIE_ACTION_YARN_TAG) + "@" + action.getName(); 602 } 603 else if (wfJob.getParentId() != null) { 604 return wfJob.getParentId() + "@" + action.getName(); 605 } 606 else { 607 return action.getId(); 608 } 609 } 610}