001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import java.io.BufferedReader; 021 import java.io.ByteArrayOutputStream; 022 import java.io.File; 023 import java.io.FileNotFoundException; 024 import java.io.IOException; 025 import java.io.InputStream; 026 import java.io.InputStreamReader; 027 import java.io.PrintStream; 028 import java.io.StringReader; 029 import java.net.ConnectException; 030 import java.net.URI; 031 import java.net.URISyntaxException; 032 import java.net.UnknownHostException; 033 import java.util.ArrayList; 034 import java.util.HashMap; 035 import java.util.HashSet; 036 import java.util.Iterator; 037 import java.util.List; 038 import java.util.Map; 039 import java.util.Properties; 040 import java.util.Set; 041 import java.util.Map.Entry; 042 043 import org.apache.hadoop.conf.Configuration; 044 import org.apache.hadoop.filecache.DistributedCache; 045 import org.apache.hadoop.fs.FileStatus; 046 import org.apache.hadoop.fs.FileSystem; 047 import org.apache.hadoop.fs.Path; 048 import org.apache.hadoop.fs.permission.AccessControlException; 049 import org.apache.hadoop.mapred.JobClient; 050 import org.apache.hadoop.mapred.JobConf; 051 import org.apache.hadoop.mapred.JobID; 052 import org.apache.hadoop.mapred.RunningJob; 053 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 054 import org.apache.hadoop.util.DiskChecker; 055 import org.apache.oozie.WorkflowActionBean; 056 import org.apache.oozie.WorkflowJobBean; 057 import org.apache.oozie.action.ActionExecutor; 058 import org.apache.oozie.action.ActionExecutorException; 059 import org.apache.oozie.client.OozieClient; 060 import org.apache.oozie.client.WorkflowAction; 061 import org.apache.oozie.service.HadoopAccessorException; 062 import org.apache.oozie.service.HadoopAccessorService; 063 import org.apache.oozie.service.Services; 064 import org.apache.oozie.service.URIHandlerService; 065 import org.apache.oozie.service.WorkflowAppService; 066 import org.apache.oozie.servlet.CallbackServlet; 067 import org.apache.oozie.util.ELEvaluator; 068 import org.apache.oozie.util.IOUtils; 069 import org.apache.oozie.util.PropertiesUtils; 070 import org.apache.oozie.util.XConfiguration; 071 import org.apache.oozie.util.XLog; 072 import org.apache.oozie.util.XmlUtils; 073 import org.jdom.Element; 074 import org.jdom.JDOMException; 075 import org.jdom.Namespace; 076 import org.apache.hadoop.security.token.Token; 077 import org.apache.hadoop.security.token.TokenIdentifier; 078 079 public class JavaActionExecutor extends ActionExecutor { 080 081 private static final String HADOOP_USER = "user.name"; 082 public static final String HADOOP_JOB_TRACKER = "mapred.job.tracker"; 083 public static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address"; 084 public static final String HADOOP_YARN_RM = "yarn.resourcemanager.address"; 085 public static final String HADOOP_NAME_NODE = "fs.default.name"; 086 private static final String HADOOP_JOB_NAME = "mapred.job.name"; 087 public static final String OOZIE_COMMON_LIBDIR = "oozie"; 088 public static final int MAX_EXTERNAL_STATS_SIZE_DEFAULT = Integer.MAX_VALUE; 089 private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>(); 090 public final static String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size"; 091 public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job"; 092 public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job"; 093 private static final String HADOOP_YARN_UBER_MODE = "mapreduce.job.ubertask.enable"; 094 public static final String OOZIE_ACTION_SHIP_LAUNCHER_JAR = "oozie.action.ship.launcher.jar"; 095 private boolean useLauncherJar; 096 private static int maxActionOutputLen; 097 private static int maxExternalStatsSize; 098 099 private static final String SUCCEEDED = "SUCCEEDED"; 100 private static final String KILLED = "KILLED"; 101 private static final String FAILED = "FAILED"; 102 private static final String FAILED_KILLED = "FAILED/KILLED"; 103 private static final String RUNNING = "RUNNING"; 104 protected XLog log = XLog.getLog(getClass()); 105 106 static { 107 DISALLOWED_PROPERTIES.add(HADOOP_USER); 108 DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER); 109 DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE); 110 DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER_2); 111 DISALLOWED_PROPERTIES.add(HADOOP_YARN_RM); 112 } 113 114 public JavaActionExecutor() { 115 this("java"); 116 } 117 118 protected JavaActionExecutor(String type) { 119 super(type); 120 requiresNNJT = true; 121 useLauncherJar = getOozieConf().getBoolean(OOZIE_ACTION_SHIP_LAUNCHER_JAR, true); 122 } 123 124 protected String getLauncherJarName() { 125 return getType() + "-launcher.jar"; 126 } 127 128 protected List<Class> getLauncherClasses() { 129 List<Class> classes = new ArrayList<Class>(); 130 classes.add(LauncherMapper.class); 131 classes.add(LauncherSecurityManager.class); 132 classes.add(LauncherException.class); 133 classes.add(LauncherMainException.class); 134 classes.add(PrepareActionsDriver.class); 135 classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher()); 136 classes.add(ActionStats.class); 137 classes.add(ActionType.class); 138 return classes; 139 } 140 141 @Override 142 public void initActionType() { 143 XLog log = XLog.getLog(getClass()); 144 super.initActionType(); 145 maxActionOutputLen = getOozieConf() 146 .getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 147 // TODO: Remove the below config get in a subsequent release.. 148 // This other irrelevant property is only used to 149 // preserve backwards compatibility cause of a typo. 150 // See OOZIE-4. 151 getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN, 152 2 * 1024)); 153 //Get the limit for the maximum allowed size of action stats 154 maxExternalStatsSize = getOozieConf().getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE, MAX_EXTERNAL_STATS_SIZE_DEFAULT); 155 maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize; 156 157 createLauncherJar(); 158 159 registerError(UnknownHostException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA001"); 160 registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.NON_TRANSIENT, 161 "JA002"); 162 registerError(DiskChecker.DiskOutOfSpaceException.class.getName(), 163 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA003"); 164 registerError(org.apache.hadoop.hdfs.protocol.QuotaExceededException.class.getName(), 165 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004"); 166 registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(), 167 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005"); 168 registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, " JA006"); 169 registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007"); 170 registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008"); 171 registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009"); 172 } 173 174 public void createLauncherJar() { 175 if (useLauncherJar) { 176 try { 177 List<Class> classes = getLauncherClasses(); 178 Class[] launcherClasses = classes.toArray(new Class[classes.size()]); 179 IOUtils.createJar(new File(getOozieRuntimeDir()), getLauncherJarName(), launcherClasses); 180 } 181 catch (IOException ex) { 182 throw new RuntimeException(ex); 183 } 184 catch (java.lang.NoClassDefFoundError err) { 185 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 186 err.printStackTrace(new PrintStream(baos)); 187 log.warn(baos.toString()); 188 } 189 } 190 } 191 192 /** 193 * Get the maximum allowed size of stats 194 * 195 * @return maximum size of stats 196 */ 197 public static int getMaxExternalStatsSize() { 198 return maxExternalStatsSize; 199 } 200 201 static void checkForDisallowedProps(Configuration conf, String confName) throws ActionExecutorException { 202 for (String prop : DISALLOWED_PROPERTIES) { 203 if (conf.get(prop) != null) { 204 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA010", 205 "Property [{0}] not allowed in action [{1}] configuration", prop, confName); 206 } 207 } 208 } 209 210 public JobConf createBaseHadoopConf(Context context, Element actionXml) { 211 Namespace ns = actionXml.getNamespace(); 212 String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim(); 213 String nameNode = actionXml.getChild("name-node", ns).getTextTrim(); 214 JobConf conf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); 215 conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER)); 216 conf.set(HADOOP_JOB_TRACKER, jobTracker); 217 conf.set(HADOOP_JOB_TRACKER_2, jobTracker); 218 conf.set(HADOOP_YARN_RM, jobTracker); 219 conf.set(HADOOP_NAME_NODE, nameNode); 220 conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true"); 221 return conf; 222 } 223 224 private void injectLauncherProperties(Configuration srcConf, Configuration launcherConf) { 225 for (Map.Entry<String, String> entry : srcConf) { 226 if (entry.getKey().startsWith("oozie.launcher.")) { 227 String name = entry.getKey().substring("oozie.launcher.".length()); 228 String value = entry.getValue(); 229 // setting original KEY 230 launcherConf.set(entry.getKey(), value); 231 // setting un-prefixed key (to allow Hadoop job config 232 // for the launcher job 233 launcherConf.set(name, value); 234 } 235 } 236 } 237 238 Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) 239 throws ActionExecutorException { 240 try { 241 Namespace ns = actionXml.getNamespace(); 242 Element e = actionXml.getChild("configuration", ns); 243 if (e != null) { 244 String strConf = XmlUtils.prettyPrint(e).toString(); 245 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); 246 247 XConfiguration launcherConf = new XConfiguration(); 248 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 249 XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_JOB_TRACKER), getType()); 250 injectLauncherProperties(actionDefaultConf, launcherConf); 251 injectLauncherProperties(inlineConf, launcherConf); 252 injectLauncherUseUberMode(launcherConf); 253 checkForDisallowedProps(launcherConf, "launcher configuration"); 254 XConfiguration.copy(launcherConf, conf); 255 } 256 return conf; 257 } 258 catch (IOException ex) { 259 throw convertException(ex); 260 } 261 } 262 263 void injectLauncherUseUberMode(Configuration launcherConf) { 264 // Set Uber Mode for the launcher (YARN only, ignored by MR1) if not set by action conf and not disabled in oozie-site 265 if (launcherConf.get(HADOOP_YARN_UBER_MODE) == null) { 266 if (getOozieConf().getBoolean("oozie.action.launcher.mapreduce.job.ubertask.enable", false)) { 267 launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true); 268 } 269 } 270 } 271 272 public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf) 273 throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException { 274 Namespace ns = element.getNamespace(); 275 Iterator<Element> it = element.getChildren("job-xml", ns).iterator(); 276 while (it.hasNext()) { 277 Element e = it.next(); 278 String jobXml = e.getTextTrim(); 279 Path path = new Path(appPath, jobXml); 280 FileSystem fs = context.getAppFileSystem(); 281 Configuration jobXmlConf = new XConfiguration(fs.open(path)); 282 checkForDisallowedProps(jobXmlConf, "job-xml"); 283 XConfiguration.copy(jobXmlConf, conf); 284 } 285 Element e = element.getChild("configuration", ns); 286 if (e != null) { 287 String strConf = XmlUtils.prettyPrint(e).toString(); 288 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); 289 checkForDisallowedProps(inlineConf, "inline configuration"); 290 XConfiguration.copy(inlineConf, conf); 291 } 292 } 293 294 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) 295 throws ActionExecutorException { 296 try { 297 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 298 XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_JOB_TRACKER), getType()); 299 XConfiguration.injectDefaults(actionDefaults, actionConf); 300 301 has.checkSupportedFilesystem(appPath.toUri()); 302 303 parseJobXmlAndConfiguration(context, actionXml, appPath, actionConf); 304 return actionConf; 305 } 306 catch (IOException ex) { 307 throw convertException(ex); 308 } 309 catch (HadoopAccessorException ex) { 310 throw convertException(ex); 311 } 312 catch (URISyntaxException ex) { 313 throw convertException(ex); 314 } 315 } 316 317 Configuration addToCache(Configuration conf, Path appPath, String filePath, boolean archive) 318 throws ActionExecutorException { 319 320 URI uri = null; 321 try { 322 uri = new URI(filePath); 323 URI baseUri = appPath.toUri(); 324 if (uri.getScheme() == null) { 325 String resolvedPath = uri.getPath(); 326 if (!resolvedPath.startsWith("/")) { 327 resolvedPath = baseUri.getPath() + "/" + resolvedPath; 328 } 329 uri = new URI(baseUri.getScheme(), baseUri.getAuthority(), resolvedPath, uri.getQuery(), uri.getFragment()); 330 } 331 if (archive) { 332 DistributedCache.addCacheArchive(uri.normalize(), conf); 333 } 334 else { 335 String fileName = filePath.substring(filePath.lastIndexOf("/") + 1); 336 if (fileName.endsWith(".so") || fileName.contains(".so.")) { // .so files 337 uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), uri.getQuery(), fileName); 338 DistributedCache.addCacheFile(uri.normalize(), conf); 339 } 340 else if (fileName.endsWith(".jar")) { // .jar files 341 if (!fileName.contains("#")) { 342 String user = conf.get("user.name"); 343 Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, new Path(uri.normalize()), conf); 344 } 345 else { 346 DistributedCache.addCacheFile(uri, conf); 347 } 348 } 349 else { // regular files 350 if (!fileName.contains("#")) { 351 uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), uri.getQuery(), fileName); 352 } 353 DistributedCache.addCacheFile(uri.normalize(), conf); 354 } 355 } 356 DistributedCache.createSymlink(conf); 357 return conf; 358 } 359 catch (Exception ex) { 360 XLog.getLog(getClass()).debug( 361 "Errors when add to DistributedCache. Path=" + uri.toString() + ", archive=" + archive + ", conf=" 362 + XmlUtils.prettyPrint(conf).toString()); 363 throw convertException(ex); 364 } 365 } 366 367 String getOozieLauncherJar(Context context) throws ActionExecutorException { 368 try { 369 return new Path(context.getActionDir(), getLauncherJarName()).toString(); 370 } 371 catch (Exception ex) { 372 throw convertException(ex); 373 } 374 } 375 376 public void prepareActionDir(FileSystem actionFs, Context context) throws ActionExecutorException { 377 try { 378 Path actionDir = context.getActionDir(); 379 Path tempActionDir = new Path(actionDir.getParent(), actionDir.getName() + ".tmp"); 380 if (!actionFs.exists(actionDir)) { 381 try { 382 if (useLauncherJar) { 383 actionFs.copyFromLocalFile(new Path(getOozieRuntimeDir(), getLauncherJarName()), new Path( 384 tempActionDir, getLauncherJarName())); 385 } 386 else { 387 actionFs.mkdirs(tempActionDir); 388 } 389 actionFs.rename(tempActionDir, actionDir); 390 } 391 catch (IOException ex) { 392 actionFs.delete(tempActionDir, true); 393 actionFs.delete(actionDir, true); 394 throw ex; 395 } 396 } 397 } 398 catch (Exception ex) { 399 throw convertException(ex); 400 } 401 } 402 403 void cleanUpActionDir(FileSystem actionFs, Context context) throws ActionExecutorException { 404 try { 405 Path actionDir = context.getActionDir(); 406 if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false) 407 && actionFs.exists(actionDir)) { 408 actionFs.delete(actionDir, true); 409 } 410 } 411 catch (Exception ex) { 412 throw convertException(ex); 413 } 414 } 415 416 protected void addShareLib(Path appPath, Configuration conf, String[] actionShareLibNames) 417 throws ActionExecutorException { 418 if (actionShareLibNames != null) { 419 for (String actionShareLibName : actionShareLibNames) { 420 try { 421 Path systemLibPath = Services.get().get(WorkflowAppService.class).getSystemLibPath(); 422 if (systemLibPath != null) { 423 Path actionLibPath = new Path(systemLibPath, actionShareLibName.trim()); 424 String user = conf.get("user.name"); 425 FileSystem fs; 426 // If the actionLibPath has a valid scheme and authority, then use them to 427 // determine the filesystem that the sharelib resides on; otherwise, assume 428 // it resides on the same filesystem as the appPath and use the appPath to 429 // determine the filesystem 430 if (actionLibPath.toUri().getScheme() != null && actionLibPath.toUri().getAuthority() != null) { 431 fs = Services.get().get(HadoopAccessorService.class) 432 .createFileSystem(user, actionLibPath.toUri(), conf); 433 } 434 else { 435 fs = Services.get().get(HadoopAccessorService.class) 436 .createFileSystem(user, appPath.toUri(), conf); 437 } 438 if (fs.exists(actionLibPath)) { 439 FileStatus[] files = fs.listStatus(actionLibPath); 440 for (FileStatus file : files) { 441 addToCache(conf, actionLibPath, file.getPath().toUri().getPath(), false); 442 } 443 } 444 } 445 } 446 catch (HadoopAccessorException ex) { 447 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, ex.getErrorCode() 448 .toString(), ex.getMessage()); 449 } 450 catch (IOException ex) { 451 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, 452 "It should never happen", ex.getMessage()); 453 } 454 } 455 } 456 } 457 458 protected void addActionLibs(Path appPath, Configuration conf) throws ActionExecutorException { 459 String[] actionLibsStrArr = conf.getStrings("oozie.launcher.oozie.libpath"); 460 if (actionLibsStrArr != null) { 461 try { 462 for (String actionLibsStr : actionLibsStrArr) { 463 actionLibsStr = actionLibsStr.trim(); 464 if (actionLibsStr.length() > 0) 465 { 466 Path actionLibsPath = new Path(actionLibsStr); 467 String user = conf.get("user.name"); 468 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf); 469 if (fs.exists(actionLibsPath)) { 470 FileStatus[] files = fs.listStatus(actionLibsPath); 471 for (FileStatus file : files) { 472 addToCache(conf, appPath, file.getPath().toUri().getPath(), false); 473 } 474 } 475 } 476 } 477 } 478 catch (HadoopAccessorException ex){ 479 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, 480 ex.getErrorCode().toString(), ex.getMessage()); 481 } 482 catch (IOException ex){ 483 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, 484 "It should never happen", ex.getMessage()); 485 } 486 } 487 } 488 489 @SuppressWarnings("unchecked") 490 void setLibFilesArchives(Context context, Element actionXml, Path appPath, Configuration conf) 491 throws ActionExecutorException { 492 Configuration proto = context.getProtoActionConf(); 493 494 // launcher JAR 495 if (useLauncherJar) { 496 addToCache(conf, appPath, getOozieLauncherJar(context), false); 497 } 498 499 // Workflow lib/ 500 String[] paths = proto.getStrings(WorkflowAppService.APP_LIB_PATH_LIST); 501 if (paths != null) { 502 for (String path : paths) { 503 addToCache(conf, appPath, path, false); 504 } 505 } 506 507 // Action libs 508 addActionLibs(appPath, conf); 509 510 // files and archives defined in the action 511 for (Element eProp : (List<Element>) actionXml.getChildren()) { 512 if (eProp.getName().equals("file")) { 513 String[] filePaths = eProp.getTextTrim().split(","); 514 for (String path : filePaths) { 515 addToCache(conf, appPath, path.trim(), false); 516 } 517 } 518 else if (eProp.getName().equals("archive")) { 519 String[] archivePaths = eProp.getTextTrim().split(","); 520 for (String path : archivePaths){ 521 addToCache(conf, appPath, path.trim(), true); 522 } 523 } 524 } 525 526 addAllShareLibs(appPath, conf, context, actionXml); 527 } 528 529 // Adds action specific share libs and common share libs 530 private void addAllShareLibs(Path appPath, Configuration conf, Context context, Element actionXml) 531 throws ActionExecutorException { 532 // Add action specific share libs 533 addActionShareLib(appPath, conf, context, actionXml); 534 // Add common sharelibs for Oozie 535 addShareLib(appPath, conf, new String[]{JavaActionExecutor.OOZIE_COMMON_LIBDIR}); 536 } 537 538 private void addActionShareLib(Path appPath, Configuration conf, Context context, Element actionXml) 539 throws ActionExecutorException { 540 XConfiguration wfJobConf = null; 541 try { 542 wfJobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); 543 } 544 catch (IOException ioe) { 545 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen", 546 ioe.getMessage()); 547 } 548 // Action sharelibs are only added if user has specified to use system libpath 549 if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) { 550 // add action specific sharelibs 551 addShareLib(appPath, conf, getShareLibNames(context, actionXml, conf)); 552 } 553 } 554 555 556 protected String getLauncherMain(Configuration launcherConf, Element actionXml) { 557 Namespace ns = actionXml.getNamespace(); 558 Element e = actionXml.getChild("main-class", ns); 559 return e.getTextTrim(); 560 } 561 562 private static final String QUEUE_NAME = "mapred.job.queue.name"; 563 564 private static final Set<String> SPECIAL_PROPERTIES = new HashSet<String>(); 565 566 static { 567 SPECIAL_PROPERTIES.add(QUEUE_NAME); 568 SPECIAL_PROPERTIES.add(ACL_VIEW_JOB); 569 SPECIAL_PROPERTIES.add(ACL_MODIFY_JOB); 570 } 571 572 @SuppressWarnings("unchecked") 573 JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf) 574 throws ActionExecutorException { 575 try { 576 577 // app path could be a file 578 Path appPathRoot = new Path(context.getWorkflow().getAppPath()); 579 if (actionFs.isFile(appPathRoot)) { 580 appPathRoot = appPathRoot.getParent(); 581 } 582 583 // launcher job configuration 584 JobConf launcherJobConf = createBaseHadoopConf(context, actionXml); 585 setupLauncherConf(launcherJobConf, actionXml, appPathRoot, context); 586 587 String actionShareLibProperty = actionConf.get(ACTION_SHARELIB_FOR + getType()); 588 if (actionShareLibProperty != null) { 589 launcherJobConf.set(ACTION_SHARELIB_FOR + getType(), actionShareLibProperty); 590 } 591 setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf); 592 593 String jobName = launcherJobConf.get(HADOOP_JOB_NAME); 594 if (jobName == null || jobName.isEmpty()) { 595 jobName = XLog.format( 596 "oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(), 597 context.getWorkflow().getAppName(), action.getName(), 598 context.getWorkflow().getId()); 599 launcherJobConf.setJobName(jobName); 600 } 601 602 String jobId = context.getWorkflow().getId(); 603 String actionId = action.getId(); 604 Path actionDir = context.getActionDir(); 605 String recoveryId = context.getRecoveryId(); 606 607 // Getting the prepare XML from the action XML 608 Namespace ns = actionXml.getNamespace(); 609 Element prepareElement = actionXml.getChild("prepare", ns); 610 String prepareXML = ""; 611 if (prepareElement != null) { 612 if (prepareElement.getChildren().size() > 0) { 613 prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim(); 614 } 615 } 616 LauncherMapperHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf, 617 prepareXML); 618 619 LauncherMapperHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml)); 620 LauncherMapperHelper.setupLauncherURIHandlerConf(launcherJobConf); 621 LauncherMapperHelper.setupMaxOutputData(launcherJobConf, maxActionOutputLen); 622 LauncherMapperHelper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize); 623 624 List<Element> list = actionXml.getChildren("arg", ns); 625 String[] args = new String[list.size()]; 626 for (int i = 0; i < list.size(); i++) { 627 args[i] = list.get(i).getTextTrim(); 628 } 629 LauncherMapperHelper.setupMainArguments(launcherJobConf, args); 630 631 List<Element> javaopts = actionXml.getChildren("java-opt", ns); 632 for (Element opt: javaopts) { 633 String opts = launcherJobConf.get("mapred.child.java.opts", ""); 634 opts = opts + " " + opt.getTextTrim(); 635 opts = opts.trim(); 636 launcherJobConf.set("mapred.child.java.opts", opts); 637 } 638 639 Element opt = actionXml.getChild("java-opts", ns); 640 if (opt != null) { 641 String opts = launcherJobConf.get("mapred.child.java.opts", ""); 642 opts = opts + " " + opt.getTextTrim(); 643 opts = opts.trim(); 644 launcherJobConf.set("mapred.child.java.opts", opts); 645 } 646 647 // properties from action that are needed by the launcher (e.g. QUEUE NAME, ACLs) 648 // maybe we should add queue to the WF schema, below job-tracker 649 actionConfToLauncherConf(actionConf, launcherJobConf); 650 651 // to disable cancelation of delegation token on launcher job end 652 launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false); 653 654 return launcherJobConf; 655 } 656 catch (Exception ex) { 657 throw convertException(ex); 658 } 659 } 660 661 private void injectCallback(Context context, Configuration conf) { 662 String callback = context.getCallbackUrl("$jobStatus"); 663 if (conf.get("job.end.notification.url") != null) { 664 XLog.getLog(getClass()).warn("Overriding the action job end notification URI"); 665 } 666 conf.set("job.end.notification.url", callback); 667 } 668 669 void injectActionCallback(Context context, Configuration actionConf) { 670 injectCallback(context, actionConf); 671 } 672 673 void injectLauncherCallback(Context context, Configuration launcherConf) { 674 injectCallback(context, launcherConf); 675 } 676 677 private void actionConfToLauncherConf(Configuration actionConf, JobConf launcherConf) { 678 for (String name : SPECIAL_PROPERTIES) { 679 if (actionConf.get(name) != null && launcherConf.get("oozie.launcher." + name) == null) { 680 launcherConf.set(name, actionConf.get(name)); 681 } 682 } 683 } 684 685 public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException { 686 JobClient jobClient = null; 687 boolean exception = false; 688 try { 689 Path appPathRoot = new Path(context.getWorkflow().getAppPath()); 690 691 // app path could be a file 692 if (actionFs.isFile(appPathRoot)) { 693 appPathRoot = appPathRoot.getParent(); 694 } 695 696 Element actionXml = XmlUtils.parseXml(action.getConf()); 697 698 // action job configuration 699 Configuration actionConf = createBaseHadoopConf(context, actionXml); 700 setupActionConf(actionConf, context, actionXml, appPathRoot); 701 XLog.getLog(getClass()).debug("Setting LibFilesArchives "); 702 setLibFilesArchives(context, actionXml, appPathRoot, actionConf); 703 704 String jobName = actionConf.get(HADOOP_JOB_NAME); 705 if (jobName == null || jobName.isEmpty()) { 706 jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}", 707 getType(), context.getWorkflow().getAppName(), 708 action.getName(), context.getWorkflow().getId()); 709 actionConf.set(HADOOP_JOB_NAME, jobName); 710 } 711 712 injectActionCallback(context, actionConf); 713 714 if(actionConf.get(ACL_MODIFY_JOB) == null || actionConf.get(ACL_MODIFY_JOB).trim().equals("")) { 715 // ONLY in the case where user has not given the 716 // modify-job ACL specifically 717 if (context.getWorkflow().getAcl() != null) { 718 // setting the group owning the Oozie job to allow anybody in that 719 // group to modify the jobs. 720 actionConf.set(ACL_MODIFY_JOB, context.getWorkflow().getAcl()); 721 } 722 } 723 724 // Setting the credential properties in launcher conf 725 HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context, 726 action, actionConf); 727 728 // Adding if action need to set more credential tokens 729 JobConf credentialsConf = new JobConf(false); 730 XConfiguration.copy(actionConf, credentialsConf); 731 setCredentialTokens(credentialsConf, context, action, credentialsProperties); 732 733 // insert conf to action conf from credentialsConf 734 for (Entry<String, String> entry : credentialsConf) { 735 if (actionConf.get(entry.getKey()) == null) { 736 actionConf.set(entry.getKey(), entry.getValue()); 737 } 738 } 739 740 JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf); 741 injectLauncherCallback(context, launcherJobConf); 742 XLog.getLog(getClass()).debug("Creating Job Client for action " + action.getId()); 743 jobClient = createJobClient(context, launcherJobConf); 744 String launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context 745 .getRecoveryId()); 746 boolean alreadyRunning = launcherId != null; 747 RunningJob runningJob; 748 749 // if user-retry is on, always submit new launcher 750 boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry(); 751 752 if (alreadyRunning && !isUserRetry) { 753 runningJob = jobClient.getJob(JobID.forName(launcherId)); 754 if (runningJob == null) { 755 String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER); 756 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017", 757 "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker); 758 } 759 } 760 else { 761 XLog.getLog(getClass()).debug("Submitting the job through Job Client for action " + action.getId()); 762 763 // setting up propagation of the delegation token. 764 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 765 Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(has 766 .getMRDelegationTokenRenewer(launcherJobConf)); 767 launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt); 768 769 // insert credentials tokens to launcher job conf if needed 770 if (needInjectCredentials()) { 771 for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) { 772 log.debug("ADDING TOKEN: " + tk.getKind().toString()); 773 launcherJobConf.getCredentials().addToken(tk.getKind(), tk); 774 } 775 } 776 else { 777 log.info("No need to inject credentials."); 778 } 779 runningJob = jobClient.submitJob(launcherJobConf); 780 if (runningJob == null) { 781 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017", 782 "Error submitting launcher for action [{0}]", action.getId()); 783 } 784 launcherId = runningJob.getID().toString(); 785 XLog.getLog(getClass()).debug("After submission get the launcherId " + launcherId); 786 } 787 788 String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER); 789 String consoleUrl = runningJob.getTrackingURL(); 790 context.setStartData(launcherId, jobTracker, consoleUrl); 791 } 792 catch (Exception ex) { 793 exception = true; 794 throw convertException(ex); 795 } 796 finally { 797 if (jobClient != null) { 798 try { 799 jobClient.close(); 800 } 801 catch (Exception e) { 802 if (exception) { 803 log.error("JobClient error: ", e); 804 } 805 else { 806 throw convertException(e); 807 } 808 } 809 } 810 } 811 } 812 813 private boolean needInjectCredentials() { 814 boolean methodExists = true; 815 816 Class klass; 817 try { 818 klass = Class.forName("org.apache.hadoop.mapred.JobConf"); 819 klass.getMethod("getCredentials"); 820 } 821 catch (ClassNotFoundException ex) { 822 methodExists = false; 823 } 824 catch (NoSuchMethodException ex) { 825 methodExists = false; 826 } 827 828 return methodExists; 829 } 830 831 protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context, 832 WorkflowAction action, Configuration actionConf) throws Exception { 833 HashMap<String, CredentialsProperties> credPropertiesMap = null; 834 if (context != null && action != null) { 835 credPropertiesMap = getActionCredentialsProperties(context, action); 836 if (credPropertiesMap != null) { 837 for (String key : credPropertiesMap.keySet()) { 838 CredentialsProperties prop = credPropertiesMap.get(key); 839 if (prop != null) { 840 log.debug("Credential Properties set for action : " + action.getId()); 841 for (String property : prop.getProperties().keySet()) { 842 actionConf.set(property, prop.getProperties().get(property)); 843 log.debug("property : '" + property + "', value : '" + prop.getProperties().get(property) + "'"); 844 } 845 } 846 } 847 } 848 else { 849 log.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred()); 850 } 851 } 852 else { 853 log.warn("context or action is null"); 854 } 855 return credPropertiesMap; 856 } 857 858 protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action, 859 HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception { 860 861 if (context != null && action != null && credPropertiesMap != null) { 862 for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) { 863 String credName = entry.getKey(); 864 CredentialsProperties credProps = entry.getValue(); 865 if (credProps != null) { 866 CredentialsProvider credProvider = new CredentialsProvider(credProps.getType()); 867 Credentials credentialObject = credProvider.createCredentialObject(); 868 if (credentialObject != null) { 869 credentialObject.addtoJobConf(jobconf, credProps, context); 870 log.debug("Retrieved Credential '" + credName + "' for action " + action.getId()); 871 } 872 else { 873 log.debug("Credentials object is null for name= " + credName + ", type=" + credProps.getType()); 874 } 875 } 876 } 877 } 878 879 } 880 881 protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context, 882 WorkflowAction action) throws Exception { 883 HashMap<String, CredentialsProperties> props = new HashMap<String, CredentialsProperties>(); 884 if (context != null && action != null) { 885 String credsInAction = action.getCred(); 886 log.debug("Get credential '" + credsInAction + "' properties for action : " + action.getId()); 887 String[] credNames = credsInAction.split(","); 888 for (String credName : credNames) { 889 CredentialsProperties credProps = getCredProperties(context, credName); 890 props.put(credName, credProps); 891 } 892 } 893 else { 894 log.warn("context or action is null"); 895 } 896 return props; 897 } 898 899 @SuppressWarnings("unchecked") 900 protected CredentialsProperties getCredProperties(Context context, String credName) 901 throws Exception { 902 CredentialsProperties credProp = null; 903 String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition(); 904 XConfiguration wfjobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); 905 Element elementJob = XmlUtils.parseXml(workflowXml); 906 Element credentials = elementJob.getChild("credentials", elementJob.getNamespace()); 907 if (credentials != null) { 908 for (Element credential : (List<Element>) credentials.getChildren("credential", credentials.getNamespace())) { 909 String name = credential.getAttributeValue("name"); 910 String type = credential.getAttributeValue("type"); 911 log.debug("getCredProperties: Name: " + name + ", Type: " + type); 912 if (name.equalsIgnoreCase(credName)) { 913 credProp = new CredentialsProperties(name, type); 914 for (Element property : (List<Element>) credential.getChildren("property", 915 credential.getNamespace())) { 916 String propertyName = property.getChildText("name", property.getNamespace()); 917 String propertyValue = property.getChildText("value", property.getNamespace()); 918 ELEvaluator eval = new ELEvaluator(); 919 for (Map.Entry<String, String> entry : wfjobConf) { 920 eval.setVariable(entry.getKey(), entry.getValue().trim()); 921 } 922 propertyName = eval.evaluate(propertyName, String.class); 923 propertyValue = eval.evaluate(propertyValue, String.class); 924 925 credProp.getProperties().put(propertyName, propertyValue); 926 log.debug("getCredProperties: Properties name :'" + propertyName + "', Value : '" 927 + propertyValue + "'"); 928 } 929 } 930 } 931 } else { 932 log.warn("credentials is null for the action"); 933 } 934 return credProp; 935 } 936 937 @Override 938 public void start(Context context, WorkflowAction action) throws ActionExecutorException { 939 try { 940 XLog.getLog(getClass()).debug("Starting action " + action.getId() + " getting Action File System"); 941 FileSystem actionFs = context.getAppFileSystem(); 942 XLog.getLog(getClass()).debug("Preparing action Dir through copying " + context.getActionDir()); 943 prepareActionDir(actionFs, context); 944 XLog.getLog(getClass()).debug("Action Dir is ready. Submitting the action "); 945 submitLauncher(actionFs, context, action); 946 XLog.getLog(getClass()).debug("Action submit completed. Performing check "); 947 check(context, action); 948 XLog.getLog(getClass()).debug("Action check is done after submission"); 949 } 950 catch (Exception ex) { 951 throw convertException(ex); 952 } 953 } 954 955 @Override 956 public void end(Context context, WorkflowAction action) throws ActionExecutorException { 957 try { 958 String externalStatus = action.getExternalStatus(); 959 WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ? WorkflowAction.Status.OK 960 : WorkflowAction.Status.ERROR; 961 context.setEndData(status, getActionSignal(status)); 962 } 963 catch (Exception ex) { 964 throw convertException(ex); 965 } 966 finally { 967 try { 968 FileSystem actionFs = context.getAppFileSystem(); 969 cleanUpActionDir(actionFs, context); 970 } 971 catch (Exception ex) { 972 throw convertException(ex); 973 } 974 } 975 } 976 977 /** 978 * Create job client object 979 * 980 * @param context 981 * @param jobConf 982 * @return 983 * @throws HadoopAccessorException 984 */ 985 protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException { 986 String user = context.getWorkflow().getUser(); 987 String group = context.getWorkflow().getGroup(); 988 return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf); 989 } 990 991 protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{ 992 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId())); 993 return runningJob; 994 } 995 996 @Override 997 public void check(Context context, WorkflowAction action) throws ActionExecutorException { 998 JobClient jobClient = null; 999 boolean exception = false; 1000 try { 1001 Element actionXml = XmlUtils.parseXml(action.getConf()); 1002 FileSystem actionFs = context.getAppFileSystem(); 1003 JobConf jobConf = createBaseHadoopConf(context, actionXml); 1004 jobClient = createJobClient(context, jobConf); 1005 RunningJob runningJob = getRunningJob(context, action, jobClient); 1006 if (runningJob == null) { 1007 context.setExternalStatus(FAILED); 1008 context.setExecutionData(FAILED, null); 1009 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017", 1010 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", action 1011 .getExternalId(), action.getId()); 1012 } 1013 if (runningJob.isComplete()) { 1014 Path actionDir = context.getActionDir(); 1015 1016 String user = context.getWorkflow().getUser(); 1017 String group = context.getWorkflow().getGroup(); 1018 if (LauncherMapperHelper.hasIdSwap(runningJob, user, group, actionDir)) { 1019 String launcherId = action.getExternalId(); 1020 Path idSwapPath = LauncherMapperHelper.getIdSwapPath(context.getActionDir()); 1021 InputStream is = actionFs.open(idSwapPath); 1022 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 1023 Properties props = PropertiesUtils.readProperties(reader, maxActionOutputLen); 1024 reader.close(); 1025 String newId = props.getProperty("id"); 1026 runningJob = jobClient.getJob(JobID.forName(newId)); 1027 if (runningJob == null) { 1028 context.setExternalStatus(FAILED); 1029 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017", 1030 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newId, 1031 action.getId()); 1032 } 1033 1034 context.setExternalChildIDs(newId); 1035 XLog.getLog(getClass()).info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId, 1036 newId); 1037 } 1038 if (runningJob.isComplete()) { 1039 XLog.getLog(getClass()).info(XLog.STD, "action completed, external ID [{0}]", 1040 action.getExternalId()); 1041 if (runningJob.isSuccessful() && LauncherMapperHelper.isMainSuccessful(runningJob)) { 1042 getActionData(actionFs, runningJob, action, context); 1043 XLog.getLog(getClass()).info(XLog.STD, "action produced output"); 1044 } 1045 else { 1046 XLog log = XLog.getLog(getClass()); 1047 String errorReason; 1048 Path actionError = LauncherMapperHelper.getErrorPath(context.getActionDir()); 1049 if (actionFs.exists(actionError)) { 1050 InputStream is = actionFs.open(actionError); 1051 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 1052 Properties props = PropertiesUtils.readProperties(reader, -1); 1053 reader.close(); 1054 String errorCode = props.getProperty("error.code"); 1055 if (errorCode.equals("0")) { 1056 errorCode = "JA018"; 1057 } 1058 if (errorCode.equals("-1")) { 1059 errorCode = "JA019"; 1060 } 1061 errorReason = props.getProperty("error.reason"); 1062 log.warn("Launcher ERROR, reason: {0}", errorReason); 1063 String exMsg = props.getProperty("exception.message"); 1064 String errorInfo = (exMsg != null) ? exMsg : errorReason; 1065 context.setErrorInfo(errorCode, errorInfo); 1066 String exStackTrace = props.getProperty("exception.stacktrace"); 1067 if (exMsg != null) { 1068 log.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace); 1069 } 1070 } 1071 else { 1072 errorReason = XLog.format("LauncherMapper died, check Hadoop log for job [{0}:{1}]", action 1073 .getTrackerUri(), action.getExternalId()); 1074 log.warn(errorReason); 1075 } 1076 context.setExecutionData(FAILED_KILLED, null); 1077 setActionCompletionData(context, actionFs); 1078 } 1079 } 1080 else { 1081 context.setExternalStatus(RUNNING); 1082 XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]", 1083 action.getExternalId(), action.getExternalStatus()); 1084 } 1085 } 1086 else { 1087 context.setExternalStatus(RUNNING); 1088 XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]", 1089 action.getExternalId(), action.getExternalStatus()); 1090 } 1091 } 1092 catch (Exception ex) { 1093 XLog.getLog(getClass()).warn("Exception in check(). Message[{0}]", ex.getMessage(), ex); 1094 exception = true; 1095 throw convertException(ex); 1096 } 1097 finally { 1098 if (jobClient != null) { 1099 try { 1100 jobClient.close(); 1101 } 1102 catch (Exception e) { 1103 if (exception) { 1104 log.error("JobClient error: ", e); 1105 } 1106 else { 1107 throw convertException(e); 1108 } 1109 } 1110 } 1111 } 1112 } 1113 1114 /** 1115 * Get the output data of an action. Subclasses should override this method 1116 * to get action specific output data. 1117 * 1118 * @param actionFs the FileSystem object 1119 * @param runningJob the runningJob 1120 * @param action the Workflow action 1121 * @param context executor context 1122 * 1123 */ 1124 protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) 1125 throws HadoopAccessorException, JDOMException, IOException, URISyntaxException { 1126 Properties props = null; 1127 if (getCaptureOutput(action)) { 1128 props = new Properties(); 1129 if (LauncherMapperHelper.hasOutputData(runningJob)) { 1130 Path actionOutput = LauncherMapperHelper.getOutputDataPath(context.getActionDir()); 1131 InputStream is = actionFs.open(actionOutput); 1132 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 1133 props = PropertiesUtils.readProperties(reader, maxActionOutputLen); 1134 reader.close(); 1135 } 1136 } 1137 context.setExecutionData(SUCCEEDED, props); 1138 } 1139 1140 protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException { 1141 Element eConf = XmlUtils.parseXml(action.getConf()); 1142 Namespace ns = eConf.getNamespace(); 1143 Element captureOutput = eConf.getChild("capture-output", ns); 1144 return captureOutput != null; 1145 } 1146 1147 @Override 1148 public void kill(Context context, WorkflowAction action) throws ActionExecutorException { 1149 JobClient jobClient = null; 1150 boolean exception = false; 1151 try { 1152 Element actionXml = XmlUtils.parseXml(action.getConf()); 1153 JobConf jobConf = createBaseHadoopConf(context, actionXml); 1154 jobClient = createJobClient(context, jobConf); 1155 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId())); 1156 if (runningJob != null) { 1157 runningJob.killJob(); 1158 } 1159 context.setExternalStatus(KILLED); 1160 context.setExecutionData(KILLED, null); 1161 } 1162 catch (Exception ex) { 1163 exception = true; 1164 throw convertException(ex); 1165 } 1166 finally { 1167 try { 1168 FileSystem actionFs = context.getAppFileSystem(); 1169 cleanUpActionDir(actionFs, context); 1170 if (jobClient != null) { 1171 jobClient.close(); 1172 } 1173 } 1174 catch (Exception ex) { 1175 if (exception) { 1176 log.error("Error: ", ex); 1177 } 1178 else { 1179 throw convertException(ex); 1180 } 1181 } 1182 } 1183 } 1184 1185 private static Set<String> FINAL_STATUS = new HashSet<String>(); 1186 1187 static { 1188 FINAL_STATUS.add(SUCCEEDED); 1189 FINAL_STATUS.add(KILLED); 1190 FINAL_STATUS.add(FAILED); 1191 FINAL_STATUS.add(FAILED_KILLED); 1192 } 1193 1194 @Override 1195 public boolean isCompleted(String externalStatus) { 1196 return FINAL_STATUS.contains(externalStatus); 1197 } 1198 1199 1200 /** 1201 * Return the sharelib names for the action. 1202 * <p/> 1203 * If <code>NULL</code> or empty, it means that the action does not use the action 1204 * sharelib. 1205 * <p/> 1206 * If a non-empty string, i.e. <code>foo</code>, it means the action uses the 1207 * action sharelib sub-directory <code>foo</code> and all JARs in the sharelib 1208 * <code>foo</code> directory will be in the action classpath. Multiple sharelib 1209 * sub-directories can be specified as a comma separated list. 1210 * <p/> 1211 * The resolution is done using the following precedence order: 1212 * <ul> 1213 * <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the action configuration</li> 1214 * <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the job configuration</li> 1215 * <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the oozie configuration</li> 1216 * <li>Action Executor <code>getDefaultShareLibName()</code> method</li> 1217 * </ul> 1218 * 1219 * 1220 * @param context executor context. 1221 * @param actionXml 1222 * @param conf action configuration. 1223 * @return the action sharelib names. 1224 */ 1225 protected String[] getShareLibNames(Context context, Element actionXml, Configuration conf) { 1226 String[] names = conf.getStrings(ACTION_SHARELIB_FOR + getType()); 1227 if (names == null || names.length == 0) { 1228 try { 1229 XConfiguration jobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); 1230 names = jobConf.getStrings(ACTION_SHARELIB_FOR + getType()); 1231 if (names == null || names.length == 0) { 1232 names = Services.get().getConf().getStrings(ACTION_SHARELIB_FOR + getType()); 1233 if (names == null || names.length == 0) { 1234 String name = getDefaultShareLibName(actionXml); 1235 if (name != null) { 1236 names = new String[] { name }; 1237 } 1238 } 1239 } 1240 } 1241 catch (IOException ex) { 1242 throw new RuntimeException("It cannot happen, " + ex.toString(), ex); 1243 } 1244 } 1245 return names; 1246 } 1247 1248 private final static String ACTION_SHARELIB_FOR = "oozie.action.sharelib.for."; 1249 1250 1251 /** 1252 * Returns the default sharelib name for the action if any. 1253 * 1254 * @param actionXml the action XML fragment. 1255 * @return the sharelib name for the action, <code>NULL</code> if none. 1256 */ 1257 protected String getDefaultShareLibName(Element actionXml) { 1258 return null; 1259 } 1260 1261 /** 1262 * Sets some data for the action on completion 1263 * 1264 * @param context executor context 1265 * @param actionFs the FileSystem object 1266 */ 1267 protected void setActionCompletionData(Context context, FileSystem actionFs) throws IOException, 1268 HadoopAccessorException, URISyntaxException { 1269 } 1270 }