001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action; 019 020 import org.apache.hadoop.fs.FileSystem; 021 import org.apache.hadoop.fs.Path; 022 import org.apache.hadoop.conf.Configuration; 023 import org.apache.oozie.client.WorkflowAction; 024 import org.apache.oozie.client.WorkflowJob; 025 import org.apache.oozie.util.ELEvaluator; 026 import org.apache.oozie.util.ParamChecker; 027 import org.apache.oozie.util.XLog; 028 import org.apache.oozie.service.HadoopAccessorException; 029 import org.apache.oozie.service.Services; 030 031 import java.io.ByteArrayOutputStream; 032 import java.io.IOException; 033 import java.io.PrintStream; 034 import java.net.URISyntaxException; 035 import java.util.HashMap; 036 import java.util.Map; 037 import java.util.Properties; 038 import java.util.LinkedHashMap; 039 040 /** 041 * Base action executor class. <p/> All the action executors should extend this class. 042 */ 043 public abstract class ActionExecutor { 044 045 /** 046 * Configuration prefix for action executor (sub-classes) properties. 047 */ 048 public static final String CONF_PREFIX = "oozie.action."; 049 050 public static final String MAX_RETRIES = CONF_PREFIX + "retries.max"; 051 052 /** 053 * Error code used by {@link #convertException} when there is not register error information for an exception. 054 */ 055 public static final String ERROR_OTHER = "OTHER"; 056 057 public boolean requiresNNJT = false; 058 059 private static class ErrorInfo { 060 ActionExecutorException.ErrorType errorType; 061 String errorCode; 062 Class<?> errorClass; 063 064 private ErrorInfo(ActionExecutorException.ErrorType errorType, String errorCode, Class<?> errorClass) { 065 this.errorType = errorType; 066 this.errorCode = errorCode; 067 this.errorClass = errorClass; 068 } 069 } 070 071 private static boolean initMode = false; 072 private static Map<String, Map<String, ErrorInfo>> ERROR_INFOS = new HashMap<String, Map<String, ErrorInfo>>(); 073 074 /** 075 * Context information passed to the ActionExecutor methods. 076 */ 077 public interface Context { 078 079 /** 080 * Create the callback URL for the action. 081 * 082 * @param externalStatusVar variable for the caller to inject the external status. 083 * @return the callback URL. 084 */ 085 public String getCallbackUrl(String externalStatusVar); 086 087 /** 088 * Return a proto configuration for actions with auth properties already set. 089 * 090 * @return a proto configuration for actions with auth properties already set. 091 */ 092 public Configuration getProtoActionConf(); 093 094 /** 095 * Return the workflow job. 096 * 097 * @return the workflow job. 098 */ 099 public WorkflowJob getWorkflow(); 100 101 /** 102 * Return an ELEvaluator with the context injected. 103 * 104 * @return configured ELEvaluator. 105 */ 106 public ELEvaluator getELEvaluator(); 107 108 /** 109 * Set a workflow action variable. <p/> Convenience method that prefixes the variable name with the action name 110 * plus a '.'. 111 * 112 * @param name variable name. 113 * @param value variable value, <code>null</code> removes the variable. 114 */ 115 public void setVar(String name, String value); 116 117 /** 118 * Get a workflow action variable. <p/> Convenience method that prefixes the variable name with the action name 119 * plus a '.'. 120 * 121 * @param name variable name. 122 * @return the variable value, <code>null</code> if not set. 123 */ 124 public String getVar(String name); 125 126 /** 127 * Set the action tracking information for an successfully started action. 128 * 129 * @param externalId the action external ID. 130 * @param trackerUri the action tracker URI. 131 * @param consoleUrl the action console URL. 132 */ 133 void setStartData(String externalId, String trackerUri, String consoleUrl); 134 135 /** 136 * Set the action execution completion information for an action. The action status is set to {@link 137 * org.apache.oozie.client.WorkflowAction.Status#DONE} 138 * 139 * @param externalStatus the action external end status. 140 * @param actionData the action data on completion, <code>null</code> if none. 141 */ 142 void setExecutionData(String externalStatus, Properties actionData); 143 144 /** 145 * Set execution statistics information for a particular action. The action status is set to {@link 146 * org.apache.oozie.client.WorkflowAction.Status#DONE} 147 * 148 * @param jsonStats the JSON string representation of the stats. 149 */ 150 void setExecutionStats(String jsonStats); 151 152 /** 153 * Set external child IDs for a particular action (Eg: pig). The action status is set to {@link 154 * org.apache.oozie.client.WorkflowAction.Status#DONE} 155 * 156 * @param externalChildIDs the external child IDs as a comma-delimited string. 157 */ 158 void setExternalChildIDs(String externalChildIDs); 159 160 /** 161 * Set the action end completion information for a completed action. 162 * 163 * @param status the action end status, it can be {@link org.apache.oozie.client.WorkflowAction.Status#OK} or 164 * {@link org.apache.oozie.client.WorkflowAction.Status#ERROR}. 165 * @param signalValue the action external end status. 166 */ 167 void setEndData(WorkflowAction.Status status, String signalValue); 168 169 /** 170 * Return if the executor invocation is a retry or not. 171 * 172 * @return if the executor invocation is a retry or not. 173 */ 174 boolean isRetry(); 175 176 /** 177 * Sets the external status for the action in context. 178 * 179 * @param externalStatus the external status. 180 */ 181 void setExternalStatus(String externalStatus); 182 183 /** 184 * Get the Action Recovery ID. 185 * 186 * @return recovery ID. 187 */ 188 String getRecoveryId(); 189 190 /* 191 * @return the path that will be used to store action specific data 192 * @throws IOException @throws URISyntaxException @throws HadoopAccessorException 193 */ 194 public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException; 195 196 /** 197 * @return filesystem handle for the application deployment fs. 198 * @throws IOException 199 * @throws URISyntaxException 200 * @throws HadoopAccessorException 201 */ 202 public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException; 203 204 public void setErrorInfo(String str, String exMsg); 205 } 206 207 208 /** 209 * Define the default inteval in seconds between retries. 210 */ 211 public static final long RETRY_INTERVAL = 60; 212 213 private String type; 214 private int maxRetries; 215 private long retryInterval; 216 217 /** 218 * Create an action executor with default retry parameters. 219 * 220 * @param type action executor type. 221 */ 222 protected ActionExecutor(String type) { 223 this(type, RETRY_INTERVAL); 224 } 225 226 /** 227 * Create an action executor. 228 * 229 * @param type action executor type. 230 * @param retryAttempts retry attempts. 231 * @param retryInterval retry interval, in seconds. 232 */ 233 protected ActionExecutor(String type, long retryInterval) { 234 this.type = ParamChecker.notEmpty(type, "type"); 235 this.maxRetries = getOozieConf().getInt(MAX_RETRIES, 3); 236 this.retryInterval = retryInterval; 237 } 238 239 /** 240 * Clear all init settings for all action types. 241 */ 242 public static void resetInitInfo() { 243 if (!initMode) { 244 throw new IllegalStateException("Error, action type info locked"); 245 } 246 ERROR_INFOS.clear(); 247 } 248 249 /** 250 * Enable action type initialization. 251 */ 252 public static void enableInit() { 253 initMode = true; 254 } 255 256 /** 257 * Disable action type initialization. 258 */ 259 public static void disableInit() { 260 initMode = false; 261 } 262 263 /** 264 * Invoked once at system initialization time. <p/> It can be used to register error information for the expected 265 * exceptions. Exceptions should be register from subclasses to superclasses to ensure proper detection, same thing 266 * that it is done in a normal catch. <p/> This method should invoke the {@link #registerError} method to register 267 * all its possible errors. <p/> Subclasses overriding must invoke super. 268 */ 269 public void initActionType() { 270 XLog.getLog(getClass()).trace(" Init Action Type : [{0}]", getType()); 271 ERROR_INFOS.put(getType(), new LinkedHashMap<String, ErrorInfo>()); 272 } 273 274 /** 275 * Return the system ID, this ID is defined in Oozie configuration. 276 * 277 * @return the system ID. 278 */ 279 public String getOozieSystemId() { 280 return Services.get().getSystemId(); 281 } 282 283 /** 284 * Return the runtime directory of the Oozie instance. <p/> The directory is created under TMP and it is always a 285 * new directory per system initialization. 286 * 287 * @return the runtime directory of the Oozie instance. 288 */ 289 public String getOozieRuntimeDir() { 290 return Services.get().getRuntimeDir(); 291 } 292 293 /** 294 * Return Oozie configuration. <p/> This is useful for actions that need access to configuration properties. 295 * 296 * @return Oozie configuration. 297 */ 298 public Configuration getOozieConf() { 299 return Services.get().getConf(); 300 } 301 302 /** 303 * Register error handling information for an exception. 304 * 305 * @param exClass excpetion class name (to work in case of a particular exception not being in the classpath, needed 306 * to be able to handle multiple version of Hadoop or other JARs used by executors with the same codebase). 307 * @param errorType error type for the exception. 308 * @param errorCode error code for the exception. 309 */ 310 protected void registerError(String exClass, ActionExecutorException.ErrorType errorType, String errorCode) { 311 if (!initMode) { 312 throw new IllegalStateException("Error, action type info locked"); 313 } 314 try { 315 Class errorClass = Thread.currentThread().getContextClassLoader().loadClass(exClass); 316 Map<String, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType()); 317 executorErrorInfo.put(exClass, new ErrorInfo(errorType, errorCode, errorClass)); 318 } 319 catch (ClassNotFoundException cnfe) { 320 XLog.getLog(getClass()).warn( 321 "Exception [{0}] not in classpath, ActionExecutor [{1}] will handle it as ERROR", exClass, 322 getType()); 323 } 324 catch (java.lang.NoClassDefFoundError err) { 325 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 326 err.printStackTrace(new PrintStream(baos)); 327 XLog.getLog(getClass()).warn(baos.toString()); 328 } 329 } 330 331 /** 332 * Return the action executor type. 333 * 334 * @return the action executor type. 335 */ 336 public String getType() { 337 return type; 338 } 339 340 /** 341 * Return the maximum number of retries for the action executor. 342 * 343 * @return the maximum number of retries for the action executor. 344 */ 345 public int getMaxRetries() { 346 return maxRetries; 347 } 348 349 /** 350 * Set the maximum number of retries for the action executor. 351 * 352 * @param maxRetries the maximum number of retries. 353 */ 354 public void setMaxRetries(int maxRetries) { 355 this.maxRetries = maxRetries; 356 } 357 358 /** 359 * Return the retry interval for the action executor in seconds. 360 * 361 * @return the retry interval for the action executor in seconds. 362 */ 363 public long getRetryInterval() { 364 return retryInterval; 365 } 366 367 /** 368 * Sets the retry interval for the action executor. 369 * 370 * @param retryInterval retry interval in seconds. 371 */ 372 public void setRetryInterval(long retryInterval) { 373 this.retryInterval = retryInterval; 374 } 375 376 /** 377 * Utility method to handle exceptions in the {@link #start}, {@link #end}, {@link #kill} and {@link #check} methods 378 * <p/> It uses the error registry to convert exceptions to {@link ActionExecutorException}s. 379 * 380 * @param ex exception to convert. 381 * @return ActionExecutorException converted exception. 382 */ 383 @SuppressWarnings({"ThrowableInstanceNeverThrown"}) 384 protected ActionExecutorException convertException(Exception ex) { 385 if (ex instanceof ActionExecutorException) { 386 return (ActionExecutorException) ex; 387 } 388 389 ActionExecutorException aee = null; 390 // Check the cause of the exception first 391 if (ex.getCause() != null) { 392 aee = convertExceptionHelper(ex.getCause()); 393 } 394 // If the cause isn't registered or doesn't exist, check the exception itself 395 if (aee == null) { 396 aee = convertExceptionHelper(ex); 397 // If the cause isn't registered either, then just create a new ActionExecutorException 398 if (aee == null) { 399 String exClass = ex.getClass().getName(); 400 String errorCode = exClass.substring(exClass.lastIndexOf(".") + 1); 401 aee = new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, errorCode, "{0}", ex.getMessage(), ex); 402 } 403 } 404 return aee; 405 } 406 407 private ActionExecutorException convertExceptionHelper(Throwable ex) { 408 Map<String, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType()); 409 // Check if we have registered ex 410 ErrorInfo classErrorInfo = executorErrorInfo.get(ex.getClass().getName()); 411 if (classErrorInfo != null) { 412 return new ActionExecutorException(classErrorInfo.errorType, classErrorInfo.errorCode, "{0}", ex.getMessage(), ex); 413 } 414 // Else, check if a parent class of ex is registered 415 else { 416 for (ErrorInfo errorInfo : executorErrorInfo.values()) { 417 if (errorInfo.errorClass.isInstance(ex)) { 418 return new ActionExecutorException(errorInfo.errorType, errorInfo.errorCode, "{0}", ex.getMessage(), ex); 419 } 420 } 421 } 422 return null; 423 } 424 425 /** 426 * Convenience method that return the signal for an Action based on the action status. 427 * 428 * @param status action status. 429 * @return the action signal. 430 */ 431 protected String getActionSignal(WorkflowAction.Status status) { 432 switch (status) { 433 case OK: 434 return "OK"; 435 case ERROR: 436 case KILLED: 437 return "ERROR"; 438 default: 439 throw new IllegalArgumentException("Action status for signal can only be OK or ERROR"); 440 } 441 } 442 443 /** 444 * Return the path that will be used to store action specific data 445 * 446 * @param jobId Worfklow ID 447 * @param action Action 448 * @param key An Identifier 449 * @param temp temp directory flag 450 * @return A string that has the path 451 */ 452 protected String getActionDirPath(String jobId, WorkflowAction action, String key, boolean temp) { 453 String name = jobId + "/" + action.getName() + "--" + key; 454 if (temp) { 455 name += ".temp"; 456 } 457 return getOozieSystemId() + "/" + name; 458 } 459 460 /** 461 * Return the path that will be used to store action specific data. 462 * 463 * @param jobId Workflow ID 464 * @param action Action 465 * @param key An identifier 466 * @param temp Temp directory flag 467 * @return Path to the directory 468 */ 469 public Path getActionDir(String jobId, WorkflowAction action, String key, boolean temp) { 470 return new Path(getActionDirPath(jobId, action, key, temp)); 471 } 472 473 /** 474 * Start an action. <p/> The {@link Context#setStartData} method must be called within this method. <p/> If the 475 * action has completed, the {@link Context#setExecutionData} method must be called within this method. 476 * 477 * @param context executor context. 478 * @param action the action to start. 479 * @throws ActionExecutorException thrown if the action could not start. 480 */ 481 public abstract void start(Context context, WorkflowAction action) throws ActionExecutorException; 482 483 /** 484 * End an action after it has executed. <p/> The {@link Context#setEndData} method must be called within this 485 * method. 486 * 487 * @param context executor context. 488 * @param action the action to end. 489 * @throws ActionExecutorException thrown if the action could not end. 490 */ 491 public abstract void end(Context context, WorkflowAction action) throws ActionExecutorException; 492 493 /** 494 * Check if an action has completed. This method must be implemented by Async Action Executors. <p/> If the action 495 * has completed, the {@link Context#setExecutionData} method must be called within this method. <p/> If the action 496 * has not completed, the {@link Context#setExternalStatus} method must be called within this method. 497 * 498 * @param context executor context. 499 * @param action the action to end. 500 * @throws ActionExecutorException thrown if the action could not be checked. 501 */ 502 public abstract void check(Context context, WorkflowAction action) throws ActionExecutorException; 503 504 /** 505 * Kill an action. <p/> The {@link Context#setEndData} method must be called within this method. 506 * 507 * @param context executor context. 508 * @param action the action to kill. 509 * @throws ActionExecutorException thrown if the action could not be killed. 510 */ 511 public abstract void kill(Context context, WorkflowAction action) throws ActionExecutorException; 512 513 /** 514 * Return if the external status indicates that the action has completed. 515 * 516 * @param externalStatus external status to check. 517 * @return if the external status indicates that the action has completed. 518 */ 519 public abstract boolean isCompleted(String externalStatus); 520 521 }