001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action; 019 020 import org.apache.hadoop.fs.FileSystem; 021 import org.apache.hadoop.fs.Path; 022 import org.apache.hadoop.conf.Configuration; 023 import org.apache.oozie.client.WorkflowAction; 024 import org.apache.oozie.client.WorkflowJob; 025 import org.apache.oozie.util.ELEvaluator; 026 import org.apache.oozie.util.ParamChecker; 027 import org.apache.oozie.util.XLog; 028 import org.apache.oozie.service.HadoopAccessorException; 029 import org.apache.oozie.service.Services; 030 031 import java.io.IOException; 032 import java.net.URISyntaxException; 033 import java.util.HashMap; 034 import java.util.Map; 035 import java.util.Properties; 036 import java.util.LinkedHashMap; 037 038 /** 039 * Base action executor class. <p/> All the action executors should extend this class. 040 */ 041 public abstract class ActionExecutor { 042 043 /** 044 * Configuration prefix for action executor (sub-classes) properties. 045 */ 046 public static final String CONF_PREFIX = "oozie.action."; 047 048 /** 049 * Error code used by {@link #convertException} when there is not register error information for an exception. 050 */ 051 public static final String ERROR_OTHER = "OTHER"; 052 053 private static class ErrorInfo { 054 ActionExecutorException.ErrorType errorType; 055 String errorCode; 056 057 private ErrorInfo(ActionExecutorException.ErrorType errorType, String errorCode) { 058 this.errorType = errorType; 059 this.errorCode = errorCode; 060 } 061 } 062 063 private static boolean initMode = false; 064 private static Map<String, Map<Class, ErrorInfo>> ERROR_INFOS = new HashMap<String, Map<Class, ErrorInfo>>(); 065 066 /** 067 * Context information passed to the ActionExecutor methods. 068 */ 069 public interface Context { 070 071 /** 072 * Create the callback URL for the action. 073 * 074 * @param externalStatusVar variable for the caller to inject the external status. 075 * @return the callback URL. 076 */ 077 public String getCallbackUrl(String externalStatusVar); 078 079 /** 080 * Return a proto configuration for actions with auth properties already set. 081 * 082 * @return a proto configuration for actions with auth properties already set. 083 */ 084 public Configuration getProtoActionConf(); 085 086 /** 087 * Return the workflow job. 088 * 089 * @return the workflow job. 090 */ 091 public WorkflowJob getWorkflow(); 092 093 /** 094 * Return an ELEvaluator with the context injected. 095 * 096 * @return configured ELEvaluator. 097 */ 098 public ELEvaluator getELEvaluator(); 099 100 /** 101 * Set a workflow action variable. <p/> Convenience method that prefixes the variable name with the action name 102 * plus a '.'. 103 * 104 * @param name variable name. 105 * @param value variable value, <code>null</code> removes the variable. 106 */ 107 public void setVar(String name, String value); 108 109 /** 110 * Get a workflow action variable. <p/> Convenience method that prefixes the variable name with the action name 111 * plus a '.'. 112 * 113 * @param name variable name. 114 * @return the variable value, <code>null</code> if not set. 115 */ 116 public String getVar(String name); 117 118 /** 119 * Set the action tracking information for an successfully started action. 120 * 121 * @param externalId the action external ID. 122 * @param trackerUri the action tracker URI. 123 * @param consoleUrl the action console URL. 124 */ 125 void setStartData(String externalId, String trackerUri, String consoleUrl); 126 127 /** 128 * Set the action execution completion information for an action. The action status is set to {@link 129 * org.apache.oozie.client.WorkflowAction.Status#DONE} 130 * 131 * @param externalStatus the action external end status. 132 * @param actionData the action data on completion, <code>null</code> if none. 133 */ 134 void setExecutionData(String externalStatus, Properties actionData); 135 136 /** 137 * Set the action end completion information for a completed action. 138 * 139 * @param status the action end status, it can be {@link org.apache.oozie.client.WorkflowAction.Status#OK} or 140 * {@link org.apache.oozie.client.WorkflowAction.Status#ERROR}. 141 * @param signalValue the action external end status. 142 */ 143 void setEndData(WorkflowAction.Status status, String signalValue); 144 145 /** 146 * Return if the executor invocation is a retry or not. 147 * 148 * @return if the executor invocation is a retry or not. 149 */ 150 boolean isRetry(); 151 152 /** 153 * Sets the external status for the action in context. 154 * 155 * @param externalStatus the external status. 156 */ 157 void setExternalStatus(String externalStatus); 158 159 /** 160 * Get the Action Recovery ID. 161 * 162 * @return recovery ID. 163 */ 164 String getRecoveryId(); 165 166 /* 167 * @return the path that will be used to store action specific data 168 * @throws IOException @throws URISyntaxException @throws HadoopAccessorException 169 */ 170 public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException; 171 172 /** 173 * @return filesystem handle for the application deployment fs. 174 * @throws IOException 175 * @throws URISyntaxException 176 * @throws HadoopAccessorException 177 */ 178 public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException; 179 180 public void setErrorInfo(String str, String exMsg); 181 } 182 183 /** 184 * Define the default maximum number of retry attempts for transient errors (total attempts = 1 + MAX_RETRIES). 185 */ 186 public static final int MAX_RETRIES = 3; 187 188 /** 189 * Define the default inteval in seconds between retries. 190 */ 191 public static final long RETRY_INTERVAL = 60; 192 193 private String type; 194 private int maxRetries; 195 private long retryInterval; 196 197 /** 198 * Create an action executor with default retry parameters. 199 * 200 * @param type action executor type. 201 */ 202 protected ActionExecutor(String type) { 203 this(type, MAX_RETRIES, RETRY_INTERVAL); 204 } 205 206 /** 207 * Create an action executor. 208 * 209 * @param type action executor type. 210 * @param retryAttempts retry attempts. 211 * @param retryInterval retry interval, in seconds. 212 */ 213 protected ActionExecutor(String type, int retryAttempts, long retryInterval) { 214 this.type = ParamChecker.notEmpty(type, "type"); 215 this.maxRetries = retryAttempts; 216 this.retryInterval = retryInterval; 217 } 218 219 /** 220 * Clear all init settings for all action types. 221 */ 222 public static void resetInitInfo() { 223 if (!initMode) { 224 throw new IllegalStateException("Error, action type info locked"); 225 } 226 ERROR_INFOS.clear(); 227 } 228 229 /** 230 * Enable action type initialization. 231 */ 232 public static void enableInit() { 233 initMode = true; 234 } 235 236 /** 237 * Disable action type initialization. 238 */ 239 public static void disableInit() { 240 initMode = false; 241 } 242 243 /** 244 * Invoked once at system initialization time. <p/> It can be used to register error information for the expected 245 * exceptions. Exceptions should be register from subclasses to superclasses to ensure proper detection, same thing 246 * that it is done in a normal catch. <p/> This method should invoke the {@link #registerError} method to register 247 * all its possible errors. <p/> Subclasses overriding must invoke super. 248 */ 249 public void initActionType() { 250 ERROR_INFOS.put(getType(), new LinkedHashMap<Class, ErrorInfo>()); 251 } 252 253 /** 254 * Return the system ID, this ID is defined in Oozie configuration. 255 * 256 * @return the system ID. 257 */ 258 public String getOozieSystemId() { 259 return Services.get().getSystemId(); 260 } 261 262 /** 263 * Return the runtime directory of the Oozie instance. <p/> The directory is created under TMP and it is always a 264 * new directory per system initialization. 265 * 266 * @return the runtime directory of the Oozie instance. 267 */ 268 public String getOozieRuntimeDir() { 269 return Services.get().getRuntimeDir(); 270 } 271 272 /** 273 * Return Oozie configuration. <p/> This is useful for actions that need access to configuration properties. 274 * 275 * @return Oozie configuration. 276 */ 277 public Configuration getOozieConf() { 278 return Services.get().getConf(); 279 } 280 281 /** 282 * Register error handling information for an exception. 283 * 284 * @param exClass excpetion class name (to work in case of a particular exception not being in the classpath, needed 285 * to be able to handle multiple version of Hadoop or other JARs used by executors with the same codebase). 286 * @param errorType error type for the exception. 287 * @param errorCode error code for the exception. 288 */ 289 protected void registerError(String exClass, ActionExecutorException.ErrorType errorType, String errorCode) { 290 if (!initMode) { 291 throw new IllegalStateException("Error, action type info locked"); 292 } 293 try { 294 Class klass = Thread.currentThread().getContextClassLoader().loadClass(exClass); 295 Map<Class, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType()); 296 executorErrorInfo.put(klass, new ErrorInfo(errorType, errorCode)); 297 } 298 catch (ClassNotFoundException ex) { 299 XLog.getLog(getClass()).warn( 300 "Exception [{0}] no in classpath, ActionExecutor [{1}] will handled it as ERROR", exClass, 301 getType()); 302 } 303 } 304 305 /** 306 * Return the action executor type. 307 * 308 * @return the action executor type. 309 */ 310 public String getType() { 311 return type; 312 } 313 314 /** 315 * Return the maximum number of retries for the action executor. 316 * 317 * @return the maximum number of retries for the action executor. 318 */ 319 public int getMaxRetries() { 320 return maxRetries; 321 } 322 323 /** 324 * Set the maximum number of retries for the action executor. 325 * 326 * @param maxRetries the maximum number of retries. 327 */ 328 public void setMaxRetries(int maxRetries) { 329 this.maxRetries = maxRetries; 330 } 331 332 /** 333 * Return the retry interval for the action executor in seconds. 334 * 335 * @return the retry interval for the action executor in seconds. 336 */ 337 public long getRetryInterval() { 338 return retryInterval; 339 } 340 341 /** 342 * Sets the retry interval for the action executor. 343 * 344 * @param retryInterval retry interval in seconds. 345 */ 346 public void setRetryInterval(long retryInterval) { 347 this.retryInterval = retryInterval; 348 } 349 350 /** 351 * Utility method to handle exceptions in the {@link #start}, {@link #end}, {@link #kill} and {@link #check} methods 352 * <p/> It uses the error registry to convert exceptions to {@link ActionExecutorException}s. 353 * 354 * @param ex exception to convert. 355 * @return ActionExecutorException converted exception. 356 */ 357 @SuppressWarnings({"ThrowableInstanceNeverThrown"}) 358 protected ActionExecutorException convertException(Exception ex) { 359 if (ex instanceof ActionExecutorException) { 360 return (ActionExecutorException) ex; 361 } 362 for (Map.Entry<Class, ErrorInfo> errorInfo : ERROR_INFOS.get(getType()).entrySet()) { 363 if (errorInfo.getKey().isInstance(ex)) { 364 return new ActionExecutorException(errorInfo.getValue().errorType, errorInfo.getValue().errorCode, 365 "{0}", ex.getMessage(), ex); 366 } 367 } 368 String errorCode = ex.getClass().getName(); 369 errorCode = errorCode.substring(errorCode.lastIndexOf(".") + 1); 370 return new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, errorCode, "{0}", ex.getMessage(), 371 ex); 372 } 373 374 /** 375 * Convenience method that return the signal for an Action based on the action status. 376 * 377 * @param status action status. 378 * @return the action signal. 379 */ 380 protected String getActionSignal(WorkflowAction.Status status) { 381 switch (status) { 382 case OK: 383 return "OK"; 384 case ERROR: 385 case KILLED: 386 return "ERROR"; 387 default: 388 throw new IllegalArgumentException("Action status for signal can only be OK or ERROR"); 389 } 390 } 391 392 /** 393 * Return the path that will be used to store action specific data 394 * 395 * @param jobId Worfklow ID 396 * @param action Action 397 * @param key An Identifier 398 * @param temp temp directory flag 399 * @return A string that has the path 400 */ 401 protected String getActionDirPath(String jobId, WorkflowAction action, String key, boolean temp) { 402 String name = jobId + "/" + action.getName() + "--" + key; 403 if (temp) { 404 name += ".temp"; 405 } 406 return getOozieSystemId() + "/" + name; 407 } 408 409 /** 410 * Return the path that will be used to store action specific data. 411 * 412 * @param jobId Workflow ID 413 * @param action Action 414 * @param key An identifier 415 * @param temp Temp directory flag 416 * @return Path to the directory 417 */ 418 public Path getActionDir(String jobId, WorkflowAction action, String key, boolean temp) { 419 return new Path(getActionDirPath(jobId, action, key, temp)); 420 } 421 422 /** 423 * Start an action. <p/> The {@link Context#setStartData} method must be called within this method. <p/> If the 424 * action has completed, the {@link Context#setExecutionData} method must be called within this method. 425 * 426 * @param context executor context. 427 * @param action the action to start. 428 * @throws ActionExecutorException thrown if the action could not start. 429 */ 430 public abstract void start(Context context, WorkflowAction action) throws ActionExecutorException; 431 432 /** 433 * End an action after it has executed. <p/> The {@link Context#setEndData} method must be called within this 434 * method. 435 * 436 * @param context executor context. 437 * @param action the action to end. 438 * @throws ActionExecutorException thrown if the action could not end. 439 */ 440 public abstract void end(Context context, WorkflowAction action) throws ActionExecutorException; 441 442 /** 443 * Check if an action has completed. This method must be implemented by Async Action Executors. <p/> If the action 444 * has completed, the {@link Context#setExecutionData} method must be called within this method. <p/> If the action 445 * has not completed, the {@link Context#setExternalStatus} method must be called within this method. 446 * 447 * @param context executor context. 448 * @param action the action to end. 449 * @throws ActionExecutorException thrown if the action could not be checked. 450 */ 451 public abstract void check(Context context, WorkflowAction action) throws ActionExecutorException; 452 453 /** 454 * Kill an action. <p/> The {@link Context#setEndData} method must be called within this method. 455 * 456 * @param context executor context. 457 * @param action the action to kill. 458 * @throws ActionExecutorException thrown if the action could not be killed. 459 */ 460 public abstract void kill(Context context, WorkflowAction action) throws ActionExecutorException; 461 462 /** 463 * Return if the external status indicates that the action has completed. 464 * 465 * @param externalStatus external status to check. 466 * @return if the external status indicates that the action has completed. 467 */ 468 public abstract boolean isCompleted(String externalStatus); 469 470 }