001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import java.io.BufferedReader; 021 import java.io.File; 022 import java.io.FileNotFoundException; 023 import java.io.IOException; 024 import java.io.InputStream; 025 import java.io.InputStreamReader; 026 import java.io.StringReader; 027 import java.net.ConnectException; 028 import java.net.URI; 029 import java.net.URISyntaxException; 030 import java.net.UnknownHostException; 031 import java.util.ArrayList; 032 import java.util.HashMap; 033 import java.util.HashSet; 034 import java.util.List; 035 import java.util.Map; 036 import java.util.Properties; 037 import java.util.Set; 038 import java.util.Map.Entry; 039 040 import org.apache.hadoop.conf.Configuration; 041 import org.apache.hadoop.filecache.DistributedCache; 042 import org.apache.hadoop.fs.FileStatus; 043 import org.apache.hadoop.fs.FileSystem; 044 import org.apache.hadoop.fs.Path; 045 import org.apache.hadoop.fs.permission.AccessControlException; 046 import org.apache.hadoop.io.Text; 047 import org.apache.hadoop.mapred.JobClient; 048 import org.apache.hadoop.mapred.JobConf; 049 import org.apache.hadoop.mapred.JobID; 050 import org.apache.hadoop.mapred.RunningJob; 051 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 052 import org.apache.hadoop.util.DiskChecker; 053 import org.apache.oozie.WorkflowActionBean; 054 import org.apache.oozie.WorkflowJobBean; 055 import org.apache.oozie.action.ActionExecutor; 056 import org.apache.oozie.action.ActionExecutorException; 057 import org.apache.oozie.client.OozieClient; 058 import org.apache.oozie.client.WorkflowAction; 059 import org.apache.oozie.service.HadoopAccessorException; 060 import org.apache.oozie.service.HadoopAccessorService; 061 import org.apache.oozie.service.Services; 062 import org.apache.oozie.service.WorkflowAppService; 063 import org.apache.oozie.servlet.CallbackServlet; 064 import org.apache.oozie.util.IOUtils; 065 import org.apache.oozie.util.PropertiesUtils; 066 import org.apache.oozie.util.XConfiguration; 067 import org.apache.oozie.util.XLog; 068 import org.apache.oozie.util.XmlUtils; 069 import org.jdom.Element; 070 import org.jdom.JDOMException; 071 import org.jdom.Namespace; 072 import org.apache.hadoop.security.token.Token; 073 import org.apache.hadoop.security.token.TokenIdentifier; 074 075 public class JavaActionExecutor extends ActionExecutor { 076 077 private static final String HADOOP_USER = "user.name"; 078 private static final String HADOOP_JOB_TRACKER = "mapred.job.tracker"; 079 private static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address"; 080 private static final String HADOOP_NAME_NODE = "fs.default.name"; 081 public static final String OOZIE_COMMON_LIBDIR = "oozie"; 082 public static final int MAX_EXTERNAL_STATS_SIZE_DEFAULT = Integer.MAX_VALUE; 083 private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>(); 084 public final static String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size"; 085 private static int maxActionOutputLen; 086 private static int maxExternalStatsSize; 087 088 private static final String SUCCEEDED = "SUCCEEDED"; 089 private static final String KILLED = "KILLED"; 090 private static final String FAILED = "FAILED"; 091 private static final String FAILED_KILLED = "FAILED/KILLED"; 092 private static final String RUNNING = "RUNNING"; 093 protected XLog log = XLog.getLog(getClass()); 094 095 static { 096 DISALLOWED_PROPERTIES.add(HADOOP_USER); 097 DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER); 098 DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE); 099 DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER_2); 100 } 101 102 public JavaActionExecutor() { 103 this("java"); 104 } 105 106 protected JavaActionExecutor(String type) { 107 super(type); 108 } 109 110 protected String getLauncherJarName() { 111 return getType() + "-launcher.jar"; 112 } 113 114 protected List<Class> getLauncherClasses() { 115 List<Class> classes = new ArrayList<Class>(); 116 classes.add(LauncherMapper.class); 117 classes.add(LauncherSecurityManager.class); 118 classes.add(LauncherException.class); 119 classes.add(LauncherMainException.class); 120 classes.add(FileSystemActions.class); 121 classes.add(PrepareActionsDriver.class); 122 classes.add(ActionStats.class); 123 classes.add(ActionType.class); 124 return classes; 125 } 126 127 @Override 128 public void initActionType() { 129 super.initActionType(); 130 maxActionOutputLen = getOozieConf() 131 .getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 132 // TODO: Remove the below config get in a subsequent release.. 133 // This other irrelevant property is only used to 134 // preserve backwards compatibility cause of a typo. 135 // See OOZIE-4. 136 getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN, 137 2 * 1024)); 138 //Get the limit for the maximum allowed size of action stats 139 maxExternalStatsSize = getOozieConf().getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE, MAX_EXTERNAL_STATS_SIZE_DEFAULT); 140 maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize; 141 try { 142 List<Class> classes = getLauncherClasses(); 143 Class[] launcherClasses = classes.toArray(new Class[classes.size()]); 144 IOUtils.createJar(new File(getOozieRuntimeDir()), getLauncherJarName(), launcherClasses); 145 146 registerError(UnknownHostException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA001"); 147 registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.NON_TRANSIENT, 148 "JA002"); 149 registerError(DiskChecker.DiskOutOfSpaceException.class.getName(), 150 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA003"); 151 registerError(org.apache.hadoop.hdfs.protocol.QuotaExceededException.class.getName(), 152 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004"); 153 registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(), 154 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005"); 155 registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA006"); 156 registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007"); 157 registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008"); 158 registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009"); 159 } 160 catch (IOException ex) { 161 throw new RuntimeException(ex); 162 } 163 } 164 165 /** 166 * Get the maximum allowed size of stats 167 * 168 * @return maximum size of stats 169 */ 170 public static int getMaxExternalStatsSize() { 171 return maxExternalStatsSize; 172 } 173 174 void checkForDisallowedProps(Configuration conf, String confName) throws ActionExecutorException { 175 for (String prop : DISALLOWED_PROPERTIES) { 176 if (conf.get(prop) != null) { 177 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA010", 178 "Property [{0}] not allowed in action [{1}] configuration", prop, confName); 179 } 180 } 181 } 182 183 public JobConf createBaseHadoopConf(Context context, Element actionXml) { 184 Namespace ns = actionXml.getNamespace(); 185 String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim(); 186 String nameNode = actionXml.getChild("name-node", ns).getTextTrim(); 187 JobConf conf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); 188 conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER)); 189 conf.set(HADOOP_JOB_TRACKER, jobTracker); 190 conf.set(HADOOP_JOB_TRACKER_2, jobTracker); 191 conf.set(HADOOP_NAME_NODE, nameNode); 192 conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true"); 193 return conf; 194 } 195 196 private void injectLauncherProperties(Configuration srcConf, Configuration launcherConf) { 197 for (Map.Entry<String, String> entry : srcConf) { 198 if (entry.getKey().startsWith("oozie.launcher.")) { 199 String name = entry.getKey().substring("oozie.launcher.".length()); 200 String value = entry.getValue(); 201 // setting original KEY 202 launcherConf.set(entry.getKey(), value); 203 // setting un-prefixed key (to allow Hadoop job config 204 // for the launcher job 205 launcherConf.set(name, value); 206 } 207 } 208 } 209 210 Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) 211 throws ActionExecutorException { 212 try { 213 Namespace ns = actionXml.getNamespace(); 214 Element e = actionXml.getChild("configuration", ns); 215 if (e != null) { 216 String strConf = XmlUtils.prettyPrint(e).toString(); 217 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); 218 219 XConfiguration launcherConf = new XConfiguration(); 220 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 221 XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_JOB_TRACKER), getType()); 222 injectLauncherProperties(actionDefaultConf, launcherConf); 223 injectLauncherProperties(inlineConf, launcherConf); 224 checkForDisallowedProps(launcherConf, "launcher configuration"); 225 XConfiguration.copy(launcherConf, conf); 226 } 227 return conf; 228 } 229 catch (IOException ex) { 230 throw convertException(ex); 231 } 232 } 233 234 protected FileSystem getActionFileSystem(Context context, WorkflowAction action) throws ActionExecutorException { 235 try { 236 Element actionXml = XmlUtils.parseXml(action.getConf()); 237 return getActionFileSystem(context, actionXml); 238 } 239 catch (JDOMException ex) { 240 throw convertException(ex); 241 } 242 } 243 244 protected FileSystem getActionFileSystem(Context context, Element actionXml) throws ActionExecutorException { 245 try { 246 return context.getAppFileSystem(); 247 } 248 catch (Exception ex) { 249 throw convertException(ex); 250 } 251 } 252 253 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) 254 throws ActionExecutorException { 255 try { 256 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 257 XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_JOB_TRACKER), getType()); 258 XConfiguration.injectDefaults(actionDefaults, actionConf); 259 Namespace ns = actionXml.getNamespace(); 260 Element e = actionXml.getChild("job-xml", ns); 261 if (e != null) { 262 String jobXml = e.getTextTrim(); 263 Path path = new Path(appPath, jobXml); 264 FileSystem fs = getActionFileSystem(context, actionXml); 265 Configuration jobXmlConf = new XConfiguration(fs.open(path)); 266 checkForDisallowedProps(jobXmlConf, "job-xml"); 267 XConfiguration.copy(jobXmlConf, actionConf); 268 } 269 e = actionXml.getChild("configuration", ns); 270 if (e != null) { 271 String strConf = XmlUtils.prettyPrint(e).toString(); 272 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); 273 checkForDisallowedProps(inlineConf, "inline configuration"); 274 XConfiguration.copy(inlineConf, actionConf); 275 } 276 return actionConf; 277 } 278 catch (IOException ex) { 279 throw convertException(ex); 280 } 281 } 282 283 Configuration addToCache(Configuration conf, Path appPath, String filePath, boolean archive) 284 throws ActionExecutorException { 285 Path path = null; 286 try { 287 if (filePath.startsWith("/")) { 288 path = new Path(filePath); 289 } 290 else { 291 path = new Path(appPath, filePath); 292 } 293 URI uri = new URI(path.toUri().getPath()); 294 if (archive) { 295 DistributedCache.addCacheArchive(uri, conf); 296 } 297 else { 298 String fileName = filePath.substring(filePath.lastIndexOf("/") + 1); 299 if (fileName.endsWith(".so") || fileName.contains(".so.")) { // .so files 300 uri = new Path(path.toString() + "#" + fileName).toUri(); 301 uri = new URI(uri.getPath()); 302 DistributedCache.addCacheFile(uri, conf); 303 } 304 else if (fileName.endsWith(".jar")) { // .jar files 305 if (!fileName.contains("#")) { 306 path = new Path(uri.toString()); 307 308 String user = conf.get("user.name"); 309 String group = conf.get("group.name"); 310 Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, path, conf); 311 } 312 else { 313 DistributedCache.addCacheFile(uri, conf); 314 } 315 } 316 else { // regular files 317 if (!fileName.contains("#")) { 318 uri = new Path(path.toString() + "#" + fileName).toUri(); 319 uri = new URI(uri.getPath()); 320 } 321 DistributedCache.addCacheFile(uri, conf); 322 } 323 } 324 DistributedCache.createSymlink(conf); 325 return conf; 326 } 327 catch (Exception ex) { 328 XLog.getLog(getClass()).debug( 329 "Errors when add to DistributedCache. Path=" + path + ", archive=" + archive + ", conf=" 330 + XmlUtils.prettyPrint(conf).toString()); 331 throw convertException(ex); 332 } 333 } 334 335 String getOozieLauncherJar(Context context) throws ActionExecutorException { 336 try { 337 return new Path(context.getActionDir(), getLauncherJarName()).toString(); 338 } 339 catch (Exception ex) { 340 throw convertException(ex); 341 } 342 } 343 344 public void prepareActionDir(FileSystem actionFs, Context context) throws ActionExecutorException { 345 try { 346 Path actionDir = context.getActionDir(); 347 Path tempActionDir = new Path(actionDir.getParent(), actionDir.getName() + ".tmp"); 348 if (!actionFs.exists(actionDir)) { 349 try { 350 actionFs.copyFromLocalFile(new Path(getOozieRuntimeDir(), getLauncherJarName()), new Path( 351 tempActionDir, getLauncherJarName())); 352 actionFs.rename(tempActionDir, actionDir); 353 } 354 catch (IOException ex) { 355 actionFs.delete(tempActionDir, true); 356 actionFs.delete(actionDir, true); 357 throw ex; 358 } 359 } 360 } 361 catch (Exception ex) { 362 throw convertException(ex); 363 } 364 } 365 366 void cleanUpActionDir(FileSystem actionFs, Context context) throws ActionExecutorException { 367 try { 368 Path actionDir = context.getActionDir(); 369 if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false) 370 && actionFs.exists(actionDir)) { 371 actionFs.delete(actionDir, true); 372 } 373 } 374 catch (Exception ex) { 375 throw convertException(ex); 376 } 377 } 378 379 protected void addShareLib(Path appPath, Configuration conf, String actionShareLibPostfix) 380 throws ActionExecutorException { 381 if (actionShareLibPostfix != null) { 382 try { 383 Path systemLibPath = Services.get().get(WorkflowAppService.class).getSystemLibPath(); 384 if (systemLibPath != null) { 385 Path actionLibPath = new Path(systemLibPath, actionShareLibPostfix); 386 String user = conf.get("user.name"); 387 String group = conf.get("group.name"); 388 FileSystem fs = 389 Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf); 390 if (fs.exists(actionLibPath)) { 391 FileStatus[] files = fs.listStatus(actionLibPath); 392 for (FileStatus file : files) { 393 addToCache(conf, appPath, file.getPath().toUri().getPath(), false); 394 } 395 } 396 } 397 } 398 catch (HadoopAccessorException ex){ 399 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, 400 ex.getErrorCode().toString(), ex.getMessage()); 401 } 402 catch (IOException ex){ 403 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, 404 "It should never happen", ex.getMessage()); 405 } 406 } 407 } 408 409 @SuppressWarnings("unchecked") 410 void setLibFilesArchives(Context context, Element actionXml, Path appPath, Configuration conf) 411 throws ActionExecutorException { 412 Configuration proto = context.getProtoActionConf(); 413 414 // launcher JAR 415 addToCache(conf, appPath, getOozieLauncherJar(context), false); 416 417 // Workflow lib/ 418 String[] paths = proto.getStrings(WorkflowAppService.APP_LIB_PATH_LIST); 419 if (paths != null) { 420 for (String path : paths) { 421 addToCache(conf, appPath, path, false); 422 } 423 } 424 425 // files and archives defined in the action 426 for (Element eProp : (List<Element>) actionXml.getChildren()) { 427 if (eProp.getName().equals("file")) { 428 String path = eProp.getTextTrim(); 429 addToCache(conf, appPath, path, false); 430 } 431 else { 432 if (eProp.getName().equals("archive")) { 433 String path = eProp.getTextTrim(); 434 addToCache(conf, appPath, path, true); 435 } 436 } 437 } 438 439 addAllShareLibs(appPath, conf, context, actionXml); 440 } 441 442 // Adds action specific share libs and common share libs 443 private void addAllShareLibs(Path appPath, Configuration conf, Context context, Element actionXml) 444 throws ActionExecutorException { 445 // Add action specific share libs 446 addActionShareLib(appPath, conf, context, actionXml); 447 // Add common sharelibs for Oozie 448 addShareLib(appPath, conf, JavaActionExecutor.OOZIE_COMMON_LIBDIR); 449 } 450 451 private void addActionShareLib(Path appPath, Configuration conf, Context context, Element actionXml) throws ActionExecutorException { 452 XConfiguration wfJobConf = null; 453 try { 454 wfJobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); 455 } 456 catch (IOException ioe) { 457 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen", 458 ioe.getMessage()); 459 } 460 // Action sharelibs are only added if user has specified to use system libpath 461 if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) { 462 // add action specific sharelibs 463 addShareLib(appPath, conf, getShareLibPostFix(context, actionXml)); 464 } 465 } 466 467 468 protected String getLauncherMain(Configuration launcherConf, Element actionXml) { 469 Namespace ns = actionXml.getNamespace(); 470 Element e = actionXml.getChild("main-class", ns); 471 return e.getTextTrim(); 472 } 473 474 private static final String QUEUE_NAME = "mapred.job.queue.name"; 475 private static final String OOZIE_LAUNCHER_QUEUE_NAME = "oozie.launcher.mapred.job.queue.name"; 476 477 private static final Set<String> SPECIAL_PROPERTIES = new HashSet<String>(); 478 479 static { 480 SPECIAL_PROPERTIES.add(QUEUE_NAME); 481 SPECIAL_PROPERTIES.add("mapreduce.jobtracker.kerberos.principal"); 482 SPECIAL_PROPERTIES.add("dfs.namenode.kerberos.principal"); 483 } 484 485 @SuppressWarnings("unchecked") 486 JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf) 487 throws ActionExecutorException { 488 try { 489 490 // app path could be a file 491 Path appPathRoot = new Path(context.getWorkflow().getAppPath()); 492 if (actionFs.isFile(appPathRoot)) { 493 appPathRoot = appPathRoot.getParent(); 494 } 495 496 // launcher job configuration 497 JobConf launcherJobConf = createBaseHadoopConf(context, actionXml); 498 setupLauncherConf(launcherJobConf, actionXml, appPathRoot, context); 499 500 setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf); 501 String jobName = XLog.format("oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(), context.getWorkflow() 502 .getAppName(), action.getName(), context.getWorkflow().getId()); 503 launcherJobConf.setJobName(jobName); 504 505 String jobId = context.getWorkflow().getId(); 506 String actionId = action.getId(); 507 Path actionDir = context.getActionDir(); 508 String recoveryId = context.getRecoveryId(); 509 510 // Getting the prepare XML from the action XML 511 Namespace ns = actionXml.getNamespace(); 512 Element prepareElement = actionXml.getChild("prepare", ns); 513 String prepareXML = ""; 514 if (prepareElement != null) { 515 if (prepareElement.getChildren().size() > 0) { 516 prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim(); 517 } 518 } 519 LauncherMapper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf, 520 prepareXML); 521 522 LauncherMapper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml)); 523 524 LauncherMapper.setupMaxOutputData(launcherJobConf, maxActionOutputLen); 525 LauncherMapper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize); 526 527 List<Element> list = actionXml.getChildren("arg", ns); 528 String[] args = new String[list.size()]; 529 for (int i = 0; i < list.size(); i++) { 530 args[i] = list.get(i).getTextTrim(); 531 } 532 LauncherMapper.setupMainArguments(launcherJobConf, args); 533 534 Element opt = actionXml.getChild("java-opts", ns); 535 if (opt != null) { 536 String opts = launcherJobConf.get("mapred.child.java.opts", ""); 537 opts = opts + " " + opt.getTextTrim(); 538 opts = opts.trim(); 539 launcherJobConf.set("mapred.child.java.opts", opts); 540 } 541 542 // properties from action that are needed by the launcher (QUEUE NAME) 543 // maybe we should add queue to the WF schema, below job-tracker 544 for (String name : SPECIAL_PROPERTIES) { 545 String value = actionConf.get(name); 546 if (value != null) { 547 if (!name.equals(QUEUE_NAME) || 548 (name.equals(QUEUE_NAME) && launcherJobConf.get(OOZIE_LAUNCHER_QUEUE_NAME) == null)) { 549 launcherJobConf.set(name, value); 550 } 551 } 552 } 553 554 // to disable cancelation of delegation token on launcher job end 555 launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false); 556 557 if (context.getWorkflow().getAcl() != null) { 558 // setting the group owning the Oozie job to allow anybody in that 559 // group to kill the jobs. 560 launcherJobConf.set("mapreduce.job.acl-modify-job", context.getWorkflow().getAcl()); 561 } 562 563 return launcherJobConf; 564 } 565 catch (Exception ex) { 566 throw convertException(ex); 567 } 568 } 569 570 private void injectCallback(Context context, Configuration conf) { 571 String callback = context.getCallbackUrl("$jobStatus"); 572 if (conf.get("job.end.notification.url") != null) { 573 XLog.getLog(getClass()).warn("Overriding the action job end notification URI"); 574 } 575 conf.set("job.end.notification.url", callback); 576 } 577 578 void injectActionCallback(Context context, Configuration actionConf) { 579 injectCallback(context, actionConf); 580 } 581 582 void injectLauncherCallback(Context context, Configuration launcherConf) { 583 injectCallback(context, launcherConf); 584 } 585 586 public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException { 587 JobClient jobClient = null; 588 boolean exception = false; 589 try { 590 Path appPathRoot = new Path(context.getWorkflow().getAppPath()); 591 592 // app path could be a file 593 if (actionFs.isFile(appPathRoot)) { 594 appPathRoot = appPathRoot.getParent(); 595 } 596 597 Element actionXml = XmlUtils.parseXml(action.getConf()); 598 599 // action job configuration 600 Configuration actionConf = createBaseHadoopConf(context, actionXml); 601 setupActionConf(actionConf, context, actionXml, appPathRoot); 602 XLog.getLog(getClass()).debug("Setting LibFilesArchives "); 603 setLibFilesArchives(context, actionXml, appPathRoot, actionConf); 604 String jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}", getType(), context.getWorkflow() 605 .getAppName(), action.getName(), context.getWorkflow().getId()); 606 actionConf.set("mapred.job.name", jobName); 607 injectActionCallback(context, actionConf); 608 609 if (context.getWorkflow().getAcl() != null) { 610 // setting the group owning the Oozie job to allow anybody in that 611 // group to kill the jobs. 612 actionConf.set("mapreduce.job.acl-modify-job", context.getWorkflow().getAcl()); 613 } 614 615 // Setting the credential properties in launcher conf 616 HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context, 617 action, actionConf); 618 619 // Adding if action need to set more credential tokens 620 JobConf credentialsConf = new JobConf(false); 621 XConfiguration.copy(actionConf, credentialsConf); 622 setCredentialTokens(credentialsConf, context, action, credentialsProperties); 623 624 // insert conf to action conf from credentialsConf 625 for (Entry<String, String> entry : credentialsConf) { 626 if (actionConf.get(entry.getKey()) == null) { 627 actionConf.set(entry.getKey(), entry.getValue()); 628 } 629 } 630 631 JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf); 632 injectLauncherCallback(context, launcherJobConf); 633 XLog.getLog(getClass()).debug("Creating Job Client for action " + action.getId()); 634 jobClient = createJobClient(context, launcherJobConf); 635 String launcherId = LauncherMapper.getRecoveryId(launcherJobConf, context.getActionDir(), context 636 .getRecoveryId()); 637 boolean alreadyRunning = launcherId != null; 638 RunningJob runningJob; 639 640 // if user-retry is on, always submit new launcher 641 boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry(); 642 643 if (alreadyRunning && !isUserRetry) { 644 runningJob = jobClient.getJob(JobID.forName(launcherId)); 645 if (runningJob == null) { 646 String jobTracker = launcherJobConf.get("mapred.job.tracker"); 647 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017", 648 "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker); 649 } 650 } 651 else { 652 XLog.getLog(getClass()).debug("Submitting the job through Job Client for action " + action.getId()); 653 654 // setting up propagation of the delegation token. 655 Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(new Text("mr token")); 656 launcherJobConf.getCredentials().addToken(new Text("mr token"), mrdt); 657 658 // insert credentials tokens to launcher job conf if needed 659 if (needInjectCredentials()) { 660 for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) { 661 log.debug("ADDING TOKEN: " + tk.getKind().toString()); 662 launcherJobConf.getCredentials().addToken(tk.getKind(), tk); 663 } 664 } 665 else { 666 log.info("No need to inject credentials."); 667 } 668 runningJob = jobClient.submitJob(launcherJobConf); 669 if (runningJob == null) { 670 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017", 671 "Error submitting launcher for action [{0}]", action.getId()); 672 } 673 launcherId = runningJob.getID().toString(); 674 XLog.getLog(getClass()).debug("After submission get the launcherId " + launcherId); 675 } 676 677 String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER); 678 String consoleUrl = runningJob.getTrackingURL(); 679 context.setStartData(launcherId, jobTracker, consoleUrl); 680 } 681 catch (Exception ex) { 682 exception = true; 683 throw convertException(ex); 684 } 685 finally { 686 if (jobClient != null) { 687 try { 688 jobClient.close(); 689 } 690 catch (Exception e) { 691 if (exception) { 692 log.error("JobClient error: ", e); 693 } 694 else { 695 throw convertException(e); 696 } 697 } 698 } 699 } 700 } 701 702 private boolean needInjectCredentials() { 703 boolean methodExists = true; 704 705 Class klass; 706 try { 707 klass = Class.forName("org.apache.hadoop.mapred.JobConf"); 708 klass.getMethod("getCredentials"); 709 } 710 catch (ClassNotFoundException ex) { 711 methodExists = false; 712 } 713 catch (NoSuchMethodException ex) { 714 methodExists = false; 715 } 716 717 return methodExists; 718 } 719 720 protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context, 721 WorkflowAction action, Configuration actionConf) throws Exception { 722 HashMap<String, CredentialsProperties> credPropertiesMap = null; 723 if (context != null && action != null) { 724 credPropertiesMap = getActionCredentialsProperties(context, action); 725 if (credPropertiesMap != null) { 726 for (String key : credPropertiesMap.keySet()) { 727 CredentialsProperties prop = credPropertiesMap.get(key); 728 if (prop != null) { 729 log.debug("Credential Properties set for action : " + action.getId()); 730 for (String property : prop.getProperties().keySet()) { 731 actionConf.set(property, prop.getProperties().get(property)); 732 log.debug("property : '" + property + "', value : '" + prop.getProperties().get(property) + "'"); 733 } 734 } 735 } 736 } 737 else { 738 log.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred()); 739 } 740 } 741 else { 742 log.warn("context or action is null"); 743 } 744 return credPropertiesMap; 745 } 746 747 protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action, 748 HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception { 749 750 if (context != null && action != null && credPropertiesMap != null) { 751 for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) { 752 String credName = entry.getKey(); 753 CredentialsProperties credProps = entry.getValue(); 754 if (credProps != null) { 755 CredentialsProvider credProvider = new CredentialsProvider(credProps.getType()); 756 Credentials credentialObject = credProvider.createCredentialObject(); 757 if (credentialObject != null) { 758 credentialObject.addtoJobConf(jobconf, credProps, context); 759 log.debug("Retrieved Credential '" + credName + "' for action " + action.getId()); 760 } 761 else { 762 log.debug("Credentials object is null for name= " + credName + ", type=" + credProps.getType()); 763 } 764 } 765 else { 766 log.warn("Could not find credentials properties for: " + credName); 767 } 768 } 769 } 770 771 } 772 773 protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context, 774 WorkflowAction action) throws Exception { 775 HashMap<String, CredentialsProperties> props = new HashMap<String, CredentialsProperties>(); 776 if (context != null && action != null) { 777 String credsInAction = action.getCred(); 778 log.debug("Get credential '" + credsInAction + "' properties for action : " + action.getId()); 779 String[] credNames = credsInAction.split(","); 780 for (String credName : credNames) { 781 CredentialsProperties credProps = getCredProperties(context, credName); 782 props.put(credName, credProps); 783 } 784 } 785 else { 786 log.warn("context or action is null"); 787 } 788 return props; 789 } 790 791 @SuppressWarnings("unchecked") 792 protected CredentialsProperties getCredProperties(Context context, String credName) 793 throws Exception { 794 CredentialsProperties credProp = null; 795 String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition(); 796 Element elementJob = XmlUtils.parseXml(workflowXml); 797 Element credentials = elementJob.getChild("credentials", elementJob.getNamespace()); 798 if (credentials != null) { 799 for (Element credential : (List<Element>) credentials.getChildren("credential", credentials 800 .getNamespace())) { 801 String name = credential.getAttributeValue("name"); 802 String type = credential.getAttributeValue("type"); 803 log.debug("getCredProperties: Name: " + name + ", Type: " + type); 804 if (name.equalsIgnoreCase(credName)) { 805 credProp = new CredentialsProperties(name, type); 806 for (Element property : (List<Element>) credential.getChildren("property", credential 807 .getNamespace())) { 808 credProp.getProperties().put(property.getChildText("name", property.getNamespace()), 809 property.getChildText("value", property.getNamespace())); 810 log.debug("getCredProperties: Properties name :'" 811 + property.getChildText("name", property.getNamespace()) + "', Value : '" 812 + property.getChildText("value", property.getNamespace()) + "'"); 813 } 814 } 815 } 816 } 817 else { 818 log.warn("credentials is null for the action"); 819 } 820 return credProp; 821 } 822 823 @Override 824 public void start(Context context, WorkflowAction action) throws ActionExecutorException { 825 try { 826 XLog.getLog(getClass()).debug("Starting action " + action.getId() + " getting Action File System"); 827 FileSystem actionFs = getActionFileSystem(context, action); 828 XLog.getLog(getClass()).debug("Preparing action Dir through copying " + context.getActionDir()); 829 prepareActionDir(actionFs, context); 830 XLog.getLog(getClass()).debug("Action Dir is ready. Submitting the action "); 831 submitLauncher(actionFs, context, action); 832 XLog.getLog(getClass()).debug("Action submit completed. Performing check "); 833 check(context, action); 834 XLog.getLog(getClass()).debug("Action check is done after submission"); 835 } 836 catch (Exception ex) { 837 throw convertException(ex); 838 } 839 } 840 841 @Override 842 public void end(Context context, WorkflowAction action) throws ActionExecutorException { 843 try { 844 String externalStatus = action.getExternalStatus(); 845 WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ? WorkflowAction.Status.OK 846 : WorkflowAction.Status.ERROR; 847 context.setEndData(status, getActionSignal(status)); 848 } 849 catch (Exception ex) { 850 throw convertException(ex); 851 } 852 finally { 853 try { 854 FileSystem actionFs = getActionFileSystem(context, action); 855 cleanUpActionDir(actionFs, context); 856 } 857 catch (Exception ex) { 858 throw convertException(ex); 859 } 860 } 861 } 862 863 /** 864 * Create job client object 865 * 866 * @param context 867 * @param jobConf 868 * @return 869 * @throws HadoopAccessorException 870 */ 871 protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException { 872 String user = context.getWorkflow().getUser(); 873 String group = context.getWorkflow().getGroup(); 874 return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf); 875 } 876 877 @Override 878 public void check(Context context, WorkflowAction action) throws ActionExecutorException { 879 JobClient jobClient = null; 880 boolean exception = false; 881 try { 882 Element actionXml = XmlUtils.parseXml(action.getConf()); 883 FileSystem actionFs = getActionFileSystem(context, actionXml); 884 JobConf jobConf = createBaseHadoopConf(context, actionXml); 885 jobClient = createJobClient(context, jobConf); 886 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId())); 887 if (runningJob == null) { 888 context.setExternalStatus(FAILED); 889 context.setExecutionData(FAILED, null); 890 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017", 891 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", action 892 .getExternalId(), action.getId()); 893 } 894 if (runningJob.isComplete()) { 895 Path actionDir = context.getActionDir(); 896 897 String user = context.getWorkflow().getUser(); 898 String group = context.getWorkflow().getGroup(); 899 if (LauncherMapper.hasIdSwap(runningJob, user, group, actionDir)) { 900 String launcherId = action.getExternalId(); 901 Path idSwapPath = LauncherMapper.getIdSwapPath(context.getActionDir()); 902 InputStream is = actionFs.open(idSwapPath); 903 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 904 Properties props = PropertiesUtils.readProperties(reader, maxActionOutputLen); 905 reader.close(); 906 String newId = props.getProperty("id"); 907 runningJob = jobClient.getJob(JobID.forName(newId)); 908 if (runningJob == null) { 909 context.setExternalStatus(FAILED); 910 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017", 911 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newId, 912 action.getId()); 913 } 914 915 context.setStartData(newId, action.getTrackerUri(), runningJob.getTrackingURL()); 916 XLog.getLog(getClass()).info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId, 917 newId); 918 } 919 if (runningJob.isComplete()) { 920 XLog.getLog(getClass()).info(XLog.STD, "action completed, external ID [{0}]", 921 action.getExternalId()); 922 if (runningJob.isSuccessful() && LauncherMapper.isMainSuccessful(runningJob)) { 923 getActionData(actionFs, runningJob, action, context); 924 XLog.getLog(getClass()).info(XLog.STD, "action produced output"); 925 } 926 else { 927 XLog log = XLog.getLog(getClass()); 928 String errorReason; 929 Path actionError = LauncherMapper.getErrorPath(context.getActionDir()); 930 if (actionFs.exists(actionError)) { 931 InputStream is = actionFs.open(actionError); 932 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 933 Properties props = PropertiesUtils.readProperties(reader, -1); 934 reader.close(); 935 String errorCode = props.getProperty("error.code"); 936 if (errorCode.equals("0")) { 937 errorCode = "JA018"; 938 } 939 if (errorCode.equals("-1")) { 940 errorCode = "JA019"; 941 } 942 errorReason = props.getProperty("error.reason"); 943 log.warn("Launcher ERROR, reason: {0}", errorReason); 944 String exMsg = props.getProperty("exception.message"); 945 String errorInfo = (exMsg != null) ? exMsg : errorReason; 946 context.setErrorInfo(errorCode, errorInfo); 947 String exStackTrace = props.getProperty("exception.stacktrace"); 948 if (exMsg != null) { 949 log.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace); 950 } 951 } 952 else { 953 errorReason = XLog.format("LauncherMapper died, check Hadoop log for job [{0}:{1}]", action 954 .getTrackerUri(), action.getExternalId()); 955 log.warn(errorReason); 956 } 957 context.setExecutionData(FAILED_KILLED, null); 958 } 959 } 960 else { 961 context.setExternalStatus(RUNNING); 962 XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]", 963 action.getExternalId(), action.getExternalStatus()); 964 } 965 } 966 else { 967 context.setExternalStatus(RUNNING); 968 XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]", 969 action.getExternalId(), action.getExternalStatus()); 970 } 971 } 972 catch (Exception ex) { 973 XLog.getLog(getClass()).warn("Exception in check(). Message[{0}]", ex.getMessage(), ex); 974 exception = true; 975 throw convertException(ex); 976 } 977 finally { 978 if (jobClient != null) { 979 try { 980 jobClient.close(); 981 } 982 catch (Exception e) { 983 if (exception) { 984 log.error("JobClient error: ", e); 985 } 986 else { 987 throw convertException(e); 988 } 989 } 990 } 991 } 992 } 993 994 /** 995 * Get the output data of an action. Subclasses should override this method 996 * to get action specific output data. 997 * 998 * @param actionFs the FileSystem object 999 * @param runningJob the runningJob 1000 * @param action the Workflow action 1001 * @param context executor context 1002 * 1003 */ 1004 protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) 1005 throws HadoopAccessorException, JDOMException, IOException, URISyntaxException { 1006 Properties props = null; 1007 if (getCaptureOutput(action)) { 1008 props = new Properties(); 1009 if (LauncherMapper.hasOutputData(runningJob)) { 1010 Path actionOutput = LauncherMapper.getOutputDataPath(context.getActionDir()); 1011 InputStream is = actionFs.open(actionOutput); 1012 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 1013 props = PropertiesUtils.readProperties(reader, maxActionOutputLen); 1014 reader.close(); 1015 } 1016 } 1017 context.setExecutionData(SUCCEEDED, props); 1018 } 1019 1020 protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException { 1021 Element eConf = XmlUtils.parseXml(action.getConf()); 1022 Namespace ns = eConf.getNamespace(); 1023 Element captureOutput = eConf.getChild("capture-output", ns); 1024 return captureOutput != null; 1025 } 1026 1027 @Override 1028 public void kill(Context context, WorkflowAction action) throws ActionExecutorException { 1029 JobClient jobClient = null; 1030 boolean exception = false; 1031 try { 1032 Element actionXml = XmlUtils.parseXml(action.getConf()); 1033 JobConf jobConf = createBaseHadoopConf(context, actionXml); 1034 jobClient = createJobClient(context, jobConf); 1035 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId())); 1036 if (runningJob != null) { 1037 runningJob.killJob(); 1038 } 1039 context.setExternalStatus(KILLED); 1040 context.setExecutionData(KILLED, null); 1041 } 1042 catch (Exception ex) { 1043 exception = true; 1044 throw convertException(ex); 1045 } 1046 finally { 1047 try { 1048 FileSystem actionFs = getActionFileSystem(context, action); 1049 cleanUpActionDir(actionFs, context); 1050 if (jobClient != null) { 1051 jobClient.close(); 1052 } 1053 } 1054 catch (Exception ex) { 1055 if (exception) { 1056 log.error("Error: ", ex); 1057 } 1058 else { 1059 throw convertException(ex); 1060 } 1061 } 1062 } 1063 } 1064 1065 private static Set<String> FINAL_STATUS = new HashSet<String>(); 1066 1067 static { 1068 FINAL_STATUS.add(SUCCEEDED); 1069 FINAL_STATUS.add(KILLED); 1070 FINAL_STATUS.add(FAILED); 1071 FINAL_STATUS.add(FAILED_KILLED); 1072 } 1073 1074 @Override 1075 public boolean isCompleted(String externalStatus) { 1076 return FINAL_STATUS.contains(externalStatus); 1077 } 1078 1079 /** 1080 * Return the sharelib postfix for the action. 1081 * <p/> 1082 * If <code>NULL</code> or emtpy, it means that the action does not use the action 1083 * sharelib. 1084 * <p/> 1085 * If a non-empty string, i.e. <code>foo</code>, it means the action uses the 1086 * action sharelib subdirectory <code>foo</code> and all JARs in the <code>foo</code> 1087 * directory will be in the action classpath. 1088 * 1089 * @param context executor context. 1090 * @param actionXml the action XML. 1091 * @return the action sharelib post fix, this implementation returns <code>NULL</code>. 1092 */ 1093 protected String getShareLibPostFix(Context context, Element actionXml) { 1094 return null; 1095 } 1096 1097 }