001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.action.hadoop; 019 020import java.io.ByteArrayOutputStream; 021import java.io.File; 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.io.PrintStream; 025import java.io.StringReader; 026import java.net.ConnectException; 027import java.net.URI; 028import java.net.URISyntaxException; 029import java.net.UnknownHostException; 030import java.util.ArrayList; 031import java.util.HashMap; 032import java.util.HashSet; 033import java.util.Iterator; 034import java.util.List; 035import java.util.Map; 036import java.util.Properties; 037import java.util.Set; 038import java.util.Map.Entry; 039import java.util.regex.Matcher; 040import java.util.regex.Pattern; 041 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.filecache.DistributedCache; 044import org.apache.hadoop.fs.FileStatus; 045import org.apache.hadoop.fs.FileSystem; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.fs.permission.AccessControlException; 048import org.apache.hadoop.io.Text; 049import org.apache.hadoop.mapred.JobClient; 050import org.apache.hadoop.mapred.JobConf; 051import org.apache.hadoop.mapred.JobID; 052import org.apache.hadoop.mapred.RunningJob; 053import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 054import org.apache.hadoop.util.DiskChecker; 055import org.apache.oozie.WorkflowActionBean; 056import org.apache.oozie.WorkflowJobBean; 057import org.apache.oozie.action.ActionExecutor; 058import org.apache.oozie.action.ActionExecutorException; 059import org.apache.oozie.client.OozieClient; 060import org.apache.oozie.client.WorkflowAction; 061import org.apache.oozie.service.HadoopAccessorException; 062import org.apache.oozie.service.HadoopAccessorService; 063import org.apache.oozie.service.Services; 064import org.apache.oozie.service.ShareLibService; 065import org.apache.oozie.service.URIHandlerService; 066import org.apache.oozie.service.WorkflowAppService; 067import org.apache.oozie.servlet.CallbackServlet; 068import org.apache.oozie.util.ELEvaluator; 069import org.apache.oozie.util.JobUtils; 070import org.apache.oozie.util.LogUtils; 071import org.apache.oozie.util.PropertiesUtils; 072import org.apache.oozie.util.XConfiguration; 073import org.apache.oozie.util.XLog; 074import org.apache.oozie.util.XmlUtils; 075import org.apache.oozie.util.ELEvaluationException; 076import org.jdom.Element; 077import org.jdom.JDOMException; 078import org.jdom.Namespace; 079import org.apache.hadoop.security.token.Token; 080import org.apache.hadoop.security.token.TokenIdentifier; 081 082public class JavaActionExecutor extends ActionExecutor { 083 084 private static final String HADOOP_USER = "user.name"; 085 public static final String HADOOP_JOB_TRACKER = "mapred.job.tracker"; 086 public static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address"; 087 public static final String HADOOP_YARN_RM = "yarn.resourcemanager.address"; 088 public static final String HADOOP_NAME_NODE = "fs.default.name"; 089 private static final String HADOOP_JOB_NAME = "mapred.job.name"; 090 public static final String OOZIE_COMMON_LIBDIR = "oozie"; 091 public static final int MAX_EXTERNAL_STATS_SIZE_DEFAULT = Integer.MAX_VALUE; 092 private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>(); 093 public final static String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size"; 094 public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job"; 095 public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job"; 096 public static final String HADOOP_YARN_UBER_MODE = "mapreduce.job.ubertask.enable"; 097 public static final String HADOOP_MAP_MEMORY_MB = "mapreduce.map.memory.mb"; 098 public static final String HADOOP_CHILD_JAVA_OPTS = "mapred.child.java.opts"; 099 public static final String HADOOP_MAP_JAVA_OPTS = "mapreduce.map.java.opts"; 100 public static final String HADOOP_CHILD_JAVA_ENV = "mapred.child.env"; 101 public static final String HADOOP_MAP_JAVA_ENV = "mapreduce.map.env"; 102 public static final String YARN_AM_RESOURCE_MB = "yarn.app.mapreduce.am.resource.mb"; 103 public static final String YARN_AM_COMMAND_OPTS = "yarn.app.mapreduce.am.command-opts"; 104 public static final String YARN_AM_ENV = "yarn.app.mapreduce.am.env"; 105 private static final String JAVA_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.JavaMain"; 106 public static final int YARN_MEMORY_MB_MIN = 512; 107 private static int maxActionOutputLen; 108 private static int maxExternalStatsSize; 109 private static int maxFSGlobMax; 110 private static final String SUCCEEDED = "SUCCEEDED"; 111 private static final String KILLED = "KILLED"; 112 private static final String FAILED = "FAILED"; 113 private static final String FAILED_KILLED = "FAILED/KILLED"; 114 protected XLog LOG = XLog.getLog(getClass()); 115 private static final Pattern heapPattern = Pattern.compile("-Xmx(([0-9]+)[mMgG])"); 116 117 static { 118 DISALLOWED_PROPERTIES.add(HADOOP_USER); 119 DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER); 120 DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE); 121 DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER_2); 122 DISALLOWED_PROPERTIES.add(HADOOP_YARN_RM); 123 } 124 125 public JavaActionExecutor() { 126 this("java"); 127 } 128 129 protected JavaActionExecutor(String type) { 130 super(type); 131 requiresNNJT = true; 132 } 133 134 public static List<Class> getCommonLauncherClasses() { 135 List<Class> classes = new ArrayList<Class>(); 136 classes.add(LauncherMapper.class); 137 classes.add(OozieLauncherInputFormat.class); 138 classes.add(LauncherMainHadoopUtils.class); 139 classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher()); 140 return classes; 141 } 142 143 public List<Class> getLauncherClasses() { 144 List<Class> classes = new ArrayList<Class>(); 145 try { 146 classes.add(Class.forName(JAVA_MAIN_CLASS_NAME)); 147 } 148 catch (ClassNotFoundException e) { 149 throw new RuntimeException("Class not found", e); 150 } 151 return classes; 152 } 153 154 @Override 155 public void initActionType() { 156 super.initActionType(); 157 maxActionOutputLen = getOozieConf() 158 .getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 159 // TODO: Remove the below config get in a subsequent release.. 160 // This other irrelevant property is only used to 161 // preserve backwards compatibility cause of a typo. 162 // See OOZIE-4. 163 getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN, 164 2 * 1024)); 165 //Get the limit for the maximum allowed size of action stats 166 maxExternalStatsSize = getOozieConf().getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE, MAX_EXTERNAL_STATS_SIZE_DEFAULT); 167 maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize; 168 //Get the limit for the maximum number of globbed files/dirs for FS operation 169 maxFSGlobMax = getOozieConf().getInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX, LauncherMapper.GLOB_MAX_DEFAULT); 170 171 registerError(UnknownHostException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA001"); 172 registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.NON_TRANSIENT, 173 "JA002"); 174 registerError(DiskChecker.DiskOutOfSpaceException.class.getName(), 175 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA003"); 176 registerError(org.apache.hadoop.hdfs.protocol.QuotaExceededException.class.getName(), 177 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004"); 178 registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(), 179 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005"); 180 registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, " JA006"); 181 registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007"); 182 registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008"); 183 registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009"); 184 } 185 186 187 /** 188 * Get the maximum allowed size of stats 189 * 190 * @return maximum size of stats 191 */ 192 public static int getMaxExternalStatsSize() { 193 return maxExternalStatsSize; 194 } 195 196 static void checkForDisallowedProps(Configuration conf, String confName) throws ActionExecutorException { 197 for (String prop : DISALLOWED_PROPERTIES) { 198 if (conf.get(prop) != null) { 199 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA010", 200 "Property [{0}] not allowed in action [{1}] configuration", prop, confName); 201 } 202 } 203 } 204 205 public JobConf createBaseHadoopConf(Context context, Element actionXml) { 206 Namespace ns = actionXml.getNamespace(); 207 String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim(); 208 String nameNode = actionXml.getChild("name-node", ns).getTextTrim(); 209 JobConf conf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); 210 conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER)); 211 conf.set(HADOOP_JOB_TRACKER, jobTracker); 212 conf.set(HADOOP_JOB_TRACKER_2, jobTracker); 213 conf.set(HADOOP_YARN_RM, jobTracker); 214 conf.set(HADOOP_NAME_NODE, nameNode); 215 conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true"); 216 return conf; 217 } 218 219 private void injectLauncherProperties(Configuration srcConf, Configuration launcherConf) { 220 for (Map.Entry<String, String> entry : srcConf) { 221 if (entry.getKey().startsWith("oozie.launcher.")) { 222 String name = entry.getKey().substring("oozie.launcher.".length()); 223 String value = entry.getValue(); 224 // setting original KEY 225 launcherConf.set(entry.getKey(), value); 226 // setting un-prefixed key (to allow Hadoop job config 227 // for the launcher job 228 launcherConf.set(name, value); 229 } 230 } 231 } 232 233 Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) 234 throws ActionExecutorException { 235 try { 236 Namespace ns = actionXml.getNamespace(); 237 Element e = actionXml.getChild("configuration", ns); 238 if (e != null) { 239 String strConf = XmlUtils.prettyPrint(e).toString(); 240 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); 241 242 XConfiguration launcherConf = new XConfiguration(); 243 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 244 XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_JOB_TRACKER), getType()); 245 injectLauncherProperties(actionDefaultConf, launcherConf); 246 injectLauncherProperties(inlineConf, launcherConf); 247 injectLauncherUseUberMode(launcherConf); 248 checkForDisallowedProps(launcherConf, "launcher configuration"); 249 XConfiguration.copy(launcherConf, conf); 250 } else { 251 XConfiguration launcherConf = new XConfiguration(); 252 injectLauncherUseUberMode(launcherConf); 253 XConfiguration.copy(launcherConf, conf); 254 } 255 return conf; 256 } 257 catch (IOException ex) { 258 throw convertException(ex); 259 } 260 } 261 262 void injectLauncherUseUberMode(Configuration launcherConf) { 263 // Set Uber Mode for the launcher (YARN only, ignored by MR1) if not set by action conf and not disabled in oozie-site 264 if (launcherConf.get(HADOOP_YARN_UBER_MODE) == null) { 265 if (getOozieConf().getBoolean("oozie.action.launcher.mapreduce.job.ubertask.enable", false)) { 266 launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true); 267 } 268 } 269 } 270 271 void updateConfForUberMode(Configuration launcherConf) { 272 273 // child.env 274 boolean hasConflictEnv = false; 275 String launcherMapEnv = launcherConf.get(HADOOP_MAP_JAVA_ENV); 276 if (launcherMapEnv == null) { 277 launcherMapEnv = launcherConf.get(HADOOP_CHILD_JAVA_ENV); 278 } 279 String amEnv = launcherConf.get(YARN_AM_ENV); 280 StringBuffer envStr = new StringBuffer(); 281 HashMap<String, List<String>> amEnvMap = null; 282 HashMap<String, List<String>> launcherMapEnvMap = null; 283 if (amEnv != null) { 284 envStr.append(amEnv); 285 amEnvMap = populateEnvMap(amEnv); 286 } 287 if (launcherMapEnv != null) { 288 launcherMapEnvMap = populateEnvMap(launcherMapEnv); 289 if (amEnvMap != null) { 290 Iterator<String> envKeyItr = launcherMapEnvMap.keySet().iterator(); 291 while (envKeyItr.hasNext()) { 292 String envKey = envKeyItr.next(); 293 if (amEnvMap.containsKey(envKey)) { 294 List<String> amValList = amEnvMap.get(envKey); 295 List<String> launcherValList = launcherMapEnvMap.get(envKey); 296 Iterator<String> valItr = launcherValList.iterator(); 297 while (valItr.hasNext()) { 298 String val = valItr.next(); 299 if (!amValList.contains(val)) { 300 hasConflictEnv = true; 301 break; 302 } 303 else { 304 valItr.remove(); 305 } 306 } 307 if (launcherValList.isEmpty()) { 308 envKeyItr.remove(); 309 } 310 } 311 } 312 } 313 } 314 if (hasConflictEnv) { 315 launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, false); 316 } 317 else { 318 if (launcherMapEnvMap != null) { 319 for (String key : launcherMapEnvMap.keySet()) { 320 List<String> launcherValList = launcherMapEnvMap.get(key); 321 for (String val : launcherValList) { 322 if (envStr.length() > 0) { 323 envStr.append(","); 324 } 325 envStr.append(key).append("=").append(val); 326 } 327 } 328 } 329 330 launcherConf.set(YARN_AM_ENV, envStr.toString()); 331 332 // memory.mb 333 int launcherMapMemoryMB = launcherConf.getInt(HADOOP_MAP_MEMORY_MB, 1536); 334 int amMemoryMB = launcherConf.getInt(YARN_AM_RESOURCE_MB, 1536); 335 // YARN_MEMORY_MB_MIN to provide buffer. 336 // suppose launcher map aggressively use high memory, need some 337 // headroom for AM 338 int memoryMB = Math.max(launcherMapMemoryMB, amMemoryMB) + YARN_MEMORY_MB_MIN; 339 // limit to 4096 in case of 32 bit 340 if (launcherMapMemoryMB < 4096 && amMemoryMB < 4096 && memoryMB > 4096) { 341 memoryMB = 4096; 342 } 343 launcherConf.setInt(YARN_AM_RESOURCE_MB, memoryMB); 344 345 // We already made mapred.child.java.opts and 346 // mapreduce.map.java.opts equal, so just start with one of them 347 String launcherMapOpts = launcherConf.get(HADOOP_MAP_JAVA_OPTS, ""); 348 String amChildOpts = launcherConf.get(YARN_AM_COMMAND_OPTS); 349 StringBuilder optsStr = new StringBuilder(); 350 int heapSizeForMap = extractHeapSizeMB(launcherMapOpts); 351 int heapSizeForAm = extractHeapSizeMB(amChildOpts); 352 int heapSize = Math.max(heapSizeForMap, heapSizeForAm) + YARN_MEMORY_MB_MIN; 353 // limit to 3584 in case of 32 bit 354 if (heapSizeForMap < 4096 && heapSizeForAm < 4096 && heapSize > 3584) { 355 heapSize = 3584; 356 } 357 if (amChildOpts != null) { 358 optsStr.append(amChildOpts); 359 } 360 optsStr.append(" ").append(launcherMapOpts.trim()); 361 if (heapSize > 0) { 362 // append calculated total heap size to the end 363 optsStr.append(" ").append("-Xmx").append(heapSize).append("m"); 364 } 365 launcherConf.set(YARN_AM_COMMAND_OPTS, optsStr.toString().trim()); 366 } 367 } 368 369 private HashMap<String, List<String>> populateEnvMap(String input) { 370 HashMap<String, List<String>> envMaps = new HashMap<String, List<String>>(); 371 String[] envEntries = input.split(","); 372 for (String envEntry : envEntries) { 373 String[] envKeyVal = envEntry.split("="); 374 String envKey = envKeyVal[0].trim(); 375 List<String> valList = envMaps.get(envKey); 376 if (valList == null) { 377 valList = new ArrayList<String>(); 378 } 379 valList.add(envKeyVal[1].trim()); 380 envMaps.put(envKey, valList); 381 } 382 return envMaps; 383 } 384 385 public int extractHeapSizeMB(String input) { 386 int ret = 0; 387 if(input == null || input.equals("")) 388 return ret; 389 Matcher m = heapPattern.matcher(input); 390 String heapStr = null; 391 String heapNum = null; 392 // Grabs the last match which takes effect (in case that multiple Xmx options specified) 393 while (m.find()) { 394 heapStr = m.group(1); 395 heapNum = m.group(2); 396 } 397 if (heapStr != null) { 398 // when Xmx specified in Gigabyte 399 if(heapStr.endsWith("g") || heapStr.endsWith("G")) { 400 ret = Integer.parseInt(heapNum) * 1024; 401 } else { 402 ret = Integer.parseInt(heapNum); 403 } 404 } 405 return ret; 406 } 407 408 public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf) 409 throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException { 410 Namespace ns = element.getNamespace(); 411 Iterator<Element> it = element.getChildren("job-xml", ns).iterator(); 412 HashMap<String, FileSystem> filesystemsMap = new HashMap<String, FileSystem>(); 413 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 414 while (it.hasNext()) { 415 Element e = it.next(); 416 String jobXml = e.getTextTrim(); 417 Path pathSpecified = new Path(jobXml); 418 Path path = pathSpecified.isAbsolute() ? pathSpecified : new Path(appPath, jobXml); 419 FileSystem fs; 420 if (filesystemsMap.containsKey(path.toUri().getAuthority())) { 421 fs = filesystemsMap.get(path.toUri().getAuthority()); 422 } 423 else { 424 if (path.toUri().getAuthority() != null) { 425 fs = has.createFileSystem(context.getWorkflow().getUser(), path.toUri(), 426 has.createJobConf(path.toUri().getAuthority())); 427 } 428 else { 429 fs = context.getAppFileSystem(); 430 } 431 filesystemsMap.put(path.toUri().getAuthority(), fs); 432 } 433 Configuration jobXmlConf = new XConfiguration(fs.open(path)); 434 try { 435 String jobXmlConfString = XmlUtils.prettyPrint(jobXmlConf).toString(); 436 jobXmlConfString = XmlUtils.removeComments(jobXmlConfString); 437 jobXmlConfString = context.getELEvaluator().evaluate(jobXmlConfString, String.class); 438 jobXmlConf = new XConfiguration(new StringReader(jobXmlConfString)); 439 } 440 catch (ELEvaluationException ex) { 441 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, "EL_EVAL_ERROR", ex 442 .getMessage(), ex); 443 } 444 catch (Exception ex) { 445 context.setErrorInfo("EL_ERROR", ex.getMessage()); 446 } 447 checkForDisallowedProps(jobXmlConf, "job-xml"); 448 XConfiguration.copy(jobXmlConf, conf); 449 } 450 Element e = element.getChild("configuration", ns); 451 if (e != null) { 452 String strConf = XmlUtils.prettyPrint(e).toString(); 453 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); 454 checkForDisallowedProps(inlineConf, "inline configuration"); 455 XConfiguration.copy(inlineConf, conf); 456 } 457 } 458 459 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) 460 throws ActionExecutorException { 461 try { 462 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 463 XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_JOB_TRACKER), getType()); 464 XConfiguration.injectDefaults(actionDefaults, actionConf); 465 466 has.checkSupportedFilesystem(appPath.toUri()); 467 468 // Set the Java Main Class for the Java action to give to the Java launcher 469 setJavaMain(actionConf, actionXml); 470 471 parseJobXmlAndConfiguration(context, actionXml, appPath, actionConf); 472 return actionConf; 473 } 474 catch (IOException ex) { 475 throw convertException(ex); 476 } 477 catch (HadoopAccessorException ex) { 478 throw convertException(ex); 479 } 480 catch (URISyntaxException ex) { 481 throw convertException(ex); 482 } 483 } 484 485 Configuration addToCache(Configuration conf, Path appPath, String filePath, boolean archive) 486 throws ActionExecutorException { 487 488 URI uri = null; 489 try { 490 uri = new URI(filePath); 491 URI baseUri = appPath.toUri(); 492 if (uri.getScheme() == null) { 493 String resolvedPath = uri.getPath(); 494 if (!resolvedPath.startsWith("/")) { 495 resolvedPath = baseUri.getPath() + "/" + resolvedPath; 496 } 497 uri = new URI(baseUri.getScheme(), baseUri.getAuthority(), resolvedPath, uri.getQuery(), uri.getFragment()); 498 } 499 if (archive) { 500 DistributedCache.addCacheArchive(uri.normalize(), conf); 501 } 502 else { 503 String fileName = filePath.substring(filePath.lastIndexOf("/") + 1); 504 if (fileName.endsWith(".so") || fileName.contains(".so.")) { // .so files 505 uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), uri.getQuery(), fileName); 506 DistributedCache.addCacheFile(uri.normalize(), conf); 507 } 508 else if (fileName.endsWith(".jar")) { // .jar files 509 if (!fileName.contains("#")) { 510 String user = conf.get("user.name"); 511 Path pathToAdd = new Path(uri.normalize()); 512 Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, pathToAdd, conf); 513 } 514 else { 515 DistributedCache.addCacheFile(uri.normalize(), conf); 516 } 517 } 518 else { // regular files 519 if (!fileName.contains("#")) { 520 uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), uri.getQuery(), fileName); 521 } 522 DistributedCache.addCacheFile(uri.normalize(), conf); 523 } 524 } 525 DistributedCache.createSymlink(conf); 526 return conf; 527 } 528 catch (Exception ex) { 529 LOG.debug( 530 "Errors when add to DistributedCache. Path=" + uri.toString() + ", archive=" + archive + ", conf=" 531 + XmlUtils.prettyPrint(conf).toString()); 532 throw convertException(ex); 533 } 534 } 535 536 public void prepareActionDir(FileSystem actionFs, Context context) throws ActionExecutorException { 537 try { 538 Path actionDir = context.getActionDir(); 539 Path tempActionDir = new Path(actionDir.getParent(), actionDir.getName() + ".tmp"); 540 if (!actionFs.exists(actionDir)) { 541 try { 542 actionFs.mkdirs(tempActionDir); 543 actionFs.rename(tempActionDir, actionDir); 544 } 545 catch (IOException ex) { 546 actionFs.delete(tempActionDir, true); 547 actionFs.delete(actionDir, true); 548 throw ex; 549 } 550 } 551 } 552 catch (Exception ex) { 553 throw convertException(ex); 554 } 555 } 556 557 void cleanUpActionDir(FileSystem actionFs, Context context) throws ActionExecutorException { 558 try { 559 Path actionDir = context.getActionDir(); 560 if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false) 561 && actionFs.exists(actionDir)) { 562 actionFs.delete(actionDir, true); 563 } 564 } 565 catch (Exception ex) { 566 throw convertException(ex); 567 } 568 } 569 570 protected void addShareLib(Configuration conf, String[] actionShareLibNames) 571 throws ActionExecutorException { 572 if (actionShareLibNames != null) { 573 try { 574 ShareLibService shareLibService = Services.get().get(ShareLibService.class); 575 FileSystem fs = shareLibService.getFileSystem(); 576 if (fs != null) { 577 for (String actionShareLibName : actionShareLibNames) { 578 List<Path> listOfPaths = shareLibService.getShareLibJars(actionShareLibName); 579 if (listOfPaths != null && !listOfPaths.isEmpty()) { 580 581 for (Path actionLibPath : listOfPaths) { 582 JobUtils.addFileToClassPath(actionLibPath, conf, fs); 583 DistributedCache.createSymlink(conf); 584 } 585 } 586 } 587 } 588 } 589 catch (IOException ex) { 590 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen", 591 ex.getMessage()); 592 } 593 } 594 } 595 596 protected void addSystemShareLibForAction(Configuration conf) throws ActionExecutorException { 597 ShareLibService shareLibService = Services.get().get(ShareLibService.class); 598 // ShareLibService is null for test cases 599 if (shareLibService != null) { 600 try { 601 List<Path> listOfPaths = shareLibService.getSystemLibJars(JavaActionExecutor.OOZIE_COMMON_LIBDIR); 602 if (listOfPaths == null || listOfPaths.isEmpty()) { 603 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "EJ001", 604 "Could not locate Oozie sharelib"); 605 } 606 FileSystem fs = listOfPaths.get(0).getFileSystem(conf); 607 for (Path actionLibPath : listOfPaths) { 608 JobUtils.addFileToClassPath(actionLibPath, conf, fs); 609 DistributedCache.createSymlink(conf); 610 } 611 listOfPaths = shareLibService.getSystemLibJars(getType()); 612 if (listOfPaths != null) { 613 for (Path actionLibPath : listOfPaths) { 614 JobUtils.addFileToClassPath(actionLibPath, conf, fs); 615 DistributedCache.createSymlink(conf); 616 } 617 } 618 } 619 catch (IOException ex) { 620 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen", 621 ex.getMessage()); 622 } 623 } 624 } 625 626 protected void addActionLibs(Path appPath, Configuration conf) throws ActionExecutorException { 627 String[] actionLibsStrArr = conf.getStrings("oozie.launcher.oozie.libpath"); 628 if (actionLibsStrArr != null) { 629 try { 630 for (String actionLibsStr : actionLibsStrArr) { 631 actionLibsStr = actionLibsStr.trim(); 632 if (actionLibsStr.length() > 0) 633 { 634 Path actionLibsPath = new Path(actionLibsStr); 635 String user = conf.get("user.name"); 636 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf); 637 if (fs.exists(actionLibsPath)) { 638 FileStatus[] files = fs.listStatus(actionLibsPath); 639 for (FileStatus file : files) { 640 addToCache(conf, appPath, file.getPath().toUri().getPath(), false); 641 } 642 } 643 } 644 } 645 } 646 catch (HadoopAccessorException ex){ 647 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, 648 ex.getErrorCode().toString(), ex.getMessage()); 649 } 650 catch (IOException ex){ 651 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, 652 "It should never happen", ex.getMessage()); 653 } 654 } 655 } 656 657 @SuppressWarnings("unchecked") 658 public void setLibFilesArchives(Context context, Element actionXml, Path appPath, Configuration conf) 659 throws ActionExecutorException { 660 Configuration proto = context.getProtoActionConf(); 661 662 // Workflow lib/ 663 String[] paths = proto.getStrings(WorkflowAppService.APP_LIB_PATH_LIST); 664 if (paths != null) { 665 for (String path : paths) { 666 addToCache(conf, appPath, path, false); 667 } 668 } 669 670 // Action libs 671 addActionLibs(appPath, conf); 672 673 // files and archives defined in the action 674 for (Element eProp : (List<Element>) actionXml.getChildren()) { 675 if (eProp.getName().equals("file")) { 676 String[] filePaths = eProp.getTextTrim().split(","); 677 for (String path : filePaths) { 678 addToCache(conf, appPath, path.trim(), false); 679 } 680 } 681 else if (eProp.getName().equals("archive")) { 682 String[] archivePaths = eProp.getTextTrim().split(","); 683 for (String path : archivePaths){ 684 addToCache(conf, appPath, path.trim(), true); 685 } 686 } 687 } 688 689 addAllShareLibs(appPath, conf, context, actionXml); 690 } 691 692 // Adds action specific share libs and common share libs 693 private void addAllShareLibs(Path appPath, Configuration conf, Context context, Element actionXml) 694 throws ActionExecutorException { 695 // Add action specific share libs 696 addActionShareLib(appPath, conf, context, actionXml); 697 // Add common sharelibs for Oozie and launcher jars 698 addSystemShareLibForAction(conf); 699 } 700 701 private void addActionShareLib(Path appPath, Configuration conf, Context context, Element actionXml) 702 throws ActionExecutorException { 703 XConfiguration wfJobConf = null; 704 try { 705 wfJobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); 706 } 707 catch (IOException ioe) { 708 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen", 709 ioe.getMessage()); 710 } 711 // Action sharelibs are only added if user has specified to use system libpath 712 if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) { 713 // add action specific sharelibs 714 addShareLib(conf, getShareLibNames(context, actionXml, conf)); 715 } 716 } 717 718 719 protected String getLauncherMain(Configuration launcherConf, Element actionXml) { 720 return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, JavaMain.class.getName()); 721 } 722 723 private void setJavaMain(Configuration actionConf, Element actionXml) { 724 Namespace ns = actionXml.getNamespace(); 725 Element e = actionXml.getChild("main-class", ns); 726 if (e != null) { 727 actionConf.set(JavaMain.JAVA_MAIN_CLASS, e.getTextTrim()); 728 } 729 } 730 731 private static final String QUEUE_NAME = "mapred.job.queue.name"; 732 733 private static final Set<String> SPECIAL_PROPERTIES = new HashSet<String>(); 734 735 static { 736 SPECIAL_PROPERTIES.add(QUEUE_NAME); 737 SPECIAL_PROPERTIES.add(ACL_VIEW_JOB); 738 SPECIAL_PROPERTIES.add(ACL_MODIFY_JOB); 739 } 740 741 @SuppressWarnings("unchecked") 742 JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf) 743 throws ActionExecutorException { 744 try { 745 746 // app path could be a file 747 Path appPathRoot = new Path(context.getWorkflow().getAppPath()); 748 if (actionFs.isFile(appPathRoot)) { 749 appPathRoot = appPathRoot.getParent(); 750 } 751 752 // launcher job configuration 753 JobConf launcherJobConf = createBaseHadoopConf(context, actionXml); 754 setupLauncherConf(launcherJobConf, actionXml, appPathRoot, context); 755 756 // Properties for when a launcher job's AM gets restarted 757 LauncherMapperHelper.setupYarnRestartHandling(launcherJobConf, actionConf, action.getId()); 758 759 String actionShareLibProperty = actionConf.get(ACTION_SHARELIB_FOR + getType()); 760 if (actionShareLibProperty != null) { 761 launcherJobConf.set(ACTION_SHARELIB_FOR + getType(), actionShareLibProperty); 762 } 763 setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf); 764 765 String jobName = launcherJobConf.get(HADOOP_JOB_NAME); 766 if (jobName == null || jobName.isEmpty()) { 767 jobName = XLog.format( 768 "oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(), 769 context.getWorkflow().getAppName(), action.getName(), 770 context.getWorkflow().getId()); 771 launcherJobConf.setJobName(jobName); 772 } 773 774 String jobId = context.getWorkflow().getId(); 775 String actionId = action.getId(); 776 Path actionDir = context.getActionDir(); 777 String recoveryId = context.getRecoveryId(); 778 779 // Getting the prepare XML from the action XML 780 Namespace ns = actionXml.getNamespace(); 781 Element prepareElement = actionXml.getChild("prepare", ns); 782 String prepareXML = ""; 783 if (prepareElement != null) { 784 if (prepareElement.getChildren().size() > 0) { 785 prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim(); 786 } 787 } 788 LauncherMapperHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf, 789 prepareXML); 790 791 // Set the launcher Main Class 792 LauncherMapperHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml)); 793 LauncherMapperHelper.setupLauncherURIHandlerConf(launcherJobConf); 794 LauncherMapperHelper.setupMaxOutputData(launcherJobConf, maxActionOutputLen); 795 LauncherMapperHelper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize); 796 if (getOozieConf().get(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX) != null) { 797 LauncherMapperHelper.setupMaxFSGlob(launcherJobConf, maxFSGlobMax); 798 } 799 800 List<Element> list = actionXml.getChildren("arg", ns); 801 String[] args = new String[list.size()]; 802 for (int i = 0; i < list.size(); i++) { 803 args[i] = list.get(i).getTextTrim(); 804 } 805 LauncherMapperHelper.setupMainArguments(launcherJobConf, args); 806 807 // Make mapred.child.java.opts and mapreduce.map.java.opts equal, but give values from the latter priority; also append 808 // <java-opt> and <java-opts> and give those highest priority 809 StringBuilder opts = new StringBuilder(launcherJobConf.get(HADOOP_CHILD_JAVA_OPTS, "")); 810 if (launcherJobConf.get(HADOOP_MAP_JAVA_OPTS) != null) { 811 opts.append(" ").append(launcherJobConf.get(HADOOP_MAP_JAVA_OPTS)); 812 } 813 List<Element> javaopts = actionXml.getChildren("java-opt", ns); 814 for (Element opt: javaopts) { 815 opts.append(" ").append(opt.getTextTrim()); 816 } 817 Element opt = actionXml.getChild("java-opts", ns); 818 if (opt != null) { 819 opts.append(" ").append(opt.getTextTrim()); 820 } 821 launcherJobConf.set(HADOOP_CHILD_JAVA_OPTS, opts.toString().trim()); 822 launcherJobConf.set(HADOOP_MAP_JAVA_OPTS, opts.toString().trim()); 823 824 // setting for uber mode 825 if (launcherJobConf.getBoolean(HADOOP_YARN_UBER_MODE, false)) { 826 updateConfForUberMode(launcherJobConf); 827 } 828 829 // properties from action that are needed by the launcher (e.g. QUEUE NAME, ACLs) 830 // maybe we should add queue to the WF schema, below job-tracker 831 actionConfToLauncherConf(actionConf, launcherJobConf); 832 833 // to disable cancelation of delegation token on launcher job end 834 launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false); 835 836 return launcherJobConf; 837 } 838 catch (Exception ex) { 839 throw convertException(ex); 840 } 841 } 842 843 private void injectCallback(Context context, Configuration conf) { 844 String callback = context.getCallbackUrl("$jobStatus"); 845 if (conf.get("job.end.notification.url") != null) { 846 LOG.warn("Overriding the action job end notification URI"); 847 } 848 conf.set("job.end.notification.url", callback); 849 } 850 851 void injectActionCallback(Context context, Configuration actionConf) { 852 injectCallback(context, actionConf); 853 } 854 855 void injectLauncherCallback(Context context, Configuration launcherConf) { 856 injectCallback(context, launcherConf); 857 } 858 859 private void actionConfToLauncherConf(Configuration actionConf, JobConf launcherConf) { 860 for (String name : SPECIAL_PROPERTIES) { 861 if (actionConf.get(name) != null && launcherConf.get("oozie.launcher." + name) == null) { 862 launcherConf.set(name, actionConf.get(name)); 863 } 864 } 865 } 866 867 public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException { 868 JobClient jobClient = null; 869 boolean exception = false; 870 try { 871 Path appPathRoot = new Path(context.getWorkflow().getAppPath()); 872 873 // app path could be a file 874 if (actionFs.isFile(appPathRoot)) { 875 appPathRoot = appPathRoot.getParent(); 876 } 877 878 Element actionXml = XmlUtils.parseXml(action.getConf()); 879 880 // action job configuration 881 Configuration actionConf = createBaseHadoopConf(context, actionXml); 882 setupActionConf(actionConf, context, actionXml, appPathRoot); 883 LOG.debug("Setting LibFilesArchives "); 884 setLibFilesArchives(context, actionXml, appPathRoot, actionConf); 885 886 String jobName = actionConf.get(HADOOP_JOB_NAME); 887 if (jobName == null || jobName.isEmpty()) { 888 jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}", 889 getType(), context.getWorkflow().getAppName(), 890 action.getName(), context.getWorkflow().getId()); 891 actionConf.set(HADOOP_JOB_NAME, jobName); 892 } 893 894 injectActionCallback(context, actionConf); 895 896 if(actionConf.get(ACL_MODIFY_JOB) == null || actionConf.get(ACL_MODIFY_JOB).trim().equals("")) { 897 // ONLY in the case where user has not given the 898 // modify-job ACL specifically 899 if (context.getWorkflow().getAcl() != null) { 900 // setting the group owning the Oozie job to allow anybody in that 901 // group to modify the jobs. 902 actionConf.set(ACL_MODIFY_JOB, context.getWorkflow().getAcl()); 903 } 904 } 905 906 // Setting the credential properties in launcher conf 907 HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context, 908 action, actionConf); 909 910 // Adding if action need to set more credential tokens 911 JobConf credentialsConf = new JobConf(false); 912 XConfiguration.copy(actionConf, credentialsConf); 913 setCredentialTokens(credentialsConf, context, action, credentialsProperties); 914 915 // insert conf to action conf from credentialsConf 916 for (Entry<String, String> entry : credentialsConf) { 917 if (actionConf.get(entry.getKey()) == null) { 918 actionConf.set(entry.getKey(), entry.getValue()); 919 } 920 } 921 922 JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf); 923 924 injectJobInfo(launcherJobConf, actionConf, context, action); 925 injectLauncherCallback(context, launcherJobConf); 926 LOG.debug("Creating Job Client for action " + action.getId()); 927 jobClient = createJobClient(context, launcherJobConf); 928 String launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context 929 .getRecoveryId()); 930 boolean alreadyRunning = launcherId != null; 931 RunningJob runningJob; 932 933 // if user-retry is on, always submit new launcher 934 boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry(); 935 936 if (alreadyRunning && !isUserRetry) { 937 runningJob = jobClient.getJob(JobID.forName(launcherId)); 938 if (runningJob == null) { 939 String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER); 940 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017", 941 "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker); 942 } 943 } 944 else { 945 LOG.debug("Submitting the job through Job Client for action " + action.getId()); 946 947 // setting up propagation of the delegation token. 948 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 949 Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(has 950 .getMRDelegationTokenRenewer(launcherJobConf)); 951 launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt); 952 953 // insert credentials tokens to launcher job conf if needed 954 if (needInjectCredentials()) { 955 for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) { 956 Text fauxAlias = new Text(tk.getKind() + "_" + tk.getService()); 957 LOG.debug("ADDING TOKEN: " + fauxAlias); 958 launcherJobConf.getCredentials().addToken(fauxAlias, tk); 959 } 960 } 961 else { 962 LOG.info("No need to inject credentials."); 963 } 964 runningJob = jobClient.submitJob(launcherJobConf); 965 if (runningJob == null) { 966 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017", 967 "Error submitting launcher for action [{0}]", action.getId()); 968 } 969 launcherId = runningJob.getID().toString(); 970 LOG.debug("After submission get the launcherId " + launcherId); 971 } 972 973 String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER); 974 String consoleUrl = runningJob.getTrackingURL(); 975 context.setStartData(launcherId, jobTracker, consoleUrl); 976 } 977 catch (Exception ex) { 978 exception = true; 979 throw convertException(ex); 980 } 981 finally { 982 if (jobClient != null) { 983 try { 984 jobClient.close(); 985 } 986 catch (Exception e) { 987 if (exception) { 988 LOG.error("JobClient error: ", e); 989 } 990 else { 991 throw convertException(e); 992 } 993 } 994 } 995 } 996 } 997 private boolean needInjectCredentials() { 998 boolean methodExists = true; 999 1000 Class klass; 1001 try { 1002 klass = Class.forName("org.apache.hadoop.mapred.JobConf"); 1003 klass.getMethod("getCredentials"); 1004 } 1005 catch (ClassNotFoundException ex) { 1006 methodExists = false; 1007 } 1008 catch (NoSuchMethodException ex) { 1009 methodExists = false; 1010 } 1011 1012 return methodExists; 1013 } 1014 1015 protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context, 1016 WorkflowAction action, Configuration actionConf) throws Exception { 1017 HashMap<String, CredentialsProperties> credPropertiesMap = null; 1018 if (context != null && action != null) { 1019 credPropertiesMap = getActionCredentialsProperties(context, action); 1020 if (credPropertiesMap != null) { 1021 for (String key : credPropertiesMap.keySet()) { 1022 CredentialsProperties prop = credPropertiesMap.get(key); 1023 if (prop != null) { 1024 LOG.debug("Credential Properties set for action : " + action.getId()); 1025 for (String property : prop.getProperties().keySet()) { 1026 actionConf.set(property, prop.getProperties().get(property)); 1027 LOG.debug("property : '" + property + "', value : '" + prop.getProperties().get(property) + "'"); 1028 } 1029 } 1030 } 1031 } 1032 else { 1033 LOG.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred()); 1034 } 1035 } 1036 else { 1037 LOG.warn("context or action is null"); 1038 } 1039 return credPropertiesMap; 1040 } 1041 1042 protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action, 1043 HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception { 1044 1045 if (context != null && action != null && credPropertiesMap != null) { 1046 for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) { 1047 String credName = entry.getKey(); 1048 CredentialsProperties credProps = entry.getValue(); 1049 if (credProps != null) { 1050 CredentialsProvider credProvider = new CredentialsProvider(credProps.getType()); 1051 Credentials credentialObject = credProvider.createCredentialObject(); 1052 if (credentialObject != null) { 1053 credentialObject.addtoJobConf(jobconf, credProps, context); 1054 LOG.debug("Retrieved Credential '" + credName + "' for action " + action.getId()); 1055 } 1056 else { 1057 LOG.debug("Credentials object is null for name= " + credName + ", type=" + credProps.getType()); 1058 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA020", 1059 "Could not load credentials of type [{0}] with name [{1}]]; perhaps it was not defined" 1060 + " in oozie-site.xml?", credProps.getType(), credName); 1061 } 1062 } 1063 } 1064 } 1065 1066 } 1067 1068 protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context, 1069 WorkflowAction action) throws Exception { 1070 HashMap<String, CredentialsProperties> props = new HashMap<String, CredentialsProperties>(); 1071 if (context != null && action != null) { 1072 String credsInAction = action.getCred(); 1073 LOG.debug("Get credential '" + credsInAction + "' properties for action : " + action.getId()); 1074 String[] credNames = credsInAction.split(","); 1075 for (String credName : credNames) { 1076 CredentialsProperties credProps = getCredProperties(context, credName); 1077 props.put(credName, credProps); 1078 } 1079 } 1080 else { 1081 LOG.warn("context or action is null"); 1082 } 1083 return props; 1084 } 1085 1086 @SuppressWarnings("unchecked") 1087 protected CredentialsProperties getCredProperties(Context context, String credName) 1088 throws Exception { 1089 CredentialsProperties credProp = null; 1090 String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition(); 1091 XConfiguration wfjobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); 1092 Element elementJob = XmlUtils.parseXml(workflowXml); 1093 Element credentials = elementJob.getChild("credentials", elementJob.getNamespace()); 1094 if (credentials != null) { 1095 for (Element credential : (List<Element>) credentials.getChildren("credential", credentials.getNamespace())) { 1096 String name = credential.getAttributeValue("name"); 1097 String type = credential.getAttributeValue("type"); 1098 LOG.debug("getCredProperties: Name: " + name + ", Type: " + type); 1099 if (name.equalsIgnoreCase(credName)) { 1100 credProp = new CredentialsProperties(name, type); 1101 for (Element property : (List<Element>) credential.getChildren("property", 1102 credential.getNamespace())) { 1103 String propertyName = property.getChildText("name", property.getNamespace()); 1104 String propertyValue = property.getChildText("value", property.getNamespace()); 1105 ELEvaluator eval = new ELEvaluator(); 1106 for (Map.Entry<String, String> entry : wfjobConf) { 1107 eval.setVariable(entry.getKey(), entry.getValue().trim()); 1108 } 1109 propertyName = eval.evaluate(propertyName, String.class); 1110 propertyValue = eval.evaluate(propertyValue, String.class); 1111 1112 credProp.getProperties().put(propertyName, propertyValue); 1113 LOG.debug("getCredProperties: Properties name :'" + propertyName + "', Value : '" 1114 + propertyValue + "'"); 1115 } 1116 } 1117 } 1118 } else { 1119 LOG.debug("credentials is null for the action"); 1120 } 1121 return credProp; 1122 } 1123 1124 @Override 1125 public void start(Context context, WorkflowAction action) throws ActionExecutorException { 1126 LOG = XLog.resetPrefix(LOG); 1127 LogUtils.setLogInfo(action, new XLog.Info()); 1128 try { 1129 LOG.debug("Starting action " + action.getId() + " getting Action File System"); 1130 FileSystem actionFs = context.getAppFileSystem(); 1131 LOG.debug("Preparing action Dir through copying " + context.getActionDir()); 1132 prepareActionDir(actionFs, context); 1133 LOG.debug("Action Dir is ready. Submitting the action "); 1134 submitLauncher(actionFs, context, action); 1135 LOG.debug("Action submit completed. Performing check "); 1136 check(context, action); 1137 LOG.debug("Action check is done after submission"); 1138 } 1139 catch (Exception ex) { 1140 throw convertException(ex); 1141 } 1142 } 1143 1144 @Override 1145 public void end(Context context, WorkflowAction action) throws ActionExecutorException { 1146 try { 1147 String externalStatus = action.getExternalStatus(); 1148 WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ? WorkflowAction.Status.OK 1149 : WorkflowAction.Status.ERROR; 1150 context.setEndData(status, getActionSignal(status)); 1151 } 1152 catch (Exception ex) { 1153 throw convertException(ex); 1154 } 1155 finally { 1156 try { 1157 FileSystem actionFs = context.getAppFileSystem(); 1158 cleanUpActionDir(actionFs, context); 1159 } 1160 catch (Exception ex) { 1161 throw convertException(ex); 1162 } 1163 } 1164 } 1165 1166 /** 1167 * Create job client object 1168 * 1169 * @param context 1170 * @param jobConf 1171 * @return JobClient 1172 * @throws HadoopAccessorException 1173 */ 1174 protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException { 1175 String user = context.getWorkflow().getUser(); 1176 String group = context.getWorkflow().getGroup(); 1177 return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf); 1178 } 1179 1180 protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{ 1181 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId())); 1182 return runningJob; 1183 } 1184 1185 @Override 1186 public void check(Context context, WorkflowAction action) throws ActionExecutorException { 1187 JobClient jobClient = null; 1188 boolean exception = false; 1189 LOG = XLog.resetPrefix(LOG); 1190 LogUtils.setLogInfo(action, new XLog.Info()); 1191 try { 1192 Element actionXml = XmlUtils.parseXml(action.getConf()); 1193 FileSystem actionFs = context.getAppFileSystem(); 1194 JobConf jobConf = createBaseHadoopConf(context, actionXml); 1195 jobClient = createJobClient(context, jobConf); 1196 RunningJob runningJob = getRunningJob(context, action, jobClient); 1197 if (runningJob == null) { 1198 context.setExecutionData(FAILED, null); 1199 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017", 1200 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", action 1201 .getExternalId(), action.getId()); 1202 } 1203 if (runningJob.isComplete()) { 1204 Path actionDir = context.getActionDir(); 1205 String newId = null; 1206 // load sequence file into object 1207 Map<String, String> actionData = LauncherMapperHelper.getActionData(actionFs, actionDir, jobConf); 1208 if (actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID)) { 1209 newId = actionData.get(LauncherMapper.ACTION_DATA_NEW_ID); 1210 String launcherId = action.getExternalId(); 1211 runningJob = jobClient.getJob(JobID.forName(newId)); 1212 if (runningJob == null) { 1213 context.setExternalStatus(FAILED); 1214 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017", 1215 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newId, 1216 action.getId()); 1217 } 1218 context.setExternalChildIDs(newId); 1219 LOG.info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId, 1220 newId); 1221 } 1222 else { 1223 String externalIDs = actionData.get(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS); 1224 if (externalIDs != null) { 1225 context.setExternalChildIDs(externalIDs); 1226 LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs); 1227 } 1228 } 1229 if (runningJob.isComplete()) { 1230 // fetching action output and stats for the Map-Reduce action. 1231 if (newId != null) { 1232 actionData = LauncherMapperHelper.getActionData(actionFs, context.getActionDir(), jobConf); 1233 } 1234 LOG.info(XLog.STD, "action completed, external ID [{0}]", 1235 action.getExternalId()); 1236 if (LauncherMapperHelper.isMainSuccessful(runningJob)) { 1237 if (getCaptureOutput(action) && LauncherMapperHelper.hasOutputData(actionData)) { 1238 context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData 1239 .get(LauncherMapper.ACTION_DATA_OUTPUT_PROPS))); 1240 LOG.info(XLog.STD, "action produced output"); 1241 } 1242 else { 1243 context.setExecutionData(SUCCEEDED, null); 1244 } 1245 if (LauncherMapperHelper.hasStatsData(actionData)) { 1246 context.setExecutionStats(actionData.get(LauncherMapper.ACTION_DATA_STATS)); 1247 LOG.info(XLog.STD, "action produced stats"); 1248 } 1249 getActionData(actionFs, runningJob, action, context); 1250 } 1251 else { 1252 String errorReason; 1253 if (actionData.containsKey(LauncherMapper.ACTION_DATA_ERROR_PROPS)) { 1254 Properties props = PropertiesUtils.stringToProperties(actionData 1255 .get(LauncherMapper.ACTION_DATA_ERROR_PROPS)); 1256 String errorCode = props.getProperty("error.code"); 1257 if ("0".equals(errorCode)) { 1258 errorCode = "JA018"; 1259 } 1260 if ("-1".equals(errorCode)) { 1261 errorCode = "JA019"; 1262 } 1263 errorReason = props.getProperty("error.reason"); 1264 LOG.warn("Launcher ERROR, reason: {0}", errorReason); 1265 String exMsg = props.getProperty("exception.message"); 1266 String errorInfo = (exMsg != null) ? exMsg : errorReason; 1267 context.setErrorInfo(errorCode, errorInfo); 1268 String exStackTrace = props.getProperty("exception.stacktrace"); 1269 if (exMsg != null) { 1270 LOG.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace); 1271 } 1272 } 1273 else { 1274 errorReason = XLog.format("LauncherMapper died, check Hadoop LOG for job [{0}:{1}]", action 1275 .getTrackerUri(), action.getExternalId()); 1276 LOG.warn(errorReason); 1277 } 1278 context.setExecutionData(FAILED_KILLED, null); 1279 } 1280 } 1281 else { 1282 context.setExternalStatus("RUNNING"); 1283 LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]", 1284 runningJob.getID()); 1285 } 1286 } 1287 else { 1288 context.setExternalStatus("RUNNING"); 1289 LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]", 1290 runningJob.getID()); 1291 } 1292 } 1293 catch (Exception ex) { 1294 LOG.warn("Exception in check(). Message[{0}]", ex.getMessage(), ex); 1295 exception = true; 1296 throw convertException(ex); 1297 } 1298 finally { 1299 if (jobClient != null) { 1300 try { 1301 jobClient.close(); 1302 } 1303 catch (Exception e) { 1304 if (exception) { 1305 LOG.error("JobClient error: ", e); 1306 } 1307 else { 1308 throw convertException(e); 1309 } 1310 } 1311 } 1312 } 1313 } 1314 1315 /** 1316 * Get the output data of an action. Subclasses should override this method 1317 * to get action specific output data. 1318 * 1319 * @param actionFs the FileSystem object 1320 * @param runningJob the runningJob 1321 * @param action the Workflow action 1322 * @param context executor context 1323 * 1324 */ 1325 protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) 1326 throws HadoopAccessorException, JDOMException, IOException, URISyntaxException { 1327 } 1328 1329 protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException { 1330 Element eConf = XmlUtils.parseXml(action.getConf()); 1331 Namespace ns = eConf.getNamespace(); 1332 Element captureOutput = eConf.getChild("capture-output", ns); 1333 return captureOutput != null; 1334 } 1335 1336 @Override 1337 public void kill(Context context, WorkflowAction action) throws ActionExecutorException { 1338 JobClient jobClient = null; 1339 boolean exception = false; 1340 try { 1341 Element actionXml = XmlUtils.parseXml(action.getConf()); 1342 JobConf jobConf = createBaseHadoopConf(context, actionXml); 1343 jobClient = createJobClient(context, jobConf); 1344 RunningJob runningJob = getRunningJob(context, action, jobClient); 1345 if (runningJob != null) { 1346 runningJob.killJob(); 1347 } 1348 context.setExternalStatus(KILLED); 1349 context.setExecutionData(KILLED, null); 1350 } 1351 catch (Exception ex) { 1352 exception = true; 1353 throw convertException(ex); 1354 } 1355 finally { 1356 try { 1357 FileSystem actionFs = context.getAppFileSystem(); 1358 cleanUpActionDir(actionFs, context); 1359 if (jobClient != null) { 1360 jobClient.close(); 1361 } 1362 } 1363 catch (Exception ex) { 1364 if (exception) { 1365 LOG.error("Error: ", ex); 1366 } 1367 else { 1368 throw convertException(ex); 1369 } 1370 } 1371 } 1372 } 1373 1374 private static Set<String> FINAL_STATUS = new HashSet<String>(); 1375 1376 static { 1377 FINAL_STATUS.add(SUCCEEDED); 1378 FINAL_STATUS.add(KILLED); 1379 FINAL_STATUS.add(FAILED); 1380 FINAL_STATUS.add(FAILED_KILLED); 1381 } 1382 1383 @Override 1384 public boolean isCompleted(String externalStatus) { 1385 return FINAL_STATUS.contains(externalStatus); 1386 } 1387 1388 1389 /** 1390 * Return the sharelib names for the action. 1391 * <p/> 1392 * If <code>NULL</code> or empty, it means that the action does not use the action 1393 * sharelib. 1394 * <p/> 1395 * If a non-empty string, i.e. <code>foo</code>, it means the action uses the 1396 * action sharelib sub-directory <code>foo</code> and all JARs in the sharelib 1397 * <code>foo</code> directory will be in the action classpath. Multiple sharelib 1398 * sub-directories can be specified as a comma separated list. 1399 * <p/> 1400 * The resolution is done using the following precedence order: 1401 * <ul> 1402 * <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the action configuration</li> 1403 * <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the job configuration</li> 1404 * <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the oozie configuration</li> 1405 * <li>Action Executor <code>getDefaultShareLibName()</code> method</li> 1406 * </ul> 1407 * 1408 * 1409 * @param context executor context. 1410 * @param actionXml 1411 * @param conf action configuration. 1412 * @return the action sharelib names. 1413 */ 1414 protected String[] getShareLibNames(Context context, Element actionXml, Configuration conf) { 1415 String[] names = conf.getStrings(ACTION_SHARELIB_FOR + getType()); 1416 if (names == null || names.length == 0) { 1417 try { 1418 XConfiguration jobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); 1419 names = jobConf.getStrings(ACTION_SHARELIB_FOR + getType()); 1420 if (names == null || names.length == 0) { 1421 names = Services.get().getConf().getStrings(ACTION_SHARELIB_FOR + getType()); 1422 if (names == null || names.length == 0) { 1423 String name = getDefaultShareLibName(actionXml); 1424 if (name != null) { 1425 names = new String[] { name }; 1426 } 1427 } 1428 } 1429 } 1430 catch (IOException ex) { 1431 throw new RuntimeException("It cannot happen, " + ex.toString(), ex); 1432 } 1433 } 1434 return names; 1435 } 1436 1437 private final static String ACTION_SHARELIB_FOR = "oozie.action.sharelib.for."; 1438 1439 1440 /** 1441 * Returns the default sharelib name for the action if any. 1442 * 1443 * @param actionXml the action XML fragment. 1444 * @return the sharelib name for the action, <code>NULL</code> if none. 1445 */ 1446 protected String getDefaultShareLibName(Element actionXml) { 1447 return null; 1448 } 1449 1450 /** 1451 * Sets some data for the action on completion 1452 * 1453 * @param context executor context 1454 * @param actionFs the FileSystem object 1455 */ 1456 protected void setActionCompletionData(Context context, FileSystem actionFs) throws IOException, 1457 HadoopAccessorException, URISyntaxException { 1458 } 1459 1460 private void injectJobInfo(JobConf launcherJobConf, Configuration actionConf, Context context, WorkflowAction action) { 1461 if (OozieJobInfo.isJobInfoEnabled()) { 1462 try { 1463 OozieJobInfo jobInfo = new OozieJobInfo(actionConf, context, action); 1464 String jobInfoStr = jobInfo.getJobInfo(); 1465 launcherJobConf.set(OozieJobInfo.JOB_INFO_KEY, jobInfoStr + "launcher=true"); 1466 actionConf.set(OozieJobInfo.JOB_INFO_KEY, jobInfoStr + "launcher=false"); 1467 } 1468 catch (Exception e) { 1469 // Just job info, should not impact the execution. 1470 LOG.error("Error while populating job info", e); 1471 } 1472 } 1473 } 1474}