001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action; 019 020 import org.apache.hadoop.fs.FileSystem; 021 import org.apache.hadoop.fs.Path; 022 import org.apache.hadoop.conf.Configuration; 023 import org.apache.oozie.client.WorkflowAction; 024 import org.apache.oozie.client.WorkflowJob; 025 import org.apache.oozie.util.ELEvaluator; 026 import org.apache.oozie.util.ParamChecker; 027 import org.apache.oozie.util.XLog; 028 import org.apache.oozie.service.HadoopAccessorException; 029 import org.apache.oozie.service.Services; 030 import org.apache.oozie.servlet.CallbackServlet; 031 032 import java.io.IOException; 033 import java.net.URISyntaxException; 034 import java.util.HashMap; 035 import java.util.Map; 036 import java.util.Properties; 037 import java.util.LinkedHashMap; 038 039 /** 040 * Base action executor class. <p/> All the action executors should extend this class. 041 */ 042 public abstract class ActionExecutor { 043 044 /** 045 * Configuration prefix for action executor (sub-classes) properties. 046 */ 047 public static final String CONF_PREFIX = "oozie.action."; 048 049 public static final String MAX_RETRIES = CONF_PREFIX + "retries.max"; 050 051 /** 052 * Error code used by {@link #convertException} when there is not register error information for an exception. 053 */ 054 public static final String ERROR_OTHER = "OTHER"; 055 056 private static class ErrorInfo { 057 ActionExecutorException.ErrorType errorType; 058 String errorCode; 059 060 private ErrorInfo(ActionExecutorException.ErrorType errorType, String errorCode) { 061 this.errorType = errorType; 062 this.errorCode = errorCode; 063 } 064 } 065 066 private static boolean initMode = false; 067 private static Map<String, Map<Class, ErrorInfo>> ERROR_INFOS = new HashMap<String, Map<Class, ErrorInfo>>(); 068 069 /** 070 * Context information passed to the ActionExecutor methods. 071 */ 072 public interface Context { 073 074 /** 075 * Create the callback URL for the action. 076 * 077 * @param externalStatusVar variable for the caller to inject the external status. 078 * @return the callback URL. 079 */ 080 public String getCallbackUrl(String externalStatusVar); 081 082 /** 083 * Return a proto configuration for actions with auth properties already set. 084 * 085 * @return a proto configuration for actions with auth properties already set. 086 */ 087 public Configuration getProtoActionConf(); 088 089 /** 090 * Return the workflow job. 091 * 092 * @return the workflow job. 093 */ 094 public WorkflowJob getWorkflow(); 095 096 /** 097 * Return an ELEvaluator with the context injected. 098 * 099 * @return configured ELEvaluator. 100 */ 101 public ELEvaluator getELEvaluator(); 102 103 /** 104 * Set a workflow action variable. <p/> Convenience method that prefixes the variable name with the action name 105 * plus a '.'. 106 * 107 * @param name variable name. 108 * @param value variable value, <code>null</code> removes the variable. 109 */ 110 public void setVar(String name, String value); 111 112 /** 113 * Get a workflow action variable. <p/> Convenience method that prefixes the variable name with the action name 114 * plus a '.'. 115 * 116 * @param name variable name. 117 * @return the variable value, <code>null</code> if not set. 118 */ 119 public String getVar(String name); 120 121 /** 122 * Set the action tracking information for an successfully started action. 123 * 124 * @param externalId the action external ID. 125 * @param trackerUri the action tracker URI. 126 * @param consoleUrl the action console URL. 127 */ 128 void setStartData(String externalId, String trackerUri, String consoleUrl); 129 130 /** 131 * Set the action execution completion information for an action. The action status is set to {@link 132 * org.apache.oozie.client.WorkflowAction.Status#DONE} 133 * 134 * @param externalStatus the action external end status. 135 * @param actionData the action data on completion, <code>null</code> if none. 136 */ 137 void setExecutionData(String externalStatus, Properties actionData); 138 139 /** 140 * Set execution statistics information for a particular action. The action status is set to {@link 141 * org.apache.oozie.client.WorkflowAction.Status#DONE} 142 * 143 * @param jsonStats the JSON string representation of the stats. 144 */ 145 void setExecutionStats(String jsonStats); 146 147 /** 148 * Set external child IDs for a particular action (Eg: pig). The action status is set to {@link 149 * org.apache.oozie.client.WorkflowAction.Status#DONE} 150 * 151 * @param externalChildIDs the external child IDs as a comma-delimited string. 152 */ 153 void setExternalChildIDs(String externalChildIDs); 154 155 /** 156 * Set the action end completion information for a completed action. 157 * 158 * @param status the action end status, it can be {@link org.apache.oozie.client.WorkflowAction.Status#OK} or 159 * {@link org.apache.oozie.client.WorkflowAction.Status#ERROR}. 160 * @param signalValue the action external end status. 161 */ 162 void setEndData(WorkflowAction.Status status, String signalValue); 163 164 /** 165 * Return if the executor invocation is a retry or not. 166 * 167 * @return if the executor invocation is a retry or not. 168 */ 169 boolean isRetry(); 170 171 /** 172 * Sets the external status for the action in context. 173 * 174 * @param externalStatus the external status. 175 */ 176 void setExternalStatus(String externalStatus); 177 178 /** 179 * Get the Action Recovery ID. 180 * 181 * @return recovery ID. 182 */ 183 String getRecoveryId(); 184 185 /* 186 * @return the path that will be used to store action specific data 187 * @throws IOException @throws URISyntaxException @throws HadoopAccessorException 188 */ 189 public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException; 190 191 /** 192 * @return filesystem handle for the application deployment fs. 193 * @throws IOException 194 * @throws URISyntaxException 195 * @throws HadoopAccessorException 196 */ 197 public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException; 198 199 public void setErrorInfo(String str, String exMsg); 200 } 201 202 203 /** 204 * Define the default inteval in seconds between retries. 205 */ 206 public static final long RETRY_INTERVAL = 60; 207 208 private String type; 209 private int maxRetries; 210 private long retryInterval; 211 212 /** 213 * Create an action executor with default retry parameters. 214 * 215 * @param type action executor type. 216 */ 217 protected ActionExecutor(String type) { 218 this(type, RETRY_INTERVAL); 219 } 220 221 /** 222 * Create an action executor. 223 * 224 * @param type action executor type. 225 * @param retryAttempts retry attempts. 226 * @param retryInterval retry interval, in seconds. 227 */ 228 protected ActionExecutor(String type, long retryInterval) { 229 this.type = ParamChecker.notEmpty(type, "type"); 230 this.maxRetries = getOozieConf().getInt(MAX_RETRIES, 3); 231 this.retryInterval = retryInterval; 232 } 233 234 /** 235 * Clear all init settings for all action types. 236 */ 237 public static void resetInitInfo() { 238 if (!initMode) { 239 throw new IllegalStateException("Error, action type info locked"); 240 } 241 ERROR_INFOS.clear(); 242 } 243 244 /** 245 * Enable action type initialization. 246 */ 247 public static void enableInit() { 248 initMode = true; 249 } 250 251 /** 252 * Disable action type initialization. 253 */ 254 public static void disableInit() { 255 initMode = false; 256 } 257 258 /** 259 * Invoked once at system initialization time. <p/> It can be used to register error information for the expected 260 * exceptions. Exceptions should be register from subclasses to superclasses to ensure proper detection, same thing 261 * that it is done in a normal catch. <p/> This method should invoke the {@link #registerError} method to register 262 * all its possible errors. <p/> Subclasses overriding must invoke super. 263 */ 264 public void initActionType() { 265 ERROR_INFOS.put(getType(), new LinkedHashMap<Class, ErrorInfo>()); 266 } 267 268 /** 269 * Return the system ID, this ID is defined in Oozie configuration. 270 * 271 * @return the system ID. 272 */ 273 public String getOozieSystemId() { 274 return Services.get().getSystemId(); 275 } 276 277 /** 278 * Return the runtime directory of the Oozie instance. <p/> The directory is created under TMP and it is always a 279 * new directory per system initialization. 280 * 281 * @return the runtime directory of the Oozie instance. 282 */ 283 public String getOozieRuntimeDir() { 284 return Services.get().getRuntimeDir(); 285 } 286 287 /** 288 * Return Oozie configuration. <p/> This is useful for actions that need access to configuration properties. 289 * 290 * @return Oozie configuration. 291 */ 292 public Configuration getOozieConf() { 293 return Services.get().getConf(); 294 } 295 296 /** 297 * Register error handling information for an exception. 298 * 299 * @param exClass excpetion class name (to work in case of a particular exception not being in the classpath, needed 300 * to be able to handle multiple version of Hadoop or other JARs used by executors with the same codebase). 301 * @param errorType error type for the exception. 302 * @param errorCode error code for the exception. 303 */ 304 protected void registerError(String exClass, ActionExecutorException.ErrorType errorType, String errorCode) { 305 if (!initMode) { 306 throw new IllegalStateException("Error, action type info locked"); 307 } 308 try { 309 Class klass = Thread.currentThread().getContextClassLoader().loadClass(exClass); 310 Map<Class, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType()); 311 executorErrorInfo.put(klass, new ErrorInfo(errorType, errorCode)); 312 } 313 catch (ClassNotFoundException ex) { 314 XLog.getLog(getClass()).warn( 315 "Exception [{0}] no in classpath, ActionExecutor [{1}] will handled it as ERROR", exClass, 316 getType()); 317 } 318 } 319 320 /** 321 * Return the action executor type. 322 * 323 * @return the action executor type. 324 */ 325 public String getType() { 326 return type; 327 } 328 329 /** 330 * Return the maximum number of retries for the action executor. 331 * 332 * @return the maximum number of retries for the action executor. 333 */ 334 public int getMaxRetries() { 335 return maxRetries; 336 } 337 338 /** 339 * Set the maximum number of retries for the action executor. 340 * 341 * @param maxRetries the maximum number of retries. 342 */ 343 public void setMaxRetries(int maxRetries) { 344 this.maxRetries = maxRetries; 345 } 346 347 /** 348 * Return the retry interval for the action executor in seconds. 349 * 350 * @return the retry interval for the action executor in seconds. 351 */ 352 public long getRetryInterval() { 353 return retryInterval; 354 } 355 356 /** 357 * Sets the retry interval for the action executor. 358 * 359 * @param retryInterval retry interval in seconds. 360 */ 361 public void setRetryInterval(long retryInterval) { 362 this.retryInterval = retryInterval; 363 } 364 365 /** 366 * Utility method to handle exceptions in the {@link #start}, {@link #end}, {@link #kill} and {@link #check} methods 367 * <p/> It uses the error registry to convert exceptions to {@link ActionExecutorException}s. 368 * 369 * @param ex exception to convert. 370 * @return ActionExecutorException converted exception. 371 */ 372 @SuppressWarnings({"ThrowableInstanceNeverThrown"}) 373 protected ActionExecutorException convertException(Exception ex) { 374 if (ex instanceof ActionExecutorException) { 375 return (ActionExecutorException) ex; 376 } 377 for (Map.Entry<Class, ErrorInfo> errorInfo : ERROR_INFOS.get(getType()).entrySet()) { 378 if (errorInfo.getKey().isInstance(ex)) { 379 return new ActionExecutorException(errorInfo.getValue().errorType, errorInfo.getValue().errorCode, 380 "{0}", ex.getMessage(), ex); 381 } 382 } 383 String errorCode = ex.getClass().getName(); 384 errorCode = errorCode.substring(errorCode.lastIndexOf(".") + 1); 385 return new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, errorCode, "{0}", ex.getMessage(), 386 ex); 387 } 388 389 /** 390 * Convenience method that return the signal for an Action based on the action status. 391 * 392 * @param status action status. 393 * @return the action signal. 394 */ 395 protected String getActionSignal(WorkflowAction.Status status) { 396 switch (status) { 397 case OK: 398 return "OK"; 399 case ERROR: 400 case KILLED: 401 return "ERROR"; 402 default: 403 throw new IllegalArgumentException("Action status for signal can only be OK or ERROR"); 404 } 405 } 406 407 /** 408 * Return the path that will be used to store action specific data 409 * 410 * @param jobId Worfklow ID 411 * @param action Action 412 * @param key An Identifier 413 * @param temp temp directory flag 414 * @return A string that has the path 415 */ 416 protected String getActionDirPath(String jobId, WorkflowAction action, String key, boolean temp) { 417 String name = jobId + "/" + action.getName() + "--" + key; 418 if (temp) { 419 name += ".temp"; 420 } 421 return getOozieSystemId() + "/" + name; 422 } 423 424 /** 425 * Return the path that will be used to store action specific data. 426 * 427 * @param jobId Workflow ID 428 * @param action Action 429 * @param key An identifier 430 * @param temp Temp directory flag 431 * @return Path to the directory 432 */ 433 public Path getActionDir(String jobId, WorkflowAction action, String key, boolean temp) { 434 return new Path(getActionDirPath(jobId, action, key, temp)); 435 } 436 437 /** 438 * Start an action. <p/> The {@link Context#setStartData} method must be called within this method. <p/> If the 439 * action has completed, the {@link Context#setExecutionData} method must be called within this method. 440 * 441 * @param context executor context. 442 * @param action the action to start. 443 * @throws ActionExecutorException thrown if the action could not start. 444 */ 445 public abstract void start(Context context, WorkflowAction action) throws ActionExecutorException; 446 447 /** 448 * End an action after it has executed. <p/> The {@link Context#setEndData} method must be called within this 449 * method. 450 * 451 * @param context executor context. 452 * @param action the action to end. 453 * @throws ActionExecutorException thrown if the action could not end. 454 */ 455 public abstract void end(Context context, WorkflowAction action) throws ActionExecutorException; 456 457 /** 458 * Check if an action has completed. This method must be implemented by Async Action Executors. <p/> If the action 459 * has completed, the {@link Context#setExecutionData} method must be called within this method. <p/> If the action 460 * has not completed, the {@link Context#setExternalStatus} method must be called within this method. 461 * 462 * @param context executor context. 463 * @param action the action to end. 464 * @throws ActionExecutorException thrown if the action could not be checked. 465 */ 466 public abstract void check(Context context, WorkflowAction action) throws ActionExecutorException; 467 468 /** 469 * Kill an action. <p/> The {@link Context#setEndData} method must be called within this method. 470 * 471 * @param context executor context. 472 * @param action the action to kill. 473 * @throws ActionExecutorException thrown if the action could not be killed. 474 */ 475 public abstract void kill(Context context, WorkflowAction action) throws ActionExecutorException; 476 477 /** 478 * Return if the external status indicates that the action has completed. 479 * 480 * @param externalStatus external status to check. 481 * @return if the external status indicates that the action has completed. 482 */ 483 public abstract boolean isCompleted(String externalStatus); 484 485 }