This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import java.io.BufferedReader;
021 import java.io.ByteArrayOutputStream;
022 import java.io.File;
023 import java.io.FileNotFoundException;
024 import java.io.IOException;
025 import java.io.InputStream;
026 import java.io.InputStreamReader;
027 import java.io.PrintStream;
028 import java.io.StringReader;
029 import java.net.ConnectException;
030 import java.net.URI;
031 import java.net.URISyntaxException;
032 import java.net.UnknownHostException;
033 import java.util.ArrayList;
034 import java.util.HashMap;
035 import java.util.HashSet;
036 import java.util.Iterator;
037 import java.util.List;
038 import java.util.Map;
039 import java.util.Properties;
040 import java.util.Set;
041 import java.util.Map.Entry;
042
043 import org.apache.hadoop.conf.Configuration;
044 import org.apache.hadoop.filecache.DistributedCache;
045 import org.apache.hadoop.fs.FileStatus;
046 import org.apache.hadoop.fs.FileSystem;
047 import org.apache.hadoop.fs.Path;
048 import org.apache.hadoop.fs.permission.AccessControlException;
049 import org.apache.hadoop.mapred.JobClient;
050 import org.apache.hadoop.mapred.JobConf;
051 import org.apache.hadoop.mapred.JobID;
052 import org.apache.hadoop.mapred.RunningJob;
053 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
054 import org.apache.hadoop.util.DiskChecker;
055 import org.apache.oozie.WorkflowActionBean;
056 import org.apache.oozie.WorkflowJobBean;
057 import org.apache.oozie.action.ActionExecutor;
058 import org.apache.oozie.action.ActionExecutorException;
059 import org.apache.oozie.client.OozieClient;
060 import org.apache.oozie.client.WorkflowAction;
061 import org.apache.oozie.service.HadoopAccessorException;
062 import org.apache.oozie.service.HadoopAccessorService;
063 import org.apache.oozie.service.Services;
064 import org.apache.oozie.service.WorkflowAppService;
065 import org.apache.oozie.servlet.CallbackServlet;
066 import org.apache.oozie.util.ELEvaluator;
067 import org.apache.oozie.util.IOUtils;
068 import org.apache.oozie.util.PropertiesUtils;
069 import org.apache.oozie.util.XConfiguration;
070 import org.apache.oozie.util.XLog;
071 import org.apache.oozie.util.XmlUtils;
072 import org.jdom.Element;
073 import org.jdom.JDOMException;
074 import org.jdom.Namespace;
075 import org.apache.hadoop.security.token.Token;
076 import org.apache.hadoop.security.token.TokenIdentifier;
077
078 public class JavaActionExecutor extends ActionExecutor {
079
080 private static final String HADOOP_USER = "user.name";
081 public static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
082 public static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
083 public static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
084 public static final String HADOOP_NAME_NODE = "fs.default.name";
085 private static final String HADOOP_JOB_NAME = "mapred.job.name";
086 public static final String OOZIE_COMMON_LIBDIR = "oozie";
087 public static final int MAX_EXTERNAL_STATS_SIZE_DEFAULT = Integer.MAX_VALUE;
088 private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>();
089 public final static String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size";
090 public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
091 public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
092 private static int maxActionOutputLen;
093 private static int maxExternalStatsSize;
094
095 private static final String SUCCEEDED = "SUCCEEDED";
096 private static final String KILLED = "KILLED";
097 private static final String FAILED = "FAILED";
098 private static final String FAILED_KILLED = "FAILED/KILLED";
099 private static final String RUNNING = "RUNNING";
100 protected XLog log = XLog.getLog(getClass());
101
102 static {
103 DISALLOWED_PROPERTIES.add(HADOOP_USER);
104 DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER);
105 DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE);
106 DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER_2);
107 DISALLOWED_PROPERTIES.add(HADOOP_YARN_RM);
108 }
109
110 public JavaActionExecutor() {
111 this("java");
112 requiresNNJT = true;
113 }
114
115 protected JavaActionExecutor(String type) {
116 super(type);
117 requiresNNJT = true;
118 }
119
120 protected String getLauncherJarName() {
121 return getType() + "-launcher.jar";
122 }
123
124 protected List<Class> getLauncherClasses() {
125 List<Class> classes = new ArrayList<Class>();
126 classes.add(LauncherMapper.class);
127 classes.add(LauncherSecurityManager.class);
128 classes.add(LauncherException.class);
129 classes.add(LauncherMainException.class);
130 classes.add(FileSystemActions.class);
131 classes.add(PrepareActionsDriver.class);
132 classes.add(ActionStats.class);
133 classes.add(ActionType.class);
134 return classes;
135 }
136
137 @Override
138 public void initActionType() {
139 XLog log = XLog.getLog(getClass());
140 super.initActionType();
141 maxActionOutputLen = getOozieConf()
142 .getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA,
143 // TODO: Remove the below config get in a subsequent release..
144 // This other irrelevant property is only used to
145 // preserve backwards compatibility cause of a typo.
146 // See OOZIE-4.
147 getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN,
148 2 * 1024));
149 //Get the limit for the maximum allowed size of action stats
150 maxExternalStatsSize = getOozieConf().getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE, MAX_EXTERNAL_STATS_SIZE_DEFAULT);
151 maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize;
152 try {
153 List<Class> classes = getLauncherClasses();
154 Class[] launcherClasses = classes.toArray(new Class[classes.size()]);
155 IOUtils.createJar(new File(getOozieRuntimeDir()), getLauncherJarName(), launcherClasses);
156
157 registerError(UnknownHostException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA001");
158 registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.NON_TRANSIENT,
159 "JA002");
160 registerError(DiskChecker.DiskOutOfSpaceException.class.getName(),
161 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA003");
162 registerError(org.apache.hadoop.hdfs.protocol.QuotaExceededException.class.getName(),
163 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004");
164 registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(),
165 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005");
166 registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, " JA006");
167 registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007");
168 registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008");
169 registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009");
170 }
171 catch (IOException ex) {
172 throw new RuntimeException(ex);
173 }
174 catch (java.lang.NoClassDefFoundError err) {
175 ByteArrayOutputStream baos = new ByteArrayOutputStream();
176 err.printStackTrace(new PrintStream(baos));
177 log.warn(baos.toString());
178 }
179 }
180
181 /**
182 * Get the maximum allowed size of stats
183 *
184 * @return maximum size of stats
185 */
186 public static int getMaxExternalStatsSize() {
187 return maxExternalStatsSize;
188 }
189
190 static void checkForDisallowedProps(Configuration conf, String confName) throws ActionExecutorException {
191 for (String prop : DISALLOWED_PROPERTIES) {
192 if (conf.get(prop) != null) {
193 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA010",
194 "Property [{0}] not allowed in action [{1}] configuration", prop, confName);
195 }
196 }
197 }
198
199 public JobConf createBaseHadoopConf(Context context, Element actionXml) {
200 Namespace ns = actionXml.getNamespace();
201 String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim();
202 String nameNode = actionXml.getChild("name-node", ns).getTextTrim();
203 JobConf conf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
204 conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER));
205 conf.set(HADOOP_JOB_TRACKER, jobTracker);
206 conf.set(HADOOP_JOB_TRACKER_2, jobTracker);
207 conf.set(HADOOP_YARN_RM, jobTracker);
208 conf.set(HADOOP_NAME_NODE, nameNode);
209 conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true");
210 return conf;
211 }
212
213 private void injectLauncherProperties(Configuration srcConf, Configuration launcherConf) {
214 for (Map.Entry<String, String> entry : srcConf) {
215 if (entry.getKey().startsWith("oozie.launcher.")) {
216 String name = entry.getKey().substring("oozie.launcher.".length());
217 String value = entry.getValue();
218 // setting original KEY
219 launcherConf.set(entry.getKey(), value);
220 // setting un-prefixed key (to allow Hadoop job config
221 // for the launcher job
222 launcherConf.set(name, value);
223 }
224 }
225 }
226
227 Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
228 throws ActionExecutorException {
229 try {
230 Namespace ns = actionXml.getNamespace();
231 Element e = actionXml.getChild("configuration", ns);
232 if (e != null) {
233 String strConf = XmlUtils.prettyPrint(e).toString();
234 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
235
236 XConfiguration launcherConf = new XConfiguration();
237 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
238 XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_JOB_TRACKER), getType());
239 injectLauncherProperties(actionDefaultConf, launcherConf);
240 injectLauncherProperties(inlineConf, launcherConf);
241 checkForDisallowedProps(launcherConf, "launcher configuration");
242 XConfiguration.copy(launcherConf, conf);
243 }
244 return conf;
245 }
246 catch (IOException ex) {
247 throw convertException(ex);
248 }
249 }
250
251 public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf)
252 throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException {
253 Namespace ns = element.getNamespace();
254 Iterator<Element> it = element.getChildren("job-xml", ns).iterator();
255 while (it.hasNext()) {
256 Element e = it.next();
257 String jobXml = e.getTextTrim();
258 Path path = new Path(appPath, jobXml);
259 FileSystem fs = context.getAppFileSystem();
260 Configuration jobXmlConf = new XConfiguration(fs.open(path));
261 checkForDisallowedProps(jobXmlConf, "job-xml");
262 XConfiguration.copy(jobXmlConf, conf);
263 }
264 Element e = element.getChild("configuration", ns);
265 if (e != null) {
266 String strConf = XmlUtils.prettyPrint(e).toString();
267 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
268 checkForDisallowedProps(inlineConf, "inline configuration");
269 XConfiguration.copy(inlineConf, conf);
270 }
271 }
272
273 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
274 throws ActionExecutorException {
275 try {
276 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
277 XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_JOB_TRACKER), getType());
278 XConfiguration.injectDefaults(actionDefaults, actionConf);
279
280 has.checkSupportedFilesystem(appPath.toUri());
281
282 parseJobXmlAndConfiguration(context, actionXml, appPath, actionConf);
283 return actionConf;
284 }
285 catch (IOException ex) {
286 throw convertException(ex);
287 }
288 catch (HadoopAccessorException ex) {
289 throw convertException(ex);
290 }
291 catch (URISyntaxException ex) {
292 throw convertException(ex);
293 }
294 }
295
296 Configuration addToCache(Configuration conf, Path appPath, String filePath, boolean archive)
297 throws ActionExecutorException {
298 Path path = null;
299 try {
300 if (filePath.startsWith("/")) {
301 path = new Path(filePath);
302 }
303 else {
304 path = new Path(appPath, filePath);
305 }
306 URI uri = new URI(path.toUri().getPath());
307 if (archive) {
308 DistributedCache.addCacheArchive(uri, conf);
309 }
310 else {
311 String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);
312 if (fileName.endsWith(".so") || fileName.contains(".so.")) { // .so files
313 uri = new Path(path.toString() + "#" + fileName).toUri();
314 uri = new URI(uri.getPath());
315 DistributedCache.addCacheFile(uri, conf);
316 }
317 else if (fileName.endsWith(".jar")) { // .jar files
318 if (!fileName.contains("#")) {
319 path = new Path(uri.toString());
320
321 String user = conf.get("user.name");
322 Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, path, conf);
323 }
324 else {
325 DistributedCache.addCacheFile(uri, conf);
326 }
327 }
328 else { // regular files
329 if (!fileName.contains("#")) {
330 uri = new Path(path.toString() + "#" + fileName).toUri();
331 uri = new URI(uri.getPath());
332 }
333 DistributedCache.addCacheFile(uri, conf);
334 }
335 }
336 DistributedCache.createSymlink(conf);
337 return conf;
338 }
339 catch (Exception ex) {
340 XLog.getLog(getClass()).debug(
341 "Errors when add to DistributedCache. Path=" + path + ", archive=" + archive + ", conf="
342 + XmlUtils.prettyPrint(conf).toString());
343 throw convertException(ex);
344 }
345 }
346
347 String getOozieLauncherJar(Context context) throws ActionExecutorException {
348 try {
349 return new Path(context.getActionDir(), getLauncherJarName()).toString();
350 }
351 catch (Exception ex) {
352 throw convertException(ex);
353 }
354 }
355
356 public void prepareActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
357 try {
358 Path actionDir = context.getActionDir();
359 Path tempActionDir = new Path(actionDir.getParent(), actionDir.getName() + ".tmp");
360 if (!actionFs.exists(actionDir)) {
361 try {
362 actionFs.copyFromLocalFile(new Path(getOozieRuntimeDir(), getLauncherJarName()), new Path(
363 tempActionDir, getLauncherJarName()));
364 actionFs.rename(tempActionDir, actionDir);
365 }
366 catch (IOException ex) {
367 actionFs.delete(tempActionDir, true);
368 actionFs.delete(actionDir, true);
369 throw ex;
370 }
371 }
372 }
373 catch (Exception ex) {
374 throw convertException(ex);
375 }
376 }
377
378 void cleanUpActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
379 try {
380 Path actionDir = context.getActionDir();
381 if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)
382 && actionFs.exists(actionDir)) {
383 actionFs.delete(actionDir, true);
384 }
385 }
386 catch (Exception ex) {
387 throw convertException(ex);
388 }
389 }
390
391 protected void addShareLib(Path appPath, Configuration conf, String actionShareLibName)
392 throws ActionExecutorException {
393 if (actionShareLibName != null) {
394 try {
395 Path systemLibPath = Services.get().get(WorkflowAppService.class).getSystemLibPath();
396 if (systemLibPath != null) {
397 Path actionLibPath = new Path(systemLibPath, actionShareLibName);
398 String user = conf.get("user.name");
399 FileSystem fs;
400 // If the actionLibPath has a valid scheme and authority, then use them to determine the filesystem that the
401 // sharelib resides on; otherwise, assume it resides on the same filesystem as the appPath and use the appPath
402 // to determine the filesystem
403 if (actionLibPath.toUri().getScheme() != null && actionLibPath.toUri().getAuthority() != null) {
404 fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, actionLibPath.toUri(), conf);
405 }
406 else {
407 fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf);
408 }
409 if (fs.exists(actionLibPath)) {
410 FileStatus[] files = fs.listStatus(actionLibPath);
411 for (FileStatus file : files) {
412 addToCache(conf, actionLibPath, file.getPath().toUri().getPath(), false);
413 }
414 }
415 }
416 }
417 catch (HadoopAccessorException ex){
418 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
419 ex.getErrorCode().toString(), ex.getMessage());
420 }
421 catch (IOException ex){
422 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
423 "It should never happen", ex.getMessage());
424 }
425 }
426 }
427
428 protected void addActionLibs(Path appPath, Configuration conf) throws ActionExecutorException {
429 String[] actionLibsStrArr = conf.getStrings("oozie.launcher.oozie.libpath");
430 if (actionLibsStrArr != null) {
431 try {
432 for (String actionLibsStr : actionLibsStrArr) {
433 actionLibsStr = actionLibsStr.trim();
434 if (actionLibsStr.length() > 0)
435 {
436 Path actionLibsPath = new Path(actionLibsStr);
437 String user = conf.get("user.name");
438 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf);
439 if (fs.exists(actionLibsPath)) {
440 FileStatus[] files = fs.listStatus(actionLibsPath);
441 for (FileStatus file : files) {
442 addToCache(conf, appPath, file.getPath().toUri().getPath(), false);
443 }
444 }
445 }
446 }
447 }
448 catch (HadoopAccessorException ex){
449 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
450 ex.getErrorCode().toString(), ex.getMessage());
451 }
452 catch (IOException ex){
453 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
454 "It should never happen", ex.getMessage());
455 }
456 }
457 }
458
459 @SuppressWarnings("unchecked")
460 void setLibFilesArchives(Context context, Element actionXml, Path appPath, Configuration conf)
461 throws ActionExecutorException {
462 Configuration proto = context.getProtoActionConf();
463
464 // launcher JAR
465 addToCache(conf, appPath, getOozieLauncherJar(context), false);
466
467 // Workflow lib/
468 String[] paths = proto.getStrings(WorkflowAppService.APP_LIB_PATH_LIST);
469 if (paths != null) {
470 for (String path : paths) {
471 addToCache(conf, appPath, path, false);
472 }
473 }
474
475 // Action libs
476 addActionLibs(appPath, conf);
477
478 // files and archives defined in the action
479 for (Element eProp : (List<Element>) actionXml.getChildren()) {
480 if (eProp.getName().equals("file")) {
481 String[] filePaths = eProp.getTextTrim().split(",");
482 for (String path : filePaths) {
483 addToCache(conf, appPath, path.trim(), false);
484 }
485 }
486 else if (eProp.getName().equals("archive")) {
487 String[] archivePaths = eProp.getTextTrim().split(",");
488 for (String path : archivePaths){
489 addToCache(conf, appPath, path.trim(), true);
490 }
491 }
492 }
493
494 addAllShareLibs(appPath, conf, context, actionXml);
495 }
496
497 // Adds action specific share libs and common share libs
498 private void addAllShareLibs(Path appPath, Configuration conf, Context context, Element actionXml)
499 throws ActionExecutorException {
500 // Add action specific share libs
501 addActionShareLib(appPath, conf, context, actionXml);
502 // Add common sharelibs for Oozie
503 addShareLib(appPath, conf, JavaActionExecutor.OOZIE_COMMON_LIBDIR);
504 }
505
506 private void addActionShareLib(Path appPath, Configuration conf, Context context, Element actionXml)
507 throws ActionExecutorException {
508 XConfiguration wfJobConf = null;
509 try {
510 wfJobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
511 }
512 catch (IOException ioe) {
513 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen",
514 ioe.getMessage());
515 }
516 // Action sharelibs are only added if user has specified to use system libpath
517 if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) {
518 // add action specific sharelibs
519 addShareLib(appPath, conf, getShareLibName(context, actionXml, conf));
520 }
521 }
522
523
524 protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
525 Namespace ns = actionXml.getNamespace();
526 Element e = actionXml.getChild("main-class", ns);
527 return e.getTextTrim();
528 }
529
530 private static final String QUEUE_NAME = "mapred.job.queue.name";
531
532 private static final Set<String> SPECIAL_PROPERTIES = new HashSet<String>();
533
534 static {
535 SPECIAL_PROPERTIES.add(QUEUE_NAME);
536 SPECIAL_PROPERTIES.add(ACL_VIEW_JOB);
537 SPECIAL_PROPERTIES.add(ACL_MODIFY_JOB);
538 }
539
540 @SuppressWarnings("unchecked")
541 JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf)
542 throws ActionExecutorException {
543 try {
544
545 // app path could be a file
546 Path appPathRoot = new Path(context.getWorkflow().getAppPath());
547 if (actionFs.isFile(appPathRoot)) {
548 appPathRoot = appPathRoot.getParent();
549 }
550
551 // launcher job configuration
552 JobConf launcherJobConf = createBaseHadoopConf(context, actionXml);
553 setupLauncherConf(launcherJobConf, actionXml, appPathRoot, context);
554
555 String actionShareLibProperty = actionConf.get(ACTION_SHARELIB_FOR + getType());
556 if (actionShareLibProperty != null) {
557 launcherJobConf.set(ACTION_SHARELIB_FOR + getType(), actionShareLibProperty);
558 }
559 setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf);
560
561 String jobName = launcherJobConf.get(HADOOP_JOB_NAME);
562 if (jobName == null || jobName.isEmpty()) {
563 jobName = XLog.format(
564 "oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(),
565 context.getWorkflow().getAppName(), action.getName(),
566 context.getWorkflow().getId());
567 launcherJobConf.setJobName(jobName);
568 }
569
570 String jobId = context.getWorkflow().getId();
571 String actionId = action.getId();
572 Path actionDir = context.getActionDir();
573 String recoveryId = context.getRecoveryId();
574
575 // Getting the prepare XML from the action XML
576 Namespace ns = actionXml.getNamespace();
577 Element prepareElement = actionXml.getChild("prepare", ns);
578 String prepareXML = "";
579 if (prepareElement != null) {
580 if (prepareElement.getChildren().size() > 0) {
581 prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim();
582 }
583 }
584 LauncherMapper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf,
585 prepareXML);
586
587 LauncherMapper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
588 LauncherMapper.setupSupportedFileSystems(
589 launcherJobConf, Services.get().getConf().get(HadoopAccessorService.SUPPORTED_FILESYSTEMS));
590 LauncherMapper.setupMaxOutputData(launcherJobConf, maxActionOutputLen);
591 LauncherMapper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize);
592
593 List<Element> list = actionXml.getChildren("arg", ns);
594 String[] args = new String[list.size()];
595 for (int i = 0; i < list.size(); i++) {
596 args[i] = list.get(i).getTextTrim();
597 }
598 LauncherMapper.setupMainArguments(launcherJobConf, args);
599
600 List<Element> javaopts = actionXml.getChildren("java-opt", ns);
601 for (Element opt: javaopts) {
602 String opts = launcherJobConf.get("mapred.child.java.opts", "");
603 opts = opts + " " + opt.getTextTrim();
604 opts = opts.trim();
605 launcherJobConf.set("mapred.child.java.opts", opts);
606 }
607
608 Element opt = actionXml.getChild("java-opts", ns);
609 if (opt != null) {
610 String opts = launcherJobConf.get("mapred.child.java.opts", "");
611 opts = opts + " " + opt.getTextTrim();
612 opts = opts.trim();
613 launcherJobConf.set("mapred.child.java.opts", opts);
614 }
615
616 // properties from action that are needed by the launcher (e.g. QUEUE NAME, ACLs)
617 // maybe we should add queue to the WF schema, below job-tracker
618 actionConfToLauncherConf(actionConf, launcherJobConf);
619
620 // to disable cancelation of delegation token on launcher job end
621 launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
622
623 return launcherJobConf;
624 }
625 catch (Exception ex) {
626 throw convertException(ex);
627 }
628 }
629
630 private void injectCallback(Context context, Configuration conf) {
631 String callback = context.getCallbackUrl("$jobStatus");
632 if (conf.get("job.end.notification.url") != null) {
633 XLog.getLog(getClass()).warn("Overriding the action job end notification URI");
634 }
635 conf.set("job.end.notification.url", callback);
636 }
637
638 void injectActionCallback(Context context, Configuration actionConf) {
639 injectCallback(context, actionConf);
640 }
641
642 void injectLauncherCallback(Context context, Configuration launcherConf) {
643 injectCallback(context, launcherConf);
644 }
645
646 private void actionConfToLauncherConf(Configuration actionConf, JobConf launcherConf) {
647 for (String name : SPECIAL_PROPERTIES) {
648 if (actionConf.get(name) != null && launcherConf.get("oozie.launcher." + name) == null) {
649 launcherConf.set(name, actionConf.get(name));
650 }
651 }
652 }
653
654 public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
655 JobClient jobClient = null;
656 boolean exception = false;
657 try {
658 Path appPathRoot = new Path(context.getWorkflow().getAppPath());
659
660 // app path could be a file
661 if (actionFs.isFile(appPathRoot)) {
662 appPathRoot = appPathRoot.getParent();
663 }
664
665 Element actionXml = XmlUtils.parseXml(action.getConf());
666
667 // action job configuration
668 Configuration actionConf = createBaseHadoopConf(context, actionXml);
669 setupActionConf(actionConf, context, actionXml, appPathRoot);
670 XLog.getLog(getClass()).debug("Setting LibFilesArchives ");
671 setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
672
673 String jobName = actionConf.get(HADOOP_JOB_NAME);
674 if (jobName == null || jobName.isEmpty()) {
675 jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}",
676 getType(), context.getWorkflow().getAppName(),
677 action.getName(), context.getWorkflow().getId());
678 actionConf.set(HADOOP_JOB_NAME, jobName);
679 }
680
681 injectActionCallback(context, actionConf);
682
683 if(actionConf.get(ACL_MODIFY_JOB) == null || actionConf.get(ACL_MODIFY_JOB).trim().equals("")) {
684 // ONLY in the case where user has not given the
685 // modify-job ACL specifically
686 if (context.getWorkflow().getAcl() != null) {
687 // setting the group owning the Oozie job to allow anybody in that
688 // group to modify the jobs.
689 actionConf.set(ACL_MODIFY_JOB, context.getWorkflow().getAcl());
690 }
691 }
692
693 // Setting the credential properties in launcher conf
694 HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context,
695 action, actionConf);
696
697 // Adding if action need to set more credential tokens
698 JobConf credentialsConf = new JobConf(false);
699 XConfiguration.copy(actionConf, credentialsConf);
700 setCredentialTokens(credentialsConf, context, action, credentialsProperties);
701
702 // insert conf to action conf from credentialsConf
703 for (Entry<String, String> entry : credentialsConf) {
704 if (actionConf.get(entry.getKey()) == null) {
705 actionConf.set(entry.getKey(), entry.getValue());
706 }
707 }
708
709 JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
710 injectLauncherCallback(context, launcherJobConf);
711 XLog.getLog(getClass()).debug("Creating Job Client for action " + action.getId());
712 jobClient = createJobClient(context, launcherJobConf);
713 String launcherId = LauncherMapper.getRecoveryId(launcherJobConf, context.getActionDir(), context
714 .getRecoveryId());
715 boolean alreadyRunning = launcherId != null;
716 RunningJob runningJob;
717
718 // if user-retry is on, always submit new launcher
719 boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry();
720
721 if (alreadyRunning && !isUserRetry) {
722 runningJob = jobClient.getJob(JobID.forName(launcherId));
723 if (runningJob == null) {
724 String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
725 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
726 "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker);
727 }
728 }
729 else {
730 XLog.getLog(getClass()).debug("Submitting the job through Job Client for action " + action.getId());
731
732 // setting up propagation of the delegation token.
733 Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(HadoopAccessorService
734 .getMRDelegationTokenRenewer(launcherJobConf));
735 launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt);
736
737 // insert credentials tokens to launcher job conf if needed
738 if (needInjectCredentials()) {
739 for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) {
740 log.debug("ADDING TOKEN: " + tk.getKind().toString());
741 launcherJobConf.getCredentials().addToken(tk.getKind(), tk);
742 }
743 }
744 else {
745 log.info("No need to inject credentials.");
746 }
747 runningJob = jobClient.submitJob(launcherJobConf);
748 if (runningJob == null) {
749 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
750 "Error submitting launcher for action [{0}]", action.getId());
751 }
752 launcherId = runningJob.getID().toString();
753 XLog.getLog(getClass()).debug("After submission get the launcherId " + launcherId);
754 }
755
756 String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
757 String consoleUrl = runningJob.getTrackingURL();
758 context.setStartData(launcherId, jobTracker, consoleUrl);
759 }
760 catch (Exception ex) {
761 exception = true;
762 throw convertException(ex);
763 }
764 finally {
765 if (jobClient != null) {
766 try {
767 jobClient.close();
768 }
769 catch (Exception e) {
770 if (exception) {
771 log.error("JobClient error: ", e);
772 }
773 else {
774 throw convertException(e);
775 }
776 }
777 }
778 }
779 }
780
781 private boolean needInjectCredentials() {
782 boolean methodExists = true;
783
784 Class klass;
785 try {
786 klass = Class.forName("org.apache.hadoop.mapred.JobConf");
787 klass.getMethod("getCredentials");
788 }
789 catch (ClassNotFoundException ex) {
790 methodExists = false;
791 }
792 catch (NoSuchMethodException ex) {
793 methodExists = false;
794 }
795
796 return methodExists;
797 }
798
799 protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context,
800 WorkflowAction action, Configuration actionConf) throws Exception {
801 HashMap<String, CredentialsProperties> credPropertiesMap = null;
802 if (context != null && action != null) {
803 credPropertiesMap = getActionCredentialsProperties(context, action);
804 if (credPropertiesMap != null) {
805 for (String key : credPropertiesMap.keySet()) {
806 CredentialsProperties prop = credPropertiesMap.get(key);
807 if (prop != null) {
808 log.debug("Credential Properties set for action : " + action.getId());
809 for (String property : prop.getProperties().keySet()) {
810 actionConf.set(property, prop.getProperties().get(property));
811 log.debug("property : '" + property + "', value : '" + prop.getProperties().get(property) + "'");
812 }
813 }
814 }
815 }
816 else {
817 log.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred());
818 }
819 }
820 else {
821 log.warn("context or action is null");
822 }
823 return credPropertiesMap;
824 }
825
826 protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action,
827 HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception {
828
829 if (context != null && action != null && credPropertiesMap != null) {
830 for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) {
831 String credName = entry.getKey();
832 CredentialsProperties credProps = entry.getValue();
833 if (credProps != null) {
834 CredentialsProvider credProvider = new CredentialsProvider(credProps.getType());
835 Credentials credentialObject = credProvider.createCredentialObject();
836 if (credentialObject != null) {
837 credentialObject.addtoJobConf(jobconf, credProps, context);
838 log.debug("Retrieved Credential '" + credName + "' for action " + action.getId());
839 }
840 else {
841 log.debug("Credentials object is null for name= " + credName + ", type=" + credProps.getType());
842 }
843 }
844 }
845 }
846
847 }
848
849 protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context,
850 WorkflowAction action) throws Exception {
851 HashMap<String, CredentialsProperties> props = new HashMap<String, CredentialsProperties>();
852 if (context != null && action != null) {
853 String credsInAction = action.getCred();
854 log.debug("Get credential '" + credsInAction + "' properties for action : " + action.getId());
855 String[] credNames = credsInAction.split(",");
856 for (String credName : credNames) {
857 CredentialsProperties credProps = getCredProperties(context, credName);
858 props.put(credName, credProps);
859 }
860 }
861 else {
862 log.warn("context or action is null");
863 }
864 return props;
865 }
866
867 @SuppressWarnings("unchecked")
868 protected CredentialsProperties getCredProperties(Context context, String credName)
869 throws Exception {
870 CredentialsProperties credProp = null;
871 String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition();
872 XConfiguration wfjobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
873 Element elementJob = XmlUtils.parseXml(workflowXml);
874 Element credentials = elementJob.getChild("credentials", elementJob.getNamespace());
875 if (credentials != null) {
876 for (Element credential : (List<Element>) credentials.getChildren("credential", credentials.getNamespace())) {
877 String name = credential.getAttributeValue("name");
878 String type = credential.getAttributeValue("type");
879 log.debug("getCredProperties: Name: " + name + ", Type: " + type);
880 if (name.equalsIgnoreCase(credName)) {
881 credProp = new CredentialsProperties(name, type);
882 for (Element property : (List<Element>) credential.getChildren("property",
883 credential.getNamespace())) {
884 String propertyName = property.getChildText("name", property.getNamespace());
885 String propertyValue = property.getChildText("value", property.getNamespace());
886 ELEvaluator eval = new ELEvaluator();
887 for (Map.Entry<String, String> entry : wfjobConf) {
888 eval.setVariable(entry.getKey(), entry.getValue().trim());
889 }
890 propertyName = eval.evaluate(propertyName, String.class);
891 propertyValue = eval.evaluate(propertyValue, String.class);
892
893 credProp.getProperties().put(propertyName, propertyValue);
894 log.debug("getCredProperties: Properties name :'" + propertyName + "', Value : '"
895 + propertyValue + "'");
896 }
897 }
898 }
899 } else {
900 log.warn("credentials is null for the action");
901 }
902 return credProp;
903 }
904
905 @Override
906 public void start(Context context, WorkflowAction action) throws ActionExecutorException {
907 try {
908 XLog.getLog(getClass()).debug("Starting action " + action.getId() + " getting Action File System");
909 FileSystem actionFs = context.getAppFileSystem();
910 XLog.getLog(getClass()).debug("Preparing action Dir through copying " + context.getActionDir());
911 prepareActionDir(actionFs, context);
912 XLog.getLog(getClass()).debug("Action Dir is ready. Submitting the action ");
913 submitLauncher(actionFs, context, action);
914 XLog.getLog(getClass()).debug("Action submit completed. Performing check ");
915 check(context, action);
916 XLog.getLog(getClass()).debug("Action check is done after submission");
917 }
918 catch (Exception ex) {
919 throw convertException(ex);
920 }
921 }
922
923 @Override
924 public void end(Context context, WorkflowAction action) throws ActionExecutorException {
925 try {
926 String externalStatus = action.getExternalStatus();
927 WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ? WorkflowAction.Status.OK
928 : WorkflowAction.Status.ERROR;
929 context.setEndData(status, getActionSignal(status));
930 }
931 catch (Exception ex) {
932 throw convertException(ex);
933 }
934 finally {
935 try {
936 FileSystem actionFs = context.getAppFileSystem();
937 cleanUpActionDir(actionFs, context);
938 }
939 catch (Exception ex) {
940 throw convertException(ex);
941 }
942 }
943 }
944
945 /**
946 * Create job client object
947 *
948 * @param context
949 * @param jobConf
950 * @return
951 * @throws HadoopAccessorException
952 */
953 protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException {
954 String user = context.getWorkflow().getUser();
955 String group = context.getWorkflow().getGroup();
956 return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
957 }
958
959 @Override
960 public void check(Context context, WorkflowAction action) throws ActionExecutorException {
961 JobClient jobClient = null;
962 boolean exception = false;
963 try {
964 Element actionXml = XmlUtils.parseXml(action.getConf());
965 FileSystem actionFs = context.getAppFileSystem();
966 JobConf jobConf = createBaseHadoopConf(context, actionXml);
967 jobClient = createJobClient(context, jobConf);
968 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
969 if (runningJob == null) {
970 context.setExternalStatus(FAILED);
971 context.setExecutionData(FAILED, null);
972 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
973 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", action
974 .getExternalId(), action.getId());
975 }
976 if (runningJob.isComplete()) {
977 Path actionDir = context.getActionDir();
978
979 String user = context.getWorkflow().getUser();
980 String group = context.getWorkflow().getGroup();
981 if (LauncherMapper.hasIdSwap(runningJob, user, group, actionDir)) {
982 String launcherId = action.getExternalId();
983 Path idSwapPath = LauncherMapper.getIdSwapPath(context.getActionDir());
984 InputStream is = actionFs.open(idSwapPath);
985 BufferedReader reader = new BufferedReader(new InputStreamReader(is));
986 Properties props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
987 reader.close();
988 String newId = props.getProperty("id");
989 runningJob = jobClient.getJob(JobID.forName(newId));
990 if (runningJob == null) {
991 context.setExternalStatus(FAILED);
992 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
993 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newId,
994 action.getId());
995 }
996
997 context.setStartData(newId, action.getTrackerUri(), runningJob.getTrackingURL());
998 XLog.getLog(getClass()).info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId,
999 newId);
1000 }
1001 if (runningJob.isComplete()) {
1002 XLog.getLog(getClass()).info(XLog.STD, "action completed, external ID [{0}]",
1003 action.getExternalId());
1004 if (runningJob.isSuccessful() && LauncherMapper.isMainSuccessful(runningJob)) {
1005 getActionData(actionFs, runningJob, action, context);
1006 XLog.getLog(getClass()).info(XLog.STD, "action produced output");
1007 }
1008 else {
1009 XLog log = XLog.getLog(getClass());
1010 String errorReason;
1011 Path actionError = LauncherMapper.getErrorPath(context.getActionDir());
1012 if (actionFs.exists(actionError)) {
1013 InputStream is = actionFs.open(actionError);
1014 BufferedReader reader = new BufferedReader(new InputStreamReader(is));
1015 Properties props = PropertiesUtils.readProperties(reader, -1);
1016 reader.close();
1017 String errorCode = props.getProperty("error.code");
1018 if (errorCode.equals("0")) {
1019 errorCode = "JA018";
1020 }
1021 if (errorCode.equals("-1")) {
1022 errorCode = "JA019";
1023 }
1024 errorReason = props.getProperty("error.reason");
1025 log.warn("Launcher ERROR, reason: {0}", errorReason);
1026 String exMsg = props.getProperty("exception.message");
1027 String errorInfo = (exMsg != null) ? exMsg : errorReason;
1028 context.setErrorInfo(errorCode, errorInfo);
1029 String exStackTrace = props.getProperty("exception.stacktrace");
1030 if (exMsg != null) {
1031 log.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
1032 }
1033 }
1034 else {
1035 errorReason = XLog.format("LauncherMapper died, check Hadoop log for job [{0}:{1}]", action
1036 .getTrackerUri(), action.getExternalId());
1037 log.warn(errorReason);
1038 }
1039 context.setExecutionData(FAILED_KILLED, null);
1040 setActionCompletionData(context, actionFs);
1041 }
1042 }
1043 else {
1044 context.setExternalStatus(RUNNING);
1045 XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]",
1046 action.getExternalId(), action.getExternalStatus());
1047 }
1048 }
1049 else {
1050 context.setExternalStatus(RUNNING);
1051 XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]",
1052 action.getExternalId(), action.getExternalStatus());
1053 }
1054 }
1055 catch (Exception ex) {
1056 XLog.getLog(getClass()).warn("Exception in check(). Message[{0}]", ex.getMessage(), ex);
1057 exception = true;
1058 throw convertException(ex);
1059 }
1060 finally {
1061 if (jobClient != null) {
1062 try {
1063 jobClient.close();
1064 }
1065 catch (Exception e) {
1066 if (exception) {
1067 log.error("JobClient error: ", e);
1068 }
1069 else {
1070 throw convertException(e);
1071 }
1072 }
1073 }
1074 }
1075 }
1076
1077 /**
1078 * Get the output data of an action. Subclasses should override this method
1079 * to get action specific output data.
1080 *
1081 * @param actionFs the FileSystem object
1082 * @param runningJob the runningJob
1083 * @param action the Workflow action
1084 * @param context executor context
1085 *
1086 */
1087 protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
1088 throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
1089 Properties props = null;
1090 if (getCaptureOutput(action)) {
1091 props = new Properties();
1092 if (LauncherMapper.hasOutputData(runningJob)) {
1093 Path actionOutput = LauncherMapper.getOutputDataPath(context.getActionDir());
1094 InputStream is = actionFs.open(actionOutput);
1095 BufferedReader reader = new BufferedReader(new InputStreamReader(is));
1096 props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
1097 reader.close();
1098 }
1099 }
1100 context.setExecutionData(SUCCEEDED, props);
1101 }
1102
1103 protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
1104 Element eConf = XmlUtils.parseXml(action.getConf());
1105 Namespace ns = eConf.getNamespace();
1106 Element captureOutput = eConf.getChild("capture-output", ns);
1107 return captureOutput != null;
1108 }
1109
1110 @Override
1111 public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
1112 JobClient jobClient = null;
1113 boolean exception = false;
1114 try {
1115 Element actionXml = XmlUtils.parseXml(action.getConf());
1116 JobConf jobConf = createBaseHadoopConf(context, actionXml);
1117 jobClient = createJobClient(context, jobConf);
1118 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
1119 if (runningJob != null) {
1120 runningJob.killJob();
1121 }
1122 context.setExternalStatus(KILLED);
1123 context.setExecutionData(KILLED, null);
1124 }
1125 catch (Exception ex) {
1126 exception = true;
1127 throw convertException(ex);
1128 }
1129 finally {
1130 try {
1131 FileSystem actionFs = context.getAppFileSystem();
1132 cleanUpActionDir(actionFs, context);
1133 if (jobClient != null) {
1134 jobClient.close();
1135 }
1136 }
1137 catch (Exception ex) {
1138 if (exception) {
1139 log.error("Error: ", ex);
1140 }
1141 else {
1142 throw convertException(ex);
1143 }
1144 }
1145 }
1146 }
1147
1148 private static Set<String> FINAL_STATUS = new HashSet<String>();
1149
1150 static {
1151 FINAL_STATUS.add(SUCCEEDED);
1152 FINAL_STATUS.add(KILLED);
1153 FINAL_STATUS.add(FAILED);
1154 FINAL_STATUS.add(FAILED_KILLED);
1155 }
1156
1157 @Override
1158 public boolean isCompleted(String externalStatus) {
1159 return FINAL_STATUS.contains(externalStatus);
1160 }
1161
1162
1163 /**
1164 * Return the sharelib name for the action.
1165 * <p/>
1166 * If <code>NULL</code> or emtpy, it means that the action does not use the action
1167 * sharelib.
1168 * <p/>
1169 * If a non-empty string, i.e. <code>foo</code>, it means the action uses the
1170 * action sharelib subdirectory <code>foo</code> and all JARs in the sharelib
1171 * <code>foo</code> directory will be in the action classpath.
1172 * <p/>
1173 * The resolution is done using the following precedence order:
1174 * <ul>
1175 * <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the action configuration</li>
1176 * <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the job configuration</li>
1177 * <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the oozie configuration</li>
1178 * <li>Action Executor <code>getDefaultShareLibName()</code> method</li>
1179 * </ul>
1180 *
1181 *
1182 * @param context executor context.
1183 * @param actionXml
1184 *@param conf action configuration. @return the action sharelib name.
1185 */
1186 protected String getShareLibName(Context context, Element actionXml, Configuration conf) {
1187 String name = conf.get(ACTION_SHARELIB_FOR + getType());
1188 if (name == null) {
1189 try {
1190 XConfiguration jobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
1191 name = jobConf.get(ACTION_SHARELIB_FOR + getType());
1192 if (name == null) {
1193 name = Services.get().getConf().get(ACTION_SHARELIB_FOR + getType());
1194 if (name == null) {
1195 name = getDefaultShareLibName(actionXml);
1196 }
1197 }
1198 }
1199 catch (IOException ex) {
1200 throw new RuntimeException("It cannot happen, " + ex.toString(), ex);
1201 }
1202 }
1203 return name;
1204 }
1205
1206 private final static String ACTION_SHARELIB_FOR = "oozie.action.sharelib.for.";
1207
1208
1209 /**
1210 * Returns the default sharelib name for the action if any.
1211 *
1212 * @param actionXml the action XML fragment.
1213 * @return the sharelib name for the action, <code>NULL</code> if none.
1214 */
1215 protected String getDefaultShareLibName(Element actionXml) {
1216 return null;
1217 }
1218
1219 /**
1220 * Sets some data for the action on completion
1221 *
1222 * @param context executor context
1223 * @param actionFs the FileSystem object
1224 */
1225 protected void setActionCompletionData(Context context, FileSystem actionFs) throws IOException,
1226 HadoopAccessorException, URISyntaxException {
1227 }
1228 }