001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import java.io.BufferedReader; 021 import java.io.ByteArrayOutputStream; 022 import java.io.File; 023 import java.io.FileNotFoundException; 024 import java.io.IOException; 025 import java.io.InputStream; 026 import java.io.InputStreamReader; 027 import java.io.PrintStream; 028 import java.io.StringReader; 029 import java.net.ConnectException; 030 import java.net.URI; 031 import java.net.URISyntaxException; 032 import java.net.UnknownHostException; 033 import java.util.ArrayList; 034 import java.util.HashMap; 035 import java.util.HashSet; 036 import java.util.Iterator; 037 import java.util.List; 038 import java.util.Map; 039 import java.util.Properties; 040 import java.util.Set; 041 import java.util.Map.Entry; 042 043 import org.apache.hadoop.conf.Configuration; 044 import org.apache.hadoop.filecache.DistributedCache; 045 import org.apache.hadoop.fs.FileStatus; 046 import org.apache.hadoop.fs.FileSystem; 047 import org.apache.hadoop.fs.Path; 048 import org.apache.hadoop.fs.permission.AccessControlException; 049 import org.apache.hadoop.mapred.JobClient; 050 import org.apache.hadoop.mapred.JobConf; 051 import org.apache.hadoop.mapred.JobID; 052 import org.apache.hadoop.mapred.RunningJob; 053 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 054 import org.apache.hadoop.util.DiskChecker; 055 import org.apache.oozie.WorkflowActionBean; 056 import org.apache.oozie.WorkflowJobBean; 057 import org.apache.oozie.action.ActionExecutor; 058 import org.apache.oozie.action.ActionExecutorException; 059 import org.apache.oozie.client.OozieClient; 060 import org.apache.oozie.client.WorkflowAction; 061 import org.apache.oozie.service.HadoopAccessorException; 062 import org.apache.oozie.service.HadoopAccessorService; 063 import org.apache.oozie.service.Services; 064 import org.apache.oozie.service.WorkflowAppService; 065 import org.apache.oozie.servlet.CallbackServlet; 066 import org.apache.oozie.util.ELEvaluator; 067 import org.apache.oozie.util.IOUtils; 068 import org.apache.oozie.util.PropertiesUtils; 069 import org.apache.oozie.util.XConfiguration; 070 import org.apache.oozie.util.XLog; 071 import org.apache.oozie.util.XmlUtils; 072 import org.jdom.Element; 073 import org.jdom.JDOMException; 074 import org.jdom.Namespace; 075 import org.apache.hadoop.security.token.Token; 076 import org.apache.hadoop.security.token.TokenIdentifier; 077 078 public class JavaActionExecutor extends ActionExecutor { 079 080 private static final String HADOOP_USER = "user.name"; 081 public static final String HADOOP_JOB_TRACKER = "mapred.job.tracker"; 082 public static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address"; 083 public static final String HADOOP_YARN_RM = "yarn.resourcemanager.address"; 084 public static final String HADOOP_NAME_NODE = "fs.default.name"; 085 private static final String HADOOP_JOB_NAME = "mapred.job.name"; 086 public static final String OOZIE_COMMON_LIBDIR = "oozie"; 087 public static final int MAX_EXTERNAL_STATS_SIZE_DEFAULT = Integer.MAX_VALUE; 088 private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>(); 089 public final static String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size"; 090 public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job"; 091 public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job"; 092 private static int maxActionOutputLen; 093 private static int maxExternalStatsSize; 094 095 private static final String SUCCEEDED = "SUCCEEDED"; 096 private static final String KILLED = "KILLED"; 097 private static final String FAILED = "FAILED"; 098 private static final String FAILED_KILLED = "FAILED/KILLED"; 099 private static final String RUNNING = "RUNNING"; 100 protected XLog log = XLog.getLog(getClass()); 101 102 static { 103 DISALLOWED_PROPERTIES.add(HADOOP_USER); 104 DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER); 105 DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE); 106 DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER_2); 107 DISALLOWED_PROPERTIES.add(HADOOP_YARN_RM); 108 } 109 110 public JavaActionExecutor() { 111 this("java"); 112 requiresNNJT = true; 113 } 114 115 protected JavaActionExecutor(String type) { 116 super(type); 117 requiresNNJT = true; 118 } 119 120 protected String getLauncherJarName() { 121 return getType() + "-launcher.jar"; 122 } 123 124 protected List<Class> getLauncherClasses() { 125 List<Class> classes = new ArrayList<Class>(); 126 classes.add(LauncherMapper.class); 127 classes.add(LauncherSecurityManager.class); 128 classes.add(LauncherException.class); 129 classes.add(LauncherMainException.class); 130 classes.add(FileSystemActions.class); 131 classes.add(PrepareActionsDriver.class); 132 classes.add(ActionStats.class); 133 classes.add(ActionType.class); 134 return classes; 135 } 136 137 @Override 138 public void initActionType() { 139 XLog log = XLog.getLog(getClass()); 140 super.initActionType(); 141 maxActionOutputLen = getOozieConf() 142 .getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 143 // TODO: Remove the below config get in a subsequent release.. 144 // This other irrelevant property is only used to 145 // preserve backwards compatibility cause of a typo. 146 // See OOZIE-4. 147 getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN, 148 2 * 1024)); 149 //Get the limit for the maximum allowed size of action stats 150 maxExternalStatsSize = getOozieConf().getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE, MAX_EXTERNAL_STATS_SIZE_DEFAULT); 151 maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize; 152 try { 153 List<Class> classes = getLauncherClasses(); 154 Class[] launcherClasses = classes.toArray(new Class[classes.size()]); 155 IOUtils.createJar(new File(getOozieRuntimeDir()), getLauncherJarName(), launcherClasses); 156 157 registerError(UnknownHostException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA001"); 158 registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.NON_TRANSIENT, 159 "JA002"); 160 registerError(DiskChecker.DiskOutOfSpaceException.class.getName(), 161 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA003"); 162 registerError(org.apache.hadoop.hdfs.protocol.QuotaExceededException.class.getName(), 163 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004"); 164 registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(), 165 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005"); 166 registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, " JA006"); 167 registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007"); 168 registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008"); 169 registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009"); 170 } 171 catch (IOException ex) { 172 throw new RuntimeException(ex); 173 } 174 catch (java.lang.NoClassDefFoundError err) { 175 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 176 err.printStackTrace(new PrintStream(baos)); 177 log.warn(baos.toString()); 178 } 179 } 180 181 /** 182 * Get the maximum allowed size of stats 183 * 184 * @return maximum size of stats 185 */ 186 public static int getMaxExternalStatsSize() { 187 return maxExternalStatsSize; 188 } 189 190 static void checkForDisallowedProps(Configuration conf, String confName) throws ActionExecutorException { 191 for (String prop : DISALLOWED_PROPERTIES) { 192 if (conf.get(prop) != null) { 193 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA010", 194 "Property [{0}] not allowed in action [{1}] configuration", prop, confName); 195 } 196 } 197 } 198 199 public JobConf createBaseHadoopConf(Context context, Element actionXml) { 200 Namespace ns = actionXml.getNamespace(); 201 String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim(); 202 String nameNode = actionXml.getChild("name-node", ns).getTextTrim(); 203 JobConf conf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); 204 conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER)); 205 conf.set(HADOOP_JOB_TRACKER, jobTracker); 206 conf.set(HADOOP_JOB_TRACKER_2, jobTracker); 207 conf.set(HADOOP_YARN_RM, jobTracker); 208 conf.set(HADOOP_NAME_NODE, nameNode); 209 conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true"); 210 return conf; 211 } 212 213 private void injectLauncherProperties(Configuration srcConf, Configuration launcherConf) { 214 for (Map.Entry<String, String> entry : srcConf) { 215 if (entry.getKey().startsWith("oozie.launcher.")) { 216 String name = entry.getKey().substring("oozie.launcher.".length()); 217 String value = entry.getValue(); 218 // setting original KEY 219 launcherConf.set(entry.getKey(), value); 220 // setting un-prefixed key (to allow Hadoop job config 221 // for the launcher job 222 launcherConf.set(name, value); 223 } 224 } 225 } 226 227 Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) 228 throws ActionExecutorException { 229 try { 230 Namespace ns = actionXml.getNamespace(); 231 Element e = actionXml.getChild("configuration", ns); 232 if (e != null) { 233 String strConf = XmlUtils.prettyPrint(e).toString(); 234 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); 235 236 XConfiguration launcherConf = new XConfiguration(); 237 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 238 XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_JOB_TRACKER), getType()); 239 injectLauncherProperties(actionDefaultConf, launcherConf); 240 injectLauncherProperties(inlineConf, launcherConf); 241 checkForDisallowedProps(launcherConf, "launcher configuration"); 242 XConfiguration.copy(launcherConf, conf); 243 } 244 return conf; 245 } 246 catch (IOException ex) { 247 throw convertException(ex); 248 } 249 } 250 251 public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf) 252 throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException { 253 Namespace ns = element.getNamespace(); 254 Iterator<Element> it = element.getChildren("job-xml", ns).iterator(); 255 while (it.hasNext()) { 256 Element e = it.next(); 257 String jobXml = e.getTextTrim(); 258 Path path = new Path(appPath, jobXml); 259 FileSystem fs = context.getAppFileSystem(); 260 Configuration jobXmlConf = new XConfiguration(fs.open(path)); 261 checkForDisallowedProps(jobXmlConf, "job-xml"); 262 XConfiguration.copy(jobXmlConf, conf); 263 } 264 Element e = element.getChild("configuration", ns); 265 if (e != null) { 266 String strConf = XmlUtils.prettyPrint(e).toString(); 267 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); 268 checkForDisallowedProps(inlineConf, "inline configuration"); 269 XConfiguration.copy(inlineConf, conf); 270 } 271 } 272 273 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) 274 throws ActionExecutorException { 275 try { 276 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 277 XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_JOB_TRACKER), getType()); 278 XConfiguration.injectDefaults(actionDefaults, actionConf); 279 280 has.checkSupportedFilesystem(appPath.toUri()); 281 282 parseJobXmlAndConfiguration(context, actionXml, appPath, actionConf); 283 return actionConf; 284 } 285 catch (IOException ex) { 286 throw convertException(ex); 287 } 288 catch (HadoopAccessorException ex) { 289 throw convertException(ex); 290 } 291 catch (URISyntaxException ex) { 292 throw convertException(ex); 293 } 294 } 295 296 Configuration addToCache(Configuration conf, Path appPath, String filePath, boolean archive) 297 throws ActionExecutorException { 298 Path path = null; 299 try { 300 if (filePath.startsWith("/")) { 301 path = new Path(filePath); 302 } 303 else { 304 path = new Path(appPath, filePath); 305 } 306 URI uri = new URI(path.toUri().getPath()); 307 if (archive) { 308 DistributedCache.addCacheArchive(uri, conf); 309 } 310 else { 311 String fileName = filePath.substring(filePath.lastIndexOf("/") + 1); 312 if (fileName.endsWith(".so") || fileName.contains(".so.")) { // .so files 313 uri = new Path(path.toString() + "#" + fileName).toUri(); 314 uri = new URI(uri.getPath()); 315 DistributedCache.addCacheFile(uri, conf); 316 } 317 else if (fileName.endsWith(".jar")) { // .jar files 318 if (!fileName.contains("#")) { 319 path = new Path(uri.toString()); 320 321 String user = conf.get("user.name"); 322 Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, path, conf); 323 } 324 else { 325 DistributedCache.addCacheFile(uri, conf); 326 } 327 } 328 else { // regular files 329 if (!fileName.contains("#")) { 330 uri = new Path(path.toString() + "#" + fileName).toUri(); 331 uri = new URI(uri.getPath()); 332 } 333 DistributedCache.addCacheFile(uri, conf); 334 } 335 } 336 DistributedCache.createSymlink(conf); 337 return conf; 338 } 339 catch (Exception ex) { 340 XLog.getLog(getClass()).debug( 341 "Errors when add to DistributedCache. Path=" + path + ", archive=" + archive + ", conf=" 342 + XmlUtils.prettyPrint(conf).toString()); 343 throw convertException(ex); 344 } 345 } 346 347 String getOozieLauncherJar(Context context) throws ActionExecutorException { 348 try { 349 return new Path(context.getActionDir(), getLauncherJarName()).toString(); 350 } 351 catch (Exception ex) { 352 throw convertException(ex); 353 } 354 } 355 356 public void prepareActionDir(FileSystem actionFs, Context context) throws ActionExecutorException { 357 try { 358 Path actionDir = context.getActionDir(); 359 Path tempActionDir = new Path(actionDir.getParent(), actionDir.getName() + ".tmp"); 360 if (!actionFs.exists(actionDir)) { 361 try { 362 actionFs.copyFromLocalFile(new Path(getOozieRuntimeDir(), getLauncherJarName()), new Path( 363 tempActionDir, getLauncherJarName())); 364 actionFs.rename(tempActionDir, actionDir); 365 } 366 catch (IOException ex) { 367 actionFs.delete(tempActionDir, true); 368 actionFs.delete(actionDir, true); 369 throw ex; 370 } 371 } 372 } 373 catch (Exception ex) { 374 throw convertException(ex); 375 } 376 } 377 378 void cleanUpActionDir(FileSystem actionFs, Context context) throws ActionExecutorException { 379 try { 380 Path actionDir = context.getActionDir(); 381 if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false) 382 && actionFs.exists(actionDir)) { 383 actionFs.delete(actionDir, true); 384 } 385 } 386 catch (Exception ex) { 387 throw convertException(ex); 388 } 389 } 390 391 protected void addShareLib(Path appPath, Configuration conf, String actionShareLibName) 392 throws ActionExecutorException { 393 if (actionShareLibName != null) { 394 try { 395 Path systemLibPath = Services.get().get(WorkflowAppService.class).getSystemLibPath(); 396 if (systemLibPath != null) { 397 Path actionLibPath = new Path(systemLibPath, actionShareLibName); 398 String user = conf.get("user.name"); 399 FileSystem fs; 400 // If the actionLibPath has a valid scheme and authority, then use them to determine the filesystem that the 401 // sharelib resides on; otherwise, assume it resides on the same filesystem as the appPath and use the appPath 402 // to determine the filesystem 403 if (actionLibPath.toUri().getScheme() != null && actionLibPath.toUri().getAuthority() != null) { 404 fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, actionLibPath.toUri(), conf); 405 } 406 else { 407 fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf); 408 } 409 if (fs.exists(actionLibPath)) { 410 FileStatus[] files = fs.listStatus(actionLibPath); 411 for (FileStatus file : files) { 412 addToCache(conf, actionLibPath, file.getPath().toUri().getPath(), false); 413 } 414 } 415 } 416 } 417 catch (HadoopAccessorException ex){ 418 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, 419 ex.getErrorCode().toString(), ex.getMessage()); 420 } 421 catch (IOException ex){ 422 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, 423 "It should never happen", ex.getMessage()); 424 } 425 } 426 } 427 428 protected void addActionLibs(Path appPath, Configuration conf) throws ActionExecutorException { 429 String[] actionLibsStrArr = conf.getStrings("oozie.launcher.oozie.libpath"); 430 if (actionLibsStrArr != null) { 431 try { 432 for (String actionLibsStr : actionLibsStrArr) { 433 actionLibsStr = actionLibsStr.trim(); 434 if (actionLibsStr.length() > 0) 435 { 436 Path actionLibsPath = new Path(actionLibsStr); 437 String user = conf.get("user.name"); 438 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf); 439 if (fs.exists(actionLibsPath)) { 440 FileStatus[] files = fs.listStatus(actionLibsPath); 441 for (FileStatus file : files) { 442 addToCache(conf, appPath, file.getPath().toUri().getPath(), false); 443 } 444 } 445 } 446 } 447 } 448 catch (HadoopAccessorException ex){ 449 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, 450 ex.getErrorCode().toString(), ex.getMessage()); 451 } 452 catch (IOException ex){ 453 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, 454 "It should never happen", ex.getMessage()); 455 } 456 } 457 } 458 459 @SuppressWarnings("unchecked") 460 void setLibFilesArchives(Context context, Element actionXml, Path appPath, Configuration conf) 461 throws ActionExecutorException { 462 Configuration proto = context.getProtoActionConf(); 463 464 // launcher JAR 465 addToCache(conf, appPath, getOozieLauncherJar(context), false); 466 467 // Workflow lib/ 468 String[] paths = proto.getStrings(WorkflowAppService.APP_LIB_PATH_LIST); 469 if (paths != null) { 470 for (String path : paths) { 471 addToCache(conf, appPath, path, false); 472 } 473 } 474 475 // Action libs 476 addActionLibs(appPath, conf); 477 478 // files and archives defined in the action 479 for (Element eProp : (List<Element>) actionXml.getChildren()) { 480 if (eProp.getName().equals("file")) { 481 String[] filePaths = eProp.getTextTrim().split(","); 482 for (String path : filePaths) { 483 addToCache(conf, appPath, path.trim(), false); 484 } 485 } 486 else if (eProp.getName().equals("archive")) { 487 String[] archivePaths = eProp.getTextTrim().split(","); 488 for (String path : archivePaths){ 489 addToCache(conf, appPath, path.trim(), true); 490 } 491 } 492 } 493 494 addAllShareLibs(appPath, conf, context, actionXml); 495 } 496 497 // Adds action specific share libs and common share libs 498 private void addAllShareLibs(Path appPath, Configuration conf, Context context, Element actionXml) 499 throws ActionExecutorException { 500 // Add action specific share libs 501 addActionShareLib(appPath, conf, context, actionXml); 502 // Add common sharelibs for Oozie 503 addShareLib(appPath, conf, JavaActionExecutor.OOZIE_COMMON_LIBDIR); 504 } 505 506 private void addActionShareLib(Path appPath, Configuration conf, Context context, Element actionXml) 507 throws ActionExecutorException { 508 XConfiguration wfJobConf = null; 509 try { 510 wfJobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); 511 } 512 catch (IOException ioe) { 513 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen", 514 ioe.getMessage()); 515 } 516 // Action sharelibs are only added if user has specified to use system libpath 517 if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) { 518 // add action specific sharelibs 519 addShareLib(appPath, conf, getShareLibName(context, actionXml, conf)); 520 } 521 } 522 523 524 protected String getLauncherMain(Configuration launcherConf, Element actionXml) { 525 Namespace ns = actionXml.getNamespace(); 526 Element e = actionXml.getChild("main-class", ns); 527 return e.getTextTrim(); 528 } 529 530 private static final String QUEUE_NAME = "mapred.job.queue.name"; 531 532 private static final Set<String> SPECIAL_PROPERTIES = new HashSet<String>(); 533 534 static { 535 SPECIAL_PROPERTIES.add(QUEUE_NAME); 536 SPECIAL_PROPERTIES.add(ACL_VIEW_JOB); 537 SPECIAL_PROPERTIES.add(ACL_MODIFY_JOB); 538 } 539 540 @SuppressWarnings("unchecked") 541 JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf) 542 throws ActionExecutorException { 543 try { 544 545 // app path could be a file 546 Path appPathRoot = new Path(context.getWorkflow().getAppPath()); 547 if (actionFs.isFile(appPathRoot)) { 548 appPathRoot = appPathRoot.getParent(); 549 } 550 551 // launcher job configuration 552 JobConf launcherJobConf = createBaseHadoopConf(context, actionXml); 553 setupLauncherConf(launcherJobConf, actionXml, appPathRoot, context); 554 555 String actionShareLibProperty = actionConf.get(ACTION_SHARELIB_FOR + getType()); 556 if (actionShareLibProperty != null) { 557 launcherJobConf.set(ACTION_SHARELIB_FOR + getType(), actionShareLibProperty); 558 } 559 setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf); 560 561 String jobName = launcherJobConf.get(HADOOP_JOB_NAME); 562 if (jobName == null || jobName.isEmpty()) { 563 jobName = XLog.format( 564 "oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(), 565 context.getWorkflow().getAppName(), action.getName(), 566 context.getWorkflow().getId()); 567 launcherJobConf.setJobName(jobName); 568 } 569 570 String jobId = context.getWorkflow().getId(); 571 String actionId = action.getId(); 572 Path actionDir = context.getActionDir(); 573 String recoveryId = context.getRecoveryId(); 574 575 // Getting the prepare XML from the action XML 576 Namespace ns = actionXml.getNamespace(); 577 Element prepareElement = actionXml.getChild("prepare", ns); 578 String prepareXML = ""; 579 if (prepareElement != null) { 580 if (prepareElement.getChildren().size() > 0) { 581 prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim(); 582 } 583 } 584 LauncherMapper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf, 585 prepareXML); 586 587 LauncherMapper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml)); 588 LauncherMapper.setupSupportedFileSystems( 589 launcherJobConf, Services.get().getConf().get(HadoopAccessorService.SUPPORTED_FILESYSTEMS)); 590 LauncherMapper.setupMaxOutputData(launcherJobConf, maxActionOutputLen); 591 LauncherMapper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize); 592 593 List<Element> list = actionXml.getChildren("arg", ns); 594 String[] args = new String[list.size()]; 595 for (int i = 0; i < list.size(); i++) { 596 args[i] = list.get(i).getTextTrim(); 597 } 598 LauncherMapper.setupMainArguments(launcherJobConf, args); 599 600 List<Element> javaopts = actionXml.getChildren("java-opt", ns); 601 for (Element opt: javaopts) { 602 String opts = launcherJobConf.get("mapred.child.java.opts", ""); 603 opts = opts + " " + opt.getTextTrim(); 604 opts = opts.trim(); 605 launcherJobConf.set("mapred.child.java.opts", opts); 606 } 607 608 Element opt = actionXml.getChild("java-opts", ns); 609 if (opt != null) { 610 String opts = launcherJobConf.get("mapred.child.java.opts", ""); 611 opts = opts + " " + opt.getTextTrim(); 612 opts = opts.trim(); 613 launcherJobConf.set("mapred.child.java.opts", opts); 614 } 615 616 // properties from action that are needed by the launcher (e.g. QUEUE NAME, ACLs) 617 // maybe we should add queue to the WF schema, below job-tracker 618 actionConfToLauncherConf(actionConf, launcherJobConf); 619 620 // to disable cancelation of delegation token on launcher job end 621 launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false); 622 623 return launcherJobConf; 624 } 625 catch (Exception ex) { 626 throw convertException(ex); 627 } 628 } 629 630 private void injectCallback(Context context, Configuration conf) { 631 String callback = context.getCallbackUrl("$jobStatus"); 632 if (conf.get("job.end.notification.url") != null) { 633 XLog.getLog(getClass()).warn("Overriding the action job end notification URI"); 634 } 635 conf.set("job.end.notification.url", callback); 636 } 637 638 void injectActionCallback(Context context, Configuration actionConf) { 639 injectCallback(context, actionConf); 640 } 641 642 void injectLauncherCallback(Context context, Configuration launcherConf) { 643 injectCallback(context, launcherConf); 644 } 645 646 private void actionConfToLauncherConf(Configuration actionConf, JobConf launcherConf) { 647 for (String name : SPECIAL_PROPERTIES) { 648 if (actionConf.get(name) != null && launcherConf.get("oozie.launcher." + name) == null) { 649 launcherConf.set(name, actionConf.get(name)); 650 } 651 } 652 } 653 654 public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException { 655 JobClient jobClient = null; 656 boolean exception = false; 657 try { 658 Path appPathRoot = new Path(context.getWorkflow().getAppPath()); 659 660 // app path could be a file 661 if (actionFs.isFile(appPathRoot)) { 662 appPathRoot = appPathRoot.getParent(); 663 } 664 665 Element actionXml = XmlUtils.parseXml(action.getConf()); 666 667 // action job configuration 668 Configuration actionConf = createBaseHadoopConf(context, actionXml); 669 setupActionConf(actionConf, context, actionXml, appPathRoot); 670 XLog.getLog(getClass()).debug("Setting LibFilesArchives "); 671 setLibFilesArchives(context, actionXml, appPathRoot, actionConf); 672 673 String jobName = actionConf.get(HADOOP_JOB_NAME); 674 if (jobName == null || jobName.isEmpty()) { 675 jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}", 676 getType(), context.getWorkflow().getAppName(), 677 action.getName(), context.getWorkflow().getId()); 678 actionConf.set(HADOOP_JOB_NAME, jobName); 679 } 680 681 injectActionCallback(context, actionConf); 682 683 if(actionConf.get(ACL_MODIFY_JOB) == null || actionConf.get(ACL_MODIFY_JOB).trim().equals("")) { 684 // ONLY in the case where user has not given the 685 // modify-job ACL specifically 686 if (context.getWorkflow().getAcl() != null) { 687 // setting the group owning the Oozie job to allow anybody in that 688 // group to modify the jobs. 689 actionConf.set(ACL_MODIFY_JOB, context.getWorkflow().getAcl()); 690 } 691 } 692 693 // Setting the credential properties in launcher conf 694 HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context, 695 action, actionConf); 696 697 // Adding if action need to set more credential tokens 698 JobConf credentialsConf = new JobConf(false); 699 XConfiguration.copy(actionConf, credentialsConf); 700 setCredentialTokens(credentialsConf, context, action, credentialsProperties); 701 702 // insert conf to action conf from credentialsConf 703 for (Entry<String, String> entry : credentialsConf) { 704 if (actionConf.get(entry.getKey()) == null) { 705 actionConf.set(entry.getKey(), entry.getValue()); 706 } 707 } 708 709 JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf); 710 injectLauncherCallback(context, launcherJobConf); 711 XLog.getLog(getClass()).debug("Creating Job Client for action " + action.getId()); 712 jobClient = createJobClient(context, launcherJobConf); 713 String launcherId = LauncherMapper.getRecoveryId(launcherJobConf, context.getActionDir(), context 714 .getRecoveryId()); 715 boolean alreadyRunning = launcherId != null; 716 RunningJob runningJob; 717 718 // if user-retry is on, always submit new launcher 719 boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry(); 720 721 if (alreadyRunning && !isUserRetry) { 722 runningJob = jobClient.getJob(JobID.forName(launcherId)); 723 if (runningJob == null) { 724 String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER); 725 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017", 726 "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker); 727 } 728 } 729 else { 730 XLog.getLog(getClass()).debug("Submitting the job through Job Client for action " + action.getId()); 731 732 // setting up propagation of the delegation token. 733 Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(HadoopAccessorService 734 .getMRDelegationTokenRenewer(launcherJobConf)); 735 launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt); 736 737 // insert credentials tokens to launcher job conf if needed 738 if (needInjectCredentials()) { 739 for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) { 740 log.debug("ADDING TOKEN: " + tk.getKind().toString()); 741 launcherJobConf.getCredentials().addToken(tk.getKind(), tk); 742 } 743 } 744 else { 745 log.info("No need to inject credentials."); 746 } 747 runningJob = jobClient.submitJob(launcherJobConf); 748 if (runningJob == null) { 749 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017", 750 "Error submitting launcher for action [{0}]", action.getId()); 751 } 752 launcherId = runningJob.getID().toString(); 753 XLog.getLog(getClass()).debug("After submission get the launcherId " + launcherId); 754 } 755 756 String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER); 757 String consoleUrl = runningJob.getTrackingURL(); 758 context.setStartData(launcherId, jobTracker, consoleUrl); 759 } 760 catch (Exception ex) { 761 exception = true; 762 throw convertException(ex); 763 } 764 finally { 765 if (jobClient != null) { 766 try { 767 jobClient.close(); 768 } 769 catch (Exception e) { 770 if (exception) { 771 log.error("JobClient error: ", e); 772 } 773 else { 774 throw convertException(e); 775 } 776 } 777 } 778 } 779 } 780 781 private boolean needInjectCredentials() { 782 boolean methodExists = true; 783 784 Class klass; 785 try { 786 klass = Class.forName("org.apache.hadoop.mapred.JobConf"); 787 klass.getMethod("getCredentials"); 788 } 789 catch (ClassNotFoundException ex) { 790 methodExists = false; 791 } 792 catch (NoSuchMethodException ex) { 793 methodExists = false; 794 } 795 796 return methodExists; 797 } 798 799 protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context, 800 WorkflowAction action, Configuration actionConf) throws Exception { 801 HashMap<String, CredentialsProperties> credPropertiesMap = null; 802 if (context != null && action != null) { 803 credPropertiesMap = getActionCredentialsProperties(context, action); 804 if (credPropertiesMap != null) { 805 for (String key : credPropertiesMap.keySet()) { 806 CredentialsProperties prop = credPropertiesMap.get(key); 807 if (prop != null) { 808 log.debug("Credential Properties set for action : " + action.getId()); 809 for (String property : prop.getProperties().keySet()) { 810 actionConf.set(property, prop.getProperties().get(property)); 811 log.debug("property : '" + property + "', value : '" + prop.getProperties().get(property) + "'"); 812 } 813 } 814 } 815 } 816 else { 817 log.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred()); 818 } 819 } 820 else { 821 log.warn("context or action is null"); 822 } 823 return credPropertiesMap; 824 } 825 826 protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action, 827 HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception { 828 829 if (context != null && action != null && credPropertiesMap != null) { 830 for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) { 831 String credName = entry.getKey(); 832 CredentialsProperties credProps = entry.getValue(); 833 if (credProps != null) { 834 CredentialsProvider credProvider = new CredentialsProvider(credProps.getType()); 835 Credentials credentialObject = credProvider.createCredentialObject(); 836 if (credentialObject != null) { 837 credentialObject.addtoJobConf(jobconf, credProps, context); 838 log.debug("Retrieved Credential '" + credName + "' for action " + action.getId()); 839 } 840 else { 841 log.debug("Credentials object is null for name= " + credName + ", type=" + credProps.getType()); 842 } 843 } 844 } 845 } 846 847 } 848 849 protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context, 850 WorkflowAction action) throws Exception { 851 HashMap<String, CredentialsProperties> props = new HashMap<String, CredentialsProperties>(); 852 if (context != null && action != null) { 853 String credsInAction = action.getCred(); 854 log.debug("Get credential '" + credsInAction + "' properties for action : " + action.getId()); 855 String[] credNames = credsInAction.split(","); 856 for (String credName : credNames) { 857 CredentialsProperties credProps = getCredProperties(context, credName); 858 props.put(credName, credProps); 859 } 860 } 861 else { 862 log.warn("context or action is null"); 863 } 864 return props; 865 } 866 867 @SuppressWarnings("unchecked") 868 protected CredentialsProperties getCredProperties(Context context, String credName) 869 throws Exception { 870 CredentialsProperties credProp = null; 871 String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition(); 872 XConfiguration wfjobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); 873 Element elementJob = XmlUtils.parseXml(workflowXml); 874 Element credentials = elementJob.getChild("credentials", elementJob.getNamespace()); 875 if (credentials != null) { 876 for (Element credential : (List<Element>) credentials.getChildren("credential", credentials.getNamespace())) { 877 String name = credential.getAttributeValue("name"); 878 String type = credential.getAttributeValue("type"); 879 log.debug("getCredProperties: Name: " + name + ", Type: " + type); 880 if (name.equalsIgnoreCase(credName)) { 881 credProp = new CredentialsProperties(name, type); 882 for (Element property : (List<Element>) credential.getChildren("property", 883 credential.getNamespace())) { 884 String propertyName = property.getChildText("name", property.getNamespace()); 885 String propertyValue = property.getChildText("value", property.getNamespace()); 886 ELEvaluator eval = new ELEvaluator(); 887 for (Map.Entry<String, String> entry : wfjobConf) { 888 eval.setVariable(entry.getKey(), entry.getValue().trim()); 889 } 890 propertyName = eval.evaluate(propertyName, String.class); 891 propertyValue = eval.evaluate(propertyValue, String.class); 892 893 credProp.getProperties().put(propertyName, propertyValue); 894 log.debug("getCredProperties: Properties name :'" + propertyName + "', Value : '" 895 + propertyValue + "'"); 896 } 897 } 898 } 899 } else { 900 log.warn("credentials is null for the action"); 901 } 902 return credProp; 903 } 904 905 @Override 906 public void start(Context context, WorkflowAction action) throws ActionExecutorException { 907 try { 908 XLog.getLog(getClass()).debug("Starting action " + action.getId() + " getting Action File System"); 909 FileSystem actionFs = context.getAppFileSystem(); 910 XLog.getLog(getClass()).debug("Preparing action Dir through copying " + context.getActionDir()); 911 prepareActionDir(actionFs, context); 912 XLog.getLog(getClass()).debug("Action Dir is ready. Submitting the action "); 913 submitLauncher(actionFs, context, action); 914 XLog.getLog(getClass()).debug("Action submit completed. Performing check "); 915 check(context, action); 916 XLog.getLog(getClass()).debug("Action check is done after submission"); 917 } 918 catch (Exception ex) { 919 throw convertException(ex); 920 } 921 } 922 923 @Override 924 public void end(Context context, WorkflowAction action) throws ActionExecutorException { 925 try { 926 String externalStatus = action.getExternalStatus(); 927 WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ? WorkflowAction.Status.OK 928 : WorkflowAction.Status.ERROR; 929 context.setEndData(status, getActionSignal(status)); 930 } 931 catch (Exception ex) { 932 throw convertException(ex); 933 } 934 finally { 935 try { 936 FileSystem actionFs = context.getAppFileSystem(); 937 cleanUpActionDir(actionFs, context); 938 } 939 catch (Exception ex) { 940 throw convertException(ex); 941 } 942 } 943 } 944 945 /** 946 * Create job client object 947 * 948 * @param context 949 * @param jobConf 950 * @return 951 * @throws HadoopAccessorException 952 */ 953 protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException { 954 String user = context.getWorkflow().getUser(); 955 String group = context.getWorkflow().getGroup(); 956 return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf); 957 } 958 959 @Override 960 public void check(Context context, WorkflowAction action) throws ActionExecutorException { 961 JobClient jobClient = null; 962 boolean exception = false; 963 try { 964 Element actionXml = XmlUtils.parseXml(action.getConf()); 965 FileSystem actionFs = context.getAppFileSystem(); 966 JobConf jobConf = createBaseHadoopConf(context, actionXml); 967 jobClient = createJobClient(context, jobConf); 968 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId())); 969 if (runningJob == null) { 970 context.setExternalStatus(FAILED); 971 context.setExecutionData(FAILED, null); 972 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017", 973 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", action 974 .getExternalId(), action.getId()); 975 } 976 if (runningJob.isComplete()) { 977 Path actionDir = context.getActionDir(); 978 979 String user = context.getWorkflow().getUser(); 980 String group = context.getWorkflow().getGroup(); 981 if (LauncherMapper.hasIdSwap(runningJob, user, group, actionDir)) { 982 String launcherId = action.getExternalId(); 983 Path idSwapPath = LauncherMapper.getIdSwapPath(context.getActionDir()); 984 InputStream is = actionFs.open(idSwapPath); 985 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 986 Properties props = PropertiesUtils.readProperties(reader, maxActionOutputLen); 987 reader.close(); 988 String newId = props.getProperty("id"); 989 runningJob = jobClient.getJob(JobID.forName(newId)); 990 if (runningJob == null) { 991 context.setExternalStatus(FAILED); 992 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017", 993 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newId, 994 action.getId()); 995 } 996 997 context.setStartData(newId, action.getTrackerUri(), runningJob.getTrackingURL()); 998 XLog.getLog(getClass()).info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId, 999 newId); 1000 } 1001 if (runningJob.isComplete()) { 1002 XLog.getLog(getClass()).info(XLog.STD, "action completed, external ID [{0}]", 1003 action.getExternalId()); 1004 if (runningJob.isSuccessful() && LauncherMapper.isMainSuccessful(runningJob)) { 1005 getActionData(actionFs, runningJob, action, context); 1006 XLog.getLog(getClass()).info(XLog.STD, "action produced output"); 1007 } 1008 else { 1009 XLog log = XLog.getLog(getClass()); 1010 String errorReason; 1011 Path actionError = LauncherMapper.getErrorPath(context.getActionDir()); 1012 if (actionFs.exists(actionError)) { 1013 InputStream is = actionFs.open(actionError); 1014 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 1015 Properties props = PropertiesUtils.readProperties(reader, -1); 1016 reader.close(); 1017 String errorCode = props.getProperty("error.code"); 1018 if (errorCode.equals("0")) { 1019 errorCode = "JA018"; 1020 } 1021 if (errorCode.equals("-1")) { 1022 errorCode = "JA019"; 1023 } 1024 errorReason = props.getProperty("error.reason"); 1025 log.warn("Launcher ERROR, reason: {0}", errorReason); 1026 String exMsg = props.getProperty("exception.message"); 1027 String errorInfo = (exMsg != null) ? exMsg : errorReason; 1028 context.setErrorInfo(errorCode, errorInfo); 1029 String exStackTrace = props.getProperty("exception.stacktrace"); 1030 if (exMsg != null) { 1031 log.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace); 1032 } 1033 } 1034 else { 1035 errorReason = XLog.format("LauncherMapper died, check Hadoop log for job [{0}:{1}]", action 1036 .getTrackerUri(), action.getExternalId()); 1037 log.warn(errorReason); 1038 } 1039 context.setExecutionData(FAILED_KILLED, null); 1040 setActionCompletionData(context, actionFs); 1041 } 1042 } 1043 else { 1044 context.setExternalStatus(RUNNING); 1045 XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]", 1046 action.getExternalId(), action.getExternalStatus()); 1047 } 1048 } 1049 else { 1050 context.setExternalStatus(RUNNING); 1051 XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]", 1052 action.getExternalId(), action.getExternalStatus()); 1053 } 1054 } 1055 catch (Exception ex) { 1056 XLog.getLog(getClass()).warn("Exception in check(). Message[{0}]", ex.getMessage(), ex); 1057 exception = true; 1058 throw convertException(ex); 1059 } 1060 finally { 1061 if (jobClient != null) { 1062 try { 1063 jobClient.close(); 1064 } 1065 catch (Exception e) { 1066 if (exception) { 1067 log.error("JobClient error: ", e); 1068 } 1069 else { 1070 throw convertException(e); 1071 } 1072 } 1073 } 1074 } 1075 } 1076 1077 /** 1078 * Get the output data of an action. Subclasses should override this method 1079 * to get action specific output data. 1080 * 1081 * @param actionFs the FileSystem object 1082 * @param runningJob the runningJob 1083 * @param action the Workflow action 1084 * @param context executor context 1085 * 1086 */ 1087 protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) 1088 throws HadoopAccessorException, JDOMException, IOException, URISyntaxException { 1089 Properties props = null; 1090 if (getCaptureOutput(action)) { 1091 props = new Properties(); 1092 if (LauncherMapper.hasOutputData(runningJob)) { 1093 Path actionOutput = LauncherMapper.getOutputDataPath(context.getActionDir()); 1094 InputStream is = actionFs.open(actionOutput); 1095 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 1096 props = PropertiesUtils.readProperties(reader, maxActionOutputLen); 1097 reader.close(); 1098 } 1099 } 1100 context.setExecutionData(SUCCEEDED, props); 1101 } 1102 1103 protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException { 1104 Element eConf = XmlUtils.parseXml(action.getConf()); 1105 Namespace ns = eConf.getNamespace(); 1106 Element captureOutput = eConf.getChild("capture-output", ns); 1107 return captureOutput != null; 1108 } 1109 1110 @Override 1111 public void kill(Context context, WorkflowAction action) throws ActionExecutorException { 1112 JobClient jobClient = null; 1113 boolean exception = false; 1114 try { 1115 Element actionXml = XmlUtils.parseXml(action.getConf()); 1116 JobConf jobConf = createBaseHadoopConf(context, actionXml); 1117 jobClient = createJobClient(context, jobConf); 1118 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId())); 1119 if (runningJob != null) { 1120 runningJob.killJob(); 1121 } 1122 context.setExternalStatus(KILLED); 1123 context.setExecutionData(KILLED, null); 1124 } 1125 catch (Exception ex) { 1126 exception = true; 1127 throw convertException(ex); 1128 } 1129 finally { 1130 try { 1131 FileSystem actionFs = context.getAppFileSystem(); 1132 cleanUpActionDir(actionFs, context); 1133 if (jobClient != null) { 1134 jobClient.close(); 1135 } 1136 } 1137 catch (Exception ex) { 1138 if (exception) { 1139 log.error("Error: ", ex); 1140 } 1141 else { 1142 throw convertException(ex); 1143 } 1144 } 1145 } 1146 } 1147 1148 private static Set<String> FINAL_STATUS = new HashSet<String>(); 1149 1150 static { 1151 FINAL_STATUS.add(SUCCEEDED); 1152 FINAL_STATUS.add(KILLED); 1153 FINAL_STATUS.add(FAILED); 1154 FINAL_STATUS.add(FAILED_KILLED); 1155 } 1156 1157 @Override 1158 public boolean isCompleted(String externalStatus) { 1159 return FINAL_STATUS.contains(externalStatus); 1160 } 1161 1162 1163 /** 1164 * Return the sharelib name for the action. 1165 * <p/> 1166 * If <code>NULL</code> or emtpy, it means that the action does not use the action 1167 * sharelib. 1168 * <p/> 1169 * If a non-empty string, i.e. <code>foo</code>, it means the action uses the 1170 * action sharelib subdirectory <code>foo</code> and all JARs in the sharelib 1171 * <code>foo</code> directory will be in the action classpath. 1172 * <p/> 1173 * The resolution is done using the following precedence order: 1174 * <ul> 1175 * <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the action configuration</li> 1176 * <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the job configuration</li> 1177 * <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the oozie configuration</li> 1178 * <li>Action Executor <code>getDefaultShareLibName()</code> method</li> 1179 * </ul> 1180 * 1181 * 1182 * @param context executor context. 1183 * @param actionXml 1184 *@param conf action configuration. @return the action sharelib name. 1185 */ 1186 protected String getShareLibName(Context context, Element actionXml, Configuration conf) { 1187 String name = conf.get(ACTION_SHARELIB_FOR + getType()); 1188 if (name == null) { 1189 try { 1190 XConfiguration jobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); 1191 name = jobConf.get(ACTION_SHARELIB_FOR + getType()); 1192 if (name == null) { 1193 name = Services.get().getConf().get(ACTION_SHARELIB_FOR + getType()); 1194 if (name == null) { 1195 name = getDefaultShareLibName(actionXml); 1196 } 1197 } 1198 } 1199 catch (IOException ex) { 1200 throw new RuntimeException("It cannot happen, " + ex.toString(), ex); 1201 } 1202 } 1203 return name; 1204 } 1205 1206 private final static String ACTION_SHARELIB_FOR = "oozie.action.sharelib.for."; 1207 1208 1209 /** 1210 * Returns the default sharelib name for the action if any. 1211 * 1212 * @param actionXml the action XML fragment. 1213 * @return the sharelib name for the action, <code>NULL</code> if none. 1214 */ 1215 protected String getDefaultShareLibName(Element actionXml) { 1216 return null; 1217 } 1218 1219 /** 1220 * Sets some data for the action on completion 1221 * 1222 * @param context executor context 1223 * @param actionFs the FileSystem object 1224 */ 1225 protected void setActionCompletionData(Context context, FileSystem actionFs) throws IOException, 1226 HadoopAccessorException, URISyntaxException { 1227 } 1228 }