001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import java.io.BufferedReader; 021 import java.io.ByteArrayOutputStream; 022 import java.io.File; 023 import java.io.FileNotFoundException; 024 import java.io.IOException; 025 import java.io.InputStream; 026 import java.io.InputStreamReader; 027 import java.io.PrintStream; 028 import java.io.StringReader; 029 import java.net.ConnectException; 030 import java.net.URI; 031 import java.net.URISyntaxException; 032 import java.net.UnknownHostException; 033 import java.util.ArrayList; 034 import java.util.HashMap; 035 import java.util.HashSet; 036 import java.util.Iterator; 037 import java.util.List; 038 import java.util.Map; 039 import java.util.Properties; 040 import java.util.Set; 041 import java.util.Map.Entry; 042 043 import org.apache.hadoop.conf.Configuration; 044 import org.apache.hadoop.filecache.DistributedCache; 045 import org.apache.hadoop.fs.FileStatus; 046 import org.apache.hadoop.fs.FileSystem; 047 import org.apache.hadoop.fs.Path; 048 import org.apache.hadoop.fs.permission.AccessControlException; 049 import org.apache.hadoop.mapred.JobClient; 050 import org.apache.hadoop.mapred.JobConf; 051 import org.apache.hadoop.mapred.JobID; 052 import org.apache.hadoop.mapred.RunningJob; 053 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 054 import org.apache.hadoop.util.DiskChecker; 055 import org.apache.oozie.WorkflowActionBean; 056 import org.apache.oozie.WorkflowJobBean; 057 import org.apache.oozie.action.ActionExecutor; 058 import org.apache.oozie.action.ActionExecutorException; 059 import org.apache.oozie.client.OozieClient; 060 import org.apache.oozie.client.WorkflowAction; 061 import org.apache.oozie.service.HadoopAccessorException; 062 import org.apache.oozie.service.HadoopAccessorService; 063 import org.apache.oozie.service.Services; 064 import org.apache.oozie.service.WorkflowAppService; 065 import org.apache.oozie.servlet.CallbackServlet; 066 import org.apache.oozie.util.ELEvaluator; 067 import org.apache.oozie.util.IOUtils; 068 import org.apache.oozie.util.PropertiesUtils; 069 import org.apache.oozie.util.XConfiguration; 070 import org.apache.oozie.util.XLog; 071 import org.apache.oozie.util.XmlUtils; 072 import org.jdom.Element; 073 import org.jdom.JDOMException; 074 import org.jdom.Namespace; 075 import org.apache.hadoop.security.token.Token; 076 import org.apache.hadoop.security.token.TokenIdentifier; 077 078 public class JavaActionExecutor extends ActionExecutor { 079 080 private static final String HADOOP_USER = "user.name"; 081 private static final String HADOOP_JOB_TRACKER = "mapred.job.tracker"; 082 private static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address"; 083 private static final String HADOOP_YARN_RM = "yarn.resourcemanager.address"; 084 private static final String HADOOP_NAME_NODE = "fs.default.name"; 085 public static final String OOZIE_COMMON_LIBDIR = "oozie"; 086 public static final int MAX_EXTERNAL_STATS_SIZE_DEFAULT = Integer.MAX_VALUE; 087 private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>(); 088 public final static String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size"; 089 public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job"; 090 public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job"; 091 private static int maxActionOutputLen; 092 private static int maxExternalStatsSize; 093 094 private static final String SUCCEEDED = "SUCCEEDED"; 095 private static final String KILLED = "KILLED"; 096 private static final String FAILED = "FAILED"; 097 private static final String FAILED_KILLED = "FAILED/KILLED"; 098 private static final String RUNNING = "RUNNING"; 099 protected XLog log = XLog.getLog(getClass()); 100 101 static { 102 DISALLOWED_PROPERTIES.add(HADOOP_USER); 103 DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER); 104 DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE); 105 DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER_2); 106 DISALLOWED_PROPERTIES.add(HADOOP_YARN_RM); 107 } 108 109 public JavaActionExecutor() { 110 this("java"); 111 requiresNNJT = true; 112 } 113 114 protected JavaActionExecutor(String type) { 115 super(type); 116 requiresNNJT = true; 117 } 118 119 protected String getLauncherJarName() { 120 return getType() + "-launcher.jar"; 121 } 122 123 protected List<Class> getLauncherClasses() { 124 List<Class> classes = new ArrayList<Class>(); 125 classes.add(LauncherMapper.class); 126 classes.add(LauncherSecurityManager.class); 127 classes.add(LauncherException.class); 128 classes.add(LauncherMainException.class); 129 classes.add(FileSystemActions.class); 130 classes.add(PrepareActionsDriver.class); 131 classes.add(ActionStats.class); 132 classes.add(ActionType.class); 133 return classes; 134 } 135 136 @Override 137 public void initActionType() { 138 XLog log = XLog.getLog(getClass()); 139 super.initActionType(); 140 maxActionOutputLen = getOozieConf() 141 .getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 142 // TODO: Remove the below config get in a subsequent release.. 143 // This other irrelevant property is only used to 144 // preserve backwards compatibility cause of a typo. 145 // See OOZIE-4. 146 getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN, 147 2 * 1024)); 148 //Get the limit for the maximum allowed size of action stats 149 maxExternalStatsSize = getOozieConf().getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE, MAX_EXTERNAL_STATS_SIZE_DEFAULT); 150 maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize; 151 try { 152 List<Class> classes = getLauncherClasses(); 153 Class[] launcherClasses = classes.toArray(new Class[classes.size()]); 154 IOUtils.createJar(new File(getOozieRuntimeDir()), getLauncherJarName(), launcherClasses); 155 156 registerError(UnknownHostException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA001"); 157 registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.NON_TRANSIENT, 158 "JA002"); 159 registerError(DiskChecker.DiskOutOfSpaceException.class.getName(), 160 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA003"); 161 registerError(org.apache.hadoop.hdfs.protocol.QuotaExceededException.class.getName(), 162 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004"); 163 registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(), 164 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005"); 165 registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, " JA006"); 166 registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007"); 167 registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008"); 168 registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009"); 169 } 170 catch (IOException ex) { 171 throw new RuntimeException(ex); 172 } 173 catch (java.lang.NoClassDefFoundError err) { 174 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 175 err.printStackTrace(new PrintStream(baos)); 176 log.warn(baos.toString()); 177 } 178 } 179 180 /** 181 * Get the maximum allowed size of stats 182 * 183 * @return maximum size of stats 184 */ 185 public static int getMaxExternalStatsSize() { 186 return maxExternalStatsSize; 187 } 188 189 static void checkForDisallowedProps(Configuration conf, String confName) throws ActionExecutorException { 190 for (String prop : DISALLOWED_PROPERTIES) { 191 if (conf.get(prop) != null) { 192 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA010", 193 "Property [{0}] not allowed in action [{1}] configuration", prop, confName); 194 } 195 } 196 } 197 198 public JobConf createBaseHadoopConf(Context context, Element actionXml) { 199 Namespace ns = actionXml.getNamespace(); 200 String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim(); 201 String nameNode = actionXml.getChild("name-node", ns).getTextTrim(); 202 JobConf conf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); 203 conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER)); 204 conf.set(HADOOP_JOB_TRACKER, jobTracker); 205 conf.set(HADOOP_JOB_TRACKER_2, jobTracker); 206 conf.set(HADOOP_YARN_RM, jobTracker); 207 conf.set(HADOOP_NAME_NODE, nameNode); 208 conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true"); 209 return conf; 210 } 211 212 private void injectLauncherProperties(Configuration srcConf, Configuration launcherConf) { 213 for (Map.Entry<String, String> entry : srcConf) { 214 if (entry.getKey().startsWith("oozie.launcher.")) { 215 String name = entry.getKey().substring("oozie.launcher.".length()); 216 String value = entry.getValue(); 217 // setting original KEY 218 launcherConf.set(entry.getKey(), value); 219 // setting un-prefixed key (to allow Hadoop job config 220 // for the launcher job 221 launcherConf.set(name, value); 222 } 223 } 224 } 225 226 Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) 227 throws ActionExecutorException { 228 try { 229 Namespace ns = actionXml.getNamespace(); 230 Element e = actionXml.getChild("configuration", ns); 231 if (e != null) { 232 String strConf = XmlUtils.prettyPrint(e).toString(); 233 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); 234 235 XConfiguration launcherConf = new XConfiguration(); 236 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 237 XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_JOB_TRACKER), getType()); 238 injectLauncherProperties(actionDefaultConf, launcherConf); 239 injectLauncherProperties(inlineConf, launcherConf); 240 checkForDisallowedProps(launcherConf, "launcher configuration"); 241 XConfiguration.copy(launcherConf, conf); 242 } 243 return conf; 244 } 245 catch (IOException ex) { 246 throw convertException(ex); 247 } 248 } 249 250 public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf) 251 throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException { 252 Namespace ns = element.getNamespace(); 253 Iterator<Element> it = element.getChildren("job-xml", ns).iterator(); 254 while (it.hasNext()) { 255 Element e = it.next(); 256 String jobXml = e.getTextTrim(); 257 Path path = new Path(appPath, jobXml); 258 FileSystem fs = context.getAppFileSystem(); 259 Configuration jobXmlConf = new XConfiguration(fs.open(path)); 260 checkForDisallowedProps(jobXmlConf, "job-xml"); 261 XConfiguration.copy(jobXmlConf, conf); 262 } 263 Element e = element.getChild("configuration", ns); 264 if (e != null) { 265 String strConf = XmlUtils.prettyPrint(e).toString(); 266 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); 267 checkForDisallowedProps(inlineConf, "inline configuration"); 268 XConfiguration.copy(inlineConf, conf); 269 } 270 } 271 272 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) 273 throws ActionExecutorException { 274 try { 275 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 276 XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_JOB_TRACKER), getType()); 277 XConfiguration.injectDefaults(actionDefaults, actionConf); 278 279 has.checkSupportedFilesystem(appPath.toUri()); 280 281 parseJobXmlAndConfiguration(context, actionXml, appPath, actionConf); 282 return actionConf; 283 } 284 catch (IOException ex) { 285 throw convertException(ex); 286 } 287 catch (HadoopAccessorException ex) { 288 throw convertException(ex); 289 } 290 catch (URISyntaxException ex) { 291 throw convertException(ex); 292 } 293 } 294 295 Configuration addToCache(Configuration conf, Path appPath, String filePath, boolean archive) 296 throws ActionExecutorException { 297 Path path = null; 298 try { 299 if (filePath.startsWith("/")) { 300 path = new Path(filePath); 301 } 302 else { 303 path = new Path(appPath, filePath); 304 } 305 URI uri = new URI(path.toUri().getPath()); 306 if (archive) { 307 DistributedCache.addCacheArchive(uri, conf); 308 } 309 else { 310 String fileName = filePath.substring(filePath.lastIndexOf("/") + 1); 311 if (fileName.endsWith(".so") || fileName.contains(".so.")) { // .so files 312 uri = new Path(path.toString() + "#" + fileName).toUri(); 313 uri = new URI(uri.getPath()); 314 DistributedCache.addCacheFile(uri, conf); 315 } 316 else if (fileName.endsWith(".jar")) { // .jar files 317 if (!fileName.contains("#")) { 318 path = new Path(uri.toString()); 319 320 String user = conf.get("user.name"); 321 Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, path, conf); 322 } 323 else { 324 DistributedCache.addCacheFile(uri, conf); 325 } 326 } 327 else { // regular files 328 if (!fileName.contains("#")) { 329 uri = new Path(path.toString() + "#" + fileName).toUri(); 330 uri = new URI(uri.getPath()); 331 } 332 DistributedCache.addCacheFile(uri, conf); 333 } 334 } 335 DistributedCache.createSymlink(conf); 336 return conf; 337 } 338 catch (Exception ex) { 339 XLog.getLog(getClass()).debug( 340 "Errors when add to DistributedCache. Path=" + path + ", archive=" + archive + ", conf=" 341 + XmlUtils.prettyPrint(conf).toString()); 342 throw convertException(ex); 343 } 344 } 345 346 String getOozieLauncherJar(Context context) throws ActionExecutorException { 347 try { 348 return new Path(context.getActionDir(), getLauncherJarName()).toString(); 349 } 350 catch (Exception ex) { 351 throw convertException(ex); 352 } 353 } 354 355 public void prepareActionDir(FileSystem actionFs, Context context) throws ActionExecutorException { 356 try { 357 Path actionDir = context.getActionDir(); 358 Path tempActionDir = new Path(actionDir.getParent(), actionDir.getName() + ".tmp"); 359 if (!actionFs.exists(actionDir)) { 360 try { 361 actionFs.copyFromLocalFile(new Path(getOozieRuntimeDir(), getLauncherJarName()), new Path( 362 tempActionDir, getLauncherJarName())); 363 actionFs.rename(tempActionDir, actionDir); 364 } 365 catch (IOException ex) { 366 actionFs.delete(tempActionDir, true); 367 actionFs.delete(actionDir, true); 368 throw ex; 369 } 370 } 371 } 372 catch (Exception ex) { 373 throw convertException(ex); 374 } 375 } 376 377 void cleanUpActionDir(FileSystem actionFs, Context context) throws ActionExecutorException { 378 try { 379 Path actionDir = context.getActionDir(); 380 if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false) 381 && actionFs.exists(actionDir)) { 382 actionFs.delete(actionDir, true); 383 } 384 } 385 catch (Exception ex) { 386 throw convertException(ex); 387 } 388 } 389 390 protected void addShareLib(Path appPath, Configuration conf, String actionShareLibName) 391 throws ActionExecutorException { 392 if (actionShareLibName != null) { 393 try { 394 Path systemLibPath = Services.get().get(WorkflowAppService.class).getSystemLibPath(); 395 if (systemLibPath != null) { 396 Path actionLibPath = new Path(systemLibPath, actionShareLibName); 397 String user = conf.get("user.name"); 398 FileSystem fs = 399 Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf); 400 if (fs.exists(actionLibPath)) { 401 FileStatus[] files = fs.listStatus(actionLibPath); 402 for (FileStatus file : files) { 403 addToCache(conf, appPath, file.getPath().toUri().getPath(), false); 404 } 405 } 406 } 407 } 408 catch (HadoopAccessorException ex){ 409 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, 410 ex.getErrorCode().toString(), ex.getMessage()); 411 } 412 catch (IOException ex){ 413 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, 414 "It should never happen", ex.getMessage()); 415 } 416 } 417 } 418 419 protected void addActionLibs(Path appPath, Configuration conf) throws ActionExecutorException { 420 String[] actionLibsStrArr = conf.getStrings("oozie.launcher.oozie.libpath"); 421 if (actionLibsStrArr != null) { 422 try { 423 for (String actionLibsStr : actionLibsStrArr) { 424 actionLibsStr = actionLibsStr.trim(); 425 if (actionLibsStr.length() > 0) 426 { 427 Path actionLibsPath = new Path(actionLibsStr); 428 String user = conf.get("user.name"); 429 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf); 430 if (fs.exists(actionLibsPath)) { 431 FileStatus[] files = fs.listStatus(actionLibsPath); 432 for (FileStatus file : files) { 433 addToCache(conf, appPath, file.getPath().toUri().getPath(), false); 434 } 435 } 436 } 437 } 438 } 439 catch (HadoopAccessorException ex){ 440 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, 441 ex.getErrorCode().toString(), ex.getMessage()); 442 } 443 catch (IOException ex){ 444 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, 445 "It should never happen", ex.getMessage()); 446 } 447 } 448 } 449 450 @SuppressWarnings("unchecked") 451 void setLibFilesArchives(Context context, Element actionXml, Path appPath, Configuration conf) 452 throws ActionExecutorException { 453 Configuration proto = context.getProtoActionConf(); 454 455 // launcher JAR 456 addToCache(conf, appPath, getOozieLauncherJar(context), false); 457 458 // Workflow lib/ 459 String[] paths = proto.getStrings(WorkflowAppService.APP_LIB_PATH_LIST); 460 if (paths != null) { 461 for (String path : paths) { 462 addToCache(conf, appPath, path, false); 463 } 464 } 465 466 // Action libs 467 addActionLibs(appPath, conf); 468 469 // files and archives defined in the action 470 for (Element eProp : (List<Element>) actionXml.getChildren()) { 471 if (eProp.getName().equals("file")) { 472 String path = eProp.getTextTrim(); 473 addToCache(conf, appPath, path, false); 474 } 475 else { 476 if (eProp.getName().equals("archive")) { 477 String path = eProp.getTextTrim(); 478 addToCache(conf, appPath, path, true); 479 } 480 } 481 } 482 483 addAllShareLibs(appPath, conf, context, actionXml); 484 } 485 486 // Adds action specific share libs and common share libs 487 private void addAllShareLibs(Path appPath, Configuration conf, Context context, Element actionXml) 488 throws ActionExecutorException { 489 // Add action specific share libs 490 addActionShareLib(appPath, conf, context, actionXml); 491 // Add common sharelibs for Oozie 492 addShareLib(appPath, conf, JavaActionExecutor.OOZIE_COMMON_LIBDIR); 493 } 494 495 private void addActionShareLib(Path appPath, Configuration conf, Context context, Element actionXml) throws ActionExecutorException { 496 XConfiguration wfJobConf = null; 497 try { 498 wfJobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); 499 } 500 catch (IOException ioe) { 501 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen", 502 ioe.getMessage()); 503 } 504 // Action sharelibs are only added if user has specified to use system libpath 505 if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) { 506 // add action specific sharelibs 507 addShareLib(appPath, conf, getShareLibName(context, actionXml, conf)); 508 } 509 } 510 511 512 protected String getLauncherMain(Configuration launcherConf, Element actionXml) { 513 Namespace ns = actionXml.getNamespace(); 514 Element e = actionXml.getChild("main-class", ns); 515 return e.getTextTrim(); 516 } 517 518 private static final String QUEUE_NAME = "mapred.job.queue.name"; 519 520 private static final Set<String> SPECIAL_PROPERTIES = new HashSet<String>(); 521 522 static { 523 SPECIAL_PROPERTIES.add(QUEUE_NAME); 524 SPECIAL_PROPERTIES.add(ACL_VIEW_JOB); 525 SPECIAL_PROPERTIES.add(ACL_MODIFY_JOB); 526 } 527 528 @SuppressWarnings("unchecked") 529 JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf) 530 throws ActionExecutorException { 531 try { 532 533 // app path could be a file 534 Path appPathRoot = new Path(context.getWorkflow().getAppPath()); 535 if (actionFs.isFile(appPathRoot)) { 536 appPathRoot = appPathRoot.getParent(); 537 } 538 539 // launcher job configuration 540 JobConf launcherJobConf = createBaseHadoopConf(context, actionXml); 541 setupLauncherConf(launcherJobConf, actionXml, appPathRoot, context); 542 543 String actionShareLibProperty = actionConf.get(ACTION_SHARELIB_FOR + getType()); 544 if (actionShareLibProperty != null) { 545 launcherJobConf.set(ACTION_SHARELIB_FOR + getType(), actionShareLibProperty); 546 } 547 setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf); 548 String jobName = XLog.format("oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(), context.getWorkflow() 549 .getAppName(), action.getName(), context.getWorkflow().getId()); 550 launcherJobConf.setJobName(jobName); 551 552 String jobId = context.getWorkflow().getId(); 553 String actionId = action.getId(); 554 Path actionDir = context.getActionDir(); 555 String recoveryId = context.getRecoveryId(); 556 557 // Getting the prepare XML from the action XML 558 Namespace ns = actionXml.getNamespace(); 559 Element prepareElement = actionXml.getChild("prepare", ns); 560 String prepareXML = ""; 561 if (prepareElement != null) { 562 if (prepareElement.getChildren().size() > 0) { 563 prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim(); 564 } 565 } 566 LauncherMapper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf, 567 prepareXML); 568 569 LauncherMapper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml)); 570 LauncherMapper.setupSupportedFileSystems( 571 launcherJobConf, Services.get().getConf().get(HadoopAccessorService.SUPPORTED_FILESYSTEMS)); 572 LauncherMapper.setupMaxOutputData(launcherJobConf, maxActionOutputLen); 573 LauncherMapper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize); 574 575 List<Element> list = actionXml.getChildren("arg", ns); 576 String[] args = new String[list.size()]; 577 for (int i = 0; i < list.size(); i++) { 578 args[i] = list.get(i).getTextTrim(); 579 } 580 LauncherMapper.setupMainArguments(launcherJobConf, args); 581 582 List<Element> javaopts = actionXml.getChildren("java-opt", ns); 583 for (Element opt: javaopts) { 584 String opts = launcherJobConf.get("mapred.child.java.opts", ""); 585 opts = opts + " " + opt.getTextTrim(); 586 opts = opts.trim(); 587 launcherJobConf.set("mapred.child.java.opts", opts); 588 } 589 590 Element opt = actionXml.getChild("java-opts", ns); 591 if (opt != null) { 592 String opts = launcherJobConf.get("mapred.child.java.opts", ""); 593 opts = opts + " " + opt.getTextTrim(); 594 opts = opts.trim(); 595 launcherJobConf.set("mapred.child.java.opts", opts); 596 } 597 598 // properties from action that are needed by the launcher (e.g. QUEUE NAME, ACLs) 599 // maybe we should add queue to the WF schema, below job-tracker 600 actionConfToLauncherConf(actionConf, launcherJobConf); 601 602 // to disable cancelation of delegation token on launcher job end 603 launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false); 604 605 return launcherJobConf; 606 } 607 catch (Exception ex) { 608 throw convertException(ex); 609 } 610 } 611 612 private void injectCallback(Context context, Configuration conf) { 613 String callback = context.getCallbackUrl("$jobStatus"); 614 if (conf.get("job.end.notification.url") != null) { 615 XLog.getLog(getClass()).warn("Overriding the action job end notification URI"); 616 } 617 conf.set("job.end.notification.url", callback); 618 } 619 620 void injectActionCallback(Context context, Configuration actionConf) { 621 injectCallback(context, actionConf); 622 } 623 624 void injectLauncherCallback(Context context, Configuration launcherConf) { 625 injectCallback(context, launcherConf); 626 } 627 628 private void actionConfToLauncherConf(Configuration actionConf, JobConf launcherConf) { 629 for (String name : SPECIAL_PROPERTIES) { 630 if (actionConf.get(name) != null && launcherConf.get("oozie.launcher." + name) == null) { 631 launcherConf.set(name, actionConf.get(name)); 632 } 633 } 634 } 635 636 public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException { 637 JobClient jobClient = null; 638 boolean exception = false; 639 try { 640 Path appPathRoot = new Path(context.getWorkflow().getAppPath()); 641 642 // app path could be a file 643 if (actionFs.isFile(appPathRoot)) { 644 appPathRoot = appPathRoot.getParent(); 645 } 646 647 Element actionXml = XmlUtils.parseXml(action.getConf()); 648 649 // action job configuration 650 Configuration actionConf = createBaseHadoopConf(context, actionXml); 651 setupActionConf(actionConf, context, actionXml, appPathRoot); 652 XLog.getLog(getClass()).debug("Setting LibFilesArchives "); 653 setLibFilesArchives(context, actionXml, appPathRoot, actionConf); 654 String jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}", getType(), context.getWorkflow() 655 .getAppName(), action.getName(), context.getWorkflow().getId()); 656 actionConf.set("mapred.job.name", jobName); 657 injectActionCallback(context, actionConf); 658 659 if(actionConf.get(ACL_MODIFY_JOB) == null || actionConf.get(ACL_MODIFY_JOB).trim().equals("")) { 660 // ONLY in the case where user has not given the 661 // modify-job ACL specifically 662 if (context.getWorkflow().getAcl() != null) { 663 // setting the group owning the Oozie job to allow anybody in that 664 // group to modify the jobs. 665 actionConf.set(ACL_MODIFY_JOB, context.getWorkflow().getAcl()); 666 } 667 } 668 669 // Setting the credential properties in launcher conf 670 HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context, 671 action, actionConf); 672 673 // Adding if action need to set more credential tokens 674 JobConf credentialsConf = new JobConf(false); 675 XConfiguration.copy(actionConf, credentialsConf); 676 setCredentialTokens(credentialsConf, context, action, credentialsProperties); 677 678 // insert conf to action conf from credentialsConf 679 for (Entry<String, String> entry : credentialsConf) { 680 if (actionConf.get(entry.getKey()) == null) { 681 actionConf.set(entry.getKey(), entry.getValue()); 682 } 683 } 684 685 JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf); 686 injectLauncherCallback(context, launcherJobConf); 687 XLog.getLog(getClass()).debug("Creating Job Client for action " + action.getId()); 688 jobClient = createJobClient(context, launcherJobConf); 689 String launcherId = LauncherMapper.getRecoveryId(launcherJobConf, context.getActionDir(), context 690 .getRecoveryId()); 691 boolean alreadyRunning = launcherId != null; 692 RunningJob runningJob; 693 694 // if user-retry is on, always submit new launcher 695 boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry(); 696 697 if (alreadyRunning && !isUserRetry) { 698 runningJob = jobClient.getJob(JobID.forName(launcherId)); 699 if (runningJob == null) { 700 String jobTracker = launcherJobConf.get("mapred.job.tracker"); 701 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017", 702 "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker); 703 } 704 } 705 else { 706 XLog.getLog(getClass()).debug("Submitting the job through Job Client for action " + action.getId()); 707 708 // setting up propagation of the delegation token. 709 Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(HadoopAccessorService 710 .getMRDelegationTokenRenewer(launcherJobConf)); 711 launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt); 712 713 // insert credentials tokens to launcher job conf if needed 714 if (needInjectCredentials()) { 715 for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) { 716 log.debug("ADDING TOKEN: " + tk.getKind().toString()); 717 launcherJobConf.getCredentials().addToken(tk.getKind(), tk); 718 } 719 } 720 else { 721 log.info("No need to inject credentials."); 722 } 723 runningJob = jobClient.submitJob(launcherJobConf); 724 if (runningJob == null) { 725 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017", 726 "Error submitting launcher for action [{0}]", action.getId()); 727 } 728 launcherId = runningJob.getID().toString(); 729 XLog.getLog(getClass()).debug("After submission get the launcherId " + launcherId); 730 } 731 732 String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER); 733 String consoleUrl = runningJob.getTrackingURL(); 734 context.setStartData(launcherId, jobTracker, consoleUrl); 735 } 736 catch (Exception ex) { 737 exception = true; 738 throw convertException(ex); 739 } 740 finally { 741 if (jobClient != null) { 742 try { 743 jobClient.close(); 744 } 745 catch (Exception e) { 746 if (exception) { 747 log.error("JobClient error: ", e); 748 } 749 else { 750 throw convertException(e); 751 } 752 } 753 } 754 } 755 } 756 757 private boolean needInjectCredentials() { 758 boolean methodExists = true; 759 760 Class klass; 761 try { 762 klass = Class.forName("org.apache.hadoop.mapred.JobConf"); 763 klass.getMethod("getCredentials"); 764 } 765 catch (ClassNotFoundException ex) { 766 methodExists = false; 767 } 768 catch (NoSuchMethodException ex) { 769 methodExists = false; 770 } 771 772 return methodExists; 773 } 774 775 protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context, 776 WorkflowAction action, Configuration actionConf) throws Exception { 777 HashMap<String, CredentialsProperties> credPropertiesMap = null; 778 if (context != null && action != null) { 779 credPropertiesMap = getActionCredentialsProperties(context, action); 780 if (credPropertiesMap != null) { 781 for (String key : credPropertiesMap.keySet()) { 782 CredentialsProperties prop = credPropertiesMap.get(key); 783 if (prop != null) { 784 log.debug("Credential Properties set for action : " + action.getId()); 785 for (String property : prop.getProperties().keySet()) { 786 actionConf.set(property, prop.getProperties().get(property)); 787 log.debug("property : '" + property + "', value : '" + prop.getProperties().get(property) + "'"); 788 } 789 } 790 } 791 } 792 else { 793 log.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred()); 794 } 795 } 796 else { 797 log.warn("context or action is null"); 798 } 799 return credPropertiesMap; 800 } 801 802 protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action, 803 HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception { 804 805 if (context != null && action != null && credPropertiesMap != null) { 806 for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) { 807 String credName = entry.getKey(); 808 CredentialsProperties credProps = entry.getValue(); 809 if (credProps != null) { 810 CredentialsProvider credProvider = new CredentialsProvider(credProps.getType()); 811 Credentials credentialObject = credProvider.createCredentialObject(); 812 if (credentialObject != null) { 813 credentialObject.addtoJobConf(jobconf, credProps, context); 814 log.debug("Retrieved Credential '" + credName + "' for action " + action.getId()); 815 } 816 else { 817 log.debug("Credentials object is null for name= " + credName + ", type=" + credProps.getType()); 818 } 819 } 820 } 821 } 822 823 } 824 825 protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context, 826 WorkflowAction action) throws Exception { 827 HashMap<String, CredentialsProperties> props = new HashMap<String, CredentialsProperties>(); 828 if (context != null && action != null) { 829 String credsInAction = action.getCred(); 830 log.debug("Get credential '" + credsInAction + "' properties for action : " + action.getId()); 831 String[] credNames = credsInAction.split(","); 832 for (String credName : credNames) { 833 CredentialsProperties credProps = getCredProperties(context, credName); 834 props.put(credName, credProps); 835 } 836 } 837 else { 838 log.warn("context or action is null"); 839 } 840 return props; 841 } 842 843 @SuppressWarnings("unchecked") 844 protected CredentialsProperties getCredProperties(Context context, String credName) 845 throws Exception { 846 CredentialsProperties credProp = null; 847 String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition(); 848 XConfiguration wfjobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); 849 Element elementJob = XmlUtils.parseXml(workflowXml); 850 Element credentials = elementJob.getChild("credentials", elementJob.getNamespace()); 851 if (credentials != null) { 852 for (Element credential : (List<Element>) credentials.getChildren("credential", credentials.getNamespace())) { 853 String name = credential.getAttributeValue("name"); 854 String type = credential.getAttributeValue("type"); 855 log.debug("getCredProperties: Name: " + name + ", Type: " + type); 856 if (name.equalsIgnoreCase(credName)) { 857 credProp = new CredentialsProperties(name, type); 858 for (Element property : (List<Element>) credential.getChildren("property", 859 credential.getNamespace())) { 860 String propertyName = property.getChildText("name", property.getNamespace()); 861 String propertyValue = property.getChildText("value", property.getNamespace()); 862 ELEvaluator eval = new ELEvaluator(); 863 for (Map.Entry<String, String> entry : wfjobConf) { 864 eval.setVariable(entry.getKey(), entry.getValue().trim()); 865 } 866 propertyName = eval.evaluate(propertyName, String.class); 867 propertyValue = eval.evaluate(propertyValue, String.class); 868 869 credProp.getProperties().put(propertyName, propertyValue); 870 log.debug("getCredProperties: Properties name :'" + propertyName + "', Value : '" 871 + propertyValue + "'"); 872 } 873 } 874 } 875 } else { 876 log.warn("credentials is null for the action"); 877 } 878 return credProp; 879 } 880 881 @Override 882 public void start(Context context, WorkflowAction action) throws ActionExecutorException { 883 try { 884 XLog.getLog(getClass()).debug("Starting action " + action.getId() + " getting Action File System"); 885 FileSystem actionFs = context.getAppFileSystem(); 886 XLog.getLog(getClass()).debug("Preparing action Dir through copying " + context.getActionDir()); 887 prepareActionDir(actionFs, context); 888 XLog.getLog(getClass()).debug("Action Dir is ready. Submitting the action "); 889 submitLauncher(actionFs, context, action); 890 XLog.getLog(getClass()).debug("Action submit completed. Performing check "); 891 check(context, action); 892 XLog.getLog(getClass()).debug("Action check is done after submission"); 893 } 894 catch (Exception ex) { 895 throw convertException(ex); 896 } 897 } 898 899 @Override 900 public void end(Context context, WorkflowAction action) throws ActionExecutorException { 901 try { 902 String externalStatus = action.getExternalStatus(); 903 WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ? WorkflowAction.Status.OK 904 : WorkflowAction.Status.ERROR; 905 context.setEndData(status, getActionSignal(status)); 906 } 907 catch (Exception ex) { 908 throw convertException(ex); 909 } 910 finally { 911 try { 912 FileSystem actionFs = context.getAppFileSystem(); 913 cleanUpActionDir(actionFs, context); 914 } 915 catch (Exception ex) { 916 throw convertException(ex); 917 } 918 } 919 } 920 921 /** 922 * Create job client object 923 * 924 * @param context 925 * @param jobConf 926 * @return 927 * @throws HadoopAccessorException 928 */ 929 protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException { 930 String user = context.getWorkflow().getUser(); 931 String group = context.getWorkflow().getGroup(); 932 return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf); 933 } 934 935 @Override 936 public void check(Context context, WorkflowAction action) throws ActionExecutorException { 937 JobClient jobClient = null; 938 boolean exception = false; 939 try { 940 Element actionXml = XmlUtils.parseXml(action.getConf()); 941 FileSystem actionFs = context.getAppFileSystem(); 942 JobConf jobConf = createBaseHadoopConf(context, actionXml); 943 jobClient = createJobClient(context, jobConf); 944 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId())); 945 if (runningJob == null) { 946 context.setExternalStatus(FAILED); 947 context.setExecutionData(FAILED, null); 948 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017", 949 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", action 950 .getExternalId(), action.getId()); 951 } 952 if (runningJob.isComplete()) { 953 Path actionDir = context.getActionDir(); 954 955 String user = context.getWorkflow().getUser(); 956 String group = context.getWorkflow().getGroup(); 957 if (LauncherMapper.hasIdSwap(runningJob, user, group, actionDir)) { 958 String launcherId = action.getExternalId(); 959 Path idSwapPath = LauncherMapper.getIdSwapPath(context.getActionDir()); 960 InputStream is = actionFs.open(idSwapPath); 961 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 962 Properties props = PropertiesUtils.readProperties(reader, maxActionOutputLen); 963 reader.close(); 964 String newId = props.getProperty("id"); 965 runningJob = jobClient.getJob(JobID.forName(newId)); 966 if (runningJob == null) { 967 context.setExternalStatus(FAILED); 968 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017", 969 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newId, 970 action.getId()); 971 } 972 973 context.setStartData(newId, action.getTrackerUri(), runningJob.getTrackingURL()); 974 XLog.getLog(getClass()).info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId, 975 newId); 976 } 977 if (runningJob.isComplete()) { 978 XLog.getLog(getClass()).info(XLog.STD, "action completed, external ID [{0}]", 979 action.getExternalId()); 980 if (runningJob.isSuccessful() && LauncherMapper.isMainSuccessful(runningJob)) { 981 getActionData(actionFs, runningJob, action, context); 982 XLog.getLog(getClass()).info(XLog.STD, "action produced output"); 983 } 984 else { 985 XLog log = XLog.getLog(getClass()); 986 String errorReason; 987 Path actionError = LauncherMapper.getErrorPath(context.getActionDir()); 988 if (actionFs.exists(actionError)) { 989 InputStream is = actionFs.open(actionError); 990 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 991 Properties props = PropertiesUtils.readProperties(reader, -1); 992 reader.close(); 993 String errorCode = props.getProperty("error.code"); 994 if (errorCode.equals("0")) { 995 errorCode = "JA018"; 996 } 997 if (errorCode.equals("-1")) { 998 errorCode = "JA019"; 999 } 1000 errorReason = props.getProperty("error.reason"); 1001 log.warn("Launcher ERROR, reason: {0}", errorReason); 1002 String exMsg = props.getProperty("exception.message"); 1003 String errorInfo = (exMsg != null) ? exMsg : errorReason; 1004 context.setErrorInfo(errorCode, errorInfo); 1005 String exStackTrace = props.getProperty("exception.stacktrace"); 1006 if (exMsg != null) { 1007 log.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace); 1008 } 1009 } 1010 else { 1011 errorReason = XLog.format("LauncherMapper died, check Hadoop log for job [{0}:{1}]", action 1012 .getTrackerUri(), action.getExternalId()); 1013 log.warn(errorReason); 1014 } 1015 context.setExecutionData(FAILED_KILLED, null); 1016 } 1017 } 1018 else { 1019 context.setExternalStatus(RUNNING); 1020 XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]", 1021 action.getExternalId(), action.getExternalStatus()); 1022 } 1023 } 1024 else { 1025 context.setExternalStatus(RUNNING); 1026 XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]", 1027 action.getExternalId(), action.getExternalStatus()); 1028 } 1029 } 1030 catch (Exception ex) { 1031 XLog.getLog(getClass()).warn("Exception in check(). Message[{0}]", ex.getMessage(), ex); 1032 exception = true; 1033 throw convertException(ex); 1034 } 1035 finally { 1036 if (jobClient != null) { 1037 try { 1038 jobClient.close(); 1039 } 1040 catch (Exception e) { 1041 if (exception) { 1042 log.error("JobClient error: ", e); 1043 } 1044 else { 1045 throw convertException(e); 1046 } 1047 } 1048 } 1049 } 1050 } 1051 1052 /** 1053 * Get the output data of an action. Subclasses should override this method 1054 * to get action specific output data. 1055 * 1056 * @param actionFs the FileSystem object 1057 * @param runningJob the runningJob 1058 * @param action the Workflow action 1059 * @param context executor context 1060 * 1061 */ 1062 protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) 1063 throws HadoopAccessorException, JDOMException, IOException, URISyntaxException { 1064 Properties props = null; 1065 if (getCaptureOutput(action)) { 1066 props = new Properties(); 1067 if (LauncherMapper.hasOutputData(runningJob)) { 1068 Path actionOutput = LauncherMapper.getOutputDataPath(context.getActionDir()); 1069 InputStream is = actionFs.open(actionOutput); 1070 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 1071 props = PropertiesUtils.readProperties(reader, maxActionOutputLen); 1072 reader.close(); 1073 } 1074 } 1075 context.setExecutionData(SUCCEEDED, props); 1076 } 1077 1078 protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException { 1079 Element eConf = XmlUtils.parseXml(action.getConf()); 1080 Namespace ns = eConf.getNamespace(); 1081 Element captureOutput = eConf.getChild("capture-output", ns); 1082 return captureOutput != null; 1083 } 1084 1085 @Override 1086 public void kill(Context context, WorkflowAction action) throws ActionExecutorException { 1087 JobClient jobClient = null; 1088 boolean exception = false; 1089 try { 1090 Element actionXml = XmlUtils.parseXml(action.getConf()); 1091 JobConf jobConf = createBaseHadoopConf(context, actionXml); 1092 jobClient = createJobClient(context, jobConf); 1093 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId())); 1094 if (runningJob != null) { 1095 runningJob.killJob(); 1096 } 1097 context.setExternalStatus(KILLED); 1098 context.setExecutionData(KILLED, null); 1099 } 1100 catch (Exception ex) { 1101 exception = true; 1102 throw convertException(ex); 1103 } 1104 finally { 1105 try { 1106 FileSystem actionFs = context.getAppFileSystem(); 1107 cleanUpActionDir(actionFs, context); 1108 if (jobClient != null) { 1109 jobClient.close(); 1110 } 1111 } 1112 catch (Exception ex) { 1113 if (exception) { 1114 log.error("Error: ", ex); 1115 } 1116 else { 1117 throw convertException(ex); 1118 } 1119 } 1120 } 1121 } 1122 1123 private static Set<String> FINAL_STATUS = new HashSet<String>(); 1124 1125 static { 1126 FINAL_STATUS.add(SUCCEEDED); 1127 FINAL_STATUS.add(KILLED); 1128 FINAL_STATUS.add(FAILED); 1129 FINAL_STATUS.add(FAILED_KILLED); 1130 } 1131 1132 @Override 1133 public boolean isCompleted(String externalStatus) { 1134 return FINAL_STATUS.contains(externalStatus); 1135 } 1136 1137 1138 /** 1139 * Return the sharelib name for the action. 1140 * <p/> 1141 * If <code>NULL</code> or emtpy, it means that the action does not use the action 1142 * sharelib. 1143 * <p/> 1144 * If a non-empty string, i.e. <code>foo</code>, it means the action uses the 1145 * action sharelib subdirectory <code>foo</code> and all JARs in the sharelib 1146 * <code>foo</code> directory will be in the action classpath. 1147 * <p/> 1148 * The resolution is done using the following precedence order: 1149 * <ul> 1150 * <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the action configuration</li> 1151 * <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the job configuration</li> 1152 * <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the oozie configuration</li> 1153 * <li>Action Executor <code>getDefaultShareLibName()</code> method</li> 1154 * </ul> 1155 * 1156 * 1157 * @param context executor context. 1158 * @param actionXml 1159 *@param conf action configuration. @return the action sharelib name. 1160 */ 1161 protected String getShareLibName(Context context, Element actionXml, Configuration conf) { 1162 String name = conf.get(ACTION_SHARELIB_FOR + getType()); 1163 if (name == null) { 1164 try { 1165 XConfiguration jobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); 1166 name = jobConf.get(ACTION_SHARELIB_FOR + getType()); 1167 if (name == null) { 1168 name = Services.get().getConf().get(ACTION_SHARELIB_FOR + getType()); 1169 if (name == null) { 1170 name = getDefaultShareLibName(actionXml); 1171 } 1172 } 1173 } 1174 catch (IOException ex) { 1175 throw new RuntimeException("It cannot happen, " + ex.toString(), ex); 1176 } 1177 } 1178 return name; 1179 } 1180 1181 private final static String ACTION_SHARELIB_FOR = "oozie.action.sharelib.for."; 1182 1183 1184 /** 1185 * Returns the default sharelib name for the action if any. 1186 * 1187 * @param actionXml the action XML fragment. 1188 * @return the sharelib name for the action, <code>NULL</code> if none. 1189 */ 1190 protected String getDefaultShareLibName(Element actionXml) { 1191 return null; 1192 } 1193 }