001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.io.BufferedReader;
021    import java.io.ByteArrayOutputStream;
022    import java.io.File;
023    import java.io.FileNotFoundException;
024    import java.io.IOException;
025    import java.io.InputStream;
026    import java.io.InputStreamReader;
027    import java.io.PrintStream;
028    import java.io.StringReader;
029    import java.net.ConnectException;
030    import java.net.URI;
031    import java.net.URISyntaxException;
032    import java.net.UnknownHostException;
033    import java.util.ArrayList;
034    import java.util.HashMap;
035    import java.util.HashSet;
036    import java.util.Iterator;
037    import java.util.List;
038    import java.util.Map;
039    import java.util.Properties;
040    import java.util.Set;
041    import java.util.Map.Entry;
042    
043    import org.apache.hadoop.conf.Configuration;
044    import org.apache.hadoop.filecache.DistributedCache;
045    import org.apache.hadoop.fs.FileStatus;
046    import org.apache.hadoop.fs.FileSystem;
047    import org.apache.hadoop.fs.Path;
048    import org.apache.hadoop.fs.permission.AccessControlException;
049    import org.apache.hadoop.mapred.JobClient;
050    import org.apache.hadoop.mapred.JobConf;
051    import org.apache.hadoop.mapred.JobID;
052    import org.apache.hadoop.mapred.RunningJob;
053    import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
054    import org.apache.hadoop.util.DiskChecker;
055    import org.apache.oozie.WorkflowActionBean;
056    import org.apache.oozie.WorkflowJobBean;
057    import org.apache.oozie.action.ActionExecutor;
058    import org.apache.oozie.action.ActionExecutorException;
059    import org.apache.oozie.client.OozieClient;
060    import org.apache.oozie.client.WorkflowAction;
061    import org.apache.oozie.service.HadoopAccessorException;
062    import org.apache.oozie.service.HadoopAccessorService;
063    import org.apache.oozie.service.Services;
064    import org.apache.oozie.service.WorkflowAppService;
065    import org.apache.oozie.servlet.CallbackServlet;
066    import org.apache.oozie.util.ELEvaluator;
067    import org.apache.oozie.util.IOUtils;
068    import org.apache.oozie.util.PropertiesUtils;
069    import org.apache.oozie.util.XConfiguration;
070    import org.apache.oozie.util.XLog;
071    import org.apache.oozie.util.XmlUtils;
072    import org.jdom.Element;
073    import org.jdom.JDOMException;
074    import org.jdom.Namespace;
075    import org.apache.hadoop.security.token.Token;
076    import org.apache.hadoop.security.token.TokenIdentifier;
077    
078    public class JavaActionExecutor extends ActionExecutor {
079    
080        private static final String HADOOP_USER = "user.name";
081        private static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
082        private static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
083        private static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
084        private static final String HADOOP_NAME_NODE = "fs.default.name";
085        public static final String OOZIE_COMMON_LIBDIR = "oozie";
086        public static final int MAX_EXTERNAL_STATS_SIZE_DEFAULT = Integer.MAX_VALUE;
087        private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>();
088        public final static String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size";
089        public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
090        public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
091        private static int maxActionOutputLen;
092        private static int maxExternalStatsSize;
093    
094        private static final String SUCCEEDED = "SUCCEEDED";
095        private static final String KILLED = "KILLED";
096        private static final String FAILED = "FAILED";
097        private static final String FAILED_KILLED = "FAILED/KILLED";
098        private static final String RUNNING = "RUNNING";
099        protected XLog log = XLog.getLog(getClass());
100    
101        static {
102            DISALLOWED_PROPERTIES.add(HADOOP_USER);
103            DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER);
104            DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE);
105            DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER_2);
106            DISALLOWED_PROPERTIES.add(HADOOP_YARN_RM);
107        }
108    
109        public JavaActionExecutor() {
110            this("java");
111            requiresNNJT = true;
112        }
113    
114        protected JavaActionExecutor(String type) {
115            super(type);
116            requiresNNJT = true;
117        }
118    
119        protected String getLauncherJarName() {
120            return getType() + "-launcher.jar";
121        }
122    
123        protected List<Class> getLauncherClasses() {
124            List<Class> classes = new ArrayList<Class>();
125            classes.add(LauncherMapper.class);
126            classes.add(LauncherSecurityManager.class);
127            classes.add(LauncherException.class);
128            classes.add(LauncherMainException.class);
129            classes.add(FileSystemActions.class);
130            classes.add(PrepareActionsDriver.class);
131            classes.add(ActionStats.class);
132            classes.add(ActionType.class);
133            return classes;
134        }
135    
136        @Override
137        public void initActionType() {
138            XLog log = XLog.getLog(getClass());
139            super.initActionType();
140            maxActionOutputLen = getOozieConf()
141              .getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA,
142              // TODO: Remove the below config get in a subsequent release..
143              // This other irrelevant property is only used to
144              // preserve backwards compatibility cause of a typo.
145              // See OOZIE-4.
146              getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN,
147                2 * 1024));
148            //Get the limit for the maximum allowed size of action stats
149            maxExternalStatsSize = getOozieConf().getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE, MAX_EXTERNAL_STATS_SIZE_DEFAULT);
150            maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize;
151            try {
152                List<Class> classes = getLauncherClasses();
153                Class[] launcherClasses = classes.toArray(new Class[classes.size()]);
154                IOUtils.createJar(new File(getOozieRuntimeDir()), getLauncherJarName(), launcherClasses);
155    
156                registerError(UnknownHostException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA001");
157                registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.NON_TRANSIENT,
158                        "JA002");
159                registerError(DiskChecker.DiskOutOfSpaceException.class.getName(),
160                        ActionExecutorException.ErrorType.NON_TRANSIENT, "JA003");
161                registerError(org.apache.hadoop.hdfs.protocol.QuotaExceededException.class.getName(),
162                        ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004");
163                registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(),
164                        ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005");
165                registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "  JA006");
166                registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007");
167                registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008");
168                registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009");
169            }
170            catch (IOException ex) {
171                throw new RuntimeException(ex);
172            }
173            catch (java.lang.NoClassDefFoundError err) {
174                ByteArrayOutputStream baos = new ByteArrayOutputStream();
175                err.printStackTrace(new PrintStream(baos));
176                log.warn(baos.toString());
177            }
178        }
179    
180        /**
181         * Get the maximum allowed size of stats
182         *
183         * @return maximum size of stats
184         */
185        public static int getMaxExternalStatsSize() {
186            return maxExternalStatsSize;
187        }
188    
189        static void checkForDisallowedProps(Configuration conf, String confName) throws ActionExecutorException {
190            for (String prop : DISALLOWED_PROPERTIES) {
191                if (conf.get(prop) != null) {
192                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA010",
193                            "Property [{0}] not allowed in action [{1}] configuration", prop, confName);
194                }
195            }
196        }
197    
198        public JobConf createBaseHadoopConf(Context context, Element actionXml) {
199            Namespace ns = actionXml.getNamespace();
200            String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim();
201            String nameNode = actionXml.getChild("name-node", ns).getTextTrim();
202            JobConf conf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
203            conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER));
204            conf.set(HADOOP_JOB_TRACKER, jobTracker);
205            conf.set(HADOOP_JOB_TRACKER_2, jobTracker);
206            conf.set(HADOOP_YARN_RM, jobTracker);
207            conf.set(HADOOP_NAME_NODE, nameNode);
208            conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true");
209            return conf;
210        }
211    
212        private void injectLauncherProperties(Configuration srcConf, Configuration launcherConf) {
213            for (Map.Entry<String, String> entry : srcConf) {
214                if (entry.getKey().startsWith("oozie.launcher.")) {
215                    String name = entry.getKey().substring("oozie.launcher.".length());
216                    String value = entry.getValue();
217                    // setting original KEY
218                    launcherConf.set(entry.getKey(), value);
219                    // setting un-prefixed key (to allow Hadoop job config
220                    // for the launcher job
221                    launcherConf.set(name, value);
222                }
223            }
224        }
225    
226        Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
227                throws ActionExecutorException {
228            try {
229                Namespace ns = actionXml.getNamespace();
230                Element e = actionXml.getChild("configuration", ns);
231                if (e != null) {
232                    String strConf = XmlUtils.prettyPrint(e).toString();
233                    XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
234    
235                    XConfiguration launcherConf = new XConfiguration();
236                    HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
237                    XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_JOB_TRACKER), getType());
238                    injectLauncherProperties(actionDefaultConf, launcherConf);
239                    injectLauncherProperties(inlineConf, launcherConf);
240                    checkForDisallowedProps(launcherConf, "launcher configuration");
241                    XConfiguration.copy(launcherConf, conf);
242                }
243                return conf;
244            }
245            catch (IOException ex) {
246                throw convertException(ex);
247            }
248        }
249    
250        public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf)
251                throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException {
252            Namespace ns = element.getNamespace();
253            Iterator<Element> it = element.getChildren("job-xml", ns).iterator();
254            while (it.hasNext()) {
255                Element e = it.next();
256                String jobXml = e.getTextTrim();
257                Path path = new Path(appPath, jobXml);
258                FileSystem fs = context.getAppFileSystem();
259                Configuration jobXmlConf = new XConfiguration(fs.open(path));
260                checkForDisallowedProps(jobXmlConf, "job-xml");
261                XConfiguration.copy(jobXmlConf, conf);
262            }
263            Element e = element.getChild("configuration", ns);
264            if (e != null) {
265                String strConf = XmlUtils.prettyPrint(e).toString();
266                XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
267                checkForDisallowedProps(inlineConf, "inline configuration");
268                XConfiguration.copy(inlineConf, conf);
269            }
270        }
271    
272        Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
273                throws ActionExecutorException {
274            try {
275                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
276                XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_JOB_TRACKER), getType());
277                XConfiguration.injectDefaults(actionDefaults, actionConf);
278    
279                has.checkSupportedFilesystem(appPath.toUri());
280    
281                parseJobXmlAndConfiguration(context, actionXml, appPath, actionConf);
282                return actionConf;
283            }
284            catch (IOException ex) {
285                throw convertException(ex);
286            }
287            catch (HadoopAccessorException ex) {
288                throw convertException(ex);
289            }
290            catch (URISyntaxException ex) {
291                throw convertException(ex);
292            }
293        }
294    
295        Configuration addToCache(Configuration conf, Path appPath, String filePath, boolean archive)
296                throws ActionExecutorException {
297            Path path = null;
298            try {
299                if (filePath.startsWith("/")) {
300                    path = new Path(filePath);
301                }
302                else {
303                    path = new Path(appPath, filePath);
304                }
305                URI uri = new URI(path.toUri().getPath());
306                if (archive) {
307                    DistributedCache.addCacheArchive(uri, conf);
308                }
309                else {
310                    String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);
311                    if (fileName.endsWith(".so") || fileName.contains(".so.")) {  // .so files
312                        uri = new Path(path.toString() + "#" + fileName).toUri();
313                        uri = new URI(uri.getPath());
314                        DistributedCache.addCacheFile(uri, conf);
315                    }
316                    else if (fileName.endsWith(".jar")) { // .jar files
317                        if (!fileName.contains("#")) {
318                            path = new Path(uri.toString());
319    
320                            String user = conf.get("user.name");
321                            Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, path, conf);
322                        }
323                        else {
324                            DistributedCache.addCacheFile(uri, conf);
325                        }
326                    }
327                    else { // regular files
328                        if (!fileName.contains("#")) {
329                            uri = new Path(path.toString() + "#" + fileName).toUri();
330                            uri = new URI(uri.getPath());
331                        }
332                        DistributedCache.addCacheFile(uri, conf);
333                    }
334                }
335                DistributedCache.createSymlink(conf);
336                return conf;
337            }
338            catch (Exception ex) {
339                XLog.getLog(getClass()).debug(
340                        "Errors when add to DistributedCache. Path=" + path + ", archive=" + archive + ", conf="
341                                + XmlUtils.prettyPrint(conf).toString());
342                throw convertException(ex);
343            }
344        }
345    
346        String getOozieLauncherJar(Context context) throws ActionExecutorException {
347            try {
348                return new Path(context.getActionDir(), getLauncherJarName()).toString();
349            }
350            catch (Exception ex) {
351                throw convertException(ex);
352            }
353        }
354    
355        public void prepareActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
356            try {
357                Path actionDir = context.getActionDir();
358                Path tempActionDir = new Path(actionDir.getParent(), actionDir.getName() + ".tmp");
359                if (!actionFs.exists(actionDir)) {
360                    try {
361                        actionFs.copyFromLocalFile(new Path(getOozieRuntimeDir(), getLauncherJarName()), new Path(
362                                tempActionDir, getLauncherJarName()));
363                        actionFs.rename(tempActionDir, actionDir);
364                    }
365                    catch (IOException ex) {
366                        actionFs.delete(tempActionDir, true);
367                        actionFs.delete(actionDir, true);
368                        throw ex;
369                    }
370                }
371            }
372            catch (Exception ex) {
373                throw convertException(ex);
374            }
375        }
376    
377        void cleanUpActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
378            try {
379                Path actionDir = context.getActionDir();
380                if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)
381                        && actionFs.exists(actionDir)) {
382                    actionFs.delete(actionDir, true);
383                }
384            }
385            catch (Exception ex) {
386                throw convertException(ex);
387            }
388        }
389    
390        protected void addShareLib(Path appPath, Configuration conf, String actionShareLibName)
391        throws ActionExecutorException {
392            if (actionShareLibName != null) {
393                try {
394                    Path systemLibPath = Services.get().get(WorkflowAppService.class).getSystemLibPath();
395                    if (systemLibPath != null) {
396                        Path actionLibPath = new Path(systemLibPath, actionShareLibName);
397                        String user = conf.get("user.name");
398                        FileSystem fs =
399                            Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf);
400                        if (fs.exists(actionLibPath)) {
401                            FileStatus[] files = fs.listStatus(actionLibPath);
402                            for (FileStatus file : files) {
403                                addToCache(conf, appPath, file.getPath().toUri().getPath(), false);
404                            }
405                        }
406                    }
407                }
408                catch (HadoopAccessorException ex){
409                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
410                            ex.getErrorCode().toString(), ex.getMessage());
411                }
412                catch (IOException ex){
413                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
414                            "It should never happen", ex.getMessage());
415                }
416            }
417        }
418    
419        protected void addActionLibs(Path appPath, Configuration conf) throws ActionExecutorException {
420            String[] actionLibsStrArr = conf.getStrings("oozie.launcher.oozie.libpath");
421            if (actionLibsStrArr != null) {
422                try {
423                    for (String actionLibsStr : actionLibsStrArr) {
424                        actionLibsStr = actionLibsStr.trim();
425                        if (actionLibsStr.length() > 0)
426                        {
427                            Path actionLibsPath = new Path(actionLibsStr);
428                            String user = conf.get("user.name");
429                            FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf);
430                            if (fs.exists(actionLibsPath)) {
431                                FileStatus[] files = fs.listStatus(actionLibsPath);
432                                for (FileStatus file : files) {
433                                    addToCache(conf, appPath, file.getPath().toUri().getPath(), false);
434                                }
435                            }
436                        }
437                    }
438                }
439                catch (HadoopAccessorException ex){
440                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
441                            ex.getErrorCode().toString(), ex.getMessage());
442                }
443                catch (IOException ex){
444                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
445                            "It should never happen", ex.getMessage());
446                }
447            }
448        }
449    
450        @SuppressWarnings("unchecked")
451        void setLibFilesArchives(Context context, Element actionXml, Path appPath, Configuration conf)
452                throws ActionExecutorException {
453            Configuration proto = context.getProtoActionConf();
454    
455            // launcher JAR
456            addToCache(conf, appPath, getOozieLauncherJar(context), false);
457    
458            // Workflow lib/
459            String[] paths = proto.getStrings(WorkflowAppService.APP_LIB_PATH_LIST);
460            if (paths != null) {
461                for (String path : paths) {
462                    addToCache(conf, appPath, path, false);
463                }
464            }
465    
466            // Action libs
467            addActionLibs(appPath, conf);
468    
469            // files and archives defined in the action
470            for (Element eProp : (List<Element>) actionXml.getChildren()) {
471                if (eProp.getName().equals("file")) {
472                    String path = eProp.getTextTrim();
473                    addToCache(conf, appPath, path, false);
474                }
475                else {
476                    if (eProp.getName().equals("archive")) {
477                        String path = eProp.getTextTrim();
478                        addToCache(conf, appPath, path, true);
479                    }
480                }
481            }
482    
483            addAllShareLibs(appPath, conf, context, actionXml);
484            }
485    
486        // Adds action specific share libs and common share libs
487        private void addAllShareLibs(Path appPath, Configuration conf, Context context, Element actionXml)
488                throws ActionExecutorException {
489            // Add action specific share libs
490            addActionShareLib(appPath, conf, context, actionXml);
491            // Add common sharelibs for Oozie
492            addShareLib(appPath, conf, JavaActionExecutor.OOZIE_COMMON_LIBDIR);
493        }
494    
495        private void addActionShareLib(Path appPath, Configuration conf, Context context, Element actionXml) throws ActionExecutorException {
496            XConfiguration wfJobConf = null;
497            try {
498                wfJobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
499            }
500            catch (IOException ioe) {
501                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen",
502                        ioe.getMessage());
503            }
504            // Action sharelibs are only added if user has specified to use system libpath
505            if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) {
506                // add action specific sharelibs
507                addShareLib(appPath, conf, getShareLibName(context, actionXml, conf));
508            }
509        }
510    
511    
512        protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
513            Namespace ns = actionXml.getNamespace();
514            Element e = actionXml.getChild("main-class", ns);
515            return e.getTextTrim();
516        }
517    
518        private static final String QUEUE_NAME = "mapred.job.queue.name";
519    
520        private static final Set<String> SPECIAL_PROPERTIES = new HashSet<String>();
521    
522        static {
523            SPECIAL_PROPERTIES.add(QUEUE_NAME);
524            SPECIAL_PROPERTIES.add(ACL_VIEW_JOB);
525            SPECIAL_PROPERTIES.add(ACL_MODIFY_JOB);
526        }
527    
528        @SuppressWarnings("unchecked")
529        JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf)
530                throws ActionExecutorException {
531            try {
532    
533                // app path could be a file
534                Path appPathRoot = new Path(context.getWorkflow().getAppPath());
535                if (actionFs.isFile(appPathRoot)) {
536                    appPathRoot = appPathRoot.getParent();
537                }
538    
539                // launcher job configuration
540                JobConf launcherJobConf = createBaseHadoopConf(context, actionXml);
541                setupLauncherConf(launcherJobConf, actionXml, appPathRoot, context);
542    
543                String actionShareLibProperty = actionConf.get(ACTION_SHARELIB_FOR + getType());
544                if (actionShareLibProperty != null) {
545                    launcherJobConf.set(ACTION_SHARELIB_FOR + getType(), actionShareLibProperty);
546                }
547                setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf);
548                String jobName = XLog.format("oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(), context.getWorkflow()
549                        .getAppName(), action.getName(), context.getWorkflow().getId());
550                launcherJobConf.setJobName(jobName);
551    
552                String jobId = context.getWorkflow().getId();
553                String actionId = action.getId();
554                Path actionDir = context.getActionDir();
555                String recoveryId = context.getRecoveryId();
556    
557                // Getting the prepare XML from the action XML
558                Namespace ns = actionXml.getNamespace();
559                Element prepareElement = actionXml.getChild("prepare", ns);
560                String prepareXML = "";
561                if (prepareElement != null) {
562                    if (prepareElement.getChildren().size() > 0) {
563                        prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim();
564                    }
565                }
566                LauncherMapper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf,
567                        prepareXML);
568    
569                LauncherMapper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
570                LauncherMapper.setupSupportedFileSystems(
571                    launcherJobConf, Services.get().getConf().get(HadoopAccessorService.SUPPORTED_FILESYSTEMS));
572                LauncherMapper.setupMaxOutputData(launcherJobConf, maxActionOutputLen);
573                LauncherMapper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize);
574    
575                List<Element> list = actionXml.getChildren("arg", ns);
576                String[] args = new String[list.size()];
577                for (int i = 0; i < list.size(); i++) {
578                    args[i] = list.get(i).getTextTrim();
579                }
580                LauncherMapper.setupMainArguments(launcherJobConf, args);
581    
582                List<Element> javaopts = actionXml.getChildren("java-opt", ns);
583                for (Element opt: javaopts) {
584                    String opts = launcherJobConf.get("mapred.child.java.opts", "");
585                    opts = opts + " " + opt.getTextTrim();
586                    opts = opts.trim();
587                    launcherJobConf.set("mapred.child.java.opts", opts);
588                }
589    
590                Element opt = actionXml.getChild("java-opts", ns);
591                if (opt != null) {
592                    String opts = launcherJobConf.get("mapred.child.java.opts", "");
593                    opts = opts + " " + opt.getTextTrim();
594                    opts = opts.trim();
595                    launcherJobConf.set("mapred.child.java.opts", opts);
596                }
597    
598                // properties from action that are needed by the launcher (e.g. QUEUE NAME, ACLs)
599                // maybe we should add queue to the WF schema, below job-tracker
600                actionConfToLauncherConf(actionConf, launcherJobConf);
601    
602                // to disable cancelation of delegation token on launcher job end
603                launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
604    
605                return launcherJobConf;
606            }
607            catch (Exception ex) {
608                throw convertException(ex);
609            }
610        }
611    
612        private void injectCallback(Context context, Configuration conf) {
613            String callback = context.getCallbackUrl("$jobStatus");
614            if (conf.get("job.end.notification.url") != null) {
615                XLog.getLog(getClass()).warn("Overriding the action job end notification URI");
616            }
617            conf.set("job.end.notification.url", callback);
618        }
619    
620        void injectActionCallback(Context context, Configuration actionConf) {
621            injectCallback(context, actionConf);
622        }
623    
624        void injectLauncherCallback(Context context, Configuration launcherConf) {
625            injectCallback(context, launcherConf);
626        }
627    
628        private void actionConfToLauncherConf(Configuration actionConf, JobConf launcherConf) {
629            for (String name : SPECIAL_PROPERTIES) {
630                if (actionConf.get(name) != null && launcherConf.get("oozie.launcher." + name) == null) {
631                    launcherConf.set(name, actionConf.get(name));
632                }
633            }
634        }
635    
636        public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
637            JobClient jobClient = null;
638            boolean exception = false;
639            try {
640                Path appPathRoot = new Path(context.getWorkflow().getAppPath());
641    
642                // app path could be a file
643                if (actionFs.isFile(appPathRoot)) {
644                    appPathRoot = appPathRoot.getParent();
645                }
646    
647                Element actionXml = XmlUtils.parseXml(action.getConf());
648    
649                // action job configuration
650                Configuration actionConf = createBaseHadoopConf(context, actionXml);
651                setupActionConf(actionConf, context, actionXml, appPathRoot);
652                XLog.getLog(getClass()).debug("Setting LibFilesArchives ");
653                setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
654                String jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}", getType(), context.getWorkflow()
655                        .getAppName(), action.getName(), context.getWorkflow().getId());
656                actionConf.set("mapred.job.name", jobName);
657                injectActionCallback(context, actionConf);
658    
659                if(actionConf.get(ACL_MODIFY_JOB) == null || actionConf.get(ACL_MODIFY_JOB).trim().equals("")) {
660                    // ONLY in the case where user has not given the
661                    // modify-job ACL specifically
662                    if (context.getWorkflow().getAcl() != null) {
663                        // setting the group owning the Oozie job to allow anybody in that
664                        // group to modify the jobs.
665                        actionConf.set(ACL_MODIFY_JOB, context.getWorkflow().getAcl());
666                    }
667                }
668    
669                // Setting the credential properties in launcher conf
670                HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context,
671                        action, actionConf);
672    
673                // Adding if action need to set more credential tokens
674                JobConf credentialsConf = new JobConf(false);
675                XConfiguration.copy(actionConf, credentialsConf);
676                setCredentialTokens(credentialsConf, context, action, credentialsProperties);
677    
678                // insert conf to action conf from credentialsConf
679                for (Entry<String, String> entry : credentialsConf) {
680                    if (actionConf.get(entry.getKey()) == null) {
681                        actionConf.set(entry.getKey(), entry.getValue());
682                    }
683                }
684    
685                JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
686                injectLauncherCallback(context, launcherJobConf);
687                XLog.getLog(getClass()).debug("Creating Job Client for action " + action.getId());
688                jobClient = createJobClient(context, launcherJobConf);
689                String launcherId = LauncherMapper.getRecoveryId(launcherJobConf, context.getActionDir(), context
690                        .getRecoveryId());
691                boolean alreadyRunning = launcherId != null;
692                RunningJob runningJob;
693    
694                // if user-retry is on, always submit new launcher
695                boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry();
696    
697                if (alreadyRunning && !isUserRetry) {
698                    runningJob = jobClient.getJob(JobID.forName(launcherId));
699                    if (runningJob == null) {
700                        String jobTracker = launcherJobConf.get("mapred.job.tracker");
701                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
702                                "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker);
703                    }
704                }
705                else {
706                    XLog.getLog(getClass()).debug("Submitting the job through Job Client for action " + action.getId());
707    
708                    // setting up propagation of the delegation token.
709                    Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(HadoopAccessorService
710                            .getMRDelegationTokenRenewer(launcherJobConf));
711                    launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt);
712    
713                    // insert credentials tokens to launcher job conf if needed
714                    if (needInjectCredentials()) {
715                        for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) {
716                            log.debug("ADDING TOKEN: " + tk.getKind().toString());
717                            launcherJobConf.getCredentials().addToken(tk.getKind(), tk);
718                        }
719                    }
720                    else {
721                        log.info("No need to inject credentials.");
722                    }
723                    runningJob = jobClient.submitJob(launcherJobConf);
724                    if (runningJob == null) {
725                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
726                                "Error submitting launcher for action [{0}]", action.getId());
727                    }
728                    launcherId = runningJob.getID().toString();
729                    XLog.getLog(getClass()).debug("After submission get the launcherId " + launcherId);
730                }
731    
732                String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
733                String consoleUrl = runningJob.getTrackingURL();
734                context.setStartData(launcherId, jobTracker, consoleUrl);
735            }
736            catch (Exception ex) {
737                exception = true;
738                throw convertException(ex);
739            }
740            finally {
741                if (jobClient != null) {
742                    try {
743                        jobClient.close();
744                    }
745                    catch (Exception e) {
746                        if (exception) {
747                            log.error("JobClient error: ", e);
748                        }
749                        else {
750                            throw convertException(e);
751                        }
752                    }
753                }
754            }
755        }
756    
757        private boolean needInjectCredentials() {
758            boolean methodExists = true;
759    
760            Class klass;
761            try {
762                klass = Class.forName("org.apache.hadoop.mapred.JobConf");
763                klass.getMethod("getCredentials");
764            }
765            catch (ClassNotFoundException ex) {
766                methodExists = false;
767            }
768            catch (NoSuchMethodException ex) {
769                methodExists = false;
770            }
771    
772            return methodExists;
773        }
774    
775        protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context,
776                WorkflowAction action, Configuration actionConf) throws Exception {
777            HashMap<String, CredentialsProperties> credPropertiesMap = null;
778            if (context != null && action != null) {
779                credPropertiesMap = getActionCredentialsProperties(context, action);
780                if (credPropertiesMap != null) {
781                    for (String key : credPropertiesMap.keySet()) {
782                        CredentialsProperties prop = credPropertiesMap.get(key);
783                        if (prop != null) {
784                            log.debug("Credential Properties set for action : " + action.getId());
785                            for (String property : prop.getProperties().keySet()) {
786                                actionConf.set(property, prop.getProperties().get(property));
787                                log.debug("property : '" + property + "', value : '" + prop.getProperties().get(property) + "'");
788                            }
789                        }
790                    }
791                }
792                else {
793                    log.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred());
794                }
795            }
796            else {
797                log.warn("context or action is null");
798            }
799            return credPropertiesMap;
800        }
801    
802        protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action,
803                HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception {
804    
805            if (context != null && action != null && credPropertiesMap != null) {
806                for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) {
807                    String credName = entry.getKey();
808                    CredentialsProperties credProps = entry.getValue();
809                    if (credProps != null) {
810                        CredentialsProvider credProvider = new CredentialsProvider(credProps.getType());
811                        Credentials credentialObject = credProvider.createCredentialObject();
812                        if (credentialObject != null) {
813                            credentialObject.addtoJobConf(jobconf, credProps, context);
814                            log.debug("Retrieved Credential '" + credName + "' for action " + action.getId());
815                        }
816                        else {
817                            log.debug("Credentials object is null for name= " + credName + ", type=" + credProps.getType());
818                        }
819                    }
820                }
821            }
822    
823        }
824    
825        protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context,
826                WorkflowAction action) throws Exception {
827            HashMap<String, CredentialsProperties> props = new HashMap<String, CredentialsProperties>();
828            if (context != null && action != null) {
829                String credsInAction = action.getCred();
830                log.debug("Get credential '" + credsInAction + "' properties for action : " + action.getId());
831                String[] credNames = credsInAction.split(",");
832                for (String credName : credNames) {
833                    CredentialsProperties credProps = getCredProperties(context, credName);
834                    props.put(credName, credProps);
835                }
836            }
837            else {
838                log.warn("context or action is null");
839            }
840            return props;
841        }
842    
843        @SuppressWarnings("unchecked")
844        protected CredentialsProperties getCredProperties(Context context, String credName)
845                throws Exception {
846            CredentialsProperties credProp = null;
847            String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition();
848            XConfiguration wfjobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
849            Element elementJob = XmlUtils.parseXml(workflowXml);
850            Element credentials = elementJob.getChild("credentials", elementJob.getNamespace());
851            if (credentials != null) {
852                for (Element credential : (List<Element>) credentials.getChildren("credential", credentials.getNamespace())) {
853                    String name = credential.getAttributeValue("name");
854                    String type = credential.getAttributeValue("type");
855                    log.debug("getCredProperties: Name: " + name + ", Type: " + type);
856                    if (name.equalsIgnoreCase(credName)) {
857                        credProp = new CredentialsProperties(name, type);
858                        for (Element property : (List<Element>) credential.getChildren("property",
859                                credential.getNamespace())) {
860                            String propertyName = property.getChildText("name", property.getNamespace());
861                            String propertyValue = property.getChildText("value", property.getNamespace());
862                            ELEvaluator eval = new ELEvaluator();
863                            for (Map.Entry<String, String> entry : wfjobConf) {
864                                eval.setVariable(entry.getKey(), entry.getValue().trim());
865                            }
866                            propertyName = eval.evaluate(propertyName, String.class);
867                            propertyValue = eval.evaluate(propertyValue, String.class);
868    
869                            credProp.getProperties().put(propertyName, propertyValue);
870                            log.debug("getCredProperties: Properties name :'" + propertyName + "', Value : '"
871                                    + propertyValue + "'");
872                        }
873                    }
874                }
875            } else {
876                log.warn("credentials is null for the action");
877            }
878            return credProp;
879        }
880    
881        @Override
882        public void start(Context context, WorkflowAction action) throws ActionExecutorException {
883            try {
884                XLog.getLog(getClass()).debug("Starting action " + action.getId() + " getting Action File System");
885                FileSystem actionFs = context.getAppFileSystem();
886                XLog.getLog(getClass()).debug("Preparing action Dir through copying " + context.getActionDir());
887                prepareActionDir(actionFs, context);
888                XLog.getLog(getClass()).debug("Action Dir is ready. Submitting the action ");
889                submitLauncher(actionFs, context, action);
890                XLog.getLog(getClass()).debug("Action submit completed. Performing check ");
891                check(context, action);
892                XLog.getLog(getClass()).debug("Action check is done after submission");
893            }
894            catch (Exception ex) {
895                throw convertException(ex);
896            }
897        }
898    
899        @Override
900        public void end(Context context, WorkflowAction action) throws ActionExecutorException {
901            try {
902                String externalStatus = action.getExternalStatus();
903                WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ? WorkflowAction.Status.OK
904                        : WorkflowAction.Status.ERROR;
905                context.setEndData(status, getActionSignal(status));
906            }
907            catch (Exception ex) {
908                throw convertException(ex);
909            }
910            finally {
911                try {
912                    FileSystem actionFs = context.getAppFileSystem();
913                    cleanUpActionDir(actionFs, context);
914                }
915                catch (Exception ex) {
916                    throw convertException(ex);
917                }
918            }
919        }
920    
921        /**
922         * Create job client object
923         *
924         * @param context
925         * @param jobConf
926         * @return
927         * @throws HadoopAccessorException
928         */
929        protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException {
930            String user = context.getWorkflow().getUser();
931            String group = context.getWorkflow().getGroup();
932            return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
933        }
934    
935        @Override
936        public void check(Context context, WorkflowAction action) throws ActionExecutorException {
937            JobClient jobClient = null;
938            boolean exception = false;
939            try {
940                Element actionXml = XmlUtils.parseXml(action.getConf());
941                FileSystem actionFs = context.getAppFileSystem();
942                JobConf jobConf = createBaseHadoopConf(context, actionXml);
943                jobClient = createJobClient(context, jobConf);
944                RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
945                if (runningJob == null) {
946                    context.setExternalStatus(FAILED);
947                    context.setExecutionData(FAILED, null);
948                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
949                            "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", action
950                                    .getExternalId(), action.getId());
951                }
952                if (runningJob.isComplete()) {
953                    Path actionDir = context.getActionDir();
954    
955                    String user = context.getWorkflow().getUser();
956                    String group = context.getWorkflow().getGroup();
957                    if (LauncherMapper.hasIdSwap(runningJob, user, group, actionDir)) {
958                        String launcherId = action.getExternalId();
959                        Path idSwapPath = LauncherMapper.getIdSwapPath(context.getActionDir());
960                        InputStream is = actionFs.open(idSwapPath);
961                        BufferedReader reader = new BufferedReader(new InputStreamReader(is));
962                        Properties props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
963                        reader.close();
964                        String newId = props.getProperty("id");
965                        runningJob = jobClient.getJob(JobID.forName(newId));
966                        if (runningJob == null) {
967                            context.setExternalStatus(FAILED);
968                            throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
969                                    "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", newId,
970                                    action.getId());
971                        }
972    
973                        context.setStartData(newId, action.getTrackerUri(), runningJob.getTrackingURL());
974                        XLog.getLog(getClass()).info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId,
975                                newId);
976                    }
977                    if (runningJob.isComplete()) {
978                        XLog.getLog(getClass()).info(XLog.STD, "action completed, external ID [{0}]",
979                                action.getExternalId());
980                        if (runningJob.isSuccessful() && LauncherMapper.isMainSuccessful(runningJob)) {
981                            getActionData(actionFs, runningJob, action, context);
982                            XLog.getLog(getClass()).info(XLog.STD, "action produced output");
983                        }
984                        else {
985                            XLog log = XLog.getLog(getClass());
986                            String errorReason;
987                            Path actionError = LauncherMapper.getErrorPath(context.getActionDir());
988                            if (actionFs.exists(actionError)) {
989                                InputStream is = actionFs.open(actionError);
990                                BufferedReader reader = new BufferedReader(new InputStreamReader(is));
991                                Properties props = PropertiesUtils.readProperties(reader, -1);
992                                reader.close();
993                                String errorCode = props.getProperty("error.code");
994                                if (errorCode.equals("0")) {
995                                    errorCode = "JA018";
996                                }
997                                if (errorCode.equals("-1")) {
998                                    errorCode = "JA019";
999                                }
1000                                errorReason = props.getProperty("error.reason");
1001                                log.warn("Launcher ERROR, reason: {0}", errorReason);
1002                                String exMsg = props.getProperty("exception.message");
1003                                String errorInfo = (exMsg != null) ? exMsg : errorReason;
1004                                context.setErrorInfo(errorCode, errorInfo);
1005                                String exStackTrace = props.getProperty("exception.stacktrace");
1006                                if (exMsg != null) {
1007                                    log.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
1008                                }
1009                            }
1010                            else {
1011                                errorReason = XLog.format("LauncherMapper died, check Hadoop log for job [{0}:{1}]", action
1012                                        .getTrackerUri(), action.getExternalId());
1013                                log.warn(errorReason);
1014                            }
1015                            context.setExecutionData(FAILED_KILLED, null);
1016                        }
1017                    }
1018                    else {
1019                        context.setExternalStatus(RUNNING);
1020                        XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]",
1021                                action.getExternalId(), action.getExternalStatus());
1022                    }
1023                }
1024                else {
1025                    context.setExternalStatus(RUNNING);
1026                    XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]",
1027                            action.getExternalId(), action.getExternalStatus());
1028                }
1029            }
1030            catch (Exception ex) {
1031                XLog.getLog(getClass()).warn("Exception in check(). Message[{0}]", ex.getMessage(), ex);
1032                exception = true;
1033                throw convertException(ex);
1034            }
1035            finally {
1036                if (jobClient != null) {
1037                    try {
1038                        jobClient.close();
1039                    }
1040                    catch (Exception e) {
1041                        if (exception) {
1042                            log.error("JobClient error: ", e);
1043                        }
1044                        else {
1045                            throw convertException(e);
1046                        }
1047                    }
1048                }
1049            }
1050        }
1051    
1052        /**
1053         * Get the output data of an action. Subclasses should override this method
1054         * to get action specific output data.
1055         *
1056         * @param actionFs the FileSystem object
1057         * @param runningJob the runningJob
1058         * @param action the Workflow action
1059         * @param context executor context
1060         *
1061         */
1062        protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
1063                throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
1064            Properties props = null;
1065            if (getCaptureOutput(action)) {
1066                props = new Properties();
1067                if (LauncherMapper.hasOutputData(runningJob)) {
1068                    Path actionOutput = LauncherMapper.getOutputDataPath(context.getActionDir());
1069                    InputStream is = actionFs.open(actionOutput);
1070                    BufferedReader reader = new BufferedReader(new InputStreamReader(is));
1071                    props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
1072                    reader.close();
1073                }
1074            }
1075            context.setExecutionData(SUCCEEDED, props);
1076        }
1077    
1078        protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
1079            Element eConf = XmlUtils.parseXml(action.getConf());
1080            Namespace ns = eConf.getNamespace();
1081            Element captureOutput = eConf.getChild("capture-output", ns);
1082            return captureOutput != null;
1083        }
1084    
1085        @Override
1086        public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
1087            JobClient jobClient = null;
1088            boolean exception = false;
1089            try {
1090                Element actionXml = XmlUtils.parseXml(action.getConf());
1091                JobConf jobConf = createBaseHadoopConf(context, actionXml);
1092                jobClient = createJobClient(context, jobConf);
1093                RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
1094                if (runningJob != null) {
1095                    runningJob.killJob();
1096                }
1097                context.setExternalStatus(KILLED);
1098                context.setExecutionData(KILLED, null);
1099            }
1100            catch (Exception ex) {
1101                exception = true;
1102                throw convertException(ex);
1103            }
1104            finally {
1105                try {
1106                    FileSystem actionFs = context.getAppFileSystem();
1107                    cleanUpActionDir(actionFs, context);
1108                    if (jobClient != null) {
1109                        jobClient.close();
1110                    }
1111                }
1112                catch (Exception ex) {
1113                    if (exception) {
1114                        log.error("Error: ", ex);
1115                    }
1116                    else {
1117                        throw convertException(ex);
1118                    }
1119                }
1120            }
1121        }
1122    
1123        private static Set<String> FINAL_STATUS = new HashSet<String>();
1124    
1125        static {
1126            FINAL_STATUS.add(SUCCEEDED);
1127            FINAL_STATUS.add(KILLED);
1128            FINAL_STATUS.add(FAILED);
1129            FINAL_STATUS.add(FAILED_KILLED);
1130        }
1131    
1132        @Override
1133        public boolean isCompleted(String externalStatus) {
1134            return FINAL_STATUS.contains(externalStatus);
1135        }
1136    
1137    
1138        /**
1139         * Return the sharelib name for the action.
1140         * <p/>
1141         * If <code>NULL</code> or emtpy, it means that the action does not use the action
1142         * sharelib.
1143         * <p/>
1144         * If a non-empty string, i.e. <code>foo</code>, it means the action uses the
1145         * action sharelib subdirectory <code>foo</code> and all JARs in the sharelib
1146         * <code>foo</code> directory will be in the action classpath.
1147         * <p/>
1148         * The resolution is done using the following precedence order:
1149         * <ul>
1150         *     <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the action configuration</li>
1151         *     <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the job configuration</li>
1152         *     <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the oozie configuration</li>
1153         *     <li>Action Executor <code>getDefaultShareLibName()</code> method</li>
1154         * </ul>
1155         *
1156         *
1157         * @param context executor context.
1158         * @param actionXml
1159         *@param conf action configuration.  @return the action sharelib name.
1160         */
1161        protected String getShareLibName(Context context, Element actionXml, Configuration conf) {
1162            String name = conf.get(ACTION_SHARELIB_FOR + getType());
1163            if (name == null) {
1164                try {
1165                    XConfiguration jobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
1166                    name = jobConf.get(ACTION_SHARELIB_FOR + getType());
1167                    if (name == null) {
1168                        name = Services.get().getConf().get(ACTION_SHARELIB_FOR + getType());
1169                        if (name == null) {
1170                            name = getDefaultShareLibName(actionXml);
1171                        }
1172                    }
1173                }
1174                catch (IOException ex) {
1175                    throw new RuntimeException("It cannot happen, " + ex.toString(), ex);
1176                }
1177            }
1178            return name;
1179        }
1180    
1181        private final static String ACTION_SHARELIB_FOR = "oozie.action.sharelib.for.";
1182    
1183    
1184        /**
1185         * Returns the default sharelib name for the action if any.
1186         *
1187         * @param actionXml the action XML fragment.
1188         * @return the sharelib name for the action, <code>NULL</code> if none.
1189         */
1190        protected String getDefaultShareLibName(Element actionXml) {
1191            return null;
1192        }
1193    }