001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import java.io.BufferedReader; 021 import java.io.ByteArrayOutputStream; 022 import java.io.File; 023 import java.io.FileNotFoundException; 024 import java.io.IOException; 025 import java.io.InputStream; 026 import java.io.InputStreamReader; 027 import java.io.PrintStream; 028 import java.io.StringReader; 029 import java.net.ConnectException; 030 import java.net.URI; 031 import java.net.URISyntaxException; 032 import java.net.UnknownHostException; 033 import java.util.ArrayList; 034 import java.util.HashMap; 035 import java.util.HashSet; 036 import java.util.Iterator; 037 import java.util.List; 038 import java.util.Map; 039 import java.util.Properties; 040 import java.util.Set; 041 import java.util.Map.Entry; 042 043 import org.apache.hadoop.conf.Configuration; 044 import org.apache.hadoop.filecache.DistributedCache; 045 import org.apache.hadoop.fs.FileStatus; 046 import org.apache.hadoop.fs.FileSystem; 047 import org.apache.hadoop.fs.Path; 048 import org.apache.hadoop.fs.permission.AccessControlException; 049 import org.apache.hadoop.io.Text; 050 import org.apache.hadoop.mapred.JobClient; 051 import org.apache.hadoop.mapred.JobConf; 052 import org.apache.hadoop.mapred.JobID; 053 import org.apache.hadoop.mapred.RunningJob; 054 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 055 import org.apache.hadoop.util.DiskChecker; 056 import org.apache.oozie.WorkflowActionBean; 057 import org.apache.oozie.WorkflowJobBean; 058 import org.apache.oozie.action.ActionExecutor; 059 import org.apache.oozie.action.ActionExecutorException; 060 import org.apache.oozie.client.OozieClient; 061 import org.apache.oozie.client.WorkflowAction; 062 import org.apache.oozie.service.HadoopAccessorException; 063 import org.apache.oozie.service.HadoopAccessorService; 064 import org.apache.oozie.service.Services; 065 import org.apache.oozie.service.WorkflowAppService; 066 import org.apache.oozie.servlet.CallbackServlet; 067 import org.apache.oozie.util.ELEvaluator; 068 import org.apache.oozie.util.IOUtils; 069 import org.apache.oozie.util.PropertiesUtils; 070 import org.apache.oozie.util.XConfiguration; 071 import org.apache.oozie.util.XLog; 072 import org.apache.oozie.util.XmlUtils; 073 import org.jdom.Element; 074 import org.jdom.JDOMException; 075 import org.jdom.Namespace; 076 import org.apache.hadoop.security.token.Token; 077 import org.apache.hadoop.security.token.TokenIdentifier; 078 079 public class JavaActionExecutor extends ActionExecutor { 080 081 private static final String HADOOP_USER = "user.name"; 082 private static final String HADOOP_JOB_TRACKER = "mapred.job.tracker"; 083 private static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address"; 084 private static final String HADOOP_YARN_RM = "yarn.resourcemanager.address"; 085 private static final String HADOOP_NAME_NODE = "fs.default.name"; 086 public static final String OOZIE_COMMON_LIBDIR = "oozie"; 087 public static final int MAX_EXTERNAL_STATS_SIZE_DEFAULT = Integer.MAX_VALUE; 088 private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>(); 089 public final static String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size"; 090 public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job"; 091 public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job"; 092 private static int maxActionOutputLen; 093 private static int maxExternalStatsSize; 094 095 private static final String SUCCEEDED = "SUCCEEDED"; 096 private static final String KILLED = "KILLED"; 097 private static final String FAILED = "FAILED"; 098 private static final String FAILED_KILLED = "FAILED/KILLED"; 099 private static final String RUNNING = "RUNNING"; 100 protected XLog log = XLog.getLog(getClass()); 101 102 static { 103 DISALLOWED_PROPERTIES.add(HADOOP_USER); 104 DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER); 105 DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE); 106 DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER_2); 107 DISALLOWED_PROPERTIES.add(HADOOP_YARN_RM); 108 } 109 110 public JavaActionExecutor() { 111 this("java"); 112 requiresNNJT = true; 113 } 114 115 protected JavaActionExecutor(String type) { 116 super(type); 117 requiresNNJT = true; 118 } 119 120 protected String getLauncherJarName() { 121 return getType() + "-launcher.jar"; 122 } 123 124 protected List<Class> getLauncherClasses() { 125 List<Class> classes = new ArrayList<Class>(); 126 classes.add(LauncherMapper.class); 127 classes.add(LauncherSecurityManager.class); 128 classes.add(LauncherException.class); 129 classes.add(LauncherMainException.class); 130 classes.add(FileSystemActions.class); 131 classes.add(PrepareActionsDriver.class); 132 classes.add(ActionStats.class); 133 classes.add(ActionType.class); 134 return classes; 135 } 136 137 @Override 138 public void initActionType() { 139 XLog log = XLog.getLog(getClass()); 140 super.initActionType(); 141 maxActionOutputLen = getOozieConf() 142 .getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 143 // TODO: Remove the below config get in a subsequent release.. 144 // This other irrelevant property is only used to 145 // preserve backwards compatibility cause of a typo. 146 // See OOZIE-4. 147 getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN, 148 2 * 1024)); 149 //Get the limit for the maximum allowed size of action stats 150 maxExternalStatsSize = getOozieConf().getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE, MAX_EXTERNAL_STATS_SIZE_DEFAULT); 151 maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize; 152 try { 153 List<Class> classes = getLauncherClasses(); 154 Class[] launcherClasses = classes.toArray(new Class[classes.size()]); 155 IOUtils.createJar(new File(getOozieRuntimeDir()), getLauncherJarName(), launcherClasses); 156 157 registerError(UnknownHostException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA001"); 158 registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.NON_TRANSIENT, 159 "JA002"); 160 registerError(DiskChecker.DiskOutOfSpaceException.class.getName(), 161 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA003"); 162 registerError(org.apache.hadoop.hdfs.protocol.QuotaExceededException.class.getName(), 163 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004"); 164 registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(), 165 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005"); 166 registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, " JA006"); 167 registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007"); 168 registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008"); 169 registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009"); 170 } 171 catch (IOException ex) { 172 throw new RuntimeException(ex); 173 } 174 catch (java.lang.NoClassDefFoundError err) { 175 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 176 err.printStackTrace(new PrintStream(baos)); 177 log.warn(baos.toString()); 178 } 179 } 180 181 /** 182 * Get the maximum allowed size of stats 183 * 184 * @return maximum size of stats 185 */ 186 public static int getMaxExternalStatsSize() { 187 return maxExternalStatsSize; 188 } 189 190 static void checkForDisallowedProps(Configuration conf, String confName) throws ActionExecutorException { 191 for (String prop : DISALLOWED_PROPERTIES) { 192 if (conf.get(prop) != null) { 193 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA010", 194 "Property [{0}] not allowed in action [{1}] configuration", prop, confName); 195 } 196 } 197 } 198 199 public JobConf createBaseHadoopConf(Context context, Element actionXml) { 200 Namespace ns = actionXml.getNamespace(); 201 String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim(); 202 String nameNode = actionXml.getChild("name-node", ns).getTextTrim(); 203 JobConf conf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); 204 conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER)); 205 conf.set(HADOOP_JOB_TRACKER, jobTracker); 206 conf.set(HADOOP_JOB_TRACKER_2, jobTracker); 207 conf.set(HADOOP_YARN_RM, jobTracker); 208 conf.set(HADOOP_NAME_NODE, nameNode); 209 conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true"); 210 return conf; 211 } 212 213 private void injectLauncherProperties(Configuration srcConf, Configuration launcherConf) { 214 for (Map.Entry<String, String> entry : srcConf) { 215 if (entry.getKey().startsWith("oozie.launcher.")) { 216 String name = entry.getKey().substring("oozie.launcher.".length()); 217 String value = entry.getValue(); 218 // setting original KEY 219 launcherConf.set(entry.getKey(), value); 220 // setting un-prefixed key (to allow Hadoop job config 221 // for the launcher job 222 launcherConf.set(name, value); 223 } 224 } 225 } 226 227 Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) 228 throws ActionExecutorException { 229 try { 230 Namespace ns = actionXml.getNamespace(); 231 Element e = actionXml.getChild("configuration", ns); 232 if (e != null) { 233 String strConf = XmlUtils.prettyPrint(e).toString(); 234 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); 235 236 XConfiguration launcherConf = new XConfiguration(); 237 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 238 XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_JOB_TRACKER), getType()); 239 injectLauncherProperties(actionDefaultConf, launcherConf); 240 injectLauncherProperties(inlineConf, launcherConf); 241 checkForDisallowedProps(launcherConf, "launcher configuration"); 242 XConfiguration.copy(launcherConf, conf); 243 } 244 return conf; 245 } 246 catch (IOException ex) { 247 throw convertException(ex); 248 } 249 } 250 251 public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf) 252 throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException { 253 Namespace ns = element.getNamespace(); 254 Iterator<Element> it = element.getChildren("job-xml", ns).iterator(); 255 while (it.hasNext()) { 256 Element e = it.next(); 257 String jobXml = e.getTextTrim(); 258 Path path = new Path(appPath, jobXml); 259 FileSystem fs = context.getAppFileSystem(); 260 Configuration jobXmlConf = new XConfiguration(fs.open(path)); 261 checkForDisallowedProps(jobXmlConf, "job-xml"); 262 XConfiguration.copy(jobXmlConf, conf); 263 } 264 Element e = element.getChild("configuration", ns); 265 if (e != null) { 266 String strConf = XmlUtils.prettyPrint(e).toString(); 267 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); 268 checkForDisallowedProps(inlineConf, "inline configuration"); 269 XConfiguration.copy(inlineConf, conf); 270 } 271 } 272 273 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) 274 throws ActionExecutorException { 275 try { 276 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 277 XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_JOB_TRACKER), getType()); 278 XConfiguration.injectDefaults(actionDefaults, actionConf); 279 280 has.checkSupportedFilesystem(appPath.toUri()); 281 282 parseJobXmlAndConfiguration(context, actionXml, appPath, actionConf); 283 return actionConf; 284 } 285 catch (IOException ex) { 286 throw convertException(ex); 287 } 288 catch (HadoopAccessorException ex) { 289 throw convertException(ex); 290 } 291 catch (URISyntaxException ex) { 292 throw convertException(ex); 293 } 294 } 295 296 Configuration addToCache(Configuration conf, Path appPath, String filePath, boolean archive) 297 throws ActionExecutorException { 298 Path path = null; 299 try { 300 if (filePath.startsWith("/")) { 301 path = new Path(filePath); 302 } 303 else { 304 path = new Path(appPath, filePath); 305 } 306 URI uri = new URI(path.toUri().getPath()); 307 if (archive) { 308 DistributedCache.addCacheArchive(uri, conf); 309 } 310 else { 311 String fileName = filePath.substring(filePath.lastIndexOf("/") + 1); 312 if (fileName.endsWith(".so") || fileName.contains(".so.")) { // .so files 313 uri = new Path(path.toString() + "#" + fileName).toUri(); 314 uri = new URI(uri.getPath()); 315 DistributedCache.addCacheFile(uri, conf); 316 } 317 else if (fileName.endsWith(".jar")) { // .jar files 318 if (!fileName.contains("#")) { 319 path = new Path(uri.toString()); 320 321 String user = conf.get("user.name"); 322 Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, path, conf); 323 } 324 else { 325 DistributedCache.addCacheFile(uri, conf); 326 } 327 } 328 else { // regular files 329 if (!fileName.contains("#")) { 330 uri = new Path(path.toString() + "#" + fileName).toUri(); 331 uri = new URI(uri.getPath()); 332 } 333 DistributedCache.addCacheFile(uri, conf); 334 } 335 } 336 DistributedCache.createSymlink(conf); 337 return conf; 338 } 339 catch (Exception ex) { 340 XLog.getLog(getClass()).debug( 341 "Errors when add to DistributedCache. Path=" + path + ", archive=" + archive + ", conf=" 342 + XmlUtils.prettyPrint(conf).toString()); 343 throw convertException(ex); 344 } 345 } 346 347 String getOozieLauncherJar(Context context) throws ActionExecutorException { 348 try { 349 return new Path(context.getActionDir(), getLauncherJarName()).toString(); 350 } 351 catch (Exception ex) { 352 throw convertException(ex); 353 } 354 } 355 356 public void prepareActionDir(FileSystem actionFs, Context context) throws ActionExecutorException { 357 try { 358 Path actionDir = context.getActionDir(); 359 Path tempActionDir = new Path(actionDir.getParent(), actionDir.getName() + ".tmp"); 360 if (!actionFs.exists(actionDir)) { 361 try { 362 actionFs.copyFromLocalFile(new Path(getOozieRuntimeDir(), getLauncherJarName()), new Path( 363 tempActionDir, getLauncherJarName())); 364 actionFs.rename(tempActionDir, actionDir); 365 } 366 catch (IOException ex) { 367 actionFs.delete(tempActionDir, true); 368 actionFs.delete(actionDir, true); 369 throw ex; 370 } 371 } 372 } 373 catch (Exception ex) { 374 throw convertException(ex); 375 } 376 } 377 378 void cleanUpActionDir(FileSystem actionFs, Context context) throws ActionExecutorException { 379 try { 380 Path actionDir = context.getActionDir(); 381 if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false) 382 && actionFs.exists(actionDir)) { 383 actionFs.delete(actionDir, true); 384 } 385 } 386 catch (Exception ex) { 387 throw convertException(ex); 388 } 389 } 390 391 protected void addShareLib(Path appPath, Configuration conf, String actionShareLibName) 392 throws ActionExecutorException { 393 if (actionShareLibName != null) { 394 try { 395 Path systemLibPath = Services.get().get(WorkflowAppService.class).getSystemLibPath(); 396 if (systemLibPath != null) { 397 Path actionLibPath = new Path(systemLibPath, actionShareLibName); 398 String user = conf.get("user.name"); 399 FileSystem fs = 400 Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf); 401 if (fs.exists(actionLibPath)) { 402 FileStatus[] files = fs.listStatus(actionLibPath); 403 for (FileStatus file : files) { 404 addToCache(conf, appPath, file.getPath().toUri().getPath(), false); 405 } 406 } 407 } 408 } 409 catch (HadoopAccessorException ex){ 410 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, 411 ex.getErrorCode().toString(), ex.getMessage()); 412 } 413 catch (IOException ex){ 414 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, 415 "It should never happen", ex.getMessage()); 416 } 417 } 418 } 419 420 protected void addActionLibs(Path appPath, Configuration conf) throws ActionExecutorException { 421 String[] actionLibsStrArr = conf.getStrings("oozie.launcher.oozie.libpath"); 422 if (actionLibsStrArr != null) { 423 try { 424 for (String actionLibsStr : actionLibsStrArr) { 425 actionLibsStr = actionLibsStr.trim(); 426 if (actionLibsStr.length() > 0) 427 { 428 Path actionLibsPath = new Path(actionLibsStr); 429 String user = conf.get("user.name"); 430 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf); 431 if (fs.exists(actionLibsPath)) { 432 FileStatus[] files = fs.listStatus(actionLibsPath); 433 for (FileStatus file : files) { 434 addToCache(conf, appPath, file.getPath().toUri().getPath(), false); 435 } 436 } 437 } 438 } 439 } 440 catch (HadoopAccessorException ex){ 441 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, 442 ex.getErrorCode().toString(), ex.getMessage()); 443 } 444 catch (IOException ex){ 445 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, 446 "It should never happen", ex.getMessage()); 447 } 448 } 449 } 450 451 @SuppressWarnings("unchecked") 452 void setLibFilesArchives(Context context, Element actionXml, Path appPath, Configuration conf) 453 throws ActionExecutorException { 454 Configuration proto = context.getProtoActionConf(); 455 456 // launcher JAR 457 addToCache(conf, appPath, getOozieLauncherJar(context), false); 458 459 // Workflow lib/ 460 String[] paths = proto.getStrings(WorkflowAppService.APP_LIB_PATH_LIST); 461 if (paths != null) { 462 for (String path : paths) { 463 addToCache(conf, appPath, path, false); 464 } 465 } 466 467 // Action libs 468 addActionLibs(appPath, conf); 469 470 // files and archives defined in the action 471 for (Element eProp : (List<Element>) actionXml.getChildren()) { 472 if (eProp.getName().equals("file")) { 473 String path = eProp.getTextTrim(); 474 addToCache(conf, appPath, path, false); 475 } 476 else { 477 if (eProp.getName().equals("archive")) { 478 String path = eProp.getTextTrim(); 479 addToCache(conf, appPath, path, true); 480 } 481 } 482 } 483 484 addAllShareLibs(appPath, conf, context, actionXml); 485 } 486 487 // Adds action specific share libs and common share libs 488 private void addAllShareLibs(Path appPath, Configuration conf, Context context, Element actionXml) 489 throws ActionExecutorException { 490 // Add action specific share libs 491 addActionShareLib(appPath, conf, context, actionXml); 492 // Add common sharelibs for Oozie 493 addShareLib(appPath, conf, JavaActionExecutor.OOZIE_COMMON_LIBDIR); 494 } 495 496 private void addActionShareLib(Path appPath, Configuration conf, Context context, Element actionXml) throws ActionExecutorException { 497 XConfiguration wfJobConf = null; 498 try { 499 wfJobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); 500 } 501 catch (IOException ioe) { 502 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen", 503 ioe.getMessage()); 504 } 505 // Action sharelibs are only added if user has specified to use system libpath 506 if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) { 507 // add action specific sharelibs 508 addShareLib(appPath, conf, getShareLibName(context, actionXml, conf)); 509 } 510 } 511 512 513 protected String getLauncherMain(Configuration launcherConf, Element actionXml) { 514 Namespace ns = actionXml.getNamespace(); 515 Element e = actionXml.getChild("main-class", ns); 516 return e.getTextTrim(); 517 } 518 519 private static final String QUEUE_NAME = "mapred.job.queue.name"; 520 521 private static final Set<String> SPECIAL_PROPERTIES = new HashSet<String>(); 522 523 static { 524 SPECIAL_PROPERTIES.add(QUEUE_NAME); 525 SPECIAL_PROPERTIES.add(ACL_VIEW_JOB); 526 SPECIAL_PROPERTIES.add(ACL_MODIFY_JOB); 527 } 528 529 @SuppressWarnings("unchecked") 530 JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf) 531 throws ActionExecutorException { 532 try { 533 534 // app path could be a file 535 Path appPathRoot = new Path(context.getWorkflow().getAppPath()); 536 if (actionFs.isFile(appPathRoot)) { 537 appPathRoot = appPathRoot.getParent(); 538 } 539 540 // launcher job configuration 541 JobConf launcherJobConf = createBaseHadoopConf(context, actionXml); 542 setupLauncherConf(launcherJobConf, actionXml, appPathRoot, context); 543 544 String actionShareLibProperty = actionConf.get(ACTION_SHARELIB_FOR + getType()); 545 if (actionShareLibProperty != null) { 546 launcherJobConf.set(ACTION_SHARELIB_FOR + getType(), actionShareLibProperty); 547 } 548 setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf); 549 String jobName = XLog.format("oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(), context.getWorkflow() 550 .getAppName(), action.getName(), context.getWorkflow().getId()); 551 launcherJobConf.setJobName(jobName); 552 553 String jobId = context.getWorkflow().getId(); 554 String actionId = action.getId(); 555 Path actionDir = context.getActionDir(); 556 String recoveryId = context.getRecoveryId(); 557 558 // Getting the prepare XML from the action XML 559 Namespace ns = actionXml.getNamespace(); 560 Element prepareElement = actionXml.getChild("prepare", ns); 561 String prepareXML = ""; 562 if (prepareElement != null) { 563 if (prepareElement.getChildren().size() > 0) { 564 prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim(); 565 } 566 } 567 LauncherMapper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf, 568 prepareXML); 569 570 LauncherMapper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml)); 571 LauncherMapper.setupSupportedFileSystems( 572 launcherJobConf, Services.get().getConf().get(HadoopAccessorService.SUPPORTED_FILESYSTEMS)); 573 LauncherMapper.setupMaxOutputData(launcherJobConf, maxActionOutputLen); 574 LauncherMapper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize); 575 576 List<Element> list = actionXml.getChildren("arg", ns); 577 String[] args = new String[list.size()]; 578 for (int i = 0; i < list.size(); i++) { 579 args[i] = list.get(i).getTextTrim(); 580 } 581 LauncherMapper.setupMainArguments(launcherJobConf, args); 582 583 List<Element> javaopts = actionXml.getChildren("java-opt", ns); 584 for (Element opt: javaopts) { 585 String opts = launcherJobConf.get("mapred.child.java.opts", ""); 586 opts = opts + " " + opt.getTextTrim(); 587 opts = opts.trim(); 588 launcherJobConf.set("mapred.child.java.opts", opts); 589 } 590 591 Element opt = actionXml.getChild("java-opts", ns); 592 if (opt != null) { 593 String opts = launcherJobConf.get("mapred.child.java.opts", ""); 594 opts = opts + " " + opt.getTextTrim(); 595 opts = opts.trim(); 596 launcherJobConf.set("mapred.child.java.opts", opts); 597 } 598 599 // properties from action that are needed by the launcher (e.g. QUEUE NAME, ACLs) 600 // maybe we should add queue to the WF schema, below job-tracker 601 actionConfToLauncherConf(actionConf, launcherJobConf); 602 603 // to disable cancelation of delegation token on launcher job end 604 launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false); 605 606 return launcherJobConf; 607 } 608 catch (Exception ex) { 609 throw convertException(ex); 610 } 611 } 612 613 private void injectCallback(Context context, Configuration conf) { 614 String callback = context.getCallbackUrl("$jobStatus"); 615 if (conf.get("job.end.notification.url") != null) { 616 XLog.getLog(getClass()).warn("Overriding the action job end notification URI"); 617 } 618 conf.set("job.end.notification.url", callback); 619 } 620 621 void injectActionCallback(Context context, Configuration actionConf) { 622 injectCallback(context, actionConf); 623 } 624 625 void injectLauncherCallback(Context context, Configuration launcherConf) { 626 injectCallback(context, launcherConf); 627 } 628 629 private void actionConfToLauncherConf(Configuration actionConf, JobConf launcherConf) { 630 for (String name : SPECIAL_PROPERTIES) { 631 if (actionConf.get(name) != null && launcherConf.get("oozie.launcher." + name) == null) { 632 launcherConf.set(name, actionConf.get(name)); 633 } 634 } 635 } 636 637 public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException { 638 JobClient jobClient = null; 639 boolean exception = false; 640 try { 641 Path appPathRoot = new Path(context.getWorkflow().getAppPath()); 642 643 // app path could be a file 644 if (actionFs.isFile(appPathRoot)) { 645 appPathRoot = appPathRoot.getParent(); 646 } 647 648 Element actionXml = XmlUtils.parseXml(action.getConf()); 649 650 // action job configuration 651 Configuration actionConf = createBaseHadoopConf(context, actionXml); 652 setupActionConf(actionConf, context, actionXml, appPathRoot); 653 XLog.getLog(getClass()).debug("Setting LibFilesArchives "); 654 setLibFilesArchives(context, actionXml, appPathRoot, actionConf); 655 String jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}", getType(), context.getWorkflow() 656 .getAppName(), action.getName(), context.getWorkflow().getId()); 657 actionConf.set("mapred.job.name", jobName); 658 injectActionCallback(context, actionConf); 659 660 if(actionConf.get(ACL_MODIFY_JOB) == null || actionConf.get(ACL_MODIFY_JOB).trim().equals("")) { 661 // ONLY in the case where user has not given the 662 // modify-job ACL specifically 663 if (context.getWorkflow().getAcl() != null) { 664 // setting the group owning the Oozie job to allow anybody in that 665 // group to modify the jobs. 666 actionConf.set(ACL_MODIFY_JOB, context.getWorkflow().getAcl()); 667 } 668 } 669 670 // Setting the credential properties in launcher conf 671 HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context, 672 action, actionConf); 673 674 // Adding if action need to set more credential tokens 675 JobConf credentialsConf = new JobConf(false); 676 XConfiguration.copy(actionConf, credentialsConf); 677 setCredentialTokens(credentialsConf, context, action, credentialsProperties); 678 679 // insert conf to action conf from credentialsConf 680 for (Entry<String, String> entry : credentialsConf) { 681 if (actionConf.get(entry.getKey()) == null) { 682 actionConf.set(entry.getKey(), entry.getValue()); 683 } 684 } 685 686 JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf); 687 injectLauncherCallback(context, launcherJobConf); 688 XLog.getLog(getClass()).debug("Creating Job Client for action " + action.getId()); 689 jobClient = createJobClient(context, launcherJobConf); 690 String launcherId = LauncherMapper.getRecoveryId(launcherJobConf, context.getActionDir(), context 691 .getRecoveryId()); 692 boolean alreadyRunning = launcherId != null; 693 RunningJob runningJob; 694 695 // if user-retry is on, always submit new launcher 696 boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry(); 697 698 if (alreadyRunning && !isUserRetry) { 699 runningJob = jobClient.getJob(JobID.forName(launcherId)); 700 if (runningJob == null) { 701 String jobTracker = launcherJobConf.get("mapred.job.tracker"); 702 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017", 703 "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker); 704 } 705 } 706 else { 707 XLog.getLog(getClass()).debug("Submitting the job through Job Client for action " + action.getId()); 708 709 // setting up propagation of the delegation token. 710 Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(new Text("mr token")); 711 launcherJobConf.getCredentials().addToken(new Text("mr token"), mrdt); 712 713 // insert credentials tokens to launcher job conf if needed 714 if (needInjectCredentials()) { 715 for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) { 716 log.debug("ADDING TOKEN: " + tk.getKind().toString()); 717 launcherJobConf.getCredentials().addToken(tk.getKind(), tk); 718 } 719 } 720 else { 721 log.info("No need to inject credentials."); 722 } 723 runningJob = jobClient.submitJob(launcherJobConf); 724 if (runningJob == null) { 725 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017", 726 "Error submitting launcher for action [{0}]", action.getId()); 727 } 728 launcherId = runningJob.getID().toString(); 729 XLog.getLog(getClass()).debug("After submission get the launcherId " + launcherId); 730 } 731 732 String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER); 733 String consoleUrl = runningJob.getTrackingURL(); 734 context.setStartData(launcherId, jobTracker, consoleUrl); 735 } 736 catch (Exception ex) { 737 exception = true; 738 throw convertException(ex); 739 } 740 finally { 741 if (jobClient != null) { 742 try { 743 jobClient.close(); 744 } 745 catch (Exception e) { 746 if (exception) { 747 log.error("JobClient error: ", e); 748 } 749 else { 750 throw convertException(e); 751 } 752 } 753 } 754 } 755 } 756 757 private boolean needInjectCredentials() { 758 boolean methodExists = true; 759 760 Class klass; 761 try { 762 klass = Class.forName("org.apache.hadoop.mapred.JobConf"); 763 klass.getMethod("getCredentials"); 764 } 765 catch (ClassNotFoundException ex) { 766 methodExists = false; 767 } 768 catch (NoSuchMethodException ex) { 769 methodExists = false; 770 } 771 772 return methodExists; 773 } 774 775 protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context, 776 WorkflowAction action, Configuration actionConf) throws Exception { 777 HashMap<String, CredentialsProperties> credPropertiesMap = null; 778 if (context != null && action != null) { 779 credPropertiesMap = getActionCredentialsProperties(context, action, actionConf); 780 if (credPropertiesMap != null) { 781 for (String key : credPropertiesMap.keySet()) { 782 CredentialsProperties prop = credPropertiesMap.get(key); 783 if (prop != null) { 784 log.debug("Credential Properties set for action : " + action.getId()); 785 for (String property : prop.getProperties().keySet()) { 786 actionConf.set(property, prop.getProperties().get(property)); 787 log.debug("property : '" + property + "', value : '" + prop.getProperties().get(property) + "'"); 788 } 789 } 790 } 791 } 792 else { 793 log.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred()); 794 } 795 } 796 else { 797 log.warn("context or action is null"); 798 } 799 return credPropertiesMap; 800 } 801 802 protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action, 803 HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception { 804 805 if (context != null && action != null && credPropertiesMap != null) { 806 for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) { 807 String credName = entry.getKey(); 808 CredentialsProperties credProps = entry.getValue(); 809 if (credProps != null) { 810 CredentialsProvider credProvider = new CredentialsProvider(credProps.getType()); 811 Credentials credentialObject = credProvider.createCredentialObject(); 812 if (credentialObject != null) { 813 credentialObject.addtoJobConf(jobconf, credProps, context); 814 log.debug("Retrieved Credential '" + credName + "' for action " + action.getId()); 815 } 816 else { 817 log.debug("Credentials object is null for name= " + credName + ", type=" + credProps.getType()); 818 } 819 } 820 } 821 } 822 823 } 824 825 protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context, 826 WorkflowAction action, Configuration conf) throws Exception { 827 HashMap<String, CredentialsProperties> props = new HashMap<String, CredentialsProperties>(); 828 if (context != null && action != null) { 829 String credsInAction = action.getCred(); 830 log.debug("Get credential '" + credsInAction + "' properties for action : " + action.getId()); 831 String[] credNames = credsInAction.split(","); 832 for (String credName : credNames) { 833 CredentialsProperties credProps = getCredProperties(context, credName, conf); 834 props.put(credName, credProps); 835 } 836 } 837 else { 838 log.warn("context or action is null"); 839 } 840 return props; 841 } 842 843 @SuppressWarnings("unchecked") 844 protected CredentialsProperties getCredProperties(Context context, String credName, Configuration conf) 845 throws Exception { 846 CredentialsProperties credProp = null; 847 String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition(); 848 Element elementJob = XmlUtils.parseXml(workflowXml); 849 Element credentials = elementJob.getChild("credentials", elementJob.getNamespace()); 850 if (credentials != null) { 851 for (Element credential : (List<Element>) credentials.getChildren("credential", credentials.getNamespace())) { 852 String name = credential.getAttributeValue("name"); 853 String type = credential.getAttributeValue("type"); 854 log.debug("getCredProperties: Name: " + name + ", Type: " + type); 855 if (name.equalsIgnoreCase(credName)) { 856 credProp = new CredentialsProperties(name, type); 857 for (Element property : (List<Element>) credential.getChildren("property", 858 credential.getNamespace())) { 859 String propertyName = property.getChildText("name", property.getNamespace()); 860 String propertyValue = property.getChildText("value", property.getNamespace()); 861 ELEvaluator eval = new ELEvaluator(); 862 for (Map.Entry<String, String> entry : conf) { 863 eval.setVariable(entry.getKey(), entry.getValue().trim()); 864 } 865 propertyName = eval.evaluate(propertyName, String.class); 866 propertyValue = eval.evaluate(propertyValue, String.class); 867 868 credProp.getProperties().put(propertyName, propertyValue); 869 log.debug("getCredProperties: Properties name :'" + propertyName + "', Value : '" 870 + propertyValue + "'"); 871 } 872 } 873 } 874 } else { 875 log.warn("credentials is null for the action"); 876 } 877 return credProp; 878 } 879 880 @Override 881 public void start(Context context, WorkflowAction action) throws ActionExecutorException { 882 try { 883 XLog.getLog(getClass()).debug("Starting action " + action.getId() + " getting Action File System"); 884 FileSystem actionFs = context.getAppFileSystem(); 885 XLog.getLog(getClass()).debug("Preparing action Dir through copying " + context.getActionDir()); 886 prepareActionDir(actionFs, context); 887 XLog.getLog(getClass()).debug("Action Dir is ready. Submitting the action "); 888 submitLauncher(actionFs, context, action); 889 XLog.getLog(getClass()).debug("Action submit completed. Performing check "); 890 check(context, action); 891 XLog.getLog(getClass()).debug("Action check is done after submission"); 892 } 893 catch (Exception ex) { 894 throw convertException(ex); 895 } 896 } 897 898 @Override 899 public void end(Context context, WorkflowAction action) throws ActionExecutorException { 900 try { 901 String externalStatus = action.getExternalStatus(); 902 WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ? WorkflowAction.Status.OK 903 : WorkflowAction.Status.ERROR; 904 context.setEndData(status, getActionSignal(status)); 905 } 906 catch (Exception ex) { 907 throw convertException(ex); 908 } 909 finally { 910 try { 911 FileSystem actionFs = context.getAppFileSystem(); 912 cleanUpActionDir(actionFs, context); 913 } 914 catch (Exception ex) { 915 throw convertException(ex); 916 } 917 } 918 } 919 920 /** 921 * Create job client object 922 * 923 * @param context 924 * @param jobConf 925 * @return 926 * @throws HadoopAccessorException 927 */ 928 protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException { 929 String user = context.getWorkflow().getUser(); 930 String group = context.getWorkflow().getGroup(); 931 return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf); 932 } 933 934 @Override 935 public void check(Context context, WorkflowAction action) throws ActionExecutorException { 936 JobClient jobClient = null; 937 boolean exception = false; 938 try { 939 Element actionXml = XmlUtils.parseXml(action.getConf()); 940 FileSystem actionFs = context.getAppFileSystem(); 941 JobConf jobConf = createBaseHadoopConf(context, actionXml); 942 jobClient = createJobClient(context, jobConf); 943 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId())); 944 if (runningJob == null) { 945 context.setExternalStatus(FAILED); 946 context.setExecutionData(FAILED, null); 947 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017", 948 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", action 949 .getExternalId(), action.getId()); 950 } 951 if (runningJob.isComplete()) { 952 Path actionDir = context.getActionDir(); 953 954 String user = context.getWorkflow().getUser(); 955 String group = context.getWorkflow().getGroup(); 956 if (LauncherMapper.hasIdSwap(runningJob, user, group, actionDir)) { 957 String launcherId = action.getExternalId(); 958 Path idSwapPath = LauncherMapper.getIdSwapPath(context.getActionDir()); 959 InputStream is = actionFs.open(idSwapPath); 960 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 961 Properties props = PropertiesUtils.readProperties(reader, maxActionOutputLen); 962 reader.close(); 963 String newId = props.getProperty("id"); 964 runningJob = jobClient.getJob(JobID.forName(newId)); 965 if (runningJob == null) { 966 context.setExternalStatus(FAILED); 967 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017", 968 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newId, 969 action.getId()); 970 } 971 972 context.setStartData(newId, action.getTrackerUri(), runningJob.getTrackingURL()); 973 XLog.getLog(getClass()).info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId, 974 newId); 975 } 976 if (runningJob.isComplete()) { 977 XLog.getLog(getClass()).info(XLog.STD, "action completed, external ID [{0}]", 978 action.getExternalId()); 979 if (runningJob.isSuccessful() && LauncherMapper.isMainSuccessful(runningJob)) { 980 getActionData(actionFs, runningJob, action, context); 981 XLog.getLog(getClass()).info(XLog.STD, "action produced output"); 982 } 983 else { 984 XLog log = XLog.getLog(getClass()); 985 String errorReason; 986 Path actionError = LauncherMapper.getErrorPath(context.getActionDir()); 987 if (actionFs.exists(actionError)) { 988 InputStream is = actionFs.open(actionError); 989 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 990 Properties props = PropertiesUtils.readProperties(reader, -1); 991 reader.close(); 992 String errorCode = props.getProperty("error.code"); 993 if (errorCode.equals("0")) { 994 errorCode = "JA018"; 995 } 996 if (errorCode.equals("-1")) { 997 errorCode = "JA019"; 998 } 999 errorReason = props.getProperty("error.reason"); 1000 log.warn("Launcher ERROR, reason: {0}", errorReason); 1001 String exMsg = props.getProperty("exception.message"); 1002 String errorInfo = (exMsg != null) ? exMsg : errorReason; 1003 context.setErrorInfo(errorCode, errorInfo); 1004 String exStackTrace = props.getProperty("exception.stacktrace"); 1005 if (exMsg != null) { 1006 log.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace); 1007 } 1008 } 1009 else { 1010 errorReason = XLog.format("LauncherMapper died, check Hadoop log for job [{0}:{1}]", action 1011 .getTrackerUri(), action.getExternalId()); 1012 log.warn(errorReason); 1013 } 1014 context.setExecutionData(FAILED_KILLED, null); 1015 } 1016 } 1017 else { 1018 context.setExternalStatus(RUNNING); 1019 XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]", 1020 action.getExternalId(), action.getExternalStatus()); 1021 } 1022 } 1023 else { 1024 context.setExternalStatus(RUNNING); 1025 XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]", 1026 action.getExternalId(), action.getExternalStatus()); 1027 } 1028 } 1029 catch (Exception ex) { 1030 XLog.getLog(getClass()).warn("Exception in check(). Message[{0}]", ex.getMessage(), ex); 1031 exception = true; 1032 throw convertException(ex); 1033 } 1034 finally { 1035 if (jobClient != null) { 1036 try { 1037 jobClient.close(); 1038 } 1039 catch (Exception e) { 1040 if (exception) { 1041 log.error("JobClient error: ", e); 1042 } 1043 else { 1044 throw convertException(e); 1045 } 1046 } 1047 } 1048 } 1049 } 1050 1051 /** 1052 * Get the output data of an action. Subclasses should override this method 1053 * to get action specific output data. 1054 * 1055 * @param actionFs the FileSystem object 1056 * @param runningJob the runningJob 1057 * @param action the Workflow action 1058 * @param context executor context 1059 * 1060 */ 1061 protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) 1062 throws HadoopAccessorException, JDOMException, IOException, URISyntaxException { 1063 Properties props = null; 1064 if (getCaptureOutput(action)) { 1065 props = new Properties(); 1066 if (LauncherMapper.hasOutputData(runningJob)) { 1067 Path actionOutput = LauncherMapper.getOutputDataPath(context.getActionDir()); 1068 InputStream is = actionFs.open(actionOutput); 1069 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 1070 props = PropertiesUtils.readProperties(reader, maxActionOutputLen); 1071 reader.close(); 1072 } 1073 } 1074 context.setExecutionData(SUCCEEDED, props); 1075 } 1076 1077 protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException { 1078 Element eConf = XmlUtils.parseXml(action.getConf()); 1079 Namespace ns = eConf.getNamespace(); 1080 Element captureOutput = eConf.getChild("capture-output", ns); 1081 return captureOutput != null; 1082 } 1083 1084 @Override 1085 public void kill(Context context, WorkflowAction action) throws ActionExecutorException { 1086 JobClient jobClient = null; 1087 boolean exception = false; 1088 try { 1089 Element actionXml = XmlUtils.parseXml(action.getConf()); 1090 JobConf jobConf = createBaseHadoopConf(context, actionXml); 1091 jobClient = createJobClient(context, jobConf); 1092 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId())); 1093 if (runningJob != null) { 1094 runningJob.killJob(); 1095 } 1096 context.setExternalStatus(KILLED); 1097 context.setExecutionData(KILLED, null); 1098 } 1099 catch (Exception ex) { 1100 exception = true; 1101 throw convertException(ex); 1102 } 1103 finally { 1104 try { 1105 FileSystem actionFs = context.getAppFileSystem(); 1106 cleanUpActionDir(actionFs, context); 1107 if (jobClient != null) { 1108 jobClient.close(); 1109 } 1110 } 1111 catch (Exception ex) { 1112 if (exception) { 1113 log.error("Error: ", ex); 1114 } 1115 else { 1116 throw convertException(ex); 1117 } 1118 } 1119 } 1120 } 1121 1122 private static Set<String> FINAL_STATUS = new HashSet<String>(); 1123 1124 static { 1125 FINAL_STATUS.add(SUCCEEDED); 1126 FINAL_STATUS.add(KILLED); 1127 FINAL_STATUS.add(FAILED); 1128 FINAL_STATUS.add(FAILED_KILLED); 1129 } 1130 1131 @Override 1132 public boolean isCompleted(String externalStatus) { 1133 return FINAL_STATUS.contains(externalStatus); 1134 } 1135 1136 1137 /** 1138 * Return the sharelib name for the action. 1139 * <p/> 1140 * If <code>NULL</code> or emtpy, it means that the action does not use the action 1141 * sharelib. 1142 * <p/> 1143 * If a non-empty string, i.e. <code>foo</code>, it means the action uses the 1144 * action sharelib subdirectory <code>foo</code> and all JARs in the sharelib 1145 * <code>foo</code> directory will be in the action classpath. 1146 * <p/> 1147 * The resolution is done using the following precedence order: 1148 * <ul> 1149 * <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the action configuration</li> 1150 * <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the job configuration</li> 1151 * <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the oozie configuration</li> 1152 * <li>Action Executor <code>getDefaultShareLibName()</code> method</li> 1153 * </ul> 1154 * 1155 * 1156 * @param context executor context. 1157 * @param actionXml 1158 *@param conf action configuration. @return the action sharelib name. 1159 */ 1160 protected String getShareLibName(Context context, Element actionXml, Configuration conf) { 1161 String name = conf.get(ACTION_SHARELIB_FOR + getType()); 1162 if (name == null) { 1163 try { 1164 XConfiguration jobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); 1165 name = jobConf.get(ACTION_SHARELIB_FOR + getType()); 1166 if (name == null) { 1167 name = Services.get().getConf().get(ACTION_SHARELIB_FOR + getType()); 1168 if (name == null) { 1169 name = getDefaultShareLibName(actionXml); 1170 } 1171 } 1172 } 1173 catch (IOException ex) { 1174 throw new RuntimeException("It cannot happen, " + ex.toString(), ex); 1175 } 1176 } 1177 return name; 1178 } 1179 1180 private final static String ACTION_SHARELIB_FOR = "oozie.action.sharelib.for."; 1181 1182 1183 /** 1184 * Returns the default sharelib name for the action if any. 1185 * 1186 * @param actionXml the action XML fragment. 1187 * @return the sharelib name for the action, <code>NULL</code> if none. 1188 */ 1189 protected String getDefaultShareLibName(Element actionXml) { 1190 return null; 1191 } 1192 }