This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import java.io.BufferedReader;
021 import java.io.File;
022 import java.io.FileNotFoundException;
023 import java.io.IOException;
024 import java.io.InputStream;
025 import java.io.InputStreamReader;
026 import java.io.StringReader;
027 import java.net.ConnectException;
028 import java.net.URI;
029 import java.net.UnknownHostException;
030 import java.util.ArrayList;
031 import java.util.HashMap;
032 import java.util.HashSet;
033 import java.util.List;
034 import java.util.Map;
035 import java.util.Properties;
036 import java.util.Set;
037 import java.util.Map.Entry;
038
039 import org.apache.hadoop.conf.Configuration;
040 import org.apache.hadoop.filecache.DistributedCache;
041 import org.apache.hadoop.fs.FileSystem;
042 import org.apache.hadoop.fs.Path;
043 import org.apache.hadoop.fs.permission.AccessControlException;
044 import org.apache.hadoop.mapred.JobClient;
045 import org.apache.hadoop.mapred.JobConf;
046 import org.apache.hadoop.mapred.JobID;
047 import org.apache.hadoop.mapred.RunningJob;
048 import org.apache.hadoop.util.DiskChecker;
049 import org.apache.oozie.WorkflowActionBean;
050 import org.apache.oozie.WorkflowJobBean;
051 import org.apache.oozie.action.ActionExecutor;
052 import org.apache.oozie.action.ActionExecutorException;
053 import org.apache.oozie.client.OozieClient;
054 import org.apache.oozie.client.WorkflowAction;
055 import org.apache.oozie.service.HadoopAccessorException;
056 import org.apache.oozie.service.HadoopAccessorService;
057 import org.apache.oozie.service.Services;
058 import org.apache.oozie.service.WorkflowAppService;
059 import org.apache.oozie.servlet.CallbackServlet;
060 import org.apache.oozie.util.IOUtils;
061 import org.apache.oozie.util.PropertiesUtils;
062 import org.apache.oozie.util.XConfiguration;
063 import org.apache.oozie.util.XLog;
064 import org.apache.oozie.util.XmlUtils;
065 import org.jdom.Element;
066 import org.jdom.JDOMException;
067 import org.jdom.Namespace;
068 import org.apache.hadoop.security.token.Token;
069 import org.apache.hadoop.security.token.TokenIdentifier;
070
071 public class JavaActionExecutor extends ActionExecutor {
072
073 private static final String HADOOP_USER = "user.name";
074 private static final String HADOOP_UGI = "hadoop.job.ugi";
075 private static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
076 private static final String HADOOP_NAME_NODE = "fs.default.name";
077
078 private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>();
079
080 private static int maxActionOutputLen;
081
082 private static final String SUCCEEDED = "SUCCEEDED";
083 private static final String KILLED = "KILLED";
084 private static final String FAILED = "FAILED";
085 private static final String FAILED_KILLED = "FAILED/KILLED";
086 private static final String RUNNING = "RUNNING";
087 private XLog log = XLog.getLog(getClass());
088
089 static {
090 DISALLOWED_PROPERTIES.add(HADOOP_USER);
091 DISALLOWED_PROPERTIES.add(HADOOP_UGI);
092 DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER);
093 DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE);
094 DISALLOWED_PROPERTIES.add(WorkflowAppService.HADOOP_JT_KERBEROS_NAME);
095 DISALLOWED_PROPERTIES.add(WorkflowAppService.HADOOP_NN_KERBEROS_NAME);
096 }
097
098 public JavaActionExecutor() {
099 this("java");
100 }
101
102 protected JavaActionExecutor(String type) {
103 super(type);
104 }
105
106 protected String getLauncherJarName() {
107 return getType() + "-launcher.jar";
108 }
109
110 protected List<Class> getLauncherClasses() {
111 List<Class> classes = new ArrayList<Class>();
112 classes.add(LauncherMapper.class);
113 classes.add(LauncherSecurityManager.class);
114 classes.add(LauncherException.class);
115 classes.add(LauncherMainException.class);
116 return classes;
117 }
118
119 @Override
120 public void initActionType() {
121 super.initActionType();
122 maxActionOutputLen = getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN, 2 * 1024);
123 try {
124 List<Class> classes = getLauncherClasses();
125 Class[] launcherClasses = classes.toArray(new Class[classes.size()]);
126 IOUtils.createJar(new File(getOozieRuntimeDir()), getLauncherJarName(), launcherClasses);
127
128 registerError(UnknownHostException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA001");
129 registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.NON_TRANSIENT,
130 "JA002");
131 registerError(DiskChecker.DiskOutOfSpaceException.class.getName(),
132 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA003");
133 registerError(org.apache.hadoop.hdfs.protocol.QuotaExceededException.class.getName(),
134 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004");
135 registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(),
136 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005");
137 registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA006");
138 registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007");
139 registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008");
140 registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009");
141 }
142 catch (IOException ex) {
143 throw new RuntimeException(ex);
144 }
145 }
146
147 void checkForDisallowedProps(Configuration conf, String confName) throws ActionExecutorException {
148 for (String prop : DISALLOWED_PROPERTIES) {
149 if (conf.get(prop) != null) {
150 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA010",
151 "Property [{0}] not allowed in action [{1}] configuration", prop, confName);
152 }
153 }
154 }
155
156 public Configuration createBaseHadoopConf(Context context, Element actionXml) {
157 Configuration conf = new XConfiguration();
158 conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER));
159 conf.set(HADOOP_UGI, context.getProtoActionConf().get(WorkflowAppService.HADOOP_UGI));
160 if (context.getProtoActionConf().get(WorkflowAppService.HADOOP_JT_KERBEROS_NAME) != null) {
161 conf.set(WorkflowAppService.HADOOP_JT_KERBEROS_NAME, context.getProtoActionConf().get(
162 WorkflowAppService.HADOOP_JT_KERBEROS_NAME));
163 }
164 if (context.getProtoActionConf().get(WorkflowAppService.HADOOP_NN_KERBEROS_NAME) != null) {
165 conf.set(WorkflowAppService.HADOOP_NN_KERBEROS_NAME, context.getProtoActionConf().get(
166 WorkflowAppService.HADOOP_NN_KERBEROS_NAME));
167 }
168 conf.set(OozieClient.GROUP_NAME, context.getProtoActionConf().get(OozieClient.GROUP_NAME));
169 Namespace ns = actionXml.getNamespace();
170 String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim();
171 String nameNode = actionXml.getChild("name-node", ns).getTextTrim();
172 conf.set(HADOOP_JOB_TRACKER, jobTracker);
173 conf.set(HADOOP_NAME_NODE, nameNode);
174 conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true");
175 return conf;
176 }
177
178 Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
179 throws ActionExecutorException {
180 try {
181 Namespace ns = actionXml.getNamespace();
182 Element e = actionXml.getChild("configuration", ns);
183 if (e != null) {
184 String strConf = XmlUtils.prettyPrint(e).toString();
185 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
186
187 XConfiguration launcherConf = new XConfiguration();
188 for (Map.Entry<String, String> entry : inlineConf) {
189 if (entry.getKey().startsWith("oozie.launcher.")) {
190 String name = entry.getKey().substring("oozie.launcher.".length());
191 String value = entry.getValue();
192 // setting original KEY
193 launcherConf.set(entry.getKey(), value);
194 // setting un-prefixed key (to allow Hadoop job config
195 // for the launcher job
196 launcherConf.set(name, value);
197 }
198 }
199 checkForDisallowedProps(launcherConf, "inline launcher configuration");
200 XConfiguration.copy(launcherConf, conf);
201 }
202 return conf;
203 }
204 catch (IOException ex) {
205 throw convertException(ex);
206 }
207 }
208
209 protected FileSystem getActionFileSystem(Context context, WorkflowAction action) throws ActionExecutorException {
210 try {
211 Element actionXml = XmlUtils.parseXml(action.getConf());
212 return getActionFileSystem(context, actionXml);
213 }
214 catch (JDOMException ex) {
215 throw convertException(ex);
216 }
217 }
218
219 protected FileSystem getActionFileSystem(Context context, Element actionXml) throws ActionExecutorException {
220 try {
221 return context.getAppFileSystem();
222 }
223 catch (Exception ex) {
224 throw convertException(ex);
225 }
226 }
227
228 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
229 throws ActionExecutorException {
230 try {
231 Namespace ns = actionXml.getNamespace();
232 Element e = actionXml.getChild("job-xml", ns);
233 if (e != null) {
234 String jobXml = e.getTextTrim();
235 Path path = new Path(appPath, jobXml);
236 FileSystem fs = getActionFileSystem(context, actionXml);
237 Configuration jobXmlConf = new XConfiguration(fs.open(path));
238 checkForDisallowedProps(jobXmlConf, "job-xml");
239 XConfiguration.copy(jobXmlConf, actionConf);
240 }
241 e = actionXml.getChild("configuration", ns);
242 if (e != null) {
243 String strConf = XmlUtils.prettyPrint(e).toString();
244 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
245 checkForDisallowedProps(inlineConf, "inline configuration");
246 XConfiguration.copy(inlineConf, actionConf);
247 }
248 return actionConf;
249 }
250 catch (IOException ex) {
251 throw convertException(ex);
252 }
253 }
254
255 Configuration addToCache(Configuration conf, Path appPath, String filePath, boolean archive)
256 throws ActionExecutorException {
257 Path path = null;
258 try {
259 if (filePath.startsWith("/")) {
260 path = new Path(filePath);
261 }
262 else {
263 path = new Path(appPath, filePath);
264 }
265 URI uri = new URI(path.toUri().getPath());
266 if (archive) {
267 DistributedCache.addCacheArchive(uri, conf);
268 }
269 else {
270 String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);
271 if (fileName.endsWith(".so") || fileName.contains(".so.")) { // .so files
272 uri = new Path(path.toString() + "#" + fileName).toUri();
273 uri = new URI(uri.getPath());
274 DistributedCache.addCacheFile(uri, conf);
275 }
276 else if (fileName.endsWith(".jar")) { // .jar files
277 if (!fileName.contains("#")) {
278 path = new Path(uri.toString());
279
280 String user = conf.get("user.name");
281 String group = conf.get("group.name");
282 Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, group, path, conf);
283 }
284 else {
285 DistributedCache.addCacheFile(uri, conf);
286 }
287 }
288 else { // regular files
289 if (!fileName.contains("#")) {
290 uri = new Path(path.toString() + "#" + fileName).toUri();
291 uri = new URI(uri.getPath());
292 }
293 DistributedCache.addCacheFile(uri, conf);
294 }
295 }
296 DistributedCache.createSymlink(conf);
297 return conf;
298 }
299 catch (Exception ex) {
300 XLog.getLog(getClass()).debug(
301 "Errors when add to DistributedCache. Path=" + path + ", archive=" + archive + ", conf="
302 + XmlUtils.prettyPrint(conf).toString());
303 throw convertException(ex);
304 }
305 }
306
307 String getOozieLauncherJar(Context context) throws ActionExecutorException {
308 try {
309 return new Path(context.getActionDir(), getLauncherJarName()).toString();
310 }
311 catch (Exception ex) {
312 throw convertException(ex);
313 }
314 }
315
316 public void prepareActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
317 try {
318 Path actionDir = context.getActionDir();
319 Path tempActionDir = new Path(actionDir.getParent(), actionDir.getName() + ".tmp");
320 if (!actionFs.exists(actionDir)) {
321 try {
322 actionFs.copyFromLocalFile(new Path(getOozieRuntimeDir(), getLauncherJarName()), new Path(
323 tempActionDir, getLauncherJarName()));
324 actionFs.rename(tempActionDir, actionDir);
325 }
326 catch (IOException ex) {
327 actionFs.delete(tempActionDir, true);
328 actionFs.delete(actionDir, true);
329 throw ex;
330 }
331 }
332 }
333 catch (Exception ex) {
334 throw convertException(ex);
335 }
336 }
337
338 void cleanUpActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
339 try {
340 Path actionDir = context.getActionDir();
341 if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)
342 && actionFs.exists(actionDir)) {
343 actionFs.delete(actionDir, true);
344 }
345 }
346 catch (Exception ex) {
347 throw convertException(ex);
348 }
349 }
350
351 @SuppressWarnings("unchecked")
352 void setLibFilesArchives(Context context, Element actionXml, Path appPath, Configuration conf)
353 throws ActionExecutorException {
354 Configuration proto = context.getProtoActionConf();
355
356 addToCache(conf, appPath, getOozieLauncherJar(context), false);
357
358 String[] paths = proto.getStrings(WorkflowAppService.APP_LIB_PATH_LIST);
359 if (paths != null) {
360 for (String path : paths) {
361 addToCache(conf, appPath, path, false);
362 }
363 }
364
365 for (Element eProp : (List<Element>) actionXml.getChildren()) {
366 if (eProp.getName().equals("file")) {
367 String path = eProp.getTextTrim();
368 addToCache(conf, appPath, path, false);
369 }
370 else {
371 if (eProp.getName().equals("archive")) {
372 String path = eProp.getTextTrim();
373 addToCache(conf, appPath, path, true);
374 }
375 }
376 }
377 }
378
379 protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
380 Namespace ns = actionXml.getNamespace();
381 Element e = actionXml.getChild("main-class", ns);
382 return e.getTextTrim();
383 }
384
385 private static final String QUEUE_NAME = "mapred.job.queue.name";
386 private static final String OOZIE_LAUNCHER_QUEUE_NAME = "oozie.launcher.mapred.job.queue.name";
387
388 private static final Set<String> SPECIAL_PROPERTIES = new HashSet<String>();
389
390 static {
391 SPECIAL_PROPERTIES.add(QUEUE_NAME);
392 SPECIAL_PROPERTIES.add("mapreduce.jobtracker.kerberos.principal");
393 SPECIAL_PROPERTIES.add("dfs.namenode.kerberos.principal");
394 }
395
396 @SuppressWarnings("unchecked")
397 JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf)
398 throws ActionExecutorException {
399 try {
400
401 // app path could be a file
402 Path appPathRoot = new Path(context.getWorkflow().getAppPath());
403 if (actionFs.isFile(appPathRoot)) {
404 appPathRoot = appPathRoot.getParent();
405 }
406
407 // launcher job configuration
408 Configuration launcherConf = createBaseHadoopConf(context, actionXml);
409 setupLauncherConf(launcherConf, actionXml, appPathRoot, context);
410
411 // we are doing init+copy because if not we are getting 'hdfs'
412 // scheme not known
413 // its seems that new JobConf(Conf) does not load defaults, it
414 // assumes parameter Conf does.
415 JobConf launcherJobConf = new JobConf();
416 XConfiguration.copy(launcherConf, launcherJobConf);
417 setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf);
418 String jobName = XLog.format("oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(), context.getWorkflow()
419 .getAppName(), action.getName(), context.getWorkflow().getId());
420 launcherJobConf.setJobName(jobName);
421
422 String jobId = context.getWorkflow().getId();
423 String actionId = action.getId();
424 Path actionDir = context.getActionDir();
425 String recoveryId = context.getRecoveryId();
426
427 LauncherMapper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf);
428
429 LauncherMapper.setupMainClass(launcherJobConf, getLauncherMain(launcherConf, actionXml));
430
431 LauncherMapper.setupMaxOutputData(launcherJobConf, maxActionOutputLen);
432
433 Namespace ns = actionXml.getNamespace();
434 List<Element> list = actionXml.getChildren("arg", ns);
435 String[] args = new String[list.size()];
436 for (int i = 0; i < list.size(); i++) {
437 args[i] = list.get(i).getTextTrim();
438 }
439 LauncherMapper.setupMainArguments(launcherJobConf, args);
440
441 Element opt = actionXml.getChild("java-opts", ns);
442 if (opt != null) {
443 String opts = launcherConf.get("mapred.child.java.opts", "");
444 opts = opts + " " + opt.getTextTrim();
445 opts = opts.trim();
446 launcherJobConf.set("mapred.child.java.opts", opts);
447 }
448
449 // properties from action that are needed by the launcher (QUEUE NAME)
450 // maybe we should add queue to the WF schema, below job-tracker
451 for (String name : SPECIAL_PROPERTIES) {
452 String value = actionConf.get(name);
453 if (value != null) {
454 if (!name.equals(QUEUE_NAME) ||
455 (name.equals(QUEUE_NAME) && launcherJobConf.get(OOZIE_LAUNCHER_QUEUE_NAME) == null)) {
456 launcherJobConf.set(name, value);
457 }
458 }
459 }
460
461 // to disable cancelation of delegation token on launcher job end
462 launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
463
464 // setting the group owning the Oozie job to allow anybody in that
465 // group to kill the jobs.
466 launcherJobConf.set("mapreduce.job.acl-modify-job", context.getWorkflow().getGroup());
467
468 return launcherJobConf;
469 }
470 catch (Exception ex) {
471 throw convertException(ex);
472 }
473 }
474
475 private void injectCallback(Context context, Configuration conf) {
476 String callback = context.getCallbackUrl("$jobStatus");
477 if (conf.get("job.end.notification.url") != null) {
478 XLog.getLog(getClass()).warn("Overriding the action job end notification URI");
479 }
480 conf.set("job.end.notification.url", callback);
481 }
482
483 void injectActionCallback(Context context, Configuration actionConf) {
484 injectCallback(context, actionConf);
485 }
486
487 void injectLauncherCallback(Context context, Configuration launcherConf) {
488 injectCallback(context, launcherConf);
489 }
490
491 public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
492 JobClient jobClient = null;
493 boolean exception = false;
494 try {
495 Path appPathRoot = new Path(context.getWorkflow().getAppPath());
496
497 // app path could be a file
498 if (actionFs.isFile(appPathRoot)) {
499 appPathRoot = appPathRoot.getParent();
500 }
501
502 Element actionXml = XmlUtils.parseXml(action.getConf());
503
504 // action job configuration
505 Configuration actionConf = createBaseHadoopConf(context, actionXml);
506 setupActionConf(actionConf, context, actionXml, appPathRoot);
507 XLog.getLog(getClass()).debug("Setting LibFilesArchives ");
508 setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
509 String jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}", getType(), context.getWorkflow()
510 .getAppName(), action.getName(), context.getWorkflow().getId());
511 actionConf.set("mapred.job.name", jobName);
512 injectActionCallback(context, actionConf);
513
514 // setting the group owning the Oozie job to allow anybody in that
515 // group to kill the jobs.
516 actionConf.set("mapreduce.job.acl-modify-job", context.getWorkflow().getGroup());
517
518 // Setting the credential properties in launcher conf
519 HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context,
520 action, actionConf);
521
522 // Adding if action need to set more credential tokens
523 JobConf credentialsConf = new JobConf(false);
524 XConfiguration.copy(actionConf, credentialsConf);
525 setCredentialTokens(credentialsConf, context, action, credentialsProperties);
526
527 // insert conf to action conf from credentialsConf
528 for (Entry<String, String> entry : credentialsConf) {
529 if (actionConf.get(entry.getKey()) == null) {
530 actionConf.set(entry.getKey(), entry.getValue());
531 }
532 }
533
534 JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
535 injectLauncherCallback(context, launcherJobConf);
536 XLog.getLog(getClass()).debug("Creating Job Client for action " + action.getId());
537 jobClient = createJobClient(context, launcherJobConf);
538 String launcherId = LauncherMapper.getRecoveryId(launcherJobConf, context.getActionDir(), context
539 .getRecoveryId());
540 boolean alreadyRunning = launcherId != null;
541 RunningJob runningJob;
542
543 // if user-retry is on, always submit new launcher
544 boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry();
545
546 if (alreadyRunning && !isUserRetry) {
547 runningJob = jobClient.getJob(JobID.forName(launcherId));
548 if (runningJob == null) {
549 String jobTracker = launcherJobConf.get("mapred.job.tracker");
550 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
551 "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker);
552 }
553 }
554 else {
555 prepare(context, actionXml);
556 XLog.getLog(getClass()).debug("Submitting the job through Job Client for action " + action.getId());
557
558 // setting up propagation of the delegation token.
559 AuthHelper.get().set(jobClient, launcherJobConf);
560 log.debug(WorkflowAppService.HADOOP_JT_KERBEROS_NAME + " = "
561 + launcherJobConf.get(WorkflowAppService.HADOOP_JT_KERBEROS_NAME));
562 log.debug(WorkflowAppService.HADOOP_NN_KERBEROS_NAME + " = "
563 + launcherJobConf.get(WorkflowAppService.HADOOP_NN_KERBEROS_NAME));
564
565 // insert credentials tokens to launcher job conf if needed
566 if (needInjectCredentials()) {
567 for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) {
568 log.debug("ADDING TOKEN: " + tk.getKind().toString());
569 launcherJobConf.getCredentials().addToken(tk.getKind(), tk);
570 }
571 }
572 else {
573 log.info("No need to inject credentials.");
574 }
575 runningJob = jobClient.submitJob(launcherJobConf);
576 if (runningJob == null) {
577 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
578 "Error submitting launcher for action [{0}]", action.getId());
579 }
580 launcherId = runningJob.getID().toString();
581 XLog.getLog(getClass()).debug("After submission get the launcherId " + launcherId);
582 }
583
584 String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
585 String consoleUrl = runningJob.getTrackingURL();
586 context.setStartData(launcherId, jobTracker, consoleUrl);
587 }
588 catch (Exception ex) {
589 exception = true;
590 throw convertException(ex);
591 }
592 finally {
593 if (jobClient != null) {
594 try {
595 jobClient.close();
596 }
597 catch (Exception e) {
598 if (exception) {
599 log.error("JobClient error: ", e);
600 }
601 else {
602 throw convertException(e);
603 }
604 }
605 }
606 }
607 }
608
609 private boolean needInjectCredentials() {
610 boolean methodExists = true;
611
612 Class klass;
613 try {
614 klass = Class.forName("org.apache.hadoop.mapred.JobConf");
615 klass.getMethod("getCredentials");
616 }
617 catch (ClassNotFoundException ex) {
618 methodExists = false;
619 }
620 catch (NoSuchMethodException ex) {
621 methodExists = false;
622 }
623
624 return methodExists;
625 }
626
627 protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context,
628 WorkflowAction action, Configuration actionConf) throws Exception {
629 HashMap<String, CredentialsProperties> credPropertiesMap = null;
630 if (context != null && action != null) {
631 credPropertiesMap = getActionCredentialsProperties(context, action);
632 if (credPropertiesMap != null) {
633 for (String key : credPropertiesMap.keySet()) {
634 CredentialsProperties prop = credPropertiesMap.get(key);
635 if (prop != null) {
636 log.debug("Credential Properties set for action : " + action.getId());
637 for (String property : prop.getProperties().keySet()) {
638 actionConf.set(property, prop.getProperties().get(property));
639 log.debug("property : '" + property + "', value : '" + prop.getProperties().get(property) + "'");
640 }
641 }
642 }
643 }
644 else {
645 log.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred());
646 }
647 }
648 else {
649 log.warn("context or action is null");
650 }
651 return credPropertiesMap;
652 }
653
654 protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action,
655 HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception {
656
657 if (context != null && action != null && credPropertiesMap != null) {
658 for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) {
659 String credName = entry.getKey();
660 CredentialsProperties credProps = entry.getValue();
661 if (credProps != null) {
662 CredentialsProvider credProvider = new CredentialsProvider(credProps.getType());
663 Credentials credentialObject = credProvider.createCredentialObject();
664 if (credentialObject != null) {
665 credentialObject.addtoJobConf(jobconf, credProps, context);
666 log.debug("Retrieved Credential '" + credName + "' for action " + action.getId());
667 }
668 else {
669 log.debug("Credentials object is null for name= " + credName + ", type=" + credProps.getType());
670 }
671 }
672 else {
673 log.warn("Could not find credentials properties for: " + credName);
674 }
675 }
676 }
677
678 }
679
680 protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context,
681 WorkflowAction action) throws Exception {
682 HashMap<String, CredentialsProperties> props = new HashMap<String, CredentialsProperties>();
683 if (context != null && action != null) {
684 String credsInAction = action.getCred();
685 log.debug("Get credential '" + credsInAction + "' properties for action : " + action.getId());
686 String[] credNames = credsInAction.split(",");
687 for (String credName : credNames) {
688 CredentialsProperties credProps = getCredProperties(context, credName);
689 props.put(credName, credProps);
690 }
691 }
692 else {
693 log.warn("context or action is null");
694 }
695 return props;
696 }
697
698 @SuppressWarnings("unchecked")
699 protected CredentialsProperties getCredProperties(Context context, String credName)
700 throws Exception {
701 CredentialsProperties credProp = null;
702 String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition();
703 Element elementJob = XmlUtils.parseXml(workflowXml);
704 Element credentials = elementJob.getChild("credentials", elementJob.getNamespace());
705 if (credentials != null) {
706 for (Element credential : (List<Element>) credentials.getChildren("credential", credentials
707 .getNamespace())) {
708 String name = credential.getAttributeValue("name");
709 String type = credential.getAttributeValue("type");
710 log.debug("getCredProperties: Name: " + name + ", Type: " + type);
711 if (name.equalsIgnoreCase(credName)) {
712 credProp = new CredentialsProperties(name, type);
713 for (Element property : (List<Element>) credential.getChildren("property", credential
714 .getNamespace())) {
715 credProp.getProperties().put(property.getChildText("name", property.getNamespace()),
716 property.getChildText("value", property.getNamespace()));
717 log.debug("getCredProperties: Properties name :'"
718 + property.getChildText("name", property.getNamespace()) + "', Value : '"
719 + property.getChildText("value", property.getNamespace()) + "'");
720 }
721 }
722 }
723 }
724 else {
725 log.warn("credentials is null for the action");
726 }
727 return credProp;
728 }
729
730 void prepare(Context context, Element actionXml) throws ActionExecutorException {
731 Namespace ns = actionXml.getNamespace();
732 Element prepare = actionXml.getChild("prepare", ns);
733 if (prepare != null) {
734 XLog.getLog(getClass()).debug("Preparing the action with FileSystem operation");
735 FsActionExecutor fsAe = new FsActionExecutor();
736 fsAe.doOperations(context, prepare);
737 XLog.getLog(getClass()).debug("FS Operation is completed");
738 }
739 }
740
741 @Override
742 public void start(Context context, WorkflowAction action) throws ActionExecutorException {
743 try {
744 XLog.getLog(getClass()).debug("Starting action " + action.getId() + " getting Action File System");
745 FileSystem actionFs = getActionFileSystem(context, action);
746 XLog.getLog(getClass()).debug("Preparing action Dir through copying " + context.getActionDir());
747 prepareActionDir(actionFs, context);
748 XLog.getLog(getClass()).debug("Action Dir is ready. Submitting the action ");
749 submitLauncher(actionFs, context, action);
750 XLog.getLog(getClass()).debug("Action submit completed. Performing check ");
751 check(context, action);
752 XLog.getLog(getClass()).debug("Action check is done after submission");
753 }
754 catch (Exception ex) {
755 throw convertException(ex);
756 }
757 }
758
759 @Override
760 public void end(Context context, WorkflowAction action) throws ActionExecutorException {
761 try {
762 String externalStatus = action.getExternalStatus();
763 WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ? WorkflowAction.Status.OK
764 : WorkflowAction.Status.ERROR;
765 context.setEndData(status, getActionSignal(status));
766 }
767 catch (Exception ex) {
768 throw convertException(ex);
769 }
770 finally {
771 try {
772 FileSystem actionFs = getActionFileSystem(context, action);
773 cleanUpActionDir(actionFs, context);
774 }
775 catch (Exception ex) {
776 throw convertException(ex);
777 }
778 }
779 }
780
781 /**
782 * Create job client object
783 *
784 * @param context
785 * @param jobConf
786 * @return
787 * @throws HadoopAccessorException
788 */
789 protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException {
790 String user = context.getWorkflow().getUser();
791 String group = context.getWorkflow().getGroup();
792 return Services.get().get(HadoopAccessorService.class).createJobClient(user, group, jobConf);
793 }
794
795 @Override
796 public void check(Context context, WorkflowAction action) throws ActionExecutorException {
797 JobClient jobClient = null;
798 boolean exception = false;
799 try {
800 Element actionXml = XmlUtils.parseXml(action.getConf());
801 FileSystem actionFs = getActionFileSystem(context, actionXml);
802 Configuration conf = createBaseHadoopConf(context, actionXml);
803 JobConf jobConf = new JobConf();
804 XConfiguration.copy(conf, jobConf);
805 jobClient = createJobClient(context, jobConf);
806 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
807 if (runningJob == null) {
808 context.setExternalStatus(FAILED);
809 context.setExecutionData(FAILED, null);
810 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
811 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", action
812 .getExternalId(), action.getId());
813 }
814 if (runningJob.isComplete()) {
815 Path actionDir = context.getActionDir();
816
817 String user = context.getWorkflow().getUser();
818 String group = context.getWorkflow().getGroup();
819 if (LauncherMapper.hasIdSwap(runningJob, user, group, actionDir)) {
820 String launcherId = action.getExternalId();
821 Path idSwapPath = LauncherMapper.getIdSwapPath(context.getActionDir());
822 InputStream is = actionFs.open(idSwapPath);
823 BufferedReader reader = new BufferedReader(new InputStreamReader(is));
824 Properties props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
825 reader.close();
826 String newId = props.getProperty("id");
827 runningJob = jobClient.getJob(JobID.forName(newId));
828 if (runningJob == null) {
829 context.setExternalStatus(FAILED);
830 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
831 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newId,
832 action.getId());
833 }
834
835 context.setStartData(newId, action.getTrackerUri(), runningJob.getTrackingURL());
836 XLog.getLog(getClass()).info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId,
837 newId);
838 }
839 if (runningJob.isComplete()) {
840 XLog.getLog(getClass()).info(XLog.STD, "action completed, external ID [{0}]",
841 action.getExternalId());
842 if (runningJob.isSuccessful() && LauncherMapper.isMainSuccessful(runningJob)) {
843 Properties props = null;
844 if (getCaptureOutput(action)) {
845 props = new Properties();
846 if (LauncherMapper.hasOutputData(runningJob)) {
847 Path actionOutput = LauncherMapper.getOutputDataPath(context.getActionDir());
848 InputStream is = actionFs.open(actionOutput);
849 BufferedReader reader = new BufferedReader(new InputStreamReader(is));
850 props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
851 reader.close();
852 }
853 }
854 context.setExecutionData(SUCCEEDED, props);
855 XLog.getLog(getClass()).info(XLog.STD, "action produced output");
856 }
857 else {
858 XLog log = XLog.getLog(getClass());
859 String errorReason;
860 Path actionError = LauncherMapper.getErrorPath(context.getActionDir());
861 if (actionFs.exists(actionError)) {
862 InputStream is = actionFs.open(actionError);
863 BufferedReader reader = new BufferedReader(new InputStreamReader(is));
864 Properties props = PropertiesUtils.readProperties(reader, -1);
865 reader.close();
866 String errorCode = props.getProperty("error.code");
867 if (errorCode.equals("0")) {
868 errorCode = "JA018";
869 }
870 if (errorCode.equals("-1")) {
871 errorCode = "JA019";
872 }
873 errorReason = props.getProperty("error.reason");
874 log.warn("Launcher ERROR, reason: {0}", errorReason);
875 String exMsg = props.getProperty("exception.message");
876 String errorInfo = (exMsg != null) ? exMsg : errorReason;
877 context.setErrorInfo(errorCode, errorInfo);
878 String exStackTrace = props.getProperty("exception.stacktrace");
879 if (exMsg != null) {
880 log.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
881 }
882 }
883 else {
884 errorReason = XLog.format("LauncherMapper died, check Hadoop log for job [{0}:{1}]", action
885 .getTrackerUri(), action.getExternalId());
886 log.warn(errorReason);
887 }
888 context.setExecutionData(FAILED_KILLED, null);
889 }
890 }
891 else {
892 context.setExternalStatus(RUNNING);
893 XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]",
894 action.getExternalId(), action.getExternalStatus());
895 }
896 }
897 else {
898 context.setExternalStatus(RUNNING);
899 XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]",
900 action.getExternalId(), action.getExternalStatus());
901 }
902 }
903 catch (Exception ex) {
904 XLog.getLog(getClass()).warn("Exception in check(). Message[{0}]", ex.getMessage(), ex);
905 exception = true;
906 throw convertException(ex);
907 }
908 finally {
909 if (jobClient != null) {
910 try {
911 jobClient.close();
912 }
913 catch (Exception e) {
914 if (exception) {
915 log.error("JobClient error: ", e);
916 }
917 else {
918 throw convertException(e);
919 }
920 }
921 }
922 }
923 }
924
925 protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
926 Element eConf = XmlUtils.parseXml(action.getConf());
927 Namespace ns = eConf.getNamespace();
928 Element captureOutput = eConf.getChild("capture-output", ns);
929 return captureOutput != null;
930 }
931
932 @Override
933 public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
934 JobClient jobClient = null;
935 boolean exception = false;
936 try {
937 Element actionXml = XmlUtils.parseXml(action.getConf());
938 Configuration conf = createBaseHadoopConf(context, actionXml);
939 JobConf jobConf = new JobConf();
940 XConfiguration.copy(conf, jobConf);
941 jobClient = createJobClient(context, jobConf);
942 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
943 if (runningJob != null) {
944 runningJob.killJob();
945 }
946 context.setExternalStatus(KILLED);
947 context.setExecutionData(KILLED, null);
948 }
949 catch (Exception ex) {
950 exception = true;
951 throw convertException(ex);
952 }
953 finally {
954 try {
955 FileSystem actionFs = getActionFileSystem(context, action);
956 cleanUpActionDir(actionFs, context);
957 if (jobClient != null) {
958 jobClient.close();
959 }
960 }
961 catch (Exception ex) {
962 if (exception) {
963 log.error("Error: ", ex);
964 }
965 else {
966 throw convertException(ex);
967 }
968 }
969 }
970 }
971
972 private static Set<String> FINAL_STATUS = new HashSet<String>();
973
974 static {
975 FINAL_STATUS.add(SUCCEEDED);
976 FINAL_STATUS.add(KILLED);
977 FINAL_STATUS.add(FAILED);
978 FINAL_STATUS.add(FAILED_KILLED);
979 }
980
981 @Override
982 public boolean isCompleted(String externalStatus) {
983 return FINAL_STATUS.contains(externalStatus);
984 }
985
986 }