001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.action.hadoop; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.StringReader; 024import java.net.ConnectException; 025import java.net.URI; 026import java.net.URISyntaxException; 027import java.net.UnknownHostException; 028import java.security.PrivilegedExceptionAction; 029import java.text.MessageFormat; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.Iterator; 035import java.util.List; 036import java.util.Map; 037import java.util.Map.Entry; 038import java.util.Properties; 039import java.util.Set; 040import java.util.regex.Matcher; 041import java.util.regex.Pattern; 042 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.filecache.DistributedCache; 045import org.apache.hadoop.fs.FileStatus; 046import org.apache.hadoop.fs.FileSystem; 047import org.apache.hadoop.fs.Path; 048import org.apache.hadoop.fs.permission.AccessControlException; 049import org.apache.oozie.hadoop.utils.HadoopShims; 050import org.apache.hadoop.io.Text; 051import org.apache.hadoop.mapred.JobClient; 052import org.apache.hadoop.mapred.JobConf; 053import org.apache.hadoop.mapred.JobID; 054import org.apache.hadoop.mapred.RunningJob; 055import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 056import org.apache.hadoop.security.UserGroupInformation; 057import org.apache.hadoop.security.token.Token; 058import org.apache.hadoop.security.token.TokenIdentifier; 059import org.apache.hadoop.util.DiskChecker; 060import org.apache.oozie.WorkflowActionBean; 061import org.apache.oozie.WorkflowJobBean; 062import org.apache.oozie.action.ActionExecutor; 063import org.apache.oozie.action.ActionExecutorException; 064import org.apache.oozie.client.OozieClient; 065import org.apache.oozie.client.WorkflowAction; 066import org.apache.oozie.client.WorkflowJob; 067import org.apache.oozie.command.coord.CoordActionStartXCommand; 068import org.apache.oozie.service.ConfigurationService; 069import org.apache.oozie.service.HadoopAccessorException; 070import org.apache.oozie.service.HadoopAccessorService; 071import org.apache.oozie.service.Services; 072import org.apache.oozie.service.ShareLibService; 073import org.apache.oozie.service.URIHandlerService; 074import org.apache.oozie.service.UserGroupInformationService; 075import org.apache.oozie.service.WorkflowAppService; 076import org.apache.oozie.util.ELEvaluationException; 077import org.apache.oozie.util.ELEvaluator; 078import org.apache.oozie.util.JobUtils; 079import org.apache.oozie.util.LogUtils; 080import org.apache.oozie.util.PropertiesUtils; 081import org.apache.oozie.util.XConfiguration; 082import org.apache.oozie.util.XLog; 083import org.apache.oozie.util.XmlUtils; 084import org.jdom.Element; 085import org.jdom.JDOMException; 086import org.jdom.Namespace; 087 088 089public class JavaActionExecutor extends ActionExecutor { 090 091 protected static final String HADOOP_USER = "user.name"; 092 public static final String HADOOP_JOB_TRACKER = "mapred.job.tracker"; 093 public static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address"; 094 public static final String HADOOP_YARN_RM = "yarn.resourcemanager.address"; 095 public static final String HADOOP_NAME_NODE = "fs.default.name"; 096 private static final String HADOOP_JOB_NAME = "mapred.job.name"; 097 public static final String OOZIE_COMMON_LIBDIR = "oozie"; 098 private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>(); 099 public final static String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size"; 100 public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job"; 101 public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job"; 102 public static final String HADOOP_YARN_TIMELINE_SERVICE_ENABLED = "yarn.timeline-service.enabled"; 103 public static final String HADOOP_YARN_UBER_MODE = "mapreduce.job.ubertask.enable"; 104 public static final String HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART = "oozie.action.launcher.am.restart.kill.childjobs"; 105 public static final String HADOOP_MAP_MEMORY_MB = "mapreduce.map.memory.mb"; 106 public static final String HADOOP_CHILD_JAVA_OPTS = "mapred.child.java.opts"; 107 public static final String HADOOP_MAP_JAVA_OPTS = "mapreduce.map.java.opts"; 108 public static final String HADOOP_REDUCE_JAVA_OPTS = "mapreduce.reduce.java.opts"; 109 public static final String HADOOP_CHILD_JAVA_ENV = "mapred.child.env"; 110 public static final String HADOOP_MAP_JAVA_ENV = "mapreduce.map.env"; 111 public static final String YARN_AM_RESOURCE_MB = "yarn.app.mapreduce.am.resource.mb"; 112 public static final String YARN_AM_COMMAND_OPTS = "yarn.app.mapreduce.am.command-opts"; 113 public static final String YARN_AM_ENV = "yarn.app.mapreduce.am.env"; 114 private static final String JAVA_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.JavaMain"; 115 public static final int YARN_MEMORY_MB_MIN = 512; 116 private static int maxActionOutputLen; 117 private static int maxExternalStatsSize; 118 private static int maxFSGlobMax; 119 private static final String SUCCEEDED = "SUCCEEDED"; 120 private static final String KILLED = "KILLED"; 121 private static final String FAILED = "FAILED"; 122 private static final String FAILED_KILLED = "FAILED/KILLED"; 123 protected XLog LOG = XLog.getLog(getClass()); 124 private static final Pattern heapPattern = Pattern.compile("-Xmx(([0-9]+)[mMgG])"); 125 private static final String JAVA_TMP_DIR_SETTINGS = "-Djava.io.tmpdir="; 126 public static final String CONF_HADOOP_YARN_UBER_MODE = "oozie.action.launcher." + HADOOP_YARN_UBER_MODE; 127 public static final String HADOOP_JOB_CLASSLOADER = "mapreduce.job.classloader"; 128 public static final String HADOOP_USER_CLASSPATH_FIRST = "mapreduce.user.classpath.first"; 129 public static final String OOZIE_CREDENTIALS_SKIP = "oozie.credentials.skip"; 130 131 public XConfiguration workflowConf = null; 132 133 static { 134 DISALLOWED_PROPERTIES.add(HADOOP_USER); 135 DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER); 136 DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE); 137 DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER_2); 138 DISALLOWED_PROPERTIES.add(HADOOP_YARN_RM); 139 } 140 141 public JavaActionExecutor() { 142 this("java"); 143 } 144 145 protected JavaActionExecutor(String type) { 146 super(type); 147 } 148 149 public static List<Class> getCommonLauncherClasses() { 150 List<Class> classes = new ArrayList<Class>(); 151 classes.add(LauncherMapper.class); 152 classes.add(OozieLauncherInputFormat.class); 153 classes.add(OozieLauncherOutputFormat.class); 154 classes.add(OozieLauncherOutputCommitter.class); 155 classes.add(LauncherMainHadoopUtils.class); 156 classes.add(HadoopShims.class); 157 classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher()); 158 return classes; 159 } 160 161 public List<Class> getLauncherClasses() { 162 List<Class> classes = new ArrayList<Class>(); 163 try { 164 classes.add(Class.forName(JAVA_MAIN_CLASS_NAME)); 165 } 166 catch (ClassNotFoundException e) { 167 throw new RuntimeException("Class not found", e); 168 } 169 return classes; 170 } 171 172 @Override 173 public void initActionType() { 174 super.initActionType(); 175 maxActionOutputLen = ConfigurationService.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA); 176 //Get the limit for the maximum allowed size of action stats 177 maxExternalStatsSize = ConfigurationService.getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE); 178 maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize; 179 //Get the limit for the maximum number of globbed files/dirs for FS operation 180 maxFSGlobMax = ConfigurationService.getInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX); 181 182 registerError(UnknownHostException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA001"); 183 registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.NON_TRANSIENT, 184 "JA002"); 185 registerError(DiskChecker.DiskOutOfSpaceException.class.getName(), 186 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA003"); 187 registerError(org.apache.hadoop.hdfs.protocol.QuotaExceededException.class.getName(), 188 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004"); 189 registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(), 190 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005"); 191 registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, " JA006"); 192 registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007"); 193 registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008"); 194 registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009"); 195 } 196 197 198 /** 199 * Get the maximum allowed size of stats 200 * 201 * @return maximum size of stats 202 */ 203 public static int getMaxExternalStatsSize() { 204 return maxExternalStatsSize; 205 } 206 207 static void checkForDisallowedProps(Configuration conf, String confName) throws ActionExecutorException { 208 for (String prop : DISALLOWED_PROPERTIES) { 209 if (conf.get(prop) != null) { 210 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA010", 211 "Property [{0}] not allowed in action [{1}] configuration", prop, confName); 212 } 213 } 214 } 215 216 public JobConf createBaseHadoopConf(Context context, Element actionXml) { 217 return createBaseHadoopConf(context, actionXml, true); 218 } 219 220 protected JobConf createBaseHadoopConf(Context context, Element actionXml, boolean loadResources) { 221 Namespace ns = actionXml.getNamespace(); 222 String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim(); 223 String nameNode = actionXml.getChild("name-node", ns).getTextTrim(); 224 JobConf conf = null; 225 if (loadResources) { 226 conf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); 227 } 228 else { 229 conf = new JobConf(false); 230 } 231 conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER)); 232 conf.set(HADOOP_JOB_TRACKER, jobTracker); 233 conf.set(HADOOP_JOB_TRACKER_2, jobTracker); 234 conf.set(HADOOP_YARN_RM, jobTracker); 235 conf.set(HADOOP_NAME_NODE, nameNode); 236 conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true"); 237 return conf; 238 } 239 240 protected JobConf loadHadoopDefaultResources(Context context, Element actionXml) { 241 return createBaseHadoopConf(context, actionXml); 242 } 243 244 private static void injectLauncherProperties(Configuration srcConf, Configuration launcherConf) { 245 for (Map.Entry<String, String> entry : srcConf) { 246 if (entry.getKey().startsWith("oozie.launcher.")) { 247 String name = entry.getKey().substring("oozie.launcher.".length()); 248 String value = entry.getValue(); 249 // setting original KEY 250 launcherConf.set(entry.getKey(), value); 251 // setting un-prefixed key (to allow Hadoop job config 252 // for the launcher job 253 launcherConf.set(name, value); 254 } 255 } 256 } 257 258 Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) 259 throws ActionExecutorException { 260 try { 261 Namespace ns = actionXml.getNamespace(); 262 XConfiguration launcherConf = new XConfiguration(); 263 // Inject action defaults for launcher 264 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 265 XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_JOB_TRACKER), getType()); 266 injectLauncherProperties(actionDefaultConf, launcherConf); 267 // Inject <job-xml> and <configuration> for launcher 268 try { 269 parseJobXmlAndConfiguration(context, actionXml, appPath, launcherConf, true); 270 } catch (HadoopAccessorException ex) { 271 throw convertException(ex); 272 } catch (URISyntaxException ex) { 273 throw convertException(ex); 274 } 275 // Inject use uber mode for launcher 276 injectLauncherUseUberMode(launcherConf); 277 XConfiguration.copy(launcherConf, conf); 278 checkForDisallowedProps(launcherConf, "launcher configuration"); 279 // Inject config-class for launcher to use for action 280 Element e = actionXml.getChild("config-class", ns); 281 if (e != null) { 282 conf.set(LauncherMapper.OOZIE_ACTION_CONFIG_CLASS, e.getTextTrim()); 283 } 284 return conf; 285 } 286 catch (IOException ex) { 287 throw convertException(ex); 288 } 289 } 290 291 void injectLauncherUseUberMode(Configuration launcherConf) { 292 // Set Uber Mode for the launcher (YARN only, ignored by MR1) 293 // Priority: 294 // 1. action's <configuration> 295 // 2. oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable 296 // 3. oozie.action.launcher.mapreduce.job.ubertask.enable 297 if (launcherConf.get(HADOOP_YARN_UBER_MODE) == null) { 298 if (ConfigurationService.get("oozie.action." + getType() + ".launcher." + HADOOP_YARN_UBER_MODE).length() > 0) { 299 if (ConfigurationService.getBoolean("oozie.action." + getType() + ".launcher." + HADOOP_YARN_UBER_MODE)) { 300 launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true); 301 } 302 } else { 303 if (ConfigurationService.getBoolean("oozie.action.launcher." + HADOOP_YARN_UBER_MODE)) { 304 launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true); 305 } 306 } 307 } 308 } 309 310 void injectLauncherTimelineServiceEnabled(Configuration launcherConf, Configuration actionConf) { 311 // Getting delegation token for ATS. If tez-site.xml is present in distributed cache, turn on timeline service. 312 if (actionConf.get("oozie.launcher." + HADOOP_YARN_TIMELINE_SERVICE_ENABLED) == null 313 && ConfigurationService.getBoolean("oozie.action.launcher." + HADOOP_YARN_TIMELINE_SERVICE_ENABLED)) { 314 String cacheFiles = launcherConf.get("mapred.cache.files"); 315 if (cacheFiles != null && cacheFiles.contains("tez-site.xml")) { 316 launcherConf.setBoolean(HADOOP_YARN_TIMELINE_SERVICE_ENABLED, true); 317 } 318 } 319 } 320 321 void updateConfForUberMode(Configuration launcherConf) { 322 323 // child.env 324 boolean hasConflictEnv = false; 325 String launcherMapEnv = launcherConf.get(HADOOP_MAP_JAVA_ENV); 326 if (launcherMapEnv == null) { 327 launcherMapEnv = launcherConf.get(HADOOP_CHILD_JAVA_ENV); 328 } 329 String amEnv = launcherConf.get(YARN_AM_ENV); 330 StringBuffer envStr = new StringBuffer(); 331 HashMap<String, List<String>> amEnvMap = null; 332 HashMap<String, List<String>> launcherMapEnvMap = null; 333 if (amEnv != null) { 334 envStr.append(amEnv); 335 amEnvMap = populateEnvMap(amEnv); 336 } 337 if (launcherMapEnv != null) { 338 launcherMapEnvMap = populateEnvMap(launcherMapEnv); 339 if (amEnvMap != null) { 340 Iterator<String> envKeyItr = launcherMapEnvMap.keySet().iterator(); 341 while (envKeyItr.hasNext()) { 342 String envKey = envKeyItr.next(); 343 if (amEnvMap.containsKey(envKey)) { 344 List<String> amValList = amEnvMap.get(envKey); 345 List<String> launcherValList = launcherMapEnvMap.get(envKey); 346 Iterator<String> valItr = launcherValList.iterator(); 347 while (valItr.hasNext()) { 348 String val = valItr.next(); 349 if (!amValList.contains(val)) { 350 hasConflictEnv = true; 351 break; 352 } 353 else { 354 valItr.remove(); 355 } 356 } 357 if (launcherValList.isEmpty()) { 358 envKeyItr.remove(); 359 } 360 } 361 } 362 } 363 } 364 if (hasConflictEnv) { 365 launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, false); 366 } 367 else { 368 if (launcherMapEnvMap != null) { 369 for (String key : launcherMapEnvMap.keySet()) { 370 List<String> launcherValList = launcherMapEnvMap.get(key); 371 for (String val : launcherValList) { 372 if (envStr.length() > 0) { 373 envStr.append(","); 374 } 375 envStr.append(key).append("=").append(val); 376 } 377 } 378 } 379 380 launcherConf.set(YARN_AM_ENV, envStr.toString()); 381 382 // memory.mb 383 int launcherMapMemoryMB = launcherConf.getInt(HADOOP_MAP_MEMORY_MB, 1536); 384 int amMemoryMB = launcherConf.getInt(YARN_AM_RESOURCE_MB, 1536); 385 // YARN_MEMORY_MB_MIN to provide buffer. 386 // suppose launcher map aggressively use high memory, need some 387 // headroom for AM 388 int memoryMB = Math.max(launcherMapMemoryMB, amMemoryMB) + YARN_MEMORY_MB_MIN; 389 // limit to 4096 in case of 32 bit 390 if (launcherMapMemoryMB < 4096 && amMemoryMB < 4096 && memoryMB > 4096) { 391 memoryMB = 4096; 392 } 393 launcherConf.setInt(YARN_AM_RESOURCE_MB, memoryMB); 394 395 // We already made mapred.child.java.opts and 396 // mapreduce.map.java.opts equal, so just start with one of them 397 String launcherMapOpts = launcherConf.get(HADOOP_MAP_JAVA_OPTS, ""); 398 String amChildOpts = launcherConf.get(YARN_AM_COMMAND_OPTS); 399 StringBuilder optsStr = new StringBuilder(); 400 int heapSizeForMap = extractHeapSizeMB(launcherMapOpts); 401 int heapSizeForAm = extractHeapSizeMB(amChildOpts); 402 int heapSize = Math.max(heapSizeForMap, heapSizeForAm) + YARN_MEMORY_MB_MIN; 403 // limit to 3584 in case of 32 bit 404 if (heapSizeForMap < 4096 && heapSizeForAm < 4096 && heapSize > 3584) { 405 heapSize = 3584; 406 } 407 if (amChildOpts != null) { 408 optsStr.append(amChildOpts); 409 } 410 optsStr.append(" ").append(launcherMapOpts.trim()); 411 if (heapSize > 0) { 412 // append calculated total heap size to the end 413 optsStr.append(" ").append("-Xmx").append(heapSize).append("m"); 414 } 415 launcherConf.set(YARN_AM_COMMAND_OPTS, optsStr.toString().trim()); 416 } 417 } 418 419 void updateConfForJavaTmpDir(Configuration conf) { 420 String amChildOpts = conf.get(YARN_AM_COMMAND_OPTS); 421 String oozieJavaTmpDirSetting = "-Djava.io.tmpdir=./tmp"; 422 if (amChildOpts != null && !amChildOpts.contains(JAVA_TMP_DIR_SETTINGS)) { 423 conf.set(YARN_AM_COMMAND_OPTS, amChildOpts + " " + oozieJavaTmpDirSetting); 424 } 425 } 426 427 private HashMap<String, List<String>> populateEnvMap(String input) { 428 HashMap<String, List<String>> envMaps = new HashMap<String, List<String>>(); 429 String[] envEntries = input.split(","); 430 for (String envEntry : envEntries) { 431 String[] envKeyVal = envEntry.split("="); 432 String envKey = envKeyVal[0].trim(); 433 List<String> valList = envMaps.get(envKey); 434 if (valList == null) { 435 valList = new ArrayList<String>(); 436 } 437 valList.add(envKeyVal[1].trim()); 438 envMaps.put(envKey, valList); 439 } 440 return envMaps; 441 } 442 443 public int extractHeapSizeMB(String input) { 444 int ret = 0; 445 if(input == null || input.equals("")) 446 return ret; 447 Matcher m = heapPattern.matcher(input); 448 String heapStr = null; 449 String heapNum = null; 450 // Grabs the last match which takes effect (in case that multiple Xmx options specified) 451 while (m.find()) { 452 heapStr = m.group(1); 453 heapNum = m.group(2); 454 } 455 if (heapStr != null) { 456 // when Xmx specified in Gigabyte 457 if(heapStr.endsWith("g") || heapStr.endsWith("G")) { 458 ret = Integer.parseInt(heapNum) * 1024; 459 } else { 460 ret = Integer.parseInt(heapNum); 461 } 462 } 463 return ret; 464 } 465 466 public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf) 467 throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException { 468 parseJobXmlAndConfiguration(context, element, appPath, conf, false); 469 } 470 471 public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf, 472 boolean isLauncher) throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException { 473 Namespace ns = element.getNamespace(); 474 Iterator<Element> it = element.getChildren("job-xml", ns).iterator(); 475 HashMap<String, FileSystem> filesystemsMap = new HashMap<String, FileSystem>(); 476 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 477 while (it.hasNext()) { 478 Element e = it.next(); 479 String jobXml = e.getTextTrim(); 480 Path pathSpecified = new Path(jobXml); 481 Path path = pathSpecified.isAbsolute() ? pathSpecified : new Path(appPath, jobXml); 482 FileSystem fs; 483 if (filesystemsMap.containsKey(path.toUri().getAuthority())) { 484 fs = filesystemsMap.get(path.toUri().getAuthority()); 485 } 486 else { 487 if (path.toUri().getAuthority() != null) { 488 fs = has.createFileSystem(context.getWorkflow().getUser(), path.toUri(), 489 has.createJobConf(path.toUri().getAuthority())); 490 } 491 else { 492 fs = context.getAppFileSystem(); 493 } 494 filesystemsMap.put(path.toUri().getAuthority(), fs); 495 } 496 Configuration jobXmlConf = new XConfiguration(fs.open(path)); 497 try { 498 String jobXmlConfString = XmlUtils.prettyPrint(jobXmlConf).toString(); 499 jobXmlConfString = XmlUtils.removeComments(jobXmlConfString); 500 jobXmlConfString = context.getELEvaluator().evaluate(jobXmlConfString, String.class); 501 jobXmlConf = new XConfiguration(new StringReader(jobXmlConfString)); 502 } 503 catch (ELEvaluationException ex) { 504 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, "EL_EVAL_ERROR", ex 505 .getMessage(), ex); 506 } 507 catch (Exception ex) { 508 context.setErrorInfo("EL_ERROR", ex.getMessage()); 509 } 510 checkForDisallowedProps(jobXmlConf, "job-xml"); 511 if (isLauncher) { 512 injectLauncherProperties(jobXmlConf, conf); 513 } else { 514 XConfiguration.copy(jobXmlConf, conf); 515 } 516 } 517 Element e = element.getChild("configuration", ns); 518 if (e != null) { 519 String strConf = XmlUtils.prettyPrint(e).toString(); 520 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); 521 checkForDisallowedProps(inlineConf, "inline configuration"); 522 if (isLauncher) { 523 injectLauncherProperties(inlineConf, conf); 524 } else { 525 XConfiguration.copy(inlineConf, conf); 526 } 527 } 528 } 529 530 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) 531 throws ActionExecutorException { 532 try { 533 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 534 XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_JOB_TRACKER), getType()); 535 XConfiguration.injectDefaults(actionDefaults, actionConf); 536 has.checkSupportedFilesystem(appPath.toUri()); 537 538 // Set the Java Main Class for the Java action to give to the Java launcher 539 setJavaMain(actionConf, actionXml); 540 541 parseJobXmlAndConfiguration(context, actionXml, appPath, actionConf); 542 543 // set cancel.delegation.token in actionConf that child job doesn't cancel delegation token 544 actionConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false); 545 updateConfForJavaTmpDir(actionConf); 546 setRootLoggerLevel(actionConf); 547 return actionConf; 548 } 549 catch (IOException ex) { 550 throw convertException(ex); 551 } 552 catch (HadoopAccessorException ex) { 553 throw convertException(ex); 554 } 555 catch (URISyntaxException ex) { 556 throw convertException(ex); 557 } 558 } 559 560 /** 561 * Set root log level property in actionConf 562 * @param actionConf 563 */ 564 void setRootLoggerLevel(Configuration actionConf) { 565 String oozieActionTypeRootLogger = "oozie.action." + getType() + LauncherMapper.ROOT_LOGGER_LEVEL; 566 String oozieActionRootLogger = "oozie.action." + LauncherMapper.ROOT_LOGGER_LEVEL; 567 568 // check if root log level has already mentioned in action configuration 569 String rootLogLevel = actionConf.get(oozieActionTypeRootLogger, actionConf.get(oozieActionRootLogger)); 570 if (rootLogLevel != null) { 571 // root log level is mentioned in action configuration 572 return; 573 } 574 575 // set the root log level which is mentioned in oozie default 576 rootLogLevel = ConfigurationService.get(oozieActionTypeRootLogger); 577 if (rootLogLevel != null && rootLogLevel.length() > 0) { 578 actionConf.set(oozieActionRootLogger, rootLogLevel); 579 } 580 else { 581 rootLogLevel = ConfigurationService.get(oozieActionRootLogger); 582 if (rootLogLevel != null && rootLogLevel.length() > 0) { 583 actionConf.set(oozieActionRootLogger, rootLogLevel); 584 } 585 } 586 } 587 588 Configuration addToCache(Configuration conf, Path appPath, String filePath, boolean archive) 589 throws ActionExecutorException { 590 591 URI uri = null; 592 try { 593 uri = new URI(filePath); 594 URI baseUri = appPath.toUri(); 595 if (uri.getScheme() == null) { 596 String resolvedPath = uri.getPath(); 597 if (!resolvedPath.startsWith("/")) { 598 resolvedPath = baseUri.getPath() + "/" + resolvedPath; 599 } 600 uri = new URI(baseUri.getScheme(), baseUri.getAuthority(), resolvedPath, uri.getQuery(), uri.getFragment()); 601 } 602 if (archive) { 603 DistributedCache.addCacheArchive(uri.normalize(), conf); 604 } 605 else { 606 String fileName = filePath.substring(filePath.lastIndexOf("/") + 1); 607 if (fileName.endsWith(".so") || fileName.contains(".so.")) { // .so files 608 uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), uri.getQuery(), fileName); 609 DistributedCache.addCacheFile(uri.normalize(), conf); 610 } 611 else if (fileName.endsWith(".jar")) { // .jar files 612 if (!fileName.contains("#")) { 613 String user = conf.get("user.name"); 614 Path pathToAdd = new Path(uri.normalize()); 615 Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, pathToAdd, conf); 616 } 617 else { 618 DistributedCache.addCacheFile(uri.normalize(), conf); 619 } 620 } 621 else { // regular files 622 if (!fileName.contains("#")) { 623 uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), uri.getQuery(), fileName); 624 } 625 DistributedCache.addCacheFile(uri.normalize(), conf); 626 } 627 } 628 DistributedCache.createSymlink(conf); 629 return conf; 630 } 631 catch (Exception ex) { 632 LOG.debug( 633 "Errors when add to DistributedCache. Path=" + uri.toString() + ", archive=" + archive + ", conf=" 634 + XmlUtils.prettyPrint(conf).toString()); 635 throw convertException(ex); 636 } 637 } 638 639 public void prepareActionDir(FileSystem actionFs, Context context) throws ActionExecutorException { 640 try { 641 Path actionDir = context.getActionDir(); 642 Path tempActionDir = new Path(actionDir.getParent(), actionDir.getName() + ".tmp"); 643 if (!actionFs.exists(actionDir)) { 644 try { 645 actionFs.mkdirs(tempActionDir); 646 actionFs.rename(tempActionDir, actionDir); 647 } 648 catch (IOException ex) { 649 actionFs.delete(tempActionDir, true); 650 actionFs.delete(actionDir, true); 651 throw ex; 652 } 653 } 654 } 655 catch (Exception ex) { 656 throw convertException(ex); 657 } 658 } 659 660 void cleanUpActionDir(FileSystem actionFs, Context context) throws ActionExecutorException { 661 try { 662 Path actionDir = context.getActionDir(); 663 if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false) 664 && actionFs.exists(actionDir)) { 665 actionFs.delete(actionDir, true); 666 } 667 } 668 catch (Exception ex) { 669 throw convertException(ex); 670 } 671 } 672 673 protected void addShareLib(Configuration conf, String[] actionShareLibNames) 674 throws ActionExecutorException { 675 Set<String> confSet = new HashSet<String>(Arrays.asList(getShareLibFilesForActionConf() == null ? new String[0] 676 : getShareLibFilesForActionConf())); 677 678 Set<Path> sharelibList = new HashSet<Path>(); 679 680 if (actionShareLibNames != null) { 681 try { 682 ShareLibService shareLibService = Services.get().get(ShareLibService.class); 683 FileSystem fs = shareLibService.getFileSystem(); 684 if (fs != null) { 685 for (String actionShareLibName : actionShareLibNames) { 686 List<Path> listOfPaths = shareLibService.getShareLibJars(actionShareLibName); 687 if (listOfPaths != null && !listOfPaths.isEmpty()) { 688 for (Path actionLibPath : listOfPaths) { 689 String fragmentName = new URI(actionLibPath.toString()).getFragment(); 690 String fileName = fragmentName == null ? actionLibPath.getName() : fragmentName; 691 if (confSet.contains(fileName)) { 692 Configuration jobXmlConf = shareLibService.getShareLibConf(actionShareLibName, 693 actionLibPath); 694 if (jobXmlConf != null) { 695 checkForDisallowedProps(jobXmlConf, actionLibPath.getName()); 696 XConfiguration.injectDefaults(jobXmlConf, conf); 697 LOG.trace("Adding properties of " + actionLibPath + " to job conf"); 698 } 699 } 700 else { 701 // Filtering out duplicate jars or files 702 sharelibList.add(new Path(actionLibPath.toUri()) { 703 @Override 704 public int hashCode() { 705 return getName().hashCode(); 706 } 707 @Override 708 public String getName() { 709 try { 710 return (new URI(toString())).getFragment() == null ? new Path(toUri()).getName() 711 : (new URI(toString())).getFragment(); 712 } 713 catch (URISyntaxException e) { 714 throw new RuntimeException(e); 715 } 716 } 717 @Override 718 public boolean equals(Object input) { 719 if (input == null) { 720 return false; 721 } 722 if (input == this) { 723 return true; 724 } 725 if (!(input instanceof Path)) { 726 return false; 727 } 728 return getName().equals(((Path) input).getName()); 729 } 730 }); 731 } 732 } 733 } 734 } 735 } 736 for (Path libPath : sharelibList) { 737 addToCache(conf, libPath, libPath.toUri().getPath(), false); 738 } 739 } 740 catch (URISyntaxException ex) { 741 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "Error configuring sharelib", 742 ex.getMessage()); 743 } 744 catch (IOException ex) { 745 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen", 746 ex.getMessage()); 747 } 748 } 749 } 750 751 protected void addSystemShareLibForAction(Configuration conf) throws ActionExecutorException { 752 ShareLibService shareLibService = Services.get().get(ShareLibService.class); 753 // ShareLibService is null for test cases 754 if (shareLibService != null) { 755 try { 756 List<Path> listOfPaths = shareLibService.getSystemLibJars(JavaActionExecutor.OOZIE_COMMON_LIBDIR); 757 if (listOfPaths == null || listOfPaths.isEmpty()) { 758 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "EJ001", 759 "Could not locate Oozie sharelib"); 760 } 761 FileSystem fs = listOfPaths.get(0).getFileSystem(conf); 762 for (Path actionLibPath : listOfPaths) { 763 JobUtils.addFileToClassPath(actionLibPath, conf, fs); 764 DistributedCache.createSymlink(conf); 765 } 766 listOfPaths = shareLibService.getSystemLibJars(getType()); 767 if (listOfPaths != null) { 768 for (Path actionLibPath : listOfPaths) { 769 JobUtils.addFileToClassPath(actionLibPath, conf, fs); 770 DistributedCache.createSymlink(conf); 771 } 772 } 773 } 774 catch (IOException ex) { 775 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen", 776 ex.getMessage()); 777 } 778 } 779 } 780 781 protected void addActionLibs(Path appPath, Configuration conf) throws ActionExecutorException { 782 String[] actionLibsStrArr = conf.getStrings("oozie.launcher.oozie.libpath"); 783 if (actionLibsStrArr != null) { 784 try { 785 for (String actionLibsStr : actionLibsStrArr) { 786 actionLibsStr = actionLibsStr.trim(); 787 if (actionLibsStr.length() > 0) 788 { 789 Path actionLibsPath = new Path(actionLibsStr); 790 String user = conf.get("user.name"); 791 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf); 792 if (fs.exists(actionLibsPath)) { 793 FileStatus[] files = fs.listStatus(actionLibsPath); 794 for (FileStatus file : files) { 795 addToCache(conf, appPath, file.getPath().toUri().getPath(), false); 796 } 797 } 798 } 799 } 800 } 801 catch (HadoopAccessorException ex){ 802 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, 803 ex.getErrorCode().toString(), ex.getMessage()); 804 } 805 catch (IOException ex){ 806 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, 807 "It should never happen", ex.getMessage()); 808 } 809 } 810 } 811 812 @SuppressWarnings("unchecked") 813 public void setLibFilesArchives(Context context, Element actionXml, Path appPath, Configuration conf) 814 throws ActionExecutorException { 815 Configuration proto = context.getProtoActionConf(); 816 817 // Workflow lib/ 818 String[] paths = proto.getStrings(WorkflowAppService.APP_LIB_PATH_LIST); 819 if (paths != null) { 820 for (String path : paths) { 821 addToCache(conf, appPath, path, false); 822 } 823 } 824 825 // Action libs 826 addActionLibs(appPath, conf); 827 828 // files and archives defined in the action 829 for (Element eProp : (List<Element>) actionXml.getChildren()) { 830 if (eProp.getName().equals("file")) { 831 String[] filePaths = eProp.getTextTrim().split(","); 832 for (String path : filePaths) { 833 addToCache(conf, appPath, path.trim(), false); 834 } 835 } 836 else if (eProp.getName().equals("archive")) { 837 String[] archivePaths = eProp.getTextTrim().split(","); 838 for (String path : archivePaths){ 839 addToCache(conf, appPath, path.trim(), true); 840 } 841 } 842 } 843 844 addAllShareLibs(appPath, conf, context, actionXml); 845 } 846 847 // Adds action specific share libs and common share libs 848 private void addAllShareLibs(Path appPath, Configuration conf, Context context, Element actionXml) 849 throws ActionExecutorException { 850 // Add action specific share libs 851 addActionShareLib(appPath, conf, context, actionXml); 852 // Add common sharelibs for Oozie and launcher jars 853 addSystemShareLibForAction(conf); 854 } 855 856 private void addActionShareLib(Path appPath, Configuration conf, Context context, Element actionXml) 857 throws ActionExecutorException { 858 XConfiguration wfJobConf = null; 859 try { 860 wfJobConf = getWorkflowConf(context); 861 } 862 catch (IOException ioe) { 863 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen", 864 ioe.getMessage()); 865 } 866 // Action sharelibs are only added if user has specified to use system libpath 867 if (conf.get(OozieClient.USE_SYSTEM_LIBPATH) == null) { 868 if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, 869 ConfigurationService.getBoolean(OozieClient.USE_SYSTEM_LIBPATH))) { 870 // add action specific sharelibs 871 addShareLib(conf, getShareLibNames(context, actionXml, conf)); 872 } 873 } 874 else { 875 if (conf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) { 876 // add action specific sharelibs 877 addShareLib(conf, getShareLibNames(context, actionXml, conf)); 878 } 879 } 880 } 881 882 883 protected String getLauncherMain(Configuration launcherConf, Element actionXml) { 884 return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, JavaMain.class.getName()); 885 } 886 887 private void setJavaMain(Configuration actionConf, Element actionXml) { 888 Namespace ns = actionXml.getNamespace(); 889 Element e = actionXml.getChild("main-class", ns); 890 if (e != null) { 891 actionConf.set(JavaMain.JAVA_MAIN_CLASS, e.getTextTrim()); 892 } 893 } 894 895 private static final String QUEUE_NAME = "mapred.job.queue.name"; 896 897 private static final Set<String> SPECIAL_PROPERTIES = new HashSet<String>(); 898 899 static { 900 SPECIAL_PROPERTIES.add(QUEUE_NAME); 901 SPECIAL_PROPERTIES.add(ACL_VIEW_JOB); 902 SPECIAL_PROPERTIES.add(ACL_MODIFY_JOB); 903 } 904 905 @SuppressWarnings("unchecked") 906 JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf) 907 throws ActionExecutorException { 908 try { 909 910 // app path could be a file 911 Path appPathRoot = new Path(context.getWorkflow().getAppPath()); 912 if (actionFs.isFile(appPathRoot)) { 913 appPathRoot = appPathRoot.getParent(); 914 } 915 916 // launcher job configuration 917 JobConf launcherJobConf = createBaseHadoopConf(context, actionXml); 918 // cancel delegation token on a launcher job which stays alive till child job(s) finishes 919 // otherwise (in mapred action), doesn't cancel not to disturb running child job 920 launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true); 921 setupLauncherConf(launcherJobConf, actionXml, appPathRoot, context); 922 923 // Properties for when a launcher job's AM gets restarted 924 if (ConfigurationService.getBoolean(HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART)) { 925 // launcher time filter is required to prune the search of launcher tag. 926 // Setting coordinator action nominal time as launcher time as it child job cannot launch before nominal 927 // time. Workflow created time is good enough when workflow is running independently or workflow is 928 // rerunning from failed node. 929 long launcherTime = System.currentTimeMillis(); 930 String coordActionNominalTime = context.getProtoActionConf().get( 931 CoordActionStartXCommand.OOZIE_COORD_ACTION_NOMINAL_TIME); 932 if (coordActionNominalTime != null) { 933 launcherTime = Long.parseLong(coordActionNominalTime); 934 } 935 else if (context.getWorkflow().getCreatedTime() != null) { 936 launcherTime = context.getWorkflow().getCreatedTime().getTime(); 937 } 938 String actionYarnTag = getActionYarnTag(getWorkflowConf(context), context.getWorkflow(), action); 939 LauncherMapperHelper.setupYarnRestartHandling(launcherJobConf, actionConf, actionYarnTag, launcherTime); 940 } 941 else { 942 LOG.info(MessageFormat.format("{0} is set to false, not setting YARN restart properties", 943 HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART)); 944 } 945 946 String actionShareLibProperty = actionConf.get(ACTION_SHARELIB_FOR + getType()); 947 if (actionShareLibProperty != null) { 948 launcherJobConf.set(ACTION_SHARELIB_FOR + getType(), actionShareLibProperty); 949 } 950 setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf); 951 952 String jobName = launcherJobConf.get(HADOOP_JOB_NAME); 953 if (jobName == null || jobName.isEmpty()) { 954 jobName = XLog.format( 955 "oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(), 956 context.getWorkflow().getAppName(), action.getName(), 957 context.getWorkflow().getId()); 958 launcherJobConf.setJobName(jobName); 959 } 960 961 // Inject Oozie job information if enabled. 962 injectJobInfo(launcherJobConf, actionConf, context, action); 963 964 injectLauncherCallback(context, launcherJobConf); 965 966 String jobId = context.getWorkflow().getId(); 967 String actionId = action.getId(); 968 Path actionDir = context.getActionDir(); 969 String recoveryId = context.getRecoveryId(); 970 971 // Getting the prepare XML from the action XML 972 Namespace ns = actionXml.getNamespace(); 973 Element prepareElement = actionXml.getChild("prepare", ns); 974 String prepareXML = ""; 975 if (prepareElement != null) { 976 if (prepareElement.getChildren().size() > 0) { 977 prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim(); 978 } 979 } 980 LauncherMapperHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf, 981 prepareXML); 982 983 // Set the launcher Main Class 984 LauncherMapperHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml)); 985 LauncherMapperHelper.setupLauncherURIHandlerConf(launcherJobConf); 986 LauncherMapperHelper.setupMaxOutputData(launcherJobConf, maxActionOutputLen); 987 LauncherMapperHelper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize); 988 LauncherMapperHelper.setupMaxFSGlob(launcherJobConf, maxFSGlobMax); 989 990 List<Element> list = actionXml.getChildren("arg", ns); 991 String[] args = new String[list.size()]; 992 for (int i = 0; i < list.size(); i++) { 993 args[i] = list.get(i).getTextTrim(); 994 } 995 LauncherMapperHelper.setupMainArguments(launcherJobConf, args); 996 997 // Make mapred.child.java.opts and mapreduce.map.java.opts equal, but give values from the latter priority; also append 998 // <java-opt> and <java-opts> and give those highest priority 999 StringBuilder opts = new StringBuilder(launcherJobConf.get(HADOOP_CHILD_JAVA_OPTS, "")); 1000 if (launcherJobConf.get(HADOOP_MAP_JAVA_OPTS) != null) { 1001 opts.append(" ").append(launcherJobConf.get(HADOOP_MAP_JAVA_OPTS)); 1002 } 1003 List<Element> javaopts = actionXml.getChildren("java-opt", ns); 1004 for (Element opt: javaopts) { 1005 opts.append(" ").append(opt.getTextTrim()); 1006 } 1007 Element opt = actionXml.getChild("java-opts", ns); 1008 if (opt != null) { 1009 opts.append(" ").append(opt.getTextTrim()); 1010 } 1011 launcherJobConf.set(HADOOP_CHILD_JAVA_OPTS, opts.toString().trim()); 1012 launcherJobConf.set(HADOOP_MAP_JAVA_OPTS, opts.toString().trim()); 1013 1014 // setting for uber mode 1015 if (launcherJobConf.getBoolean(HADOOP_YARN_UBER_MODE, false)) { 1016 if (checkPropertiesToDisableUber(launcherJobConf)) { 1017 launcherJobConf.setBoolean(HADOOP_YARN_UBER_MODE, false); 1018 } 1019 else { 1020 updateConfForUberMode(launcherJobConf); 1021 } 1022 } 1023 updateConfForJavaTmpDir(launcherJobConf); 1024 injectLauncherTimelineServiceEnabled(launcherJobConf, actionConf); 1025 1026 // properties from action that are needed by the launcher (e.g. QUEUE NAME, ACLs) 1027 // maybe we should add queue to the WF schema, below job-tracker 1028 actionConfToLauncherConf(actionConf, launcherJobConf); 1029 1030 return launcherJobConf; 1031 } 1032 catch (Exception ex) { 1033 throw convertException(ex); 1034 } 1035 } 1036 1037 private boolean checkPropertiesToDisableUber(Configuration launcherConf) { 1038 boolean disable = false; 1039 if (launcherConf.getBoolean(HADOOP_JOB_CLASSLOADER, false)) { 1040 disable = true; 1041 } 1042 else if (launcherConf.getBoolean(HADOOP_USER_CLASSPATH_FIRST, false)) { 1043 disable = true; 1044 } 1045 return disable; 1046 } 1047 1048 protected void injectCallback(Context context, Configuration conf) { 1049 String callback = context.getCallbackUrl("$jobStatus"); 1050 if (conf.get("job.end.notification.url") != null) { 1051 LOG.warn("Overriding the action job end notification URI"); 1052 } 1053 conf.set("job.end.notification.url", callback); 1054 } 1055 1056 void injectActionCallback(Context context, Configuration actionConf) { 1057 // action callback needs to be injected only for mapreduce actions. 1058 } 1059 1060 void injectLauncherCallback(Context context, Configuration launcherConf) { 1061 injectCallback(context, launcherConf); 1062 } 1063 1064 private void actionConfToLauncherConf(Configuration actionConf, JobConf launcherConf) { 1065 for (String name : SPECIAL_PROPERTIES) { 1066 if (actionConf.get(name) != null && launcherConf.get("oozie.launcher." + name) == null) { 1067 launcherConf.set(name, actionConf.get(name)); 1068 } 1069 } 1070 } 1071 1072 public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException { 1073 JobClient jobClient = null; 1074 boolean exception = false; 1075 try { 1076 Path appPathRoot = new Path(context.getWorkflow().getAppPath()); 1077 1078 // app path could be a file 1079 if (actionFs.isFile(appPathRoot)) { 1080 appPathRoot = appPathRoot.getParent(); 1081 } 1082 1083 Element actionXml = XmlUtils.parseXml(action.getConf()); 1084 1085 // action job configuration 1086 Configuration actionConf = loadHadoopDefaultResources(context, actionXml); 1087 setupActionConf(actionConf, context, actionXml, appPathRoot); 1088 LOG.debug("Setting LibFilesArchives "); 1089 setLibFilesArchives(context, actionXml, appPathRoot, actionConf); 1090 1091 String jobName = actionConf.get(HADOOP_JOB_NAME); 1092 if (jobName == null || jobName.isEmpty()) { 1093 jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}", 1094 getType(), context.getWorkflow().getAppName(), 1095 action.getName(), context.getWorkflow().getId()); 1096 actionConf.set(HADOOP_JOB_NAME, jobName); 1097 } 1098 1099 injectActionCallback(context, actionConf); 1100 1101 if(actionConf.get(ACL_MODIFY_JOB) == null || actionConf.get(ACL_MODIFY_JOB).trim().equals("")) { 1102 // ONLY in the case where user has not given the 1103 // modify-job ACL specifically 1104 if (context.getWorkflow().getAcl() != null) { 1105 // setting the group owning the Oozie job to allow anybody in that 1106 // group to modify the jobs. 1107 actionConf.set(ACL_MODIFY_JOB, context.getWorkflow().getAcl()); 1108 } 1109 } 1110 1111 // Setting the credential properties in launcher conf 1112 JobConf credentialsConf = null; 1113 HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context, 1114 action, actionConf); 1115 if (credentialsProperties != null) { 1116 1117 // Adding if action need to set more credential tokens 1118 credentialsConf = new JobConf(false); 1119 XConfiguration.copy(actionConf, credentialsConf); 1120 setCredentialTokens(credentialsConf, context, action, credentialsProperties); 1121 1122 // insert conf to action conf from credentialsConf 1123 for (Entry<String, String> entry : credentialsConf) { 1124 if (actionConf.get(entry.getKey()) == null) { 1125 actionConf.set(entry.getKey(), entry.getValue()); 1126 } 1127 } 1128 } 1129 1130 JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf); 1131 1132 LOG.debug("Creating Job Client for action " + action.getId()); 1133 jobClient = createJobClient(context, launcherJobConf); 1134 String launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context 1135 .getRecoveryId()); 1136 boolean alreadyRunning = launcherId != null; 1137 RunningJob runningJob; 1138 1139 // if user-retry is on, always submit new launcher 1140 boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry(); 1141 1142 if (alreadyRunning && !isUserRetry) { 1143 runningJob = jobClient.getJob(JobID.forName(launcherId)); 1144 if (runningJob == null) { 1145 String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER); 1146 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017", 1147 "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker); 1148 } 1149 } 1150 else { 1151 LOG.debug("Submitting the job through Job Client for action " + action.getId()); 1152 1153 // setting up propagation of the delegation token. 1154 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 1155 Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(has 1156 .getMRDelegationTokenRenewer(launcherJobConf)); 1157 launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt); 1158 1159 // insert credentials tokens to launcher job conf if needed 1160 if (needInjectCredentials() && credentialsConf != null) { 1161 for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) { 1162 Text fauxAlias = new Text(tk.getKind() + "_" + tk.getService()); 1163 LOG.debug("ADDING TOKEN: " + fauxAlias); 1164 launcherJobConf.getCredentials().addToken(fauxAlias, tk); 1165 } 1166 if (credentialsConf.getCredentials().numberOfSecretKeys() > 0) { 1167 for (Entry<String, CredentialsProperties> entry : credentialsProperties.entrySet()) { 1168 CredentialsProperties credProps = entry.getValue(); 1169 if (credProps != null) { 1170 Text credName = new Text(credProps.getName()); 1171 byte[] secKey = credentialsConf.getCredentials().getSecretKey(credName); 1172 if (secKey != null) { 1173 LOG.debug("ADDING CREDENTIAL: " + credProps.getName()); 1174 launcherJobConf.getCredentials().addSecretKey(credName, secKey); 1175 } 1176 } 1177 } 1178 } 1179 } 1180 else { 1181 LOG.info("No need to inject credentials."); 1182 } 1183 runningJob = jobClient.submitJob(launcherJobConf); 1184 if (runningJob == null) { 1185 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017", 1186 "Error submitting launcher for action [{0}]", action.getId()); 1187 } 1188 launcherId = runningJob.getID().toString(); 1189 LOG.debug("After submission get the launcherId " + launcherId); 1190 } 1191 1192 String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER); 1193 String consoleUrl = runningJob.getTrackingURL(); 1194 context.setStartData(launcherId, jobTracker, consoleUrl); 1195 } 1196 catch (Exception ex) { 1197 exception = true; 1198 throw convertException(ex); 1199 } 1200 finally { 1201 if (jobClient != null) { 1202 try { 1203 jobClient.close(); 1204 } 1205 catch (Exception e) { 1206 if (exception) { 1207 LOG.error("JobClient error: ", e); 1208 } 1209 else { 1210 throw convertException(e); 1211 } 1212 } 1213 } 1214 } 1215 } 1216 private boolean needInjectCredentials() { 1217 boolean methodExists = true; 1218 1219 Class klass; 1220 try { 1221 klass = Class.forName("org.apache.hadoop.mapred.JobConf"); 1222 klass.getMethod("getCredentials"); 1223 } 1224 catch (ClassNotFoundException ex) { 1225 methodExists = false; 1226 } 1227 catch (NoSuchMethodException ex) { 1228 methodExists = false; 1229 } 1230 1231 return methodExists; 1232 } 1233 1234 protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context, 1235 WorkflowAction action, Configuration actionConf) throws Exception { 1236 HashMap<String, CredentialsProperties> credPropertiesMap = null; 1237 if (context != null && action != null) { 1238 if (!"true".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP))) { 1239 XConfiguration wfJobConf = getWorkflowConf(context); 1240 if ("false".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) || 1241 !wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP))) { 1242 credPropertiesMap = getActionCredentialsProperties(context, action); 1243 if (credPropertiesMap != null) { 1244 for (String key : credPropertiesMap.keySet()) { 1245 CredentialsProperties prop = credPropertiesMap.get(key); 1246 if (prop != null) { 1247 LOG.debug("Credential Properties set for action : " + action.getId()); 1248 for (String property : prop.getProperties().keySet()) { 1249 actionConf.set(property, prop.getProperties().get(property)); 1250 LOG.debug("property : '" + property + "', value : '" + prop.getProperties().get(property) 1251 + "'"); 1252 } 1253 } 1254 } 1255 } else { 1256 LOG.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred()); 1257 } 1258 } else { 1259 LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)"); 1260 } 1261 } else { 1262 LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)"); 1263 } 1264 } else { 1265 LOG.warn("context or action is null"); 1266 } 1267 return credPropertiesMap; 1268 } 1269 1270 protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action, 1271 HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception { 1272 1273 if (context != null && action != null && credPropertiesMap != null) { 1274 // Make sure we're logged into Kerberos; if not, or near expiration, it will relogin 1275 CredentialsProvider.ensureKerberosLogin(); 1276 for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) { 1277 String credName = entry.getKey(); 1278 CredentialsProperties credProps = entry.getValue(); 1279 if (credProps != null) { 1280 CredentialsProvider credProvider = new CredentialsProvider(credProps.getType()); 1281 Credentials credentialObject = credProvider.createCredentialObject(); 1282 if (credentialObject != null) { 1283 credentialObject.addtoJobConf(jobconf, credProps, context); 1284 LOG.debug("Retrieved Credential '" + credName + "' for action " + action.getId()); 1285 } 1286 else { 1287 LOG.debug("Credentials object is null for name= " + credName + ", type=" + credProps.getType()); 1288 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA020", 1289 "Could not load credentials of type [{0}] with name [{1}]]; perhaps it was not defined" 1290 + " in oozie-site.xml?", credProps.getType(), credName); 1291 } 1292 } 1293 } 1294 } 1295 1296 } 1297 1298 protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context, 1299 WorkflowAction action) throws Exception { 1300 HashMap<String, CredentialsProperties> props = new HashMap<String, CredentialsProperties>(); 1301 if (context != null && action != null) { 1302 String credsInAction = action.getCred(); 1303 if (credsInAction != null) { 1304 LOG.debug("Get credential '" + credsInAction + "' properties for action : " + action.getId()); 1305 String[] credNames = credsInAction.split(","); 1306 for (String credName : credNames) { 1307 CredentialsProperties credProps = getCredProperties(context, credName); 1308 props.put(credName, credProps); 1309 } 1310 } 1311 } 1312 else { 1313 LOG.warn("context or action is null"); 1314 } 1315 return props; 1316 } 1317 1318 @SuppressWarnings("unchecked") 1319 protected CredentialsProperties getCredProperties(Context context, String credName) 1320 throws Exception { 1321 CredentialsProperties credProp = null; 1322 String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition(); 1323 XConfiguration wfjobConf = getWorkflowConf(context); 1324 Element elementJob = XmlUtils.parseXml(workflowXml); 1325 Element credentials = elementJob.getChild("credentials", elementJob.getNamespace()); 1326 if (credentials != null) { 1327 for (Element credential : (List<Element>) credentials.getChildren("credential", credentials.getNamespace())) { 1328 String name = credential.getAttributeValue("name"); 1329 String type = credential.getAttributeValue("type"); 1330 LOG.debug("getCredProperties: Name: " + name + ", Type: " + type); 1331 if (name.equalsIgnoreCase(credName)) { 1332 credProp = new CredentialsProperties(name, type); 1333 for (Element property : (List<Element>) credential.getChildren("property", 1334 credential.getNamespace())) { 1335 String propertyName = property.getChildText("name", property.getNamespace()); 1336 String propertyValue = property.getChildText("value", property.getNamespace()); 1337 ELEvaluator eval = new ELEvaluator(); 1338 for (Map.Entry<String, String> entry : wfjobConf) { 1339 eval.setVariable(entry.getKey(), entry.getValue().trim()); 1340 } 1341 propertyName = eval.evaluate(propertyName, String.class); 1342 propertyValue = eval.evaluate(propertyValue, String.class); 1343 1344 credProp.getProperties().put(propertyName, propertyValue); 1345 LOG.debug("getCredProperties: Properties name :'" + propertyName + "', Value : '" 1346 + propertyValue + "'"); 1347 } 1348 } 1349 } 1350 if (credProp == null && credName != null) { 1351 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA021", 1352 "Could not load credentials with name [{0}]].", credName); 1353 } 1354 } else { 1355 LOG.debug("credentials is null for the action"); 1356 } 1357 return credProp; 1358 } 1359 1360 @Override 1361 public void start(Context context, WorkflowAction action) throws ActionExecutorException { 1362 LogUtils.setLogInfo(action); 1363 try { 1364 LOG.debug("Starting action " + action.getId() + " getting Action File System"); 1365 FileSystem actionFs = context.getAppFileSystem(); 1366 LOG.debug("Preparing action Dir through copying " + context.getActionDir()); 1367 prepareActionDir(actionFs, context); 1368 LOG.debug("Action Dir is ready. Submitting the action "); 1369 submitLauncher(actionFs, context, action); 1370 LOG.debug("Action submit completed. Performing check "); 1371 check(context, action); 1372 LOG.debug("Action check is done after submission"); 1373 } 1374 catch (Exception ex) { 1375 throw convertException(ex); 1376 } 1377 } 1378 1379 @Override 1380 public void end(Context context, WorkflowAction action) throws ActionExecutorException { 1381 try { 1382 String externalStatus = action.getExternalStatus(); 1383 WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ? WorkflowAction.Status.OK 1384 : WorkflowAction.Status.ERROR; 1385 context.setEndData(status, getActionSignal(status)); 1386 } 1387 catch (Exception ex) { 1388 throw convertException(ex); 1389 } 1390 finally { 1391 try { 1392 FileSystem actionFs = context.getAppFileSystem(); 1393 cleanUpActionDir(actionFs, context); 1394 } 1395 catch (Exception ex) { 1396 throw convertException(ex); 1397 } 1398 } 1399 } 1400 1401 /** 1402 * Create job client object 1403 * 1404 * @param context 1405 * @param jobConf 1406 * @return JobClient 1407 * @throws HadoopAccessorException 1408 */ 1409 protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException { 1410 String user = context.getWorkflow().getUser(); 1411 String group = context.getWorkflow().getGroup(); 1412 return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf); 1413 } 1414 1415 protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{ 1416 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId())); 1417 return runningJob; 1418 } 1419 1420 /** 1421 * Useful for overriding in actions that do subsequent job runs 1422 * such as the MapReduce Action, where the launcher job is not the 1423 * actual job that then gets monitored. 1424 */ 1425 protected String getActualExternalId(WorkflowAction action) { 1426 return action.getExternalId(); 1427 } 1428 1429 @Override 1430 public void check(Context context, WorkflowAction action) throws ActionExecutorException { 1431 JobClient jobClient = null; 1432 boolean exception = false; 1433 LogUtils.setLogInfo(action); 1434 try { 1435 Element actionXml = XmlUtils.parseXml(action.getConf()); 1436 FileSystem actionFs = context.getAppFileSystem(); 1437 JobConf jobConf = createBaseHadoopConf(context, actionXml); 1438 jobClient = createJobClient(context, jobConf); 1439 RunningJob runningJob = getRunningJob(context, action, jobClient); 1440 if (runningJob == null) { 1441 context.setExecutionData(FAILED, null); 1442 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017", 1443 "Could not lookup launched hadoop Job ID [{0}] which was associated with " + 1444 " action [{1}]. Failing this action!", getActualExternalId(action), action.getId()); 1445 } 1446 if (runningJob.isComplete()) { 1447 Path actionDir = context.getActionDir(); 1448 String newId = null; 1449 // load sequence file into object 1450 Map<String, String> actionData = LauncherMapperHelper.getActionData(actionFs, actionDir, jobConf); 1451 if (actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID)) { 1452 newId = actionData.get(LauncherMapper.ACTION_DATA_NEW_ID); 1453 String launcherId = action.getExternalId(); 1454 runningJob = jobClient.getJob(JobID.forName(newId)); 1455 if (runningJob == null) { 1456 context.setExternalStatus(FAILED); 1457 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017", 1458 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newId, 1459 action.getId()); 1460 } 1461 context.setExternalChildIDs(newId); 1462 LOG.info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId, 1463 newId); 1464 } 1465 else { 1466 String externalIDs = actionData.get(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS); 1467 if (externalIDs != null) { 1468 context.setExternalChildIDs(externalIDs); 1469 LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs); 1470 } 1471 else if (LauncherMapperHelper.hasOutputData(actionData)) { 1472 // Load stored Hadoop jobs ids and promote them as external child ids 1473 // This is for jobs launched with older release during upgrade to Oozie 4.3 1474 Properties props = PropertiesUtils.stringToProperties(actionData 1475 .get(LauncherMapper.ACTION_DATA_OUTPUT_PROPS)); 1476 if (props.get(LauncherMain.HADOOP_JOBS) != null) { 1477 externalIDs = (String) props.get(LauncherMain.HADOOP_JOBS); 1478 context.setExternalChildIDs(externalIDs); 1479 LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs); 1480 } 1481 } 1482 } 1483 if (runningJob.isComplete()) { 1484 // fetching action output and stats for the Map-Reduce action. 1485 if (newId != null) { 1486 actionData = LauncherMapperHelper.getActionData(actionFs, context.getActionDir(), jobConf); 1487 } 1488 LOG.info(XLog.STD, "action completed, external ID [{0}]", 1489 action.getExternalId()); 1490 if (LauncherMapperHelper.isMainSuccessful(runningJob)) { 1491 if (getCaptureOutput(action) && LauncherMapperHelper.hasOutputData(actionData)) { 1492 context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData 1493 .get(LauncherMapper.ACTION_DATA_OUTPUT_PROPS))); 1494 LOG.info(XLog.STD, "action produced output"); 1495 } 1496 else { 1497 context.setExecutionData(SUCCEEDED, null); 1498 } 1499 if (LauncherMapperHelper.hasStatsData(actionData)) { 1500 context.setExecutionStats(actionData.get(LauncherMapper.ACTION_DATA_STATS)); 1501 LOG.info(XLog.STD, "action produced stats"); 1502 } 1503 getActionData(actionFs, runningJob, action, context); 1504 } 1505 else { 1506 String errorReason; 1507 if (actionData.containsKey(LauncherMapper.ACTION_DATA_ERROR_PROPS)) { 1508 Properties props = PropertiesUtils.stringToProperties(actionData 1509 .get(LauncherMapper.ACTION_DATA_ERROR_PROPS)); 1510 String errorCode = props.getProperty("error.code"); 1511 if ("0".equals(errorCode)) { 1512 errorCode = "JA018"; 1513 } 1514 if ("-1".equals(errorCode)) { 1515 errorCode = "JA019"; 1516 } 1517 errorReason = props.getProperty("error.reason"); 1518 LOG.warn("Launcher ERROR, reason: {0}", errorReason); 1519 String exMsg = props.getProperty("exception.message"); 1520 String errorInfo = (exMsg != null) ? exMsg : errorReason; 1521 context.setErrorInfo(errorCode, errorInfo); 1522 String exStackTrace = props.getProperty("exception.stacktrace"); 1523 if (exMsg != null) { 1524 LOG.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace); 1525 } 1526 } 1527 else { 1528 errorReason = XLog.format("LauncherMapper died, check Hadoop LOG for job [{0}:{1}]", action 1529 .getTrackerUri(), action.getExternalId()); 1530 LOG.warn(errorReason); 1531 } 1532 context.setExecutionData(FAILED_KILLED, null); 1533 } 1534 } 1535 else { 1536 context.setExternalStatus("RUNNING"); 1537 LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]", 1538 runningJob.getID()); 1539 } 1540 } 1541 else { 1542 context.setExternalStatus("RUNNING"); 1543 LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]", 1544 runningJob.getID()); 1545 } 1546 } 1547 catch (Exception ex) { 1548 LOG.warn("Exception in check(). Message[{0}]", ex.getMessage(), ex); 1549 exception = true; 1550 throw convertException(ex); 1551 } 1552 finally { 1553 if (jobClient != null) { 1554 try { 1555 jobClient.close(); 1556 } 1557 catch (Exception e) { 1558 if (exception) { 1559 LOG.error("JobClient error: ", e); 1560 } 1561 else { 1562 throw convertException(e); 1563 } 1564 } 1565 } 1566 } 1567 } 1568 1569 /** 1570 * Get the output data of an action. Subclasses should override this method 1571 * to get action specific output data. 1572 * 1573 * @param actionFs the FileSystem object 1574 * @param runningJob the runningJob 1575 * @param action the Workflow action 1576 * @param context executor context 1577 * 1578 */ 1579 protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) 1580 throws HadoopAccessorException, JDOMException, IOException, URISyntaxException { 1581 } 1582 1583 protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException { 1584 Element eConf = XmlUtils.parseXml(action.getConf()); 1585 Namespace ns = eConf.getNamespace(); 1586 Element captureOutput = eConf.getChild("capture-output", ns); 1587 return captureOutput != null; 1588 } 1589 1590 @Override 1591 public void kill(Context context, WorkflowAction action) throws ActionExecutorException { 1592 JobClient jobClient = null; 1593 boolean exception = false; 1594 try { 1595 Element actionXml = XmlUtils.parseXml(action.getConf()); 1596 final JobConf jobConf = createBaseHadoopConf(context, actionXml); 1597 WorkflowJob wfJob = context.getWorkflow(); 1598 Configuration conf = null; 1599 if ( wfJob.getConf() != null ) { 1600 conf = new XConfiguration(new StringReader(wfJob.getConf())); 1601 } 1602 String launcherTag = LauncherMapperHelper.getActionYarnTag(conf, wfJob.getParentId(), action); 1603 jobConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, LauncherMapperHelper.getTag(launcherTag)); 1604 jobConf.set(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME, Long.toString(action.getStartTime().getTime())); 1605 UserGroupInformation ugi = Services.get().get(UserGroupInformationService.class) 1606 .getProxyUser(context.getWorkflow().getUser()); 1607 ugi.doAs(new PrivilegedExceptionAction<Void>() { 1608 @Override 1609 public Void run() throws Exception { 1610 LauncherMainHadoopUtils.killChildYarnJobs(jobConf); 1611 return null; 1612 } 1613 }); 1614 jobClient = createJobClient(context, jobConf); 1615 RunningJob runningJob = getRunningJob(context, action, jobClient); 1616 if (runningJob != null) { 1617 runningJob.killJob(); 1618 } 1619 context.setExternalStatus(KILLED); 1620 context.setExecutionData(KILLED, null); 1621 } 1622 catch (Exception ex) { 1623 exception = true; 1624 throw convertException(ex); 1625 } 1626 finally { 1627 try { 1628 FileSystem actionFs = context.getAppFileSystem(); 1629 cleanUpActionDir(actionFs, context); 1630 if (jobClient != null) { 1631 jobClient.close(); 1632 } 1633 } 1634 catch (Exception ex) { 1635 if (exception) { 1636 LOG.error("Error: ", ex); 1637 } 1638 else { 1639 throw convertException(ex); 1640 } 1641 } 1642 } 1643 } 1644 1645 private static Set<String> FINAL_STATUS = new HashSet<String>(); 1646 1647 static { 1648 FINAL_STATUS.add(SUCCEEDED); 1649 FINAL_STATUS.add(KILLED); 1650 FINAL_STATUS.add(FAILED); 1651 FINAL_STATUS.add(FAILED_KILLED); 1652 } 1653 1654 @Override 1655 public boolean isCompleted(String externalStatus) { 1656 return FINAL_STATUS.contains(externalStatus); 1657 } 1658 1659 1660 /** 1661 * Return the sharelib names for the action. 1662 * <p> 1663 * If <code>NULL</code> or empty, it means that the action does not use the action 1664 * sharelib. 1665 * <p> 1666 * If a non-empty string, i.e. <code>foo</code>, it means the action uses the 1667 * action sharelib sub-directory <code>foo</code> and all JARs in the sharelib 1668 * <code>foo</code> directory will be in the action classpath. Multiple sharelib 1669 * sub-directories can be specified as a comma separated list. 1670 * <p> 1671 * The resolution is done using the following precedence order: 1672 * <ul> 1673 * <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the action configuration</li> 1674 * <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the job configuration</li> 1675 * <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the oozie configuration</li> 1676 * <li>Action Executor <code>getDefaultShareLibName()</code> method</li> 1677 * </ul> 1678 * 1679 * 1680 * @param context executor context. 1681 * @param actionXml 1682 * @param conf action configuration. 1683 * @return the action sharelib names. 1684 */ 1685 protected String[] getShareLibNames(Context context, Element actionXml, Configuration conf) { 1686 String[] names = conf.getStrings(ACTION_SHARELIB_FOR + getType()); 1687 if (names == null || names.length == 0) { 1688 try { 1689 XConfiguration jobConf = getWorkflowConf(context); 1690 names = jobConf.getStrings(ACTION_SHARELIB_FOR + getType()); 1691 if (names == null || names.length == 0) { 1692 names = Services.get().getConf().getStrings(ACTION_SHARELIB_FOR + getType()); 1693 if (names == null || names.length == 0) { 1694 String name = getDefaultShareLibName(actionXml); 1695 if (name != null) { 1696 names = new String[] { name }; 1697 } 1698 } 1699 } 1700 } 1701 catch (IOException ex) { 1702 throw new RuntimeException("It cannot happen, " + ex.toString(), ex); 1703 } 1704 } 1705 return names; 1706 } 1707 1708 private final static String ACTION_SHARELIB_FOR = "oozie.action.sharelib.for."; 1709 1710 1711 /** 1712 * Returns the default sharelib name for the action if any. 1713 * 1714 * @param actionXml the action XML fragment. 1715 * @return the sharelib name for the action, <code>NULL</code> if none. 1716 */ 1717 protected String getDefaultShareLibName(Element actionXml) { 1718 return null; 1719 } 1720 1721 public String[] getShareLibFilesForActionConf() { 1722 return null; 1723 } 1724 1725 /** 1726 * Sets some data for the action on completion 1727 * 1728 * @param context executor context 1729 * @param actionFs the FileSystem object 1730 */ 1731 protected void setActionCompletionData(Context context, FileSystem actionFs) throws IOException, 1732 HadoopAccessorException, URISyntaxException { 1733 } 1734 1735 private void injectJobInfo(JobConf launcherJobConf, Configuration actionConf, Context context, WorkflowAction action) { 1736 if (OozieJobInfo.isJobInfoEnabled()) { 1737 try { 1738 OozieJobInfo jobInfo = new OozieJobInfo(actionConf, context, action); 1739 String jobInfoStr = jobInfo.getJobInfo(); 1740 launcherJobConf.set(OozieJobInfo.JOB_INFO_KEY, jobInfoStr + "launcher=true"); 1741 actionConf.set(OozieJobInfo.JOB_INFO_KEY, jobInfoStr + "launcher=false"); 1742 } 1743 catch (Exception e) { 1744 // Just job info, should not impact the execution. 1745 LOG.error("Error while populating job info", e); 1746 } 1747 } 1748 } 1749 1750 @Override 1751 public boolean requiresNameNodeJobTracker() { 1752 return true; 1753 } 1754 1755 @Override 1756 public boolean supportsConfigurationJobXML() { 1757 return true; 1758 } 1759 1760 private XConfiguration getWorkflowConf(Context context) throws IOException { 1761 if (workflowConf == null) { 1762 workflowConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); 1763 } 1764 return workflowConf; 1765 1766 } 1767}