This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import java.io.BufferedReader;
021 import java.io.File;
022 import java.io.FileNotFoundException;
023 import java.io.IOException;
024 import java.io.InputStream;
025 import java.io.InputStreamReader;
026 import java.io.StringReader;
027 import java.net.ConnectException;
028 import java.net.URI;
029 import java.net.URISyntaxException;
030 import java.net.UnknownHostException;
031 import java.util.ArrayList;
032 import java.util.HashMap;
033 import java.util.HashSet;
034 import java.util.List;
035 import java.util.Map;
036 import java.util.Properties;
037 import java.util.Set;
038 import java.util.Map.Entry;
039
040 import org.apache.hadoop.conf.Configuration;
041 import org.apache.hadoop.filecache.DistributedCache;
042 import org.apache.hadoop.fs.FileStatus;
043 import org.apache.hadoop.fs.FileSystem;
044 import org.apache.hadoop.fs.Path;
045 import org.apache.hadoop.fs.permission.AccessControlException;
046 import org.apache.hadoop.io.Text;
047 import org.apache.hadoop.mapred.JobClient;
048 import org.apache.hadoop.mapred.JobConf;
049 import org.apache.hadoop.mapred.JobID;
050 import org.apache.hadoop.mapred.RunningJob;
051 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
052 import org.apache.hadoop.util.DiskChecker;
053 import org.apache.oozie.WorkflowActionBean;
054 import org.apache.oozie.WorkflowJobBean;
055 import org.apache.oozie.action.ActionExecutor;
056 import org.apache.oozie.action.ActionExecutorException;
057 import org.apache.oozie.client.OozieClient;
058 import org.apache.oozie.client.WorkflowAction;
059 import org.apache.oozie.service.HadoopAccessorException;
060 import org.apache.oozie.service.HadoopAccessorService;
061 import org.apache.oozie.service.Services;
062 import org.apache.oozie.service.WorkflowAppService;
063 import org.apache.oozie.servlet.CallbackServlet;
064 import org.apache.oozie.util.IOUtils;
065 import org.apache.oozie.util.PropertiesUtils;
066 import org.apache.oozie.util.XConfiguration;
067 import org.apache.oozie.util.XLog;
068 import org.apache.oozie.util.XmlUtils;
069 import org.jdom.Element;
070 import org.jdom.JDOMException;
071 import org.jdom.Namespace;
072 import org.apache.hadoop.security.token.Token;
073 import org.apache.hadoop.security.token.TokenIdentifier;
074
075 public class JavaActionExecutor extends ActionExecutor {
076
077 private static final String HADOOP_USER = "user.name";
078 private static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
079 private static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
080 private static final String HADOOP_NAME_NODE = "fs.default.name";
081 public static final String OOZIE_COMMON_LIBDIR = "oozie";
082 public static final int MAX_EXTERNAL_STATS_SIZE_DEFAULT = Integer.MAX_VALUE;
083 private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>();
084 public final static String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size";
085 private static int maxActionOutputLen;
086 private static int maxExternalStatsSize;
087
088 private static final String SUCCEEDED = "SUCCEEDED";
089 private static final String KILLED = "KILLED";
090 private static final String FAILED = "FAILED";
091 private static final String FAILED_KILLED = "FAILED/KILLED";
092 private static final String RUNNING = "RUNNING";
093 protected XLog log = XLog.getLog(getClass());
094
095 static {
096 DISALLOWED_PROPERTIES.add(HADOOP_USER);
097 DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER);
098 DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE);
099 DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER_2);
100 }
101
102 public JavaActionExecutor() {
103 this("java");
104 }
105
106 protected JavaActionExecutor(String type) {
107 super(type);
108 }
109
110 protected String getLauncherJarName() {
111 return getType() + "-launcher.jar";
112 }
113
114 protected List<Class> getLauncherClasses() {
115 List<Class> classes = new ArrayList<Class>();
116 classes.add(LauncherMapper.class);
117 classes.add(LauncherSecurityManager.class);
118 classes.add(LauncherException.class);
119 classes.add(LauncherMainException.class);
120 classes.add(FileSystemActions.class);
121 classes.add(PrepareActionsDriver.class);
122 classes.add(ActionStats.class);
123 classes.add(ActionType.class);
124 return classes;
125 }
126
127 @Override
128 public void initActionType() {
129 super.initActionType();
130 maxActionOutputLen = getOozieConf()
131 .getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA,
132 // TODO: Remove the below config get in a subsequent release..
133 // This other irrelevant property is only used to
134 // preserve backwards compatibility cause of a typo.
135 // See OOZIE-4.
136 getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN,
137 2 * 1024));
138 //Get the limit for the maximum allowed size of action stats
139 maxExternalStatsSize = getOozieConf().getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE, MAX_EXTERNAL_STATS_SIZE_DEFAULT);
140 maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize;
141 try {
142 List<Class> classes = getLauncherClasses();
143 Class[] launcherClasses = classes.toArray(new Class[classes.size()]);
144 IOUtils.createJar(new File(getOozieRuntimeDir()), getLauncherJarName(), launcherClasses);
145
146 registerError(UnknownHostException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA001");
147 registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.NON_TRANSIENT,
148 "JA002");
149 registerError(DiskChecker.DiskOutOfSpaceException.class.getName(),
150 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA003");
151 registerError(org.apache.hadoop.hdfs.protocol.QuotaExceededException.class.getName(),
152 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004");
153 registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(),
154 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005");
155 registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA006");
156 registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007");
157 registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008");
158 registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009");
159 }
160 catch (IOException ex) {
161 throw new RuntimeException(ex);
162 }
163 }
164
165 /**
166 * Get the maximum allowed size of stats
167 *
168 * @return maximum size of stats
169 */
170 public static int getMaxExternalStatsSize() {
171 return maxExternalStatsSize;
172 }
173
174 void checkForDisallowedProps(Configuration conf, String confName) throws ActionExecutorException {
175 for (String prop : DISALLOWED_PROPERTIES) {
176 if (conf.get(prop) != null) {
177 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA010",
178 "Property [{0}] not allowed in action [{1}] configuration", prop, confName);
179 }
180 }
181 }
182
183 public JobConf createBaseHadoopConf(Context context, Element actionXml) {
184 Namespace ns = actionXml.getNamespace();
185 String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim();
186 String nameNode = actionXml.getChild("name-node", ns).getTextTrim();
187 JobConf conf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
188 conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER));
189 conf.set(HADOOP_JOB_TRACKER, jobTracker);
190 conf.set(HADOOP_JOB_TRACKER_2, jobTracker);
191 conf.set(HADOOP_NAME_NODE, nameNode);
192 conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true");
193 return conf;
194 }
195
196 private void injectLauncherProperties(Configuration srcConf, Configuration launcherConf) {
197 for (Map.Entry<String, String> entry : srcConf) {
198 if (entry.getKey().startsWith("oozie.launcher.")) {
199 String name = entry.getKey().substring("oozie.launcher.".length());
200 String value = entry.getValue();
201 // setting original KEY
202 launcherConf.set(entry.getKey(), value);
203 // setting un-prefixed key (to allow Hadoop job config
204 // for the launcher job
205 launcherConf.set(name, value);
206 }
207 }
208 }
209
210 Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
211 throws ActionExecutorException {
212 try {
213 Namespace ns = actionXml.getNamespace();
214 Element e = actionXml.getChild("configuration", ns);
215 if (e != null) {
216 String strConf = XmlUtils.prettyPrint(e).toString();
217 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
218
219 XConfiguration launcherConf = new XConfiguration();
220 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
221 XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_JOB_TRACKER), getType());
222 injectLauncherProperties(actionDefaultConf, launcherConf);
223 injectLauncherProperties(inlineConf, launcherConf);
224 checkForDisallowedProps(launcherConf, "launcher configuration");
225 XConfiguration.copy(launcherConf, conf);
226 }
227 return conf;
228 }
229 catch (IOException ex) {
230 throw convertException(ex);
231 }
232 }
233
234 protected FileSystem getActionFileSystem(Context context, WorkflowAction action) throws ActionExecutorException {
235 try {
236 Element actionXml = XmlUtils.parseXml(action.getConf());
237 return getActionFileSystem(context, actionXml);
238 }
239 catch (JDOMException ex) {
240 throw convertException(ex);
241 }
242 }
243
244 protected FileSystem getActionFileSystem(Context context, Element actionXml) throws ActionExecutorException {
245 try {
246 return context.getAppFileSystem();
247 }
248 catch (Exception ex) {
249 throw convertException(ex);
250 }
251 }
252
253 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
254 throws ActionExecutorException {
255 try {
256 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
257 XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_JOB_TRACKER), getType());
258 XConfiguration.injectDefaults(actionDefaults, actionConf);
259 Namespace ns = actionXml.getNamespace();
260 Element e = actionXml.getChild("job-xml", ns);
261 if (e != null) {
262 String jobXml = e.getTextTrim();
263 Path path = new Path(appPath, jobXml);
264 FileSystem fs = getActionFileSystem(context, actionXml);
265 Configuration jobXmlConf = new XConfiguration(fs.open(path));
266 checkForDisallowedProps(jobXmlConf, "job-xml");
267 XConfiguration.copy(jobXmlConf, actionConf);
268 }
269 e = actionXml.getChild("configuration", ns);
270 if (e != null) {
271 String strConf = XmlUtils.prettyPrint(e).toString();
272 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
273 checkForDisallowedProps(inlineConf, "inline configuration");
274 XConfiguration.copy(inlineConf, actionConf);
275 }
276 return actionConf;
277 }
278 catch (IOException ex) {
279 throw convertException(ex);
280 }
281 }
282
283 Configuration addToCache(Configuration conf, Path appPath, String filePath, boolean archive)
284 throws ActionExecutorException {
285 Path path = null;
286 try {
287 if (filePath.startsWith("/")) {
288 path = new Path(filePath);
289 }
290 else {
291 path = new Path(appPath, filePath);
292 }
293 URI uri = new URI(path.toUri().getPath());
294 if (archive) {
295 DistributedCache.addCacheArchive(uri, conf);
296 }
297 else {
298 String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);
299 if (fileName.endsWith(".so") || fileName.contains(".so.")) { // .so files
300 uri = new Path(path.toString() + "#" + fileName).toUri();
301 uri = new URI(uri.getPath());
302 DistributedCache.addCacheFile(uri, conf);
303 }
304 else if (fileName.endsWith(".jar")) { // .jar files
305 if (!fileName.contains("#")) {
306 path = new Path(uri.toString());
307
308 String user = conf.get("user.name");
309 String group = conf.get("group.name");
310 Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, path, conf);
311 }
312 else {
313 DistributedCache.addCacheFile(uri, conf);
314 }
315 }
316 else { // regular files
317 if (!fileName.contains("#")) {
318 uri = new Path(path.toString() + "#" + fileName).toUri();
319 uri = new URI(uri.getPath());
320 }
321 DistributedCache.addCacheFile(uri, conf);
322 }
323 }
324 DistributedCache.createSymlink(conf);
325 return conf;
326 }
327 catch (Exception ex) {
328 XLog.getLog(getClass()).debug(
329 "Errors when add to DistributedCache. Path=" + path + ", archive=" + archive + ", conf="
330 + XmlUtils.prettyPrint(conf).toString());
331 throw convertException(ex);
332 }
333 }
334
335 String getOozieLauncherJar(Context context) throws ActionExecutorException {
336 try {
337 return new Path(context.getActionDir(), getLauncherJarName()).toString();
338 }
339 catch (Exception ex) {
340 throw convertException(ex);
341 }
342 }
343
344 public void prepareActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
345 try {
346 Path actionDir = context.getActionDir();
347 Path tempActionDir = new Path(actionDir.getParent(), actionDir.getName() + ".tmp");
348 if (!actionFs.exists(actionDir)) {
349 try {
350 actionFs.copyFromLocalFile(new Path(getOozieRuntimeDir(), getLauncherJarName()), new Path(
351 tempActionDir, getLauncherJarName()));
352 actionFs.rename(tempActionDir, actionDir);
353 }
354 catch (IOException ex) {
355 actionFs.delete(tempActionDir, true);
356 actionFs.delete(actionDir, true);
357 throw ex;
358 }
359 }
360 }
361 catch (Exception ex) {
362 throw convertException(ex);
363 }
364 }
365
366 void cleanUpActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
367 try {
368 Path actionDir = context.getActionDir();
369 if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)
370 && actionFs.exists(actionDir)) {
371 actionFs.delete(actionDir, true);
372 }
373 }
374 catch (Exception ex) {
375 throw convertException(ex);
376 }
377 }
378
379 protected void addShareLib(Path appPath, Configuration conf, String actionShareLibPostfix)
380 throws ActionExecutorException {
381 if (actionShareLibPostfix != null) {
382 try {
383 Path systemLibPath = Services.get().get(WorkflowAppService.class).getSystemLibPath();
384 if (systemLibPath != null) {
385 Path actionLibPath = new Path(systemLibPath, actionShareLibPostfix);
386 String user = conf.get("user.name");
387 String group = conf.get("group.name");
388 FileSystem fs =
389 Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf);
390 if (fs.exists(actionLibPath)) {
391 FileStatus[] files = fs.listStatus(actionLibPath);
392 for (FileStatus file : files) {
393 addToCache(conf, appPath, file.getPath().toUri().getPath(), false);
394 }
395 }
396 }
397 }
398 catch (HadoopAccessorException ex){
399 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
400 ex.getErrorCode().toString(), ex.getMessage());
401 }
402 catch (IOException ex){
403 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
404 "It should never happen", ex.getMessage());
405 }
406 }
407 }
408
409 @SuppressWarnings("unchecked")
410 void setLibFilesArchives(Context context, Element actionXml, Path appPath, Configuration conf)
411 throws ActionExecutorException {
412 Configuration proto = context.getProtoActionConf();
413
414 // launcher JAR
415 addToCache(conf, appPath, getOozieLauncherJar(context), false);
416
417 // Workflow lib/
418 String[] paths = proto.getStrings(WorkflowAppService.APP_LIB_PATH_LIST);
419 if (paths != null) {
420 for (String path : paths) {
421 addToCache(conf, appPath, path, false);
422 }
423 }
424
425 // files and archives defined in the action
426 for (Element eProp : (List<Element>) actionXml.getChildren()) {
427 if (eProp.getName().equals("file")) {
428 String path = eProp.getTextTrim();
429 addToCache(conf, appPath, path, false);
430 }
431 else {
432 if (eProp.getName().equals("archive")) {
433 String path = eProp.getTextTrim();
434 addToCache(conf, appPath, path, true);
435 }
436 }
437 }
438
439 addAllShareLibs(appPath, conf, context, actionXml);
440 }
441
442 // Adds action specific share libs and common share libs
443 private void addAllShareLibs(Path appPath, Configuration conf, Context context, Element actionXml)
444 throws ActionExecutorException {
445 // Add action specific share libs
446 addActionShareLib(appPath, conf, context, actionXml);
447 // Add common sharelibs for Oozie
448 addShareLib(appPath, conf, JavaActionExecutor.OOZIE_COMMON_LIBDIR);
449 }
450
451 private void addActionShareLib(Path appPath, Configuration conf, Context context, Element actionXml) throws ActionExecutorException {
452 XConfiguration wfJobConf = null;
453 try {
454 wfJobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
455 }
456 catch (IOException ioe) {
457 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen",
458 ioe.getMessage());
459 }
460 // Action sharelibs are only added if user has specified to use system libpath
461 if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) {
462 // add action specific sharelibs
463 addShareLib(appPath, conf, getShareLibPostFix(context, actionXml));
464 }
465 }
466
467
468 protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
469 Namespace ns = actionXml.getNamespace();
470 Element e = actionXml.getChild("main-class", ns);
471 return e.getTextTrim();
472 }
473
474 private static final String QUEUE_NAME = "mapred.job.queue.name";
475 private static final String OOZIE_LAUNCHER_QUEUE_NAME = "oozie.launcher.mapred.job.queue.name";
476
477 private static final Set<String> SPECIAL_PROPERTIES = new HashSet<String>();
478
479 static {
480 SPECIAL_PROPERTIES.add(QUEUE_NAME);
481 SPECIAL_PROPERTIES.add("mapreduce.jobtracker.kerberos.principal");
482 SPECIAL_PROPERTIES.add("dfs.namenode.kerberos.principal");
483 }
484
485 @SuppressWarnings("unchecked")
486 JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf)
487 throws ActionExecutorException {
488 try {
489
490 // app path could be a file
491 Path appPathRoot = new Path(context.getWorkflow().getAppPath());
492 if (actionFs.isFile(appPathRoot)) {
493 appPathRoot = appPathRoot.getParent();
494 }
495
496 // launcher job configuration
497 JobConf launcherJobConf = createBaseHadoopConf(context, actionXml);
498 setupLauncherConf(launcherJobConf, actionXml, appPathRoot, context);
499
500 setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf);
501 String jobName = XLog.format("oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(), context.getWorkflow()
502 .getAppName(), action.getName(), context.getWorkflow().getId());
503 launcherJobConf.setJobName(jobName);
504
505 String jobId = context.getWorkflow().getId();
506 String actionId = action.getId();
507 Path actionDir = context.getActionDir();
508 String recoveryId = context.getRecoveryId();
509
510 // Getting the prepare XML from the action XML
511 Namespace ns = actionXml.getNamespace();
512 Element prepareElement = actionXml.getChild("prepare", ns);
513 String prepareXML = "";
514 if (prepareElement != null) {
515 if (prepareElement.getChildren().size() > 0) {
516 prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim();
517 }
518 }
519 LauncherMapper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf,
520 prepareXML);
521
522 LauncherMapper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
523
524 LauncherMapper.setupMaxOutputData(launcherJobConf, maxActionOutputLen);
525 LauncherMapper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize);
526
527 List<Element> list = actionXml.getChildren("arg", ns);
528 String[] args = new String[list.size()];
529 for (int i = 0; i < list.size(); i++) {
530 args[i] = list.get(i).getTextTrim();
531 }
532 LauncherMapper.setupMainArguments(launcherJobConf, args);
533
534 Element opt = actionXml.getChild("java-opts", ns);
535 if (opt != null) {
536 String opts = launcherJobConf.get("mapred.child.java.opts", "");
537 opts = opts + " " + opt.getTextTrim();
538 opts = opts.trim();
539 launcherJobConf.set("mapred.child.java.opts", opts);
540 }
541
542 // properties from action that are needed by the launcher (QUEUE NAME)
543 // maybe we should add queue to the WF schema, below job-tracker
544 for (String name : SPECIAL_PROPERTIES) {
545 String value = actionConf.get(name);
546 if (value != null) {
547 if (!name.equals(QUEUE_NAME) ||
548 (name.equals(QUEUE_NAME) && launcherJobConf.get(OOZIE_LAUNCHER_QUEUE_NAME) == null)) {
549 launcherJobConf.set(name, value);
550 }
551 }
552 }
553
554 // to disable cancelation of delegation token on launcher job end
555 launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
556
557 if (context.getWorkflow().getAcl() != null) {
558 // setting the group owning the Oozie job to allow anybody in that
559 // group to kill the jobs.
560 launcherJobConf.set("mapreduce.job.acl-modify-job", context.getWorkflow().getAcl());
561 }
562
563 return launcherJobConf;
564 }
565 catch (Exception ex) {
566 throw convertException(ex);
567 }
568 }
569
570 private void injectCallback(Context context, Configuration conf) {
571 String callback = context.getCallbackUrl("$jobStatus");
572 if (conf.get("job.end.notification.url") != null) {
573 XLog.getLog(getClass()).warn("Overriding the action job end notification URI");
574 }
575 conf.set("job.end.notification.url", callback);
576 }
577
578 void injectActionCallback(Context context, Configuration actionConf) {
579 injectCallback(context, actionConf);
580 }
581
582 void injectLauncherCallback(Context context, Configuration launcherConf) {
583 injectCallback(context, launcherConf);
584 }
585
586 public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
587 JobClient jobClient = null;
588 boolean exception = false;
589 try {
590 Path appPathRoot = new Path(context.getWorkflow().getAppPath());
591
592 // app path could be a file
593 if (actionFs.isFile(appPathRoot)) {
594 appPathRoot = appPathRoot.getParent();
595 }
596
597 Element actionXml = XmlUtils.parseXml(action.getConf());
598
599 // action job configuration
600 Configuration actionConf = createBaseHadoopConf(context, actionXml);
601 setupActionConf(actionConf, context, actionXml, appPathRoot);
602 XLog.getLog(getClass()).debug("Setting LibFilesArchives ");
603 setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
604 String jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}", getType(), context.getWorkflow()
605 .getAppName(), action.getName(), context.getWorkflow().getId());
606 actionConf.set("mapred.job.name", jobName);
607 injectActionCallback(context, actionConf);
608
609 if (context.getWorkflow().getAcl() != null) {
610 // setting the group owning the Oozie job to allow anybody in that
611 // group to kill the jobs.
612 actionConf.set("mapreduce.job.acl-modify-job", context.getWorkflow().getAcl());
613 }
614
615 // Setting the credential properties in launcher conf
616 HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context,
617 action, actionConf);
618
619 // Adding if action need to set more credential tokens
620 JobConf credentialsConf = new JobConf(false);
621 XConfiguration.copy(actionConf, credentialsConf);
622 setCredentialTokens(credentialsConf, context, action, credentialsProperties);
623
624 // insert conf to action conf from credentialsConf
625 for (Entry<String, String> entry : credentialsConf) {
626 if (actionConf.get(entry.getKey()) == null) {
627 actionConf.set(entry.getKey(), entry.getValue());
628 }
629 }
630
631 JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
632 injectLauncherCallback(context, launcherJobConf);
633 XLog.getLog(getClass()).debug("Creating Job Client for action " + action.getId());
634 jobClient = createJobClient(context, launcherJobConf);
635 String launcherId = LauncherMapper.getRecoveryId(launcherJobConf, context.getActionDir(), context
636 .getRecoveryId());
637 boolean alreadyRunning = launcherId != null;
638 RunningJob runningJob;
639
640 // if user-retry is on, always submit new launcher
641 boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry();
642
643 if (alreadyRunning && !isUserRetry) {
644 runningJob = jobClient.getJob(JobID.forName(launcherId));
645 if (runningJob == null) {
646 String jobTracker = launcherJobConf.get("mapred.job.tracker");
647 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
648 "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker);
649 }
650 }
651 else {
652 XLog.getLog(getClass()).debug("Submitting the job through Job Client for action " + action.getId());
653
654 // setting up propagation of the delegation token.
655 Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(new Text("mr token"));
656 launcherJobConf.getCredentials().addToken(new Text("mr token"), mrdt);
657
658 // insert credentials tokens to launcher job conf if needed
659 if (needInjectCredentials()) {
660 for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) {
661 log.debug("ADDING TOKEN: " + tk.getKind().toString());
662 launcherJobConf.getCredentials().addToken(tk.getKind(), tk);
663 }
664 }
665 else {
666 log.info("No need to inject credentials.");
667 }
668 runningJob = jobClient.submitJob(launcherJobConf);
669 if (runningJob == null) {
670 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
671 "Error submitting launcher for action [{0}]", action.getId());
672 }
673 launcherId = runningJob.getID().toString();
674 XLog.getLog(getClass()).debug("After submission get the launcherId " + launcherId);
675 }
676
677 String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
678 String consoleUrl = runningJob.getTrackingURL();
679 context.setStartData(launcherId, jobTracker, consoleUrl);
680 }
681 catch (Exception ex) {
682 exception = true;
683 throw convertException(ex);
684 }
685 finally {
686 if (jobClient != null) {
687 try {
688 jobClient.close();
689 }
690 catch (Exception e) {
691 if (exception) {
692 log.error("JobClient error: ", e);
693 }
694 else {
695 throw convertException(e);
696 }
697 }
698 }
699 }
700 }
701
702 private boolean needInjectCredentials() {
703 boolean methodExists = true;
704
705 Class klass;
706 try {
707 klass = Class.forName("org.apache.hadoop.mapred.JobConf");
708 klass.getMethod("getCredentials");
709 }
710 catch (ClassNotFoundException ex) {
711 methodExists = false;
712 }
713 catch (NoSuchMethodException ex) {
714 methodExists = false;
715 }
716
717 return methodExists;
718 }
719
720 protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context,
721 WorkflowAction action, Configuration actionConf) throws Exception {
722 HashMap<String, CredentialsProperties> credPropertiesMap = null;
723 if (context != null && action != null) {
724 credPropertiesMap = getActionCredentialsProperties(context, action);
725 if (credPropertiesMap != null) {
726 for (String key : credPropertiesMap.keySet()) {
727 CredentialsProperties prop = credPropertiesMap.get(key);
728 if (prop != null) {
729 log.debug("Credential Properties set for action : " + action.getId());
730 for (String property : prop.getProperties().keySet()) {
731 actionConf.set(property, prop.getProperties().get(property));
732 log.debug("property : '" + property + "', value : '" + prop.getProperties().get(property) + "'");
733 }
734 }
735 }
736 }
737 else {
738 log.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred());
739 }
740 }
741 else {
742 log.warn("context or action is null");
743 }
744 return credPropertiesMap;
745 }
746
747 protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action,
748 HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception {
749
750 if (context != null && action != null && credPropertiesMap != null) {
751 for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) {
752 String credName = entry.getKey();
753 CredentialsProperties credProps = entry.getValue();
754 if (credProps != null) {
755 CredentialsProvider credProvider = new CredentialsProvider(credProps.getType());
756 Credentials credentialObject = credProvider.createCredentialObject();
757 if (credentialObject != null) {
758 credentialObject.addtoJobConf(jobconf, credProps, context);
759 log.debug("Retrieved Credential '" + credName + "' for action " + action.getId());
760 }
761 else {
762 log.debug("Credentials object is null for name= " + credName + ", type=" + credProps.getType());
763 }
764 }
765 else {
766 log.warn("Could not find credentials properties for: " + credName);
767 }
768 }
769 }
770
771 }
772
773 protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context,
774 WorkflowAction action) throws Exception {
775 HashMap<String, CredentialsProperties> props = new HashMap<String, CredentialsProperties>();
776 if (context != null && action != null) {
777 String credsInAction = action.getCred();
778 log.debug("Get credential '" + credsInAction + "' properties for action : " + action.getId());
779 String[] credNames = credsInAction.split(",");
780 for (String credName : credNames) {
781 CredentialsProperties credProps = getCredProperties(context, credName);
782 props.put(credName, credProps);
783 }
784 }
785 else {
786 log.warn("context or action is null");
787 }
788 return props;
789 }
790
791 @SuppressWarnings("unchecked")
792 protected CredentialsProperties getCredProperties(Context context, String credName)
793 throws Exception {
794 CredentialsProperties credProp = null;
795 String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition();
796 Element elementJob = XmlUtils.parseXml(workflowXml);
797 Element credentials = elementJob.getChild("credentials", elementJob.getNamespace());
798 if (credentials != null) {
799 for (Element credential : (List<Element>) credentials.getChildren("credential", credentials
800 .getNamespace())) {
801 String name = credential.getAttributeValue("name");
802 String type = credential.getAttributeValue("type");
803 log.debug("getCredProperties: Name: " + name + ", Type: " + type);
804 if (name.equalsIgnoreCase(credName)) {
805 credProp = new CredentialsProperties(name, type);
806 for (Element property : (List<Element>) credential.getChildren("property", credential
807 .getNamespace())) {
808 credProp.getProperties().put(property.getChildText("name", property.getNamespace()),
809 property.getChildText("value", property.getNamespace()));
810 log.debug("getCredProperties: Properties name :'"
811 + property.getChildText("name", property.getNamespace()) + "', Value : '"
812 + property.getChildText("value", property.getNamespace()) + "'");
813 }
814 }
815 }
816 }
817 else {
818 log.warn("credentials is null for the action");
819 }
820 return credProp;
821 }
822
823 @Override
824 public void start(Context context, WorkflowAction action) throws ActionExecutorException {
825 try {
826 XLog.getLog(getClass()).debug("Starting action " + action.getId() + " getting Action File System");
827 FileSystem actionFs = getActionFileSystem(context, action);
828 XLog.getLog(getClass()).debug("Preparing action Dir through copying " + context.getActionDir());
829 prepareActionDir(actionFs, context);
830 XLog.getLog(getClass()).debug("Action Dir is ready. Submitting the action ");
831 submitLauncher(actionFs, context, action);
832 XLog.getLog(getClass()).debug("Action submit completed. Performing check ");
833 check(context, action);
834 XLog.getLog(getClass()).debug("Action check is done after submission");
835 }
836 catch (Exception ex) {
837 throw convertException(ex);
838 }
839 }
840
841 @Override
842 public void end(Context context, WorkflowAction action) throws ActionExecutorException {
843 try {
844 String externalStatus = action.getExternalStatus();
845 WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ? WorkflowAction.Status.OK
846 : WorkflowAction.Status.ERROR;
847 context.setEndData(status, getActionSignal(status));
848 }
849 catch (Exception ex) {
850 throw convertException(ex);
851 }
852 finally {
853 try {
854 FileSystem actionFs = getActionFileSystem(context, action);
855 cleanUpActionDir(actionFs, context);
856 }
857 catch (Exception ex) {
858 throw convertException(ex);
859 }
860 }
861 }
862
863 /**
864 * Create job client object
865 *
866 * @param context
867 * @param jobConf
868 * @return
869 * @throws HadoopAccessorException
870 */
871 protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException {
872 String user = context.getWorkflow().getUser();
873 String group = context.getWorkflow().getGroup();
874 return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
875 }
876
877 @Override
878 public void check(Context context, WorkflowAction action) throws ActionExecutorException {
879 JobClient jobClient = null;
880 boolean exception = false;
881 try {
882 Element actionXml = XmlUtils.parseXml(action.getConf());
883 FileSystem actionFs = getActionFileSystem(context, actionXml);
884 JobConf jobConf = createBaseHadoopConf(context, actionXml);
885 jobClient = createJobClient(context, jobConf);
886 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
887 if (runningJob == null) {
888 context.setExternalStatus(FAILED);
889 context.setExecutionData(FAILED, null);
890 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
891 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", action
892 .getExternalId(), action.getId());
893 }
894 if (runningJob.isComplete()) {
895 Path actionDir = context.getActionDir();
896
897 String user = context.getWorkflow().getUser();
898 String group = context.getWorkflow().getGroup();
899 if (LauncherMapper.hasIdSwap(runningJob, user, group, actionDir)) {
900 String launcherId = action.getExternalId();
901 Path idSwapPath = LauncherMapper.getIdSwapPath(context.getActionDir());
902 InputStream is = actionFs.open(idSwapPath);
903 BufferedReader reader = new BufferedReader(new InputStreamReader(is));
904 Properties props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
905 reader.close();
906 String newId = props.getProperty("id");
907 runningJob = jobClient.getJob(JobID.forName(newId));
908 if (runningJob == null) {
909 context.setExternalStatus(FAILED);
910 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
911 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newId,
912 action.getId());
913 }
914
915 context.setStartData(newId, action.getTrackerUri(), runningJob.getTrackingURL());
916 XLog.getLog(getClass()).info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId,
917 newId);
918 }
919 if (runningJob.isComplete()) {
920 XLog.getLog(getClass()).info(XLog.STD, "action completed, external ID [{0}]",
921 action.getExternalId());
922 if (runningJob.isSuccessful() && LauncherMapper.isMainSuccessful(runningJob)) {
923 getActionData(actionFs, runningJob, action, context);
924 XLog.getLog(getClass()).info(XLog.STD, "action produced output");
925 }
926 else {
927 XLog log = XLog.getLog(getClass());
928 String errorReason;
929 Path actionError = LauncherMapper.getErrorPath(context.getActionDir());
930 if (actionFs.exists(actionError)) {
931 InputStream is = actionFs.open(actionError);
932 BufferedReader reader = new BufferedReader(new InputStreamReader(is));
933 Properties props = PropertiesUtils.readProperties(reader, -1);
934 reader.close();
935 String errorCode = props.getProperty("error.code");
936 if (errorCode.equals("0")) {
937 errorCode = "JA018";
938 }
939 if (errorCode.equals("-1")) {
940 errorCode = "JA019";
941 }
942 errorReason = props.getProperty("error.reason");
943 log.warn("Launcher ERROR, reason: {0}", errorReason);
944 String exMsg = props.getProperty("exception.message");
945 String errorInfo = (exMsg != null) ? exMsg : errorReason;
946 context.setErrorInfo(errorCode, errorInfo);
947 String exStackTrace = props.getProperty("exception.stacktrace");
948 if (exMsg != null) {
949 log.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
950 }
951 }
952 else {
953 errorReason = XLog.format("LauncherMapper died, check Hadoop log for job [{0}:{1}]", action
954 .getTrackerUri(), action.getExternalId());
955 log.warn(errorReason);
956 }
957 context.setExecutionData(FAILED_KILLED, null);
958 }
959 }
960 else {
961 context.setExternalStatus(RUNNING);
962 XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]",
963 action.getExternalId(), action.getExternalStatus());
964 }
965 }
966 else {
967 context.setExternalStatus(RUNNING);
968 XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]",
969 action.getExternalId(), action.getExternalStatus());
970 }
971 }
972 catch (Exception ex) {
973 XLog.getLog(getClass()).warn("Exception in check(). Message[{0}]", ex.getMessage(), ex);
974 exception = true;
975 throw convertException(ex);
976 }
977 finally {
978 if (jobClient != null) {
979 try {
980 jobClient.close();
981 }
982 catch (Exception e) {
983 if (exception) {
984 log.error("JobClient error: ", e);
985 }
986 else {
987 throw convertException(e);
988 }
989 }
990 }
991 }
992 }
993
994 /**
995 * Get the output data of an action. Subclasses should override this method
996 * to get action specific output data.
997 *
998 * @param actionFs the FileSystem object
999 * @param runningJob the runningJob
1000 * @param action the Workflow action
1001 * @param context executor context
1002 *
1003 */
1004 protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
1005 throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
1006 Properties props = null;
1007 if (getCaptureOutput(action)) {
1008 props = new Properties();
1009 if (LauncherMapper.hasOutputData(runningJob)) {
1010 Path actionOutput = LauncherMapper.getOutputDataPath(context.getActionDir());
1011 InputStream is = actionFs.open(actionOutput);
1012 BufferedReader reader = new BufferedReader(new InputStreamReader(is));
1013 props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
1014 reader.close();
1015 }
1016 }
1017 context.setExecutionData(SUCCEEDED, props);
1018 }
1019
1020 protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
1021 Element eConf = XmlUtils.parseXml(action.getConf());
1022 Namespace ns = eConf.getNamespace();
1023 Element captureOutput = eConf.getChild("capture-output", ns);
1024 return captureOutput != null;
1025 }
1026
1027 @Override
1028 public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
1029 JobClient jobClient = null;
1030 boolean exception = false;
1031 try {
1032 Element actionXml = XmlUtils.parseXml(action.getConf());
1033 JobConf jobConf = createBaseHadoopConf(context, actionXml);
1034 jobClient = createJobClient(context, jobConf);
1035 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
1036 if (runningJob != null) {
1037 runningJob.killJob();
1038 }
1039 context.setExternalStatus(KILLED);
1040 context.setExecutionData(KILLED, null);
1041 }
1042 catch (Exception ex) {
1043 exception = true;
1044 throw convertException(ex);
1045 }
1046 finally {
1047 try {
1048 FileSystem actionFs = getActionFileSystem(context, action);
1049 cleanUpActionDir(actionFs, context);
1050 if (jobClient != null) {
1051 jobClient.close();
1052 }
1053 }
1054 catch (Exception ex) {
1055 if (exception) {
1056 log.error("Error: ", ex);
1057 }
1058 else {
1059 throw convertException(ex);
1060 }
1061 }
1062 }
1063 }
1064
1065 private static Set<String> FINAL_STATUS = new HashSet<String>();
1066
1067 static {
1068 FINAL_STATUS.add(SUCCEEDED);
1069 FINAL_STATUS.add(KILLED);
1070 FINAL_STATUS.add(FAILED);
1071 FINAL_STATUS.add(FAILED_KILLED);
1072 }
1073
1074 @Override
1075 public boolean isCompleted(String externalStatus) {
1076 return FINAL_STATUS.contains(externalStatus);
1077 }
1078
1079 /**
1080 * Return the sharelib postfix for the action.
1081 * <p/>
1082 * If <code>NULL</code> or emtpy, it means that the action does not use the action
1083 * sharelib.
1084 * <p/>
1085 * If a non-empty string, i.e. <code>foo</code>, it means the action uses the
1086 * action sharelib subdirectory <code>foo</code> and all JARs in the <code>foo</code>
1087 * directory will be in the action classpath.
1088 *
1089 * @param context executor context.
1090 * @param actionXml the action XML.
1091 * @return the action sharelib post fix, this implementation returns <code>NULL</code>.
1092 */
1093 protected String getShareLibPostFix(Context context, Element actionXml) {
1094 return null;
1095 }
1096
1097 }