001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.io.BufferedReader;
021    import java.io.ByteArrayOutputStream;
022    import java.io.File;
023    import java.io.FileNotFoundException;
024    import java.io.IOException;
025    import java.io.InputStream;
026    import java.io.InputStreamReader;
027    import java.io.PrintStream;
028    import java.io.StringReader;
029    import java.net.ConnectException;
030    import java.net.URI;
031    import java.net.URISyntaxException;
032    import java.net.UnknownHostException;
033    import java.util.ArrayList;
034    import java.util.HashMap;
035    import java.util.HashSet;
036    import java.util.Iterator;
037    import java.util.List;
038    import java.util.Map;
039    import java.util.Properties;
040    import java.util.Set;
041    import java.util.Map.Entry;
042    
043    import org.apache.hadoop.conf.Configuration;
044    import org.apache.hadoop.filecache.DistributedCache;
045    import org.apache.hadoop.fs.FileStatus;
046    import org.apache.hadoop.fs.FileSystem;
047    import org.apache.hadoop.fs.Path;
048    import org.apache.hadoop.fs.permission.AccessControlException;
049    import org.apache.hadoop.io.Text;
050    import org.apache.hadoop.mapred.JobClient;
051    import org.apache.hadoop.mapred.JobConf;
052    import org.apache.hadoop.mapred.JobID;
053    import org.apache.hadoop.mapred.RunningJob;
054    import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
055    import org.apache.hadoop.util.DiskChecker;
056    import org.apache.oozie.WorkflowActionBean;
057    import org.apache.oozie.WorkflowJobBean;
058    import org.apache.oozie.action.ActionExecutor;
059    import org.apache.oozie.action.ActionExecutorException;
060    import org.apache.oozie.client.OozieClient;
061    import org.apache.oozie.client.WorkflowAction;
062    import org.apache.oozie.service.HadoopAccessorException;
063    import org.apache.oozie.service.HadoopAccessorService;
064    import org.apache.oozie.service.Services;
065    import org.apache.oozie.service.WorkflowAppService;
066    import org.apache.oozie.servlet.CallbackServlet;
067    import org.apache.oozie.util.ELEvaluator;
068    import org.apache.oozie.util.IOUtils;
069    import org.apache.oozie.util.PropertiesUtils;
070    import org.apache.oozie.util.XConfiguration;
071    import org.apache.oozie.util.XLog;
072    import org.apache.oozie.util.XmlUtils;
073    import org.jdom.Element;
074    import org.jdom.JDOMException;
075    import org.jdom.Namespace;
076    import org.apache.hadoop.security.token.Token;
077    import org.apache.hadoop.security.token.TokenIdentifier;
078    
079    public class JavaActionExecutor extends ActionExecutor {
080    
081        private static final String HADOOP_USER = "user.name";
082        private static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
083        private static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
084        private static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
085        private static final String HADOOP_NAME_NODE = "fs.default.name";
086        public static final String OOZIE_COMMON_LIBDIR = "oozie";
087        public static final int MAX_EXTERNAL_STATS_SIZE_DEFAULT = Integer.MAX_VALUE;
088        private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>();
089        public final static String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size";
090        public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
091        public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
092        private static int maxActionOutputLen;
093        private static int maxExternalStatsSize;
094    
095        private static final String SUCCEEDED = "SUCCEEDED";
096        private static final String KILLED = "KILLED";
097        private static final String FAILED = "FAILED";
098        private static final String FAILED_KILLED = "FAILED/KILLED";
099        private static final String RUNNING = "RUNNING";
100        protected XLog log = XLog.getLog(getClass());
101    
102        static {
103            DISALLOWED_PROPERTIES.add(HADOOP_USER);
104            DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER);
105            DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE);
106            DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER_2);
107            DISALLOWED_PROPERTIES.add(HADOOP_YARN_RM);
108        }
109    
110        public JavaActionExecutor() {
111            this("java");
112            requiresNNJT = true;
113        }
114    
115        protected JavaActionExecutor(String type) {
116            super(type);
117            requiresNNJT = true;
118        }
119    
120        protected String getLauncherJarName() {
121            return getType() + "-launcher.jar";
122        }
123    
124        protected List<Class> getLauncherClasses() {
125            List<Class> classes = new ArrayList<Class>();
126            classes.add(LauncherMapper.class);
127            classes.add(LauncherSecurityManager.class);
128            classes.add(LauncherException.class);
129            classes.add(LauncherMainException.class);
130            classes.add(FileSystemActions.class);
131            classes.add(PrepareActionsDriver.class);
132            classes.add(ActionStats.class);
133            classes.add(ActionType.class);
134            return classes;
135        }
136    
137        @Override
138        public void initActionType() {
139            XLog log = XLog.getLog(getClass());
140            super.initActionType();
141            maxActionOutputLen = getOozieConf()
142              .getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA,
143              // TODO: Remove the below config get in a subsequent release..
144              // This other irrelevant property is only used to
145              // preserve backwards compatibility cause of a typo.
146              // See OOZIE-4.
147              getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN,
148                2 * 1024));
149            //Get the limit for the maximum allowed size of action stats
150            maxExternalStatsSize = getOozieConf().getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE, MAX_EXTERNAL_STATS_SIZE_DEFAULT);
151            maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize;
152            try {
153                List<Class> classes = getLauncherClasses();
154                Class[] launcherClasses = classes.toArray(new Class[classes.size()]);
155                IOUtils.createJar(new File(getOozieRuntimeDir()), getLauncherJarName(), launcherClasses);
156    
157                registerError(UnknownHostException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA001");
158                registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.NON_TRANSIENT,
159                        "JA002");
160                registerError(DiskChecker.DiskOutOfSpaceException.class.getName(),
161                        ActionExecutorException.ErrorType.NON_TRANSIENT, "JA003");
162                registerError(org.apache.hadoop.hdfs.protocol.QuotaExceededException.class.getName(),
163                        ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004");
164                registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(),
165                        ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005");
166                registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "  JA006");
167                registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007");
168                registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008");
169                registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009");
170            }
171            catch (IOException ex) {
172                throw new RuntimeException(ex);
173            }
174            catch (java.lang.NoClassDefFoundError err) {
175                ByteArrayOutputStream baos = new ByteArrayOutputStream();
176                err.printStackTrace(new PrintStream(baos));
177                log.warn(baos.toString());
178            }
179        }
180    
181        /**
182         * Get the maximum allowed size of stats
183         *
184         * @return maximum size of stats
185         */
186        public static int getMaxExternalStatsSize() {
187            return maxExternalStatsSize;
188        }
189    
190        static void checkForDisallowedProps(Configuration conf, String confName) throws ActionExecutorException {
191            for (String prop : DISALLOWED_PROPERTIES) {
192                if (conf.get(prop) != null) {
193                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA010",
194                            "Property [{0}] not allowed in action [{1}] configuration", prop, confName);
195                }
196            }
197        }
198    
199        public JobConf createBaseHadoopConf(Context context, Element actionXml) {
200            Namespace ns = actionXml.getNamespace();
201            String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim();
202            String nameNode = actionXml.getChild("name-node", ns).getTextTrim();
203            JobConf conf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
204            conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER));
205            conf.set(HADOOP_JOB_TRACKER, jobTracker);
206            conf.set(HADOOP_JOB_TRACKER_2, jobTracker);
207            conf.set(HADOOP_YARN_RM, jobTracker);
208            conf.set(HADOOP_NAME_NODE, nameNode);
209            conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true");
210            return conf;
211        }
212    
213        private void injectLauncherProperties(Configuration srcConf, Configuration launcherConf) {
214            for (Map.Entry<String, String> entry : srcConf) {
215                if (entry.getKey().startsWith("oozie.launcher.")) {
216                    String name = entry.getKey().substring("oozie.launcher.".length());
217                    String value = entry.getValue();
218                    // setting original KEY
219                    launcherConf.set(entry.getKey(), value);
220                    // setting un-prefixed key (to allow Hadoop job config
221                    // for the launcher job
222                    launcherConf.set(name, value);
223                }
224            }
225        }
226    
227        Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
228                throws ActionExecutorException {
229            try {
230                Namespace ns = actionXml.getNamespace();
231                Element e = actionXml.getChild("configuration", ns);
232                if (e != null) {
233                    String strConf = XmlUtils.prettyPrint(e).toString();
234                    XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
235    
236                    XConfiguration launcherConf = new XConfiguration();
237                    HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
238                    XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_JOB_TRACKER), getType());
239                    injectLauncherProperties(actionDefaultConf, launcherConf);
240                    injectLauncherProperties(inlineConf, launcherConf);
241                    checkForDisallowedProps(launcherConf, "launcher configuration");
242                    XConfiguration.copy(launcherConf, conf);
243                }
244                return conf;
245            }
246            catch (IOException ex) {
247                throw convertException(ex);
248            }
249        }
250    
251        public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf)
252                throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException {
253            Namespace ns = element.getNamespace();
254            Iterator<Element> it = element.getChildren("job-xml", ns).iterator();
255            while (it.hasNext()) {
256                Element e = it.next();
257                String jobXml = e.getTextTrim();
258                Path path = new Path(appPath, jobXml);
259                FileSystem fs = context.getAppFileSystem();
260                Configuration jobXmlConf = new XConfiguration(fs.open(path));
261                checkForDisallowedProps(jobXmlConf, "job-xml");
262                XConfiguration.copy(jobXmlConf, conf);
263            }
264            Element e = element.getChild("configuration", ns);
265            if (e != null) {
266                String strConf = XmlUtils.prettyPrint(e).toString();
267                XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
268                checkForDisallowedProps(inlineConf, "inline configuration");
269                XConfiguration.copy(inlineConf, conf);
270            }
271        }
272    
273        Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
274                throws ActionExecutorException {
275            try {
276                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
277                XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_JOB_TRACKER), getType());
278                XConfiguration.injectDefaults(actionDefaults, actionConf);
279    
280                has.checkSupportedFilesystem(appPath.toUri());
281    
282                parseJobXmlAndConfiguration(context, actionXml, appPath, actionConf);
283                return actionConf;
284            }
285            catch (IOException ex) {
286                throw convertException(ex);
287            }
288            catch (HadoopAccessorException ex) {
289                throw convertException(ex);
290            }
291            catch (URISyntaxException ex) {
292                throw convertException(ex);
293            }
294        }
295    
296        Configuration addToCache(Configuration conf, Path appPath, String filePath, boolean archive)
297                throws ActionExecutorException {
298            Path path = null;
299            try {
300                if (filePath.startsWith("/")) {
301                    path = new Path(filePath);
302                }
303                else {
304                    path = new Path(appPath, filePath);
305                }
306                URI uri = new URI(path.toUri().getPath());
307                if (archive) {
308                    DistributedCache.addCacheArchive(uri, conf);
309                }
310                else {
311                    String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);
312                    if (fileName.endsWith(".so") || fileName.contains(".so.")) {  // .so files
313                        uri = new Path(path.toString() + "#" + fileName).toUri();
314                        uri = new URI(uri.getPath());
315                        DistributedCache.addCacheFile(uri, conf);
316                    }
317                    else if (fileName.endsWith(".jar")) { // .jar files
318                        if (!fileName.contains("#")) {
319                            path = new Path(uri.toString());
320    
321                            String user = conf.get("user.name");
322                            Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, path, conf);
323                        }
324                        else {
325                            DistributedCache.addCacheFile(uri, conf);
326                        }
327                    }
328                    else { // regular files
329                        if (!fileName.contains("#")) {
330                            uri = new Path(path.toString() + "#" + fileName).toUri();
331                            uri = new URI(uri.getPath());
332                        }
333                        DistributedCache.addCacheFile(uri, conf);
334                    }
335                }
336                DistributedCache.createSymlink(conf);
337                return conf;
338            }
339            catch (Exception ex) {
340                XLog.getLog(getClass()).debug(
341                        "Errors when add to DistributedCache. Path=" + path + ", archive=" + archive + ", conf="
342                                + XmlUtils.prettyPrint(conf).toString());
343                throw convertException(ex);
344            }
345        }
346    
347        String getOozieLauncherJar(Context context) throws ActionExecutorException {
348            try {
349                return new Path(context.getActionDir(), getLauncherJarName()).toString();
350            }
351            catch (Exception ex) {
352                throw convertException(ex);
353            }
354        }
355    
356        public void prepareActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
357            try {
358                Path actionDir = context.getActionDir();
359                Path tempActionDir = new Path(actionDir.getParent(), actionDir.getName() + ".tmp");
360                if (!actionFs.exists(actionDir)) {
361                    try {
362                        actionFs.copyFromLocalFile(new Path(getOozieRuntimeDir(), getLauncherJarName()), new Path(
363                                tempActionDir, getLauncherJarName()));
364                        actionFs.rename(tempActionDir, actionDir);
365                    }
366                    catch (IOException ex) {
367                        actionFs.delete(tempActionDir, true);
368                        actionFs.delete(actionDir, true);
369                        throw ex;
370                    }
371                }
372            }
373            catch (Exception ex) {
374                throw convertException(ex);
375            }
376        }
377    
378        void cleanUpActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
379            try {
380                Path actionDir = context.getActionDir();
381                if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)
382                        && actionFs.exists(actionDir)) {
383                    actionFs.delete(actionDir, true);
384                }
385            }
386            catch (Exception ex) {
387                throw convertException(ex);
388            }
389        }
390    
391        protected void addShareLib(Path appPath, Configuration conf, String actionShareLibName)
392        throws ActionExecutorException {
393            if (actionShareLibName != null) {
394                try {
395                    Path systemLibPath = Services.get().get(WorkflowAppService.class).getSystemLibPath();
396                    if (systemLibPath != null) {
397                        Path actionLibPath = new Path(systemLibPath, actionShareLibName);
398                        String user = conf.get("user.name");
399                        FileSystem fs =
400                            Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf);
401                        if (fs.exists(actionLibPath)) {
402                            FileStatus[] files = fs.listStatus(actionLibPath);
403                            for (FileStatus file : files) {
404                                addToCache(conf, appPath, file.getPath().toUri().getPath(), false);
405                            }
406                        }
407                    }
408                }
409                catch (HadoopAccessorException ex){
410                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
411                            ex.getErrorCode().toString(), ex.getMessage());
412                }
413                catch (IOException ex){
414                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
415                            "It should never happen", ex.getMessage());
416                }
417            }
418        }
419    
420        protected void addActionLibs(Path appPath, Configuration conf) throws ActionExecutorException {
421            String[] actionLibsStrArr = conf.getStrings("oozie.launcher.oozie.libpath");
422            if (actionLibsStrArr != null) {
423                try {
424                    for (String actionLibsStr : actionLibsStrArr) {
425                        actionLibsStr = actionLibsStr.trim();
426                        if (actionLibsStr.length() > 0)
427                        {
428                            Path actionLibsPath = new Path(actionLibsStr);
429                            String user = conf.get("user.name");
430                            FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf);
431                            if (fs.exists(actionLibsPath)) {
432                                FileStatus[] files = fs.listStatus(actionLibsPath);
433                                for (FileStatus file : files) {
434                                    addToCache(conf, appPath, file.getPath().toUri().getPath(), false);
435                                }
436                            }
437                        }
438                    }
439                }
440                catch (HadoopAccessorException ex){
441                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
442                            ex.getErrorCode().toString(), ex.getMessage());
443                }
444                catch (IOException ex){
445                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
446                            "It should never happen", ex.getMessage());
447                }
448            }
449        }
450    
451        @SuppressWarnings("unchecked")
452        void setLibFilesArchives(Context context, Element actionXml, Path appPath, Configuration conf)
453                throws ActionExecutorException {
454            Configuration proto = context.getProtoActionConf();
455    
456            // launcher JAR
457            addToCache(conf, appPath, getOozieLauncherJar(context), false);
458    
459            // Workflow lib/
460            String[] paths = proto.getStrings(WorkflowAppService.APP_LIB_PATH_LIST);
461            if (paths != null) {
462                for (String path : paths) {
463                    addToCache(conf, appPath, path, false);
464                }
465            }
466    
467            // Action libs
468            addActionLibs(appPath, conf);
469    
470            // files and archives defined in the action
471            for (Element eProp : (List<Element>) actionXml.getChildren()) {
472                if (eProp.getName().equals("file")) {
473                    String path = eProp.getTextTrim();
474                    addToCache(conf, appPath, path, false);
475                }
476                else {
477                    if (eProp.getName().equals("archive")) {
478                        String path = eProp.getTextTrim();
479                        addToCache(conf, appPath, path, true);
480                    }
481                }
482            }
483    
484            addAllShareLibs(appPath, conf, context, actionXml);
485            }
486    
487        // Adds action specific share libs and common share libs
488        private void addAllShareLibs(Path appPath, Configuration conf, Context context, Element actionXml)
489                throws ActionExecutorException {
490            // Add action specific share libs
491            addActionShareLib(appPath, conf, context, actionXml);
492            // Add common sharelibs for Oozie
493            addShareLib(appPath, conf, JavaActionExecutor.OOZIE_COMMON_LIBDIR);
494        }
495    
496        private void addActionShareLib(Path appPath, Configuration conf, Context context, Element actionXml) throws ActionExecutorException {
497            XConfiguration wfJobConf = null;
498            try {
499                wfJobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
500            }
501            catch (IOException ioe) {
502                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen",
503                        ioe.getMessage());
504            }
505            // Action sharelibs are only added if user has specified to use system libpath
506            if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) {
507                // add action specific sharelibs
508                addShareLib(appPath, conf, getShareLibName(context, actionXml, conf));
509            }
510        }
511    
512    
513        protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
514            Namespace ns = actionXml.getNamespace();
515            Element e = actionXml.getChild("main-class", ns);
516            return e.getTextTrim();
517        }
518    
519        private static final String QUEUE_NAME = "mapred.job.queue.name";
520    
521        private static final Set<String> SPECIAL_PROPERTIES = new HashSet<String>();
522    
523        static {
524            SPECIAL_PROPERTIES.add(QUEUE_NAME);
525            SPECIAL_PROPERTIES.add(ACL_VIEW_JOB);
526            SPECIAL_PROPERTIES.add(ACL_MODIFY_JOB);
527        }
528    
529        @SuppressWarnings("unchecked")
530        JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf)
531                throws ActionExecutorException {
532            try {
533    
534                // app path could be a file
535                Path appPathRoot = new Path(context.getWorkflow().getAppPath());
536                if (actionFs.isFile(appPathRoot)) {
537                    appPathRoot = appPathRoot.getParent();
538                }
539    
540                // launcher job configuration
541                JobConf launcherJobConf = createBaseHadoopConf(context, actionXml);
542                setupLauncherConf(launcherJobConf, actionXml, appPathRoot, context);
543    
544                String actionShareLibProperty = actionConf.get(ACTION_SHARELIB_FOR + getType());
545                if (actionShareLibProperty != null) {
546                    launcherJobConf.set(ACTION_SHARELIB_FOR + getType(), actionShareLibProperty);
547                }
548                setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf);
549                String jobName = XLog.format("oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(), context.getWorkflow()
550                        .getAppName(), action.getName(), context.getWorkflow().getId());
551                launcherJobConf.setJobName(jobName);
552    
553                String jobId = context.getWorkflow().getId();
554                String actionId = action.getId();
555                Path actionDir = context.getActionDir();
556                String recoveryId = context.getRecoveryId();
557    
558                // Getting the prepare XML from the action XML
559                Namespace ns = actionXml.getNamespace();
560                Element prepareElement = actionXml.getChild("prepare", ns);
561                String prepareXML = "";
562                if (prepareElement != null) {
563                    if (prepareElement.getChildren().size() > 0) {
564                        prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim();
565                    }
566                }
567                LauncherMapper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf,
568                        prepareXML);
569    
570                LauncherMapper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
571                LauncherMapper.setupSupportedFileSystems(
572                    launcherJobConf, Services.get().getConf().get(HadoopAccessorService.SUPPORTED_FILESYSTEMS));
573                LauncherMapper.setupMaxOutputData(launcherJobConf, maxActionOutputLen);
574                LauncherMapper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize);
575    
576                List<Element> list = actionXml.getChildren("arg", ns);
577                String[] args = new String[list.size()];
578                for (int i = 0; i < list.size(); i++) {
579                    args[i] = list.get(i).getTextTrim();
580                }
581                LauncherMapper.setupMainArguments(launcherJobConf, args);
582    
583                List<Element> javaopts = actionXml.getChildren("java-opt", ns);
584                for (Element opt: javaopts) {
585                    String opts = launcherJobConf.get("mapred.child.java.opts", "");
586                    opts = opts + " " + opt.getTextTrim();
587                    opts = opts.trim();
588                    launcherJobConf.set("mapred.child.java.opts", opts);
589                }
590    
591                Element opt = actionXml.getChild("java-opts", ns);
592                if (opt != null) {
593                    String opts = launcherJobConf.get("mapred.child.java.opts", "");
594                    opts = opts + " " + opt.getTextTrim();
595                    opts = opts.trim();
596                    launcherJobConf.set("mapred.child.java.opts", opts);
597                }
598    
599                // properties from action that are needed by the launcher (e.g. QUEUE NAME, ACLs)
600                // maybe we should add queue to the WF schema, below job-tracker
601                actionConfToLauncherConf(actionConf, launcherJobConf);
602    
603                // to disable cancelation of delegation token on launcher job end
604                launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
605    
606                return launcherJobConf;
607            }
608            catch (Exception ex) {
609                throw convertException(ex);
610            }
611        }
612    
613        private void injectCallback(Context context, Configuration conf) {
614            String callback = context.getCallbackUrl("$jobStatus");
615            if (conf.get("job.end.notification.url") != null) {
616                XLog.getLog(getClass()).warn("Overriding the action job end notification URI");
617            }
618            conf.set("job.end.notification.url", callback);
619        }
620    
621        void injectActionCallback(Context context, Configuration actionConf) {
622            injectCallback(context, actionConf);
623        }
624    
625        void injectLauncherCallback(Context context, Configuration launcherConf) {
626            injectCallback(context, launcherConf);
627        }
628    
629        private void actionConfToLauncherConf(Configuration actionConf, JobConf launcherConf) {
630            for (String name : SPECIAL_PROPERTIES) {
631                if (actionConf.get(name) != null && launcherConf.get("oozie.launcher." + name) == null) {
632                    launcherConf.set(name, actionConf.get(name));
633                }
634            }
635        }
636    
637        public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
638            JobClient jobClient = null;
639            boolean exception = false;
640            try {
641                Path appPathRoot = new Path(context.getWorkflow().getAppPath());
642    
643                // app path could be a file
644                if (actionFs.isFile(appPathRoot)) {
645                    appPathRoot = appPathRoot.getParent();
646                }
647    
648                Element actionXml = XmlUtils.parseXml(action.getConf());
649    
650                // action job configuration
651                Configuration actionConf = createBaseHadoopConf(context, actionXml);
652                setupActionConf(actionConf, context, actionXml, appPathRoot);
653                XLog.getLog(getClass()).debug("Setting LibFilesArchives ");
654                setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
655                String jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}", getType(), context.getWorkflow()
656                        .getAppName(), action.getName(), context.getWorkflow().getId());
657                actionConf.set("mapred.job.name", jobName);
658                injectActionCallback(context, actionConf);
659    
660                if(actionConf.get(ACL_MODIFY_JOB) == null || actionConf.get(ACL_MODIFY_JOB).trim().equals("")) {
661                    // ONLY in the case where user has not given the
662                    // modify-job ACL specifically
663                    if (context.getWorkflow().getAcl() != null) {
664                        // setting the group owning the Oozie job to allow anybody in that
665                        // group to modify the jobs.
666                        actionConf.set(ACL_MODIFY_JOB, context.getWorkflow().getAcl());
667                    }
668                }
669    
670                // Setting the credential properties in launcher conf
671                HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context,
672                        action, actionConf);
673    
674                // Adding if action need to set more credential tokens
675                JobConf credentialsConf = new JobConf(false);
676                XConfiguration.copy(actionConf, credentialsConf);
677                setCredentialTokens(credentialsConf, context, action, credentialsProperties);
678    
679                // insert conf to action conf from credentialsConf
680                for (Entry<String, String> entry : credentialsConf) {
681                    if (actionConf.get(entry.getKey()) == null) {
682                        actionConf.set(entry.getKey(), entry.getValue());
683                    }
684                }
685    
686                JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
687                injectLauncherCallback(context, launcherJobConf);
688                XLog.getLog(getClass()).debug("Creating Job Client for action " + action.getId());
689                jobClient = createJobClient(context, launcherJobConf);
690                String launcherId = LauncherMapper.getRecoveryId(launcherJobConf, context.getActionDir(), context
691                        .getRecoveryId());
692                boolean alreadyRunning = launcherId != null;
693                RunningJob runningJob;
694    
695                // if user-retry is on, always submit new launcher
696                boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry();
697    
698                if (alreadyRunning && !isUserRetry) {
699                    runningJob = jobClient.getJob(JobID.forName(launcherId));
700                    if (runningJob == null) {
701                        String jobTracker = launcherJobConf.get("mapred.job.tracker");
702                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
703                                "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker);
704                    }
705                }
706                else {
707                    XLog.getLog(getClass()).debug("Submitting the job through Job Client for action " + action.getId());
708    
709                    // setting up propagation of the delegation token.
710                    Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(new Text("mr token"));
711                    launcherJobConf.getCredentials().addToken(new Text("mr token"), mrdt);
712    
713                    // insert credentials tokens to launcher job conf if needed
714                    if (needInjectCredentials()) {
715                        for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) {
716                            log.debug("ADDING TOKEN: " + tk.getKind().toString());
717                            launcherJobConf.getCredentials().addToken(tk.getKind(), tk);
718                        }
719                    }
720                    else {
721                        log.info("No need to inject credentials.");
722                    }
723                    runningJob = jobClient.submitJob(launcherJobConf);
724                    if (runningJob == null) {
725                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
726                                "Error submitting launcher for action [{0}]", action.getId());
727                    }
728                    launcherId = runningJob.getID().toString();
729                    XLog.getLog(getClass()).debug("After submission get the launcherId " + launcherId);
730                }
731    
732                String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
733                String consoleUrl = runningJob.getTrackingURL();
734                context.setStartData(launcherId, jobTracker, consoleUrl);
735            }
736            catch (Exception ex) {
737                exception = true;
738                throw convertException(ex);
739            }
740            finally {
741                if (jobClient != null) {
742                    try {
743                        jobClient.close();
744                    }
745                    catch (Exception e) {
746                        if (exception) {
747                            log.error("JobClient error: ", e);
748                        }
749                        else {
750                            throw convertException(e);
751                        }
752                    }
753                }
754            }
755        }
756    
757        private boolean needInjectCredentials() {
758            boolean methodExists = true;
759    
760            Class klass;
761            try {
762                klass = Class.forName("org.apache.hadoop.mapred.JobConf");
763                klass.getMethod("getCredentials");
764            }
765            catch (ClassNotFoundException ex) {
766                methodExists = false;
767            }
768            catch (NoSuchMethodException ex) {
769                methodExists = false;
770            }
771    
772            return methodExists;
773        }
774    
775        protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context,
776                WorkflowAction action, Configuration actionConf) throws Exception {
777            HashMap<String, CredentialsProperties> credPropertiesMap = null;
778            if (context != null && action != null) {
779                credPropertiesMap = getActionCredentialsProperties(context, action, actionConf);
780                if (credPropertiesMap != null) {
781                    for (String key : credPropertiesMap.keySet()) {
782                        CredentialsProperties prop = credPropertiesMap.get(key);
783                        if (prop != null) {
784                            log.debug("Credential Properties set for action : " + action.getId());
785                            for (String property : prop.getProperties().keySet()) {
786                                actionConf.set(property, prop.getProperties().get(property));
787                                log.debug("property : '" + property + "', value : '" + prop.getProperties().get(property) + "'");
788                            }
789                        }
790                    }
791                }
792                else {
793                    log.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred());
794                }
795            }
796            else {
797                log.warn("context or action is null");
798            }
799            return credPropertiesMap;
800        }
801    
802        protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action,
803                HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception {
804    
805            if (context != null && action != null && credPropertiesMap != null) {
806                for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) {
807                    String credName = entry.getKey();
808                    CredentialsProperties credProps = entry.getValue();
809                    if (credProps != null) {
810                        CredentialsProvider credProvider = new CredentialsProvider(credProps.getType());
811                        Credentials credentialObject = credProvider.createCredentialObject();
812                        if (credentialObject != null) {
813                            credentialObject.addtoJobConf(jobconf, credProps, context);
814                            log.debug("Retrieved Credential '" + credName + "' for action " + action.getId());
815                        }
816                        else {
817                            log.debug("Credentials object is null for name= " + credName + ", type=" + credProps.getType());
818                        }
819                    }
820                }
821            }
822    
823        }
824    
825        protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context,
826                WorkflowAction action, Configuration conf) throws Exception {
827            HashMap<String, CredentialsProperties> props = new HashMap<String, CredentialsProperties>();
828            if (context != null && action != null) {
829                String credsInAction = action.getCred();
830                log.debug("Get credential '" + credsInAction + "' properties for action : " + action.getId());
831                String[] credNames = credsInAction.split(",");
832                for (String credName : credNames) {
833                    CredentialsProperties credProps = getCredProperties(context, credName, conf);
834                    props.put(credName, credProps);
835                }
836            }
837            else {
838                log.warn("context or action is null");
839            }
840            return props;
841        }
842    
843        @SuppressWarnings("unchecked")
844        protected CredentialsProperties getCredProperties(Context context, String credName, Configuration conf)
845                throws Exception {
846            CredentialsProperties credProp = null;
847            String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition();
848            Element elementJob = XmlUtils.parseXml(workflowXml);
849            Element credentials = elementJob.getChild("credentials", elementJob.getNamespace());
850            if (credentials != null) {
851                for (Element credential : (List<Element>) credentials.getChildren("credential", credentials.getNamespace())) {
852                    String name = credential.getAttributeValue("name");
853                    String type = credential.getAttributeValue("type");
854                    log.debug("getCredProperties: Name: " + name + ", Type: " + type);
855                    if (name.equalsIgnoreCase(credName)) {
856                        credProp = new CredentialsProperties(name, type);
857                        for (Element property : (List<Element>) credential.getChildren("property",
858                                credential.getNamespace())) {
859                            String propertyName = property.getChildText("name", property.getNamespace());
860                            String propertyValue = property.getChildText("value", property.getNamespace());
861                            ELEvaluator eval = new ELEvaluator();
862                            for (Map.Entry<String, String> entry : conf) {
863                                eval.setVariable(entry.getKey(), entry.getValue().trim());
864                            }
865                            propertyName = eval.evaluate(propertyName, String.class);
866                            propertyValue = eval.evaluate(propertyValue, String.class);
867    
868                            credProp.getProperties().put(propertyName, propertyValue);
869                            log.debug("getCredProperties: Properties name :'" + propertyName + "', Value : '"
870                                    + propertyValue + "'");
871                        }
872                    }
873                }
874            } else {
875                log.warn("credentials is null for the action");
876            }
877            return credProp;
878        }
879    
880        @Override
881        public void start(Context context, WorkflowAction action) throws ActionExecutorException {
882            try {
883                XLog.getLog(getClass()).debug("Starting action " + action.getId() + " getting Action File System");
884                FileSystem actionFs = context.getAppFileSystem();
885                XLog.getLog(getClass()).debug("Preparing action Dir through copying " + context.getActionDir());
886                prepareActionDir(actionFs, context);
887                XLog.getLog(getClass()).debug("Action Dir is ready. Submitting the action ");
888                submitLauncher(actionFs, context, action);
889                XLog.getLog(getClass()).debug("Action submit completed. Performing check ");
890                check(context, action);
891                XLog.getLog(getClass()).debug("Action check is done after submission");
892            }
893            catch (Exception ex) {
894                throw convertException(ex);
895            }
896        }
897    
898        @Override
899        public void end(Context context, WorkflowAction action) throws ActionExecutorException {
900            try {
901                String externalStatus = action.getExternalStatus();
902                WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ? WorkflowAction.Status.OK
903                        : WorkflowAction.Status.ERROR;
904                context.setEndData(status, getActionSignal(status));
905            }
906            catch (Exception ex) {
907                throw convertException(ex);
908            }
909            finally {
910                try {
911                    FileSystem actionFs = context.getAppFileSystem();
912                    cleanUpActionDir(actionFs, context);
913                }
914                catch (Exception ex) {
915                    throw convertException(ex);
916                }
917            }
918        }
919    
920        /**
921         * Create job client object
922         *
923         * @param context
924         * @param jobConf
925         * @return
926         * @throws HadoopAccessorException
927         */
928        protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException {
929            String user = context.getWorkflow().getUser();
930            String group = context.getWorkflow().getGroup();
931            return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
932        }
933    
934        @Override
935        public void check(Context context, WorkflowAction action) throws ActionExecutorException {
936            JobClient jobClient = null;
937            boolean exception = false;
938            try {
939                Element actionXml = XmlUtils.parseXml(action.getConf());
940                FileSystem actionFs = context.getAppFileSystem();
941                JobConf jobConf = createBaseHadoopConf(context, actionXml);
942                jobClient = createJobClient(context, jobConf);
943                RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
944                if (runningJob == null) {
945                    context.setExternalStatus(FAILED);
946                    context.setExecutionData(FAILED, null);
947                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
948                            "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", action
949                                    .getExternalId(), action.getId());
950                }
951                if (runningJob.isComplete()) {
952                    Path actionDir = context.getActionDir();
953    
954                    String user = context.getWorkflow().getUser();
955                    String group = context.getWorkflow().getGroup();
956                    if (LauncherMapper.hasIdSwap(runningJob, user, group, actionDir)) {
957                        String launcherId = action.getExternalId();
958                        Path idSwapPath = LauncherMapper.getIdSwapPath(context.getActionDir());
959                        InputStream is = actionFs.open(idSwapPath);
960                        BufferedReader reader = new BufferedReader(new InputStreamReader(is));
961                        Properties props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
962                        reader.close();
963                        String newId = props.getProperty("id");
964                        runningJob = jobClient.getJob(JobID.forName(newId));
965                        if (runningJob == null) {
966                            context.setExternalStatus(FAILED);
967                            throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
968                                    "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", newId,
969                                    action.getId());
970                        }
971    
972                        context.setStartData(newId, action.getTrackerUri(), runningJob.getTrackingURL());
973                        XLog.getLog(getClass()).info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId,
974                                newId);
975                    }
976                    if (runningJob.isComplete()) {
977                        XLog.getLog(getClass()).info(XLog.STD, "action completed, external ID [{0}]",
978                                action.getExternalId());
979                        if (runningJob.isSuccessful() && LauncherMapper.isMainSuccessful(runningJob)) {
980                            getActionData(actionFs, runningJob, action, context);
981                            XLog.getLog(getClass()).info(XLog.STD, "action produced output");
982                        }
983                        else {
984                            XLog log = XLog.getLog(getClass());
985                            String errorReason;
986                            Path actionError = LauncherMapper.getErrorPath(context.getActionDir());
987                            if (actionFs.exists(actionError)) {
988                                InputStream is = actionFs.open(actionError);
989                                BufferedReader reader = new BufferedReader(new InputStreamReader(is));
990                                Properties props = PropertiesUtils.readProperties(reader, -1);
991                                reader.close();
992                                String errorCode = props.getProperty("error.code");
993                                if (errorCode.equals("0")) {
994                                    errorCode = "JA018";
995                                }
996                                if (errorCode.equals("-1")) {
997                                    errorCode = "JA019";
998                                }
999                                errorReason = props.getProperty("error.reason");
1000                                log.warn("Launcher ERROR, reason: {0}", errorReason);
1001                                String exMsg = props.getProperty("exception.message");
1002                                String errorInfo = (exMsg != null) ? exMsg : errorReason;
1003                                context.setErrorInfo(errorCode, errorInfo);
1004                                String exStackTrace = props.getProperty("exception.stacktrace");
1005                                if (exMsg != null) {
1006                                    log.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
1007                                }
1008                            }
1009                            else {
1010                                errorReason = XLog.format("LauncherMapper died, check Hadoop log for job [{0}:{1}]", action
1011                                        .getTrackerUri(), action.getExternalId());
1012                                log.warn(errorReason);
1013                            }
1014                            context.setExecutionData(FAILED_KILLED, null);
1015                        }
1016                    }
1017                    else {
1018                        context.setExternalStatus(RUNNING);
1019                        XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]",
1020                                action.getExternalId(), action.getExternalStatus());
1021                    }
1022                }
1023                else {
1024                    context.setExternalStatus(RUNNING);
1025                    XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]",
1026                            action.getExternalId(), action.getExternalStatus());
1027                }
1028            }
1029            catch (Exception ex) {
1030                XLog.getLog(getClass()).warn("Exception in check(). Message[{0}]", ex.getMessage(), ex);
1031                exception = true;
1032                throw convertException(ex);
1033            }
1034            finally {
1035                if (jobClient != null) {
1036                    try {
1037                        jobClient.close();
1038                    }
1039                    catch (Exception e) {
1040                        if (exception) {
1041                            log.error("JobClient error: ", e);
1042                        }
1043                        else {
1044                            throw convertException(e);
1045                        }
1046                    }
1047                }
1048            }
1049        }
1050    
1051        /**
1052         * Get the output data of an action. Subclasses should override this method
1053         * to get action specific output data.
1054         *
1055         * @param actionFs the FileSystem object
1056         * @param runningJob the runningJob
1057         * @param action the Workflow action
1058         * @param context executor context
1059         *
1060         */
1061        protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
1062                throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
1063            Properties props = null;
1064            if (getCaptureOutput(action)) {
1065                props = new Properties();
1066                if (LauncherMapper.hasOutputData(runningJob)) {
1067                    Path actionOutput = LauncherMapper.getOutputDataPath(context.getActionDir());
1068                    InputStream is = actionFs.open(actionOutput);
1069                    BufferedReader reader = new BufferedReader(new InputStreamReader(is));
1070                    props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
1071                    reader.close();
1072                }
1073            }
1074            context.setExecutionData(SUCCEEDED, props);
1075        }
1076    
1077        protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
1078            Element eConf = XmlUtils.parseXml(action.getConf());
1079            Namespace ns = eConf.getNamespace();
1080            Element captureOutput = eConf.getChild("capture-output", ns);
1081            return captureOutput != null;
1082        }
1083    
1084        @Override
1085        public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
1086            JobClient jobClient = null;
1087            boolean exception = false;
1088            try {
1089                Element actionXml = XmlUtils.parseXml(action.getConf());
1090                JobConf jobConf = createBaseHadoopConf(context, actionXml);
1091                jobClient = createJobClient(context, jobConf);
1092                RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
1093                if (runningJob != null) {
1094                    runningJob.killJob();
1095                }
1096                context.setExternalStatus(KILLED);
1097                context.setExecutionData(KILLED, null);
1098            }
1099            catch (Exception ex) {
1100                exception = true;
1101                throw convertException(ex);
1102            }
1103            finally {
1104                try {
1105                    FileSystem actionFs = context.getAppFileSystem();
1106                    cleanUpActionDir(actionFs, context);
1107                    if (jobClient != null) {
1108                        jobClient.close();
1109                    }
1110                }
1111                catch (Exception ex) {
1112                    if (exception) {
1113                        log.error("Error: ", ex);
1114                    }
1115                    else {
1116                        throw convertException(ex);
1117                    }
1118                }
1119            }
1120        }
1121    
1122        private static Set<String> FINAL_STATUS = new HashSet<String>();
1123    
1124        static {
1125            FINAL_STATUS.add(SUCCEEDED);
1126            FINAL_STATUS.add(KILLED);
1127            FINAL_STATUS.add(FAILED);
1128            FINAL_STATUS.add(FAILED_KILLED);
1129        }
1130    
1131        @Override
1132        public boolean isCompleted(String externalStatus) {
1133            return FINAL_STATUS.contains(externalStatus);
1134        }
1135    
1136    
1137        /**
1138         * Return the sharelib name for the action.
1139         * <p/>
1140         * If <code>NULL</code> or emtpy, it means that the action does not use the action
1141         * sharelib.
1142         * <p/>
1143         * If a non-empty string, i.e. <code>foo</code>, it means the action uses the
1144         * action sharelib subdirectory <code>foo</code> and all JARs in the sharelib
1145         * <code>foo</code> directory will be in the action classpath.
1146         * <p/>
1147         * The resolution is done using the following precedence order:
1148         * <ul>
1149         *     <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the action configuration</li>
1150         *     <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the job configuration</li>
1151         *     <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the oozie configuration</li>
1152         *     <li>Action Executor <code>getDefaultShareLibName()</code> method</li>
1153         * </ul>
1154         *
1155         *
1156         * @param context executor context.
1157         * @param actionXml
1158         *@param conf action configuration.  @return the action sharelib name.
1159         */
1160        protected String getShareLibName(Context context, Element actionXml, Configuration conf) {
1161            String name = conf.get(ACTION_SHARELIB_FOR + getType());
1162            if (name == null) {
1163                try {
1164                    XConfiguration jobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
1165                    name = jobConf.get(ACTION_SHARELIB_FOR + getType());
1166                    if (name == null) {
1167                        name = Services.get().getConf().get(ACTION_SHARELIB_FOR + getType());
1168                        if (name == null) {
1169                            name = getDefaultShareLibName(actionXml);
1170                        }
1171                    }
1172                }
1173                catch (IOException ex) {
1174                    throw new RuntimeException("It cannot happen, " + ex.toString(), ex);
1175                }
1176            }
1177            return name;
1178        }
1179    
1180        private final static String ACTION_SHARELIB_FOR = "oozie.action.sharelib.for.";
1181    
1182    
1183        /**
1184         * Returns the default sharelib name for the action if any.
1185         *
1186         * @param actionXml the action XML fragment.
1187         * @return the sharelib name for the action, <code>NULL</code> if none.
1188         */
1189        protected String getDefaultShareLibName(Element actionXml) {
1190            return null;
1191        }
1192    }