001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import java.io.BufferedReader; 021 import java.io.File; 022 import java.io.FileNotFoundException; 023 import java.io.IOException; 024 import java.io.InputStream; 025 import java.io.InputStreamReader; 026 import java.io.StringReader; 027 import java.net.ConnectException; 028 import java.net.URI; 029 import java.net.UnknownHostException; 030 import java.util.ArrayList; 031 import java.util.HashMap; 032 import java.util.HashSet; 033 import java.util.List; 034 import java.util.Map; 035 import java.util.Properties; 036 import java.util.Set; 037 import java.util.Map.Entry; 038 039 import org.apache.hadoop.conf.Configuration; 040 import org.apache.hadoop.filecache.DistributedCache; 041 import org.apache.hadoop.fs.FileSystem; 042 import org.apache.hadoop.fs.Path; 043 import org.apache.hadoop.fs.permission.AccessControlException; 044 import org.apache.hadoop.mapred.JobClient; 045 import org.apache.hadoop.mapred.JobConf; 046 import org.apache.hadoop.mapred.JobID; 047 import org.apache.hadoop.mapred.RunningJob; 048 import org.apache.hadoop.util.DiskChecker; 049 import org.apache.oozie.WorkflowActionBean; 050 import org.apache.oozie.WorkflowJobBean; 051 import org.apache.oozie.action.ActionExecutor; 052 import org.apache.oozie.action.ActionExecutorException; 053 import org.apache.oozie.client.OozieClient; 054 import org.apache.oozie.client.WorkflowAction; 055 import org.apache.oozie.service.HadoopAccessorException; 056 import org.apache.oozie.service.HadoopAccessorService; 057 import org.apache.oozie.service.Services; 058 import org.apache.oozie.service.WorkflowAppService; 059 import org.apache.oozie.servlet.CallbackServlet; 060 import org.apache.oozie.util.IOUtils; 061 import org.apache.oozie.util.PropertiesUtils; 062 import org.apache.oozie.util.XConfiguration; 063 import org.apache.oozie.util.XLog; 064 import org.apache.oozie.util.XmlUtils; 065 import org.jdom.Element; 066 import org.jdom.JDOMException; 067 import org.jdom.Namespace; 068 import org.apache.hadoop.security.token.Token; 069 import org.apache.hadoop.security.token.TokenIdentifier; 070 071 public class JavaActionExecutor extends ActionExecutor { 072 073 private static final String HADOOP_USER = "user.name"; 074 private static final String HADOOP_UGI = "hadoop.job.ugi"; 075 private static final String HADOOP_JOB_TRACKER = "mapred.job.tracker"; 076 private static final String HADOOP_NAME_NODE = "fs.default.name"; 077 078 private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>(); 079 080 private static int maxActionOutputLen; 081 082 private static final String SUCCEEDED = "SUCCEEDED"; 083 private static final String KILLED = "KILLED"; 084 private static final String FAILED = "FAILED"; 085 private static final String FAILED_KILLED = "FAILED/KILLED"; 086 private static final String RUNNING = "RUNNING"; 087 private XLog log = XLog.getLog(getClass()); 088 089 static { 090 DISALLOWED_PROPERTIES.add(HADOOP_USER); 091 DISALLOWED_PROPERTIES.add(HADOOP_UGI); 092 DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER); 093 DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE); 094 DISALLOWED_PROPERTIES.add(WorkflowAppService.HADOOP_JT_KERBEROS_NAME); 095 DISALLOWED_PROPERTIES.add(WorkflowAppService.HADOOP_NN_KERBEROS_NAME); 096 } 097 098 public JavaActionExecutor() { 099 this("java"); 100 } 101 102 protected JavaActionExecutor(String type) { 103 super(type); 104 } 105 106 protected String getLauncherJarName() { 107 return getType() + "-launcher.jar"; 108 } 109 110 protected List<Class> getLauncherClasses() { 111 List<Class> classes = new ArrayList<Class>(); 112 classes.add(LauncherMapper.class); 113 classes.add(LauncherSecurityManager.class); 114 classes.add(LauncherException.class); 115 classes.add(LauncherMainException.class); 116 return classes; 117 } 118 119 @Override 120 public void initActionType() { 121 super.initActionType(); 122 maxActionOutputLen = getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN, 2 * 1024); 123 try { 124 List<Class> classes = getLauncherClasses(); 125 Class[] launcherClasses = classes.toArray(new Class[classes.size()]); 126 IOUtils.createJar(new File(getOozieRuntimeDir()), getLauncherJarName(), launcherClasses); 127 128 registerError(UnknownHostException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA001"); 129 registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.NON_TRANSIENT, 130 "JA002"); 131 registerError(DiskChecker.DiskOutOfSpaceException.class.getName(), 132 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA003"); 133 registerError(org.apache.hadoop.hdfs.protocol.QuotaExceededException.class.getName(), 134 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004"); 135 registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(), 136 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005"); 137 registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA006"); 138 registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007"); 139 registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008"); 140 registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009"); 141 } 142 catch (IOException ex) { 143 throw new RuntimeException(ex); 144 } 145 } 146 147 void checkForDisallowedProps(Configuration conf, String confName) throws ActionExecutorException { 148 for (String prop : DISALLOWED_PROPERTIES) { 149 if (conf.get(prop) != null) { 150 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA010", 151 "Property [{0}] not allowed in action [{1}] configuration", prop, confName); 152 } 153 } 154 } 155 156 public Configuration createBaseHadoopConf(Context context, Element actionXml) { 157 Configuration conf = new XConfiguration(); 158 conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER)); 159 conf.set(HADOOP_UGI, context.getProtoActionConf().get(WorkflowAppService.HADOOP_UGI)); 160 if (context.getProtoActionConf().get(WorkflowAppService.HADOOP_JT_KERBEROS_NAME) != null) { 161 conf.set(WorkflowAppService.HADOOP_JT_KERBEROS_NAME, context.getProtoActionConf().get( 162 WorkflowAppService.HADOOP_JT_KERBEROS_NAME)); 163 } 164 if (context.getProtoActionConf().get(WorkflowAppService.HADOOP_NN_KERBEROS_NAME) != null) { 165 conf.set(WorkflowAppService.HADOOP_NN_KERBEROS_NAME, context.getProtoActionConf().get( 166 WorkflowAppService.HADOOP_NN_KERBEROS_NAME)); 167 } 168 conf.set(OozieClient.GROUP_NAME, context.getProtoActionConf().get(OozieClient.GROUP_NAME)); 169 Namespace ns = actionXml.getNamespace(); 170 String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim(); 171 String nameNode = actionXml.getChild("name-node", ns).getTextTrim(); 172 conf.set(HADOOP_JOB_TRACKER, jobTracker); 173 conf.set(HADOOP_NAME_NODE, nameNode); 174 conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true"); 175 return conf; 176 } 177 178 Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) 179 throws ActionExecutorException { 180 try { 181 Namespace ns = actionXml.getNamespace(); 182 Element e = actionXml.getChild("configuration", ns); 183 if (e != null) { 184 String strConf = XmlUtils.prettyPrint(e).toString(); 185 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); 186 187 XConfiguration launcherConf = new XConfiguration(); 188 for (Map.Entry<String, String> entry : inlineConf) { 189 if (entry.getKey().startsWith("oozie.launcher.")) { 190 String name = entry.getKey().substring("oozie.launcher.".length()); 191 String value = entry.getValue(); 192 // setting original KEY 193 launcherConf.set(entry.getKey(), value); 194 // setting un-prefixed key (to allow Hadoop job config 195 // for the launcher job 196 launcherConf.set(name, value); 197 } 198 } 199 checkForDisallowedProps(launcherConf, "inline launcher configuration"); 200 XConfiguration.copy(launcherConf, conf); 201 } 202 return conf; 203 } 204 catch (IOException ex) { 205 throw convertException(ex); 206 } 207 } 208 209 protected FileSystem getActionFileSystem(Context context, WorkflowAction action) throws ActionExecutorException { 210 try { 211 Element actionXml = XmlUtils.parseXml(action.getConf()); 212 return getActionFileSystem(context, actionXml); 213 } 214 catch (JDOMException ex) { 215 throw convertException(ex); 216 } 217 } 218 219 protected FileSystem getActionFileSystem(Context context, Element actionXml) throws ActionExecutorException { 220 try { 221 return context.getAppFileSystem(); 222 } 223 catch (Exception ex) { 224 throw convertException(ex); 225 } 226 } 227 228 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) 229 throws ActionExecutorException { 230 try { 231 Namespace ns = actionXml.getNamespace(); 232 Element e = actionXml.getChild("job-xml", ns); 233 if (e != null) { 234 String jobXml = e.getTextTrim(); 235 Path path = new Path(appPath, jobXml); 236 FileSystem fs = getActionFileSystem(context, actionXml); 237 Configuration jobXmlConf = new XConfiguration(fs.open(path)); 238 checkForDisallowedProps(jobXmlConf, "job-xml"); 239 XConfiguration.copy(jobXmlConf, actionConf); 240 } 241 e = actionXml.getChild("configuration", ns); 242 if (e != null) { 243 String strConf = XmlUtils.prettyPrint(e).toString(); 244 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); 245 checkForDisallowedProps(inlineConf, "inline configuration"); 246 XConfiguration.copy(inlineConf, actionConf); 247 } 248 return actionConf; 249 } 250 catch (IOException ex) { 251 throw convertException(ex); 252 } 253 } 254 255 Configuration addToCache(Configuration conf, Path appPath, String filePath, boolean archive) 256 throws ActionExecutorException { 257 Path path = null; 258 try { 259 if (filePath.startsWith("/")) { 260 path = new Path(filePath); 261 } 262 else { 263 path = new Path(appPath, filePath); 264 } 265 URI uri = new URI(path.toUri().getPath()); 266 if (archive) { 267 DistributedCache.addCacheArchive(uri, conf); 268 } 269 else { 270 String fileName = filePath.substring(filePath.lastIndexOf("/") + 1); 271 if (fileName.endsWith(".so") || fileName.contains(".so.")) { // .so files 272 uri = new Path(path.toString() + "#" + fileName).toUri(); 273 uri = new URI(uri.getPath()); 274 DistributedCache.addCacheFile(uri, conf); 275 } 276 else if (fileName.endsWith(".jar")) { // .jar files 277 if (!fileName.contains("#")) { 278 path = new Path(uri.toString()); 279 280 String user = conf.get("user.name"); 281 String group = conf.get("group.name"); 282 Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, group, path, conf); 283 } 284 else { 285 DistributedCache.addCacheFile(uri, conf); 286 } 287 } 288 else { // regular files 289 if (!fileName.contains("#")) { 290 uri = new Path(path.toString() + "#" + fileName).toUri(); 291 uri = new URI(uri.getPath()); 292 } 293 DistributedCache.addCacheFile(uri, conf); 294 } 295 } 296 DistributedCache.createSymlink(conf); 297 return conf; 298 } 299 catch (Exception ex) { 300 XLog.getLog(getClass()).debug( 301 "Errors when add to DistributedCache. Path=" + path + ", archive=" + archive + ", conf=" 302 + XmlUtils.prettyPrint(conf).toString()); 303 throw convertException(ex); 304 } 305 } 306 307 String getOozieLauncherJar(Context context) throws ActionExecutorException { 308 try { 309 return new Path(context.getActionDir(), getLauncherJarName()).toString(); 310 } 311 catch (Exception ex) { 312 throw convertException(ex); 313 } 314 } 315 316 public void prepareActionDir(FileSystem actionFs, Context context) throws ActionExecutorException { 317 try { 318 Path actionDir = context.getActionDir(); 319 Path tempActionDir = new Path(actionDir.getParent(), actionDir.getName() + ".tmp"); 320 if (!actionFs.exists(actionDir)) { 321 try { 322 actionFs.copyFromLocalFile(new Path(getOozieRuntimeDir(), getLauncherJarName()), new Path( 323 tempActionDir, getLauncherJarName())); 324 actionFs.rename(tempActionDir, actionDir); 325 } 326 catch (IOException ex) { 327 actionFs.delete(tempActionDir, true); 328 actionFs.delete(actionDir, true); 329 throw ex; 330 } 331 } 332 } 333 catch (Exception ex) { 334 throw convertException(ex); 335 } 336 } 337 338 void cleanUpActionDir(FileSystem actionFs, Context context) throws ActionExecutorException { 339 try { 340 Path actionDir = context.getActionDir(); 341 if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false) 342 && actionFs.exists(actionDir)) { 343 actionFs.delete(actionDir, true); 344 } 345 } 346 catch (Exception ex) { 347 throw convertException(ex); 348 } 349 } 350 351 @SuppressWarnings("unchecked") 352 void setLibFilesArchives(Context context, Element actionXml, Path appPath, Configuration conf) 353 throws ActionExecutorException { 354 Configuration proto = context.getProtoActionConf(); 355 356 addToCache(conf, appPath, getOozieLauncherJar(context), false); 357 358 String[] paths = proto.getStrings(WorkflowAppService.APP_LIB_PATH_LIST); 359 if (paths != null) { 360 for (String path : paths) { 361 addToCache(conf, appPath, path, false); 362 } 363 } 364 365 for (Element eProp : (List<Element>) actionXml.getChildren()) { 366 if (eProp.getName().equals("file")) { 367 String path = eProp.getTextTrim(); 368 addToCache(conf, appPath, path, false); 369 } 370 else { 371 if (eProp.getName().equals("archive")) { 372 String path = eProp.getTextTrim(); 373 addToCache(conf, appPath, path, true); 374 } 375 } 376 } 377 } 378 379 protected String getLauncherMain(Configuration launcherConf, Element actionXml) { 380 Namespace ns = actionXml.getNamespace(); 381 Element e = actionXml.getChild("main-class", ns); 382 return e.getTextTrim(); 383 } 384 385 private static final String QUEUE_NAME = "mapred.job.queue.name"; 386 private static final String OOZIE_LAUNCHER_QUEUE_NAME = "oozie.launcher.mapred.job.queue.name"; 387 388 private static final Set<String> SPECIAL_PROPERTIES = new HashSet<String>(); 389 390 static { 391 SPECIAL_PROPERTIES.add(QUEUE_NAME); 392 SPECIAL_PROPERTIES.add("mapreduce.jobtracker.kerberos.principal"); 393 SPECIAL_PROPERTIES.add("dfs.namenode.kerberos.principal"); 394 } 395 396 @SuppressWarnings("unchecked") 397 JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf) 398 throws ActionExecutorException { 399 try { 400 401 // app path could be a file 402 Path appPathRoot = new Path(context.getWorkflow().getAppPath()); 403 if (actionFs.isFile(appPathRoot)) { 404 appPathRoot = appPathRoot.getParent(); 405 } 406 407 // launcher job configuration 408 Configuration launcherConf = createBaseHadoopConf(context, actionXml); 409 setupLauncherConf(launcherConf, actionXml, appPathRoot, context); 410 411 // we are doing init+copy because if not we are getting 'hdfs' 412 // scheme not known 413 // its seems that new JobConf(Conf) does not load defaults, it 414 // assumes parameter Conf does. 415 JobConf launcherJobConf = new JobConf(); 416 XConfiguration.copy(launcherConf, launcherJobConf); 417 setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf); 418 String jobName = XLog.format("oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(), context.getWorkflow() 419 .getAppName(), action.getName(), context.getWorkflow().getId()); 420 launcherJobConf.setJobName(jobName); 421 422 String jobId = context.getWorkflow().getId(); 423 String actionId = action.getId(); 424 Path actionDir = context.getActionDir(); 425 String recoveryId = context.getRecoveryId(); 426 427 LauncherMapper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf); 428 429 LauncherMapper.setupMainClass(launcherJobConf, getLauncherMain(launcherConf, actionXml)); 430 431 LauncherMapper.setupMaxOutputData(launcherJobConf, maxActionOutputLen); 432 433 Namespace ns = actionXml.getNamespace(); 434 List<Element> list = actionXml.getChildren("arg", ns); 435 String[] args = new String[list.size()]; 436 for (int i = 0; i < list.size(); i++) { 437 args[i] = list.get(i).getTextTrim(); 438 } 439 LauncherMapper.setupMainArguments(launcherJobConf, args); 440 441 Element opt = actionXml.getChild("java-opts", ns); 442 if (opt != null) { 443 String opts = launcherConf.get("mapred.child.java.opts", ""); 444 opts = opts + " " + opt.getTextTrim(); 445 opts = opts.trim(); 446 launcherJobConf.set("mapred.child.java.opts", opts); 447 } 448 449 // properties from action that are needed by the launcher (QUEUE NAME) 450 // maybe we should add queue to the WF schema, below job-tracker 451 for (String name : SPECIAL_PROPERTIES) { 452 String value = actionConf.get(name); 453 if (value != null) { 454 if (!name.equals(QUEUE_NAME) || 455 (name.equals(QUEUE_NAME) && launcherJobConf.get(OOZIE_LAUNCHER_QUEUE_NAME) == null)) { 456 launcherJobConf.set(name, value); 457 } 458 } 459 } 460 461 // to disable cancelation of delegation token on launcher job end 462 launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false); 463 464 // setting the group owning the Oozie job to allow anybody in that 465 // group to kill the jobs. 466 launcherJobConf.set("mapreduce.job.acl-modify-job", context.getWorkflow().getGroup()); 467 468 return launcherJobConf; 469 } 470 catch (Exception ex) { 471 throw convertException(ex); 472 } 473 } 474 475 private void injectCallback(Context context, Configuration conf) { 476 String callback = context.getCallbackUrl("$jobStatus"); 477 if (conf.get("job.end.notification.url") != null) { 478 XLog.getLog(getClass()).warn("Overriding the action job end notification URI"); 479 } 480 conf.set("job.end.notification.url", callback); 481 } 482 483 void injectActionCallback(Context context, Configuration actionConf) { 484 injectCallback(context, actionConf); 485 } 486 487 void injectLauncherCallback(Context context, Configuration launcherConf) { 488 injectCallback(context, launcherConf); 489 } 490 491 public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException { 492 JobClient jobClient = null; 493 boolean exception = false; 494 try { 495 Path appPathRoot = new Path(context.getWorkflow().getAppPath()); 496 497 // app path could be a file 498 if (actionFs.isFile(appPathRoot)) { 499 appPathRoot = appPathRoot.getParent(); 500 } 501 502 Element actionXml = XmlUtils.parseXml(action.getConf()); 503 504 // action job configuration 505 Configuration actionConf = createBaseHadoopConf(context, actionXml); 506 setupActionConf(actionConf, context, actionXml, appPathRoot); 507 XLog.getLog(getClass()).debug("Setting LibFilesArchives "); 508 setLibFilesArchives(context, actionXml, appPathRoot, actionConf); 509 String jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}", getType(), context.getWorkflow() 510 .getAppName(), action.getName(), context.getWorkflow().getId()); 511 actionConf.set("mapred.job.name", jobName); 512 injectActionCallback(context, actionConf); 513 514 // setting the group owning the Oozie job to allow anybody in that 515 // group to kill the jobs. 516 actionConf.set("mapreduce.job.acl-modify-job", context.getWorkflow().getGroup()); 517 518 // Setting the credential properties in launcher conf 519 HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context, 520 action, actionConf); 521 522 // Adding if action need to set more credential tokens 523 JobConf credentialsConf = new JobConf(false); 524 XConfiguration.copy(actionConf, credentialsConf); 525 setCredentialTokens(credentialsConf, context, action, credentialsProperties); 526 527 // insert conf to action conf from credentialsConf 528 for (Entry<String, String> entry : credentialsConf) { 529 if (actionConf.get(entry.getKey()) == null) { 530 actionConf.set(entry.getKey(), entry.getValue()); 531 } 532 } 533 534 JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf); 535 injectLauncherCallback(context, launcherJobConf); 536 XLog.getLog(getClass()).debug("Creating Job Client for action " + action.getId()); 537 jobClient = createJobClient(context, launcherJobConf); 538 String launcherId = LauncherMapper.getRecoveryId(launcherJobConf, context.getActionDir(), context 539 .getRecoveryId()); 540 boolean alreadyRunning = launcherId != null; 541 RunningJob runningJob; 542 543 // if user-retry is on, always submit new launcher 544 boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry(); 545 546 if (alreadyRunning && !isUserRetry) { 547 runningJob = jobClient.getJob(JobID.forName(launcherId)); 548 if (runningJob == null) { 549 String jobTracker = launcherJobConf.get("mapred.job.tracker"); 550 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017", 551 "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker); 552 } 553 } 554 else { 555 prepare(context, actionXml); 556 XLog.getLog(getClass()).debug("Submitting the job through Job Client for action " + action.getId()); 557 558 // setting up propagation of the delegation token. 559 AuthHelper.get().set(jobClient, launcherJobConf); 560 log.debug(WorkflowAppService.HADOOP_JT_KERBEROS_NAME + " = " 561 + launcherJobConf.get(WorkflowAppService.HADOOP_JT_KERBEROS_NAME)); 562 log.debug(WorkflowAppService.HADOOP_NN_KERBEROS_NAME + " = " 563 + launcherJobConf.get(WorkflowAppService.HADOOP_NN_KERBEROS_NAME)); 564 565 // insert credentials tokens to launcher job conf if needed 566 if (needInjectCredentials()) { 567 for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) { 568 log.debug("ADDING TOKEN: " + tk.getKind().toString()); 569 launcherJobConf.getCredentials().addToken(tk.getKind(), tk); 570 } 571 } 572 else { 573 log.info("No need to inject credentials."); 574 } 575 runningJob = jobClient.submitJob(launcherJobConf); 576 if (runningJob == null) { 577 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017", 578 "Error submitting launcher for action [{0}]", action.getId()); 579 } 580 launcherId = runningJob.getID().toString(); 581 XLog.getLog(getClass()).debug("After submission get the launcherId " + launcherId); 582 } 583 584 String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER); 585 String consoleUrl = runningJob.getTrackingURL(); 586 context.setStartData(launcherId, jobTracker, consoleUrl); 587 } 588 catch (Exception ex) { 589 exception = true; 590 throw convertException(ex); 591 } 592 finally { 593 if (jobClient != null) { 594 try { 595 jobClient.close(); 596 } 597 catch (Exception e) { 598 if (exception) { 599 log.error("JobClient error: ", e); 600 } 601 else { 602 throw convertException(e); 603 } 604 } 605 } 606 } 607 } 608 609 private boolean needInjectCredentials() { 610 boolean methodExists = true; 611 612 Class klass; 613 try { 614 klass = Class.forName("org.apache.hadoop.mapred.JobConf"); 615 klass.getMethod("getCredentials"); 616 } 617 catch (ClassNotFoundException ex) { 618 methodExists = false; 619 } 620 catch (NoSuchMethodException ex) { 621 methodExists = false; 622 } 623 624 return methodExists; 625 } 626 627 protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context, 628 WorkflowAction action, Configuration actionConf) throws Exception { 629 HashMap<String, CredentialsProperties> credPropertiesMap = null; 630 if (context != null && action != null) { 631 credPropertiesMap = getActionCredentialsProperties(context, action); 632 if (credPropertiesMap != null) { 633 for (String key : credPropertiesMap.keySet()) { 634 CredentialsProperties prop = credPropertiesMap.get(key); 635 if (prop != null) { 636 log.debug("Credential Properties set for action : " + action.getId()); 637 for (String property : prop.getProperties().keySet()) { 638 actionConf.set(property, prop.getProperties().get(property)); 639 log.debug("property : '" + property + "', value : '" + prop.getProperties().get(property) + "'"); 640 } 641 } 642 } 643 } 644 else { 645 log.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred()); 646 } 647 } 648 else { 649 log.warn("context or action is null"); 650 } 651 return credPropertiesMap; 652 } 653 654 protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action, 655 HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception { 656 657 if (context != null && action != null && credPropertiesMap != null) { 658 for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) { 659 String credName = entry.getKey(); 660 CredentialsProperties credProps = entry.getValue(); 661 if (credProps != null) { 662 CredentialsProvider credProvider = new CredentialsProvider(credProps.getType()); 663 Credentials credentialObject = credProvider.createCredentialObject(); 664 if (credentialObject != null) { 665 credentialObject.addtoJobConf(jobconf, credProps, context); 666 log.debug("Retrieved Credential '" + credName + "' for action " + action.getId()); 667 } 668 else { 669 log.debug("Credentials object is null for name= " + credName + ", type=" + credProps.getType()); 670 } 671 } 672 else { 673 log.warn("Could not find credentials properties for: " + credName); 674 } 675 } 676 } 677 678 } 679 680 protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context, 681 WorkflowAction action) throws Exception { 682 HashMap<String, CredentialsProperties> props = new HashMap<String, CredentialsProperties>(); 683 if (context != null && action != null) { 684 String credsInAction = action.getCred(); 685 log.debug("Get credential '" + credsInAction + "' properties for action : " + action.getId()); 686 String[] credNames = credsInAction.split(","); 687 for (String credName : credNames) { 688 CredentialsProperties credProps = getCredProperties(context, credName); 689 props.put(credName, credProps); 690 } 691 } 692 else { 693 log.warn("context or action is null"); 694 } 695 return props; 696 } 697 698 @SuppressWarnings("unchecked") 699 protected CredentialsProperties getCredProperties(Context context, String credName) 700 throws Exception { 701 CredentialsProperties credProp = null; 702 String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition(); 703 Element elementJob = XmlUtils.parseXml(workflowXml); 704 Element credentials = elementJob.getChild("credentials", elementJob.getNamespace()); 705 if (credentials != null) { 706 for (Element credential : (List<Element>) credentials.getChildren("credential", credentials 707 .getNamespace())) { 708 String name = credential.getAttributeValue("name"); 709 String type = credential.getAttributeValue("type"); 710 log.debug("getCredProperties: Name: " + name + ", Type: " + type); 711 if (name.equalsIgnoreCase(credName)) { 712 credProp = new CredentialsProperties(name, type); 713 for (Element property : (List<Element>) credential.getChildren("property", credential 714 .getNamespace())) { 715 credProp.getProperties().put(property.getChildText("name", property.getNamespace()), 716 property.getChildText("value", property.getNamespace())); 717 log.debug("getCredProperties: Properties name :'" 718 + property.getChildText("name", property.getNamespace()) + "', Value : '" 719 + property.getChildText("value", property.getNamespace()) + "'"); 720 } 721 } 722 } 723 } 724 else { 725 log.warn("credentials is null for the action"); 726 } 727 return credProp; 728 } 729 730 void prepare(Context context, Element actionXml) throws ActionExecutorException { 731 Namespace ns = actionXml.getNamespace(); 732 Element prepare = actionXml.getChild("prepare", ns); 733 if (prepare != null) { 734 XLog.getLog(getClass()).debug("Preparing the action with FileSystem operation"); 735 FsActionExecutor fsAe = new FsActionExecutor(); 736 fsAe.doOperations(context, prepare); 737 XLog.getLog(getClass()).debug("FS Operation is completed"); 738 } 739 } 740 741 @Override 742 public void start(Context context, WorkflowAction action) throws ActionExecutorException { 743 try { 744 XLog.getLog(getClass()).debug("Starting action " + action.getId() + " getting Action File System"); 745 FileSystem actionFs = getActionFileSystem(context, action); 746 XLog.getLog(getClass()).debug("Preparing action Dir through copying " + context.getActionDir()); 747 prepareActionDir(actionFs, context); 748 XLog.getLog(getClass()).debug("Action Dir is ready. Submitting the action "); 749 submitLauncher(actionFs, context, action); 750 XLog.getLog(getClass()).debug("Action submit completed. Performing check "); 751 check(context, action); 752 XLog.getLog(getClass()).debug("Action check is done after submission"); 753 } 754 catch (Exception ex) { 755 throw convertException(ex); 756 } 757 } 758 759 @Override 760 public void end(Context context, WorkflowAction action) throws ActionExecutorException { 761 try { 762 String externalStatus = action.getExternalStatus(); 763 WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ? WorkflowAction.Status.OK 764 : WorkflowAction.Status.ERROR; 765 context.setEndData(status, getActionSignal(status)); 766 } 767 catch (Exception ex) { 768 throw convertException(ex); 769 } 770 finally { 771 try { 772 FileSystem actionFs = getActionFileSystem(context, action); 773 cleanUpActionDir(actionFs, context); 774 } 775 catch (Exception ex) { 776 throw convertException(ex); 777 } 778 } 779 } 780 781 /** 782 * Create job client object 783 * 784 * @param context 785 * @param jobConf 786 * @return 787 * @throws HadoopAccessorException 788 */ 789 protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException { 790 String user = context.getWorkflow().getUser(); 791 String group = context.getWorkflow().getGroup(); 792 return Services.get().get(HadoopAccessorService.class).createJobClient(user, group, jobConf); 793 } 794 795 @Override 796 public void check(Context context, WorkflowAction action) throws ActionExecutorException { 797 JobClient jobClient = null; 798 boolean exception = false; 799 try { 800 Element actionXml = XmlUtils.parseXml(action.getConf()); 801 FileSystem actionFs = getActionFileSystem(context, actionXml); 802 Configuration conf = createBaseHadoopConf(context, actionXml); 803 JobConf jobConf = new JobConf(); 804 XConfiguration.copy(conf, jobConf); 805 jobClient = createJobClient(context, jobConf); 806 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId())); 807 if (runningJob == null) { 808 context.setExternalStatus(FAILED); 809 context.setExecutionData(FAILED, null); 810 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017", 811 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", action 812 .getExternalId(), action.getId()); 813 } 814 if (runningJob.isComplete()) { 815 Path actionDir = context.getActionDir(); 816 817 String user = context.getWorkflow().getUser(); 818 String group = context.getWorkflow().getGroup(); 819 if (LauncherMapper.hasIdSwap(runningJob, user, group, actionDir)) { 820 String launcherId = action.getExternalId(); 821 Path idSwapPath = LauncherMapper.getIdSwapPath(context.getActionDir()); 822 InputStream is = actionFs.open(idSwapPath); 823 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 824 Properties props = PropertiesUtils.readProperties(reader, maxActionOutputLen); 825 reader.close(); 826 String newId = props.getProperty("id"); 827 runningJob = jobClient.getJob(JobID.forName(newId)); 828 if (runningJob == null) { 829 context.setExternalStatus(FAILED); 830 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017", 831 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newId, 832 action.getId()); 833 } 834 835 context.setStartData(newId, action.getTrackerUri(), runningJob.getTrackingURL()); 836 XLog.getLog(getClass()).info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId, 837 newId); 838 } 839 if (runningJob.isComplete()) { 840 XLog.getLog(getClass()).info(XLog.STD, "action completed, external ID [{0}]", 841 action.getExternalId()); 842 if (runningJob.isSuccessful() && LauncherMapper.isMainSuccessful(runningJob)) { 843 Properties props = null; 844 if (getCaptureOutput(action)) { 845 props = new Properties(); 846 if (LauncherMapper.hasOutputData(runningJob)) { 847 Path actionOutput = LauncherMapper.getOutputDataPath(context.getActionDir()); 848 InputStream is = actionFs.open(actionOutput); 849 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 850 props = PropertiesUtils.readProperties(reader, maxActionOutputLen); 851 reader.close(); 852 } 853 } 854 context.setExecutionData(SUCCEEDED, props); 855 XLog.getLog(getClass()).info(XLog.STD, "action produced output"); 856 } 857 else { 858 XLog log = XLog.getLog(getClass()); 859 String errorReason; 860 Path actionError = LauncherMapper.getErrorPath(context.getActionDir()); 861 if (actionFs.exists(actionError)) { 862 InputStream is = actionFs.open(actionError); 863 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 864 Properties props = PropertiesUtils.readProperties(reader, -1); 865 reader.close(); 866 String errorCode = props.getProperty("error.code"); 867 if (errorCode.equals("0")) { 868 errorCode = "JA018"; 869 } 870 if (errorCode.equals("-1")) { 871 errorCode = "JA019"; 872 } 873 errorReason = props.getProperty("error.reason"); 874 log.warn("Launcher ERROR, reason: {0}", errorReason); 875 String exMsg = props.getProperty("exception.message"); 876 String errorInfo = (exMsg != null) ? exMsg : errorReason; 877 context.setErrorInfo(errorCode, errorInfo); 878 String exStackTrace = props.getProperty("exception.stacktrace"); 879 if (exMsg != null) { 880 log.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace); 881 } 882 } 883 else { 884 errorReason = XLog.format("LauncherMapper died, check Hadoop log for job [{0}:{1}]", action 885 .getTrackerUri(), action.getExternalId()); 886 log.warn(errorReason); 887 } 888 context.setExecutionData(FAILED_KILLED, null); 889 } 890 } 891 else { 892 context.setExternalStatus(RUNNING); 893 XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]", 894 action.getExternalId(), action.getExternalStatus()); 895 } 896 } 897 else { 898 context.setExternalStatus(RUNNING); 899 XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]", 900 action.getExternalId(), action.getExternalStatus()); 901 } 902 } 903 catch (Exception ex) { 904 XLog.getLog(getClass()).warn("Exception in check(). Message[{0}]", ex.getMessage(), ex); 905 exception = true; 906 throw convertException(ex); 907 } 908 finally { 909 if (jobClient != null) { 910 try { 911 jobClient.close(); 912 } 913 catch (Exception e) { 914 if (exception) { 915 log.error("JobClient error: ", e); 916 } 917 else { 918 throw convertException(e); 919 } 920 } 921 } 922 } 923 } 924 925 protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException { 926 Element eConf = XmlUtils.parseXml(action.getConf()); 927 Namespace ns = eConf.getNamespace(); 928 Element captureOutput = eConf.getChild("capture-output", ns); 929 return captureOutput != null; 930 } 931 932 @Override 933 public void kill(Context context, WorkflowAction action) throws ActionExecutorException { 934 JobClient jobClient = null; 935 boolean exception = false; 936 try { 937 Element actionXml = XmlUtils.parseXml(action.getConf()); 938 Configuration conf = createBaseHadoopConf(context, actionXml); 939 JobConf jobConf = new JobConf(); 940 XConfiguration.copy(conf, jobConf); 941 jobClient = createJobClient(context, jobConf); 942 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId())); 943 if (runningJob != null) { 944 runningJob.killJob(); 945 } 946 context.setExternalStatus(KILLED); 947 context.setExecutionData(KILLED, null); 948 } 949 catch (Exception ex) { 950 exception = true; 951 throw convertException(ex); 952 } 953 finally { 954 try { 955 FileSystem actionFs = getActionFileSystem(context, action); 956 cleanUpActionDir(actionFs, context); 957 if (jobClient != null) { 958 jobClient.close(); 959 } 960 } 961 catch (Exception ex) { 962 if (exception) { 963 log.error("Error: ", ex); 964 } 965 else { 966 throw convertException(ex); 967 } 968 } 969 } 970 } 971 972 private static Set<String> FINAL_STATUS = new HashSet<String>(); 973 974 static { 975 FINAL_STATUS.add(SUCCEEDED); 976 FINAL_STATUS.add(KILLED); 977 FINAL_STATUS.add(FAILED); 978 FINAL_STATUS.add(FAILED_KILLED); 979 } 980 981 @Override 982 public boolean isCompleted(String externalStatus) { 983 return FINAL_STATUS.contains(externalStatus); 984 } 985 986 }