001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.io.BufferedReader;
021    import java.io.ByteArrayOutputStream;
022    import java.io.File;
023    import java.io.FileNotFoundException;
024    import java.io.IOException;
025    import java.io.InputStream;
026    import java.io.InputStreamReader;
027    import java.io.PrintStream;
028    import java.io.StringReader;
029    import java.net.ConnectException;
030    import java.net.URI;
031    import java.net.URISyntaxException;
032    import java.net.UnknownHostException;
033    import java.util.ArrayList;
034    import java.util.HashMap;
035    import java.util.HashSet;
036    import java.util.Iterator;
037    import java.util.List;
038    import java.util.Map;
039    import java.util.Properties;
040    import java.util.Set;
041    import java.util.Map.Entry;
042    
043    import org.apache.hadoop.conf.Configuration;
044    import org.apache.hadoop.filecache.DistributedCache;
045    import org.apache.hadoop.fs.FileStatus;
046    import org.apache.hadoop.fs.FileSystem;
047    import org.apache.hadoop.fs.Path;
048    import org.apache.hadoop.fs.permission.AccessControlException;
049    import org.apache.hadoop.mapred.JobClient;
050    import org.apache.hadoop.mapred.JobConf;
051    import org.apache.hadoop.mapred.JobID;
052    import org.apache.hadoop.mapred.RunningJob;
053    import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
054    import org.apache.hadoop.util.DiskChecker;
055    import org.apache.oozie.WorkflowActionBean;
056    import org.apache.oozie.WorkflowJobBean;
057    import org.apache.oozie.action.ActionExecutor;
058    import org.apache.oozie.action.ActionExecutorException;
059    import org.apache.oozie.client.OozieClient;
060    import org.apache.oozie.client.WorkflowAction;
061    import org.apache.oozie.service.HadoopAccessorException;
062    import org.apache.oozie.service.HadoopAccessorService;
063    import org.apache.oozie.service.Services;
064    import org.apache.oozie.service.WorkflowAppService;
065    import org.apache.oozie.servlet.CallbackServlet;
066    import org.apache.oozie.util.ELEvaluator;
067    import org.apache.oozie.util.IOUtils;
068    import org.apache.oozie.util.PropertiesUtils;
069    import org.apache.oozie.util.XConfiguration;
070    import org.apache.oozie.util.XLog;
071    import org.apache.oozie.util.XmlUtils;
072    import org.jdom.Element;
073    import org.jdom.JDOMException;
074    import org.jdom.Namespace;
075    import org.apache.hadoop.security.token.Token;
076    import org.apache.hadoop.security.token.TokenIdentifier;
077    
078    public class JavaActionExecutor extends ActionExecutor {
079    
080        private static final String HADOOP_USER = "user.name";
081        public static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
082        public static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
083        public static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
084        public static final String HADOOP_NAME_NODE = "fs.default.name";
085        private static final String HADOOP_JOB_NAME = "mapred.job.name";
086        public static final String OOZIE_COMMON_LIBDIR = "oozie";
087        public static final int MAX_EXTERNAL_STATS_SIZE_DEFAULT = Integer.MAX_VALUE;
088        private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>();
089        public final static String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size";
090        public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
091        public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
092        private static int maxActionOutputLen;
093        private static int maxExternalStatsSize;
094    
095        private static final String SUCCEEDED = "SUCCEEDED";
096        private static final String KILLED = "KILLED";
097        private static final String FAILED = "FAILED";
098        private static final String FAILED_KILLED = "FAILED/KILLED";
099        private static final String RUNNING = "RUNNING";
100        protected XLog log = XLog.getLog(getClass());
101    
102        static {
103            DISALLOWED_PROPERTIES.add(HADOOP_USER);
104            DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER);
105            DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE);
106            DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER_2);
107            DISALLOWED_PROPERTIES.add(HADOOP_YARN_RM);
108        }
109    
110        public JavaActionExecutor() {
111            this("java");
112            requiresNNJT = true;
113        }
114    
115        protected JavaActionExecutor(String type) {
116            super(type);
117            requiresNNJT = true;
118        }
119    
120        protected String getLauncherJarName() {
121            return getType() + "-launcher.jar";
122        }
123    
124        protected List<Class> getLauncherClasses() {
125            List<Class> classes = new ArrayList<Class>();
126            classes.add(LauncherMapper.class);
127            classes.add(LauncherSecurityManager.class);
128            classes.add(LauncherException.class);
129            classes.add(LauncherMainException.class);
130            classes.add(FileSystemActions.class);
131            classes.add(PrepareActionsDriver.class);
132            classes.add(ActionStats.class);
133            classes.add(ActionType.class);
134            return classes;
135        }
136    
137        @Override
138        public void initActionType() {
139            XLog log = XLog.getLog(getClass());
140            super.initActionType();
141            maxActionOutputLen = getOozieConf()
142              .getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA,
143              // TODO: Remove the below config get in a subsequent release..
144              // This other irrelevant property is only used to
145              // preserve backwards compatibility cause of a typo.
146              // See OOZIE-4.
147              getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN,
148                2 * 1024));
149            //Get the limit for the maximum allowed size of action stats
150            maxExternalStatsSize = getOozieConf().getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE, MAX_EXTERNAL_STATS_SIZE_DEFAULT);
151            maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize;
152            try {
153                List<Class> classes = getLauncherClasses();
154                Class[] launcherClasses = classes.toArray(new Class[classes.size()]);
155                IOUtils.createJar(new File(getOozieRuntimeDir()), getLauncherJarName(), launcherClasses);
156    
157                registerError(UnknownHostException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA001");
158                registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.NON_TRANSIENT,
159                        "JA002");
160                registerError(DiskChecker.DiskOutOfSpaceException.class.getName(),
161                        ActionExecutorException.ErrorType.NON_TRANSIENT, "JA003");
162                registerError(org.apache.hadoop.hdfs.protocol.QuotaExceededException.class.getName(),
163                        ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004");
164                registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(),
165                        ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005");
166                registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "  JA006");
167                registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007");
168                registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008");
169                registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009");
170            }
171            catch (IOException ex) {
172                throw new RuntimeException(ex);
173            }
174            catch (java.lang.NoClassDefFoundError err) {
175                ByteArrayOutputStream baos = new ByteArrayOutputStream();
176                err.printStackTrace(new PrintStream(baos));
177                log.warn(baos.toString());
178            }
179        }
180    
181        /**
182         * Get the maximum allowed size of stats
183         *
184         * @return maximum size of stats
185         */
186        public static int getMaxExternalStatsSize() {
187            return maxExternalStatsSize;
188        }
189    
190        static void checkForDisallowedProps(Configuration conf, String confName) throws ActionExecutorException {
191            for (String prop : DISALLOWED_PROPERTIES) {
192                if (conf.get(prop) != null) {
193                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA010",
194                            "Property [{0}] not allowed in action [{1}] configuration", prop, confName);
195                }
196            }
197        }
198    
199        public JobConf createBaseHadoopConf(Context context, Element actionXml) {
200            Namespace ns = actionXml.getNamespace();
201            String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim();
202            String nameNode = actionXml.getChild("name-node", ns).getTextTrim();
203            JobConf conf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
204            conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER));
205            conf.set(HADOOP_JOB_TRACKER, jobTracker);
206            conf.set(HADOOP_JOB_TRACKER_2, jobTracker);
207            conf.set(HADOOP_YARN_RM, jobTracker);
208            conf.set(HADOOP_NAME_NODE, nameNode);
209            conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true");
210            return conf;
211        }
212    
213        private void injectLauncherProperties(Configuration srcConf, Configuration launcherConf) {
214            for (Map.Entry<String, String> entry : srcConf) {
215                if (entry.getKey().startsWith("oozie.launcher.")) {
216                    String name = entry.getKey().substring("oozie.launcher.".length());
217                    String value = entry.getValue();
218                    // setting original KEY
219                    launcherConf.set(entry.getKey(), value);
220                    // setting un-prefixed key (to allow Hadoop job config
221                    // for the launcher job
222                    launcherConf.set(name, value);
223                }
224            }
225        }
226    
227        Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
228                throws ActionExecutorException {
229            try {
230                Namespace ns = actionXml.getNamespace();
231                Element e = actionXml.getChild("configuration", ns);
232                if (e != null) {
233                    String strConf = XmlUtils.prettyPrint(e).toString();
234                    XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
235    
236                    XConfiguration launcherConf = new XConfiguration();
237                    HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
238                    XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_JOB_TRACKER), getType());
239                    injectLauncherProperties(actionDefaultConf, launcherConf);
240                    injectLauncherProperties(inlineConf, launcherConf);
241                    checkForDisallowedProps(launcherConf, "launcher configuration");
242                    XConfiguration.copy(launcherConf, conf);
243                }
244                return conf;
245            }
246            catch (IOException ex) {
247                throw convertException(ex);
248            }
249        }
250    
251        public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf)
252                throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException {
253            Namespace ns = element.getNamespace();
254            Iterator<Element> it = element.getChildren("job-xml", ns).iterator();
255            while (it.hasNext()) {
256                Element e = it.next();
257                String jobXml = e.getTextTrim();
258                Path path = new Path(appPath, jobXml);
259                FileSystem fs = context.getAppFileSystem();
260                Configuration jobXmlConf = new XConfiguration(fs.open(path));
261                checkForDisallowedProps(jobXmlConf, "job-xml");
262                XConfiguration.copy(jobXmlConf, conf);
263            }
264            Element e = element.getChild("configuration", ns);
265            if (e != null) {
266                String strConf = XmlUtils.prettyPrint(e).toString();
267                XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
268                checkForDisallowedProps(inlineConf, "inline configuration");
269                XConfiguration.copy(inlineConf, conf);
270            }
271        }
272    
273        Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
274                throws ActionExecutorException {
275            try {
276                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
277                XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_JOB_TRACKER), getType());
278                XConfiguration.injectDefaults(actionDefaults, actionConf);
279    
280                has.checkSupportedFilesystem(appPath.toUri());
281    
282                parseJobXmlAndConfiguration(context, actionXml, appPath, actionConf);
283                return actionConf;
284            }
285            catch (IOException ex) {
286                throw convertException(ex);
287            }
288            catch (HadoopAccessorException ex) {
289                throw convertException(ex);
290            }
291            catch (URISyntaxException ex) {
292                throw convertException(ex);
293            }
294        }
295    
296        Configuration addToCache(Configuration conf, Path appPath, String filePath, boolean archive)
297                throws ActionExecutorException {
298            Path path = null;
299            try {
300                if (filePath.startsWith("/")) {
301                    path = new Path(filePath);
302                }
303                else {
304                    path = new Path(appPath, filePath);
305                }
306                URI uri = new URI(path.toUri().getPath());
307                if (archive) {
308                    DistributedCache.addCacheArchive(uri, conf);
309                }
310                else {
311                    String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);
312                    if (fileName.endsWith(".so") || fileName.contains(".so.")) {  // .so files
313                        uri = new Path(path.toString() + "#" + fileName).toUri();
314                        uri = new URI(uri.getPath());
315                        DistributedCache.addCacheFile(uri, conf);
316                    }
317                    else if (fileName.endsWith(".jar")) { // .jar files
318                        if (!fileName.contains("#")) {
319                            path = new Path(uri.toString());
320    
321                            String user = conf.get("user.name");
322                            Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, path, conf);
323                        }
324                        else {
325                            DistributedCache.addCacheFile(uri, conf);
326                        }
327                    }
328                    else { // regular files
329                        if (!fileName.contains("#")) {
330                            uri = new Path(path.toString() + "#" + fileName).toUri();
331                            uri = new URI(uri.getPath());
332                        }
333                        DistributedCache.addCacheFile(uri, conf);
334                    }
335                }
336                DistributedCache.createSymlink(conf);
337                return conf;
338            }
339            catch (Exception ex) {
340                XLog.getLog(getClass()).debug(
341                        "Errors when add to DistributedCache. Path=" + path + ", archive=" + archive + ", conf="
342                                + XmlUtils.prettyPrint(conf).toString());
343                throw convertException(ex);
344            }
345        }
346    
347        String getOozieLauncherJar(Context context) throws ActionExecutorException {
348            try {
349                return new Path(context.getActionDir(), getLauncherJarName()).toString();
350            }
351            catch (Exception ex) {
352                throw convertException(ex);
353            }
354        }
355    
356        public void prepareActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
357            try {
358                Path actionDir = context.getActionDir();
359                Path tempActionDir = new Path(actionDir.getParent(), actionDir.getName() + ".tmp");
360                if (!actionFs.exists(actionDir)) {
361                    try {
362                        actionFs.copyFromLocalFile(new Path(getOozieRuntimeDir(), getLauncherJarName()), new Path(
363                                tempActionDir, getLauncherJarName()));
364                        actionFs.rename(tempActionDir, actionDir);
365                    }
366                    catch (IOException ex) {
367                        actionFs.delete(tempActionDir, true);
368                        actionFs.delete(actionDir, true);
369                        throw ex;
370                    }
371                }
372            }
373            catch (Exception ex) {
374                throw convertException(ex);
375            }
376        }
377    
378        void cleanUpActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
379            try {
380                Path actionDir = context.getActionDir();
381                if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)
382                        && actionFs.exists(actionDir)) {
383                    actionFs.delete(actionDir, true);
384                }
385            }
386            catch (Exception ex) {
387                throw convertException(ex);
388            }
389        }
390    
391        protected void addShareLib(Path appPath, Configuration conf, String actionShareLibName)
392        throws ActionExecutorException {
393            if (actionShareLibName != null) {
394                try {
395                    Path systemLibPath = Services.get().get(WorkflowAppService.class).getSystemLibPath();
396                    if (systemLibPath != null) {
397                        Path actionLibPath = new Path(systemLibPath, actionShareLibName);
398                        String user = conf.get("user.name");
399                        FileSystem fs;
400                        // If the actionLibPath has a valid scheme and authority, then use them to determine the filesystem that the
401                        // sharelib resides on; otherwise, assume it resides on the same filesystem as the appPath and use the appPath
402                        // to determine the filesystem
403                        if (actionLibPath.toUri().getScheme() != null && actionLibPath.toUri().getAuthority() != null) {
404                            fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, actionLibPath.toUri(), conf);
405                        }
406                        else {
407                            fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf);
408                        }
409                        if (fs.exists(actionLibPath)) {
410                            FileStatus[] files = fs.listStatus(actionLibPath);
411                            for (FileStatus file : files) {
412                                addToCache(conf, actionLibPath, file.getPath().toUri().getPath(), false);
413                            }
414                        }
415                    }
416                }
417                catch (HadoopAccessorException ex){
418                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
419                            ex.getErrorCode().toString(), ex.getMessage());
420                }
421                catch (IOException ex){
422                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
423                            "It should never happen", ex.getMessage());
424                }
425            }
426        }
427    
428        protected void addActionLibs(Path appPath, Configuration conf) throws ActionExecutorException {
429            String[] actionLibsStrArr = conf.getStrings("oozie.launcher.oozie.libpath");
430            if (actionLibsStrArr != null) {
431                try {
432                    for (String actionLibsStr : actionLibsStrArr) {
433                        actionLibsStr = actionLibsStr.trim();
434                        if (actionLibsStr.length() > 0)
435                        {
436                            Path actionLibsPath = new Path(actionLibsStr);
437                            String user = conf.get("user.name");
438                            FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf);
439                            if (fs.exists(actionLibsPath)) {
440                                FileStatus[] files = fs.listStatus(actionLibsPath);
441                                for (FileStatus file : files) {
442                                    addToCache(conf, appPath, file.getPath().toUri().getPath(), false);
443                                }
444                            }
445                        }
446                    }
447                }
448                catch (HadoopAccessorException ex){
449                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
450                            ex.getErrorCode().toString(), ex.getMessage());
451                }
452                catch (IOException ex){
453                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
454                            "It should never happen", ex.getMessage());
455                }
456            }
457        }
458    
459        @SuppressWarnings("unchecked")
460        void setLibFilesArchives(Context context, Element actionXml, Path appPath, Configuration conf)
461                throws ActionExecutorException {
462            Configuration proto = context.getProtoActionConf();
463    
464            // launcher JAR
465            addToCache(conf, appPath, getOozieLauncherJar(context), false);
466    
467            // Workflow lib/
468            String[] paths = proto.getStrings(WorkflowAppService.APP_LIB_PATH_LIST);
469            if (paths != null) {
470                for (String path : paths) {
471                    addToCache(conf, appPath, path, false);
472                }
473            }
474    
475            // Action libs
476            addActionLibs(appPath, conf);
477    
478            // files and archives defined in the action
479            for (Element eProp : (List<Element>) actionXml.getChildren()) {
480                if (eProp.getName().equals("file")) {
481                    String[] filePaths = eProp.getTextTrim().split(",");
482                    for (String path : filePaths) {
483                        addToCache(conf, appPath, path.trim(), false);
484                    }
485                }
486                else if (eProp.getName().equals("archive")) {
487                    String[] archivePaths = eProp.getTextTrim().split(",");
488                    for (String path : archivePaths){
489                        addToCache(conf, appPath, path.trim(), true);
490                    }
491                }
492            }
493    
494            addAllShareLibs(appPath, conf, context, actionXml);
495            }
496    
497        // Adds action specific share libs and common share libs
498        private void addAllShareLibs(Path appPath, Configuration conf, Context context, Element actionXml)
499                throws ActionExecutorException {
500            // Add action specific share libs
501            addActionShareLib(appPath, conf, context, actionXml);
502            // Add common sharelibs for Oozie
503            addShareLib(appPath, conf, JavaActionExecutor.OOZIE_COMMON_LIBDIR);
504        }
505    
506        private void addActionShareLib(Path appPath, Configuration conf, Context context, Element actionXml)
507                throws ActionExecutorException {
508            XConfiguration wfJobConf = null;
509            try {
510                wfJobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
511            }
512            catch (IOException ioe) {
513                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen",
514                        ioe.getMessage());
515            }
516            // Action sharelibs are only added if user has specified to use system libpath
517            if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) {
518                // add action specific sharelibs
519                addShareLib(appPath, conf, getShareLibName(context, actionXml, conf));
520            }
521        }
522    
523    
524        protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
525            Namespace ns = actionXml.getNamespace();
526            Element e = actionXml.getChild("main-class", ns);
527            return e.getTextTrim();
528        }
529    
530        private static final String QUEUE_NAME = "mapred.job.queue.name";
531    
532        private static final Set<String> SPECIAL_PROPERTIES = new HashSet<String>();
533    
534        static {
535            SPECIAL_PROPERTIES.add(QUEUE_NAME);
536            SPECIAL_PROPERTIES.add(ACL_VIEW_JOB);
537            SPECIAL_PROPERTIES.add(ACL_MODIFY_JOB);
538        }
539    
540        @SuppressWarnings("unchecked")
541        JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf)
542                throws ActionExecutorException {
543            try {
544    
545                // app path could be a file
546                Path appPathRoot = new Path(context.getWorkflow().getAppPath());
547                if (actionFs.isFile(appPathRoot)) {
548                    appPathRoot = appPathRoot.getParent();
549                }
550    
551                // launcher job configuration
552                JobConf launcherJobConf = createBaseHadoopConf(context, actionXml);
553                setupLauncherConf(launcherJobConf, actionXml, appPathRoot, context);
554    
555                String actionShareLibProperty = actionConf.get(ACTION_SHARELIB_FOR + getType());
556                if (actionShareLibProperty != null) {
557                    launcherJobConf.set(ACTION_SHARELIB_FOR + getType(), actionShareLibProperty);
558                }
559                setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf);
560    
561                String jobName = launcherJobConf.get(HADOOP_JOB_NAME);
562                if (jobName == null || jobName.isEmpty()) {
563                    jobName = XLog.format(
564                            "oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(),
565                            context.getWorkflow().getAppName(), action.getName(),
566                            context.getWorkflow().getId());
567                launcherJobConf.setJobName(jobName);
568                }
569    
570                String jobId = context.getWorkflow().getId();
571                String actionId = action.getId();
572                Path actionDir = context.getActionDir();
573                String recoveryId = context.getRecoveryId();
574    
575                // Getting the prepare XML from the action XML
576                Namespace ns = actionXml.getNamespace();
577                Element prepareElement = actionXml.getChild("prepare", ns);
578                String prepareXML = "";
579                if (prepareElement != null) {
580                    if (prepareElement.getChildren().size() > 0) {
581                        prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim();
582                    }
583                }
584                LauncherMapper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf,
585                        prepareXML);
586    
587                LauncherMapper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
588                LauncherMapper.setupSupportedFileSystems(
589                    launcherJobConf, Services.get().getConf().get(HadoopAccessorService.SUPPORTED_FILESYSTEMS));
590                LauncherMapper.setupMaxOutputData(launcherJobConf, maxActionOutputLen);
591                LauncherMapper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize);
592    
593                List<Element> list = actionXml.getChildren("arg", ns);
594                String[] args = new String[list.size()];
595                for (int i = 0; i < list.size(); i++) {
596                    args[i] = list.get(i).getTextTrim();
597                }
598                LauncherMapper.setupMainArguments(launcherJobConf, args);
599    
600                List<Element> javaopts = actionXml.getChildren("java-opt", ns);
601                for (Element opt: javaopts) {
602                    String opts = launcherJobConf.get("mapred.child.java.opts", "");
603                    opts = opts + " " + opt.getTextTrim();
604                    opts = opts.trim();
605                    launcherJobConf.set("mapred.child.java.opts", opts);
606                }
607    
608                Element opt = actionXml.getChild("java-opts", ns);
609                if (opt != null) {
610                    String opts = launcherJobConf.get("mapred.child.java.opts", "");
611                    opts = opts + " " + opt.getTextTrim();
612                    opts = opts.trim();
613                    launcherJobConf.set("mapred.child.java.opts", opts);
614                }
615    
616                // properties from action that are needed by the launcher (e.g. QUEUE NAME, ACLs)
617                // maybe we should add queue to the WF schema, below job-tracker
618                actionConfToLauncherConf(actionConf, launcherJobConf);
619    
620                // to disable cancelation of delegation token on launcher job end
621                launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
622    
623                return launcherJobConf;
624            }
625            catch (Exception ex) {
626                throw convertException(ex);
627            }
628        }
629    
630        private void injectCallback(Context context, Configuration conf) {
631            String callback = context.getCallbackUrl("$jobStatus");
632            if (conf.get("job.end.notification.url") != null) {
633                XLog.getLog(getClass()).warn("Overriding the action job end notification URI");
634            }
635            conf.set("job.end.notification.url", callback);
636        }
637    
638        void injectActionCallback(Context context, Configuration actionConf) {
639            injectCallback(context, actionConf);
640        }
641    
642        void injectLauncherCallback(Context context, Configuration launcherConf) {
643            injectCallback(context, launcherConf);
644        }
645    
646        private void actionConfToLauncherConf(Configuration actionConf, JobConf launcherConf) {
647            for (String name : SPECIAL_PROPERTIES) {
648                if (actionConf.get(name) != null && launcherConf.get("oozie.launcher." + name) == null) {
649                    launcherConf.set(name, actionConf.get(name));
650                }
651            }
652        }
653    
654        public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
655            JobClient jobClient = null;
656            boolean exception = false;
657            try {
658                Path appPathRoot = new Path(context.getWorkflow().getAppPath());
659    
660                // app path could be a file
661                if (actionFs.isFile(appPathRoot)) {
662                    appPathRoot = appPathRoot.getParent();
663                }
664    
665                Element actionXml = XmlUtils.parseXml(action.getConf());
666    
667                // action job configuration
668                Configuration actionConf = createBaseHadoopConf(context, actionXml);
669                setupActionConf(actionConf, context, actionXml, appPathRoot);
670                XLog.getLog(getClass()).debug("Setting LibFilesArchives ");
671                setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
672    
673                String jobName = actionConf.get(HADOOP_JOB_NAME);
674                if (jobName == null || jobName.isEmpty()) {
675                    jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}",
676                            getType(), context.getWorkflow().getAppName(),
677                            action.getName(), context.getWorkflow().getId());
678                    actionConf.set(HADOOP_JOB_NAME, jobName);
679                }
680    
681                injectActionCallback(context, actionConf);
682    
683                if(actionConf.get(ACL_MODIFY_JOB) == null || actionConf.get(ACL_MODIFY_JOB).trim().equals("")) {
684                    // ONLY in the case where user has not given the
685                    // modify-job ACL specifically
686                    if (context.getWorkflow().getAcl() != null) {
687                        // setting the group owning the Oozie job to allow anybody in that
688                        // group to modify the jobs.
689                        actionConf.set(ACL_MODIFY_JOB, context.getWorkflow().getAcl());
690                    }
691                }
692    
693                // Setting the credential properties in launcher conf
694                HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context,
695                        action, actionConf);
696    
697                // Adding if action need to set more credential tokens
698                JobConf credentialsConf = new JobConf(false);
699                XConfiguration.copy(actionConf, credentialsConf);
700                setCredentialTokens(credentialsConf, context, action, credentialsProperties);
701    
702                // insert conf to action conf from credentialsConf
703                for (Entry<String, String> entry : credentialsConf) {
704                    if (actionConf.get(entry.getKey()) == null) {
705                        actionConf.set(entry.getKey(), entry.getValue());
706                    }
707                }
708    
709                JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
710                injectLauncherCallback(context, launcherJobConf);
711                XLog.getLog(getClass()).debug("Creating Job Client for action " + action.getId());
712                jobClient = createJobClient(context, launcherJobConf);
713                String launcherId = LauncherMapper.getRecoveryId(launcherJobConf, context.getActionDir(), context
714                        .getRecoveryId());
715                boolean alreadyRunning = launcherId != null;
716                RunningJob runningJob;
717    
718                // if user-retry is on, always submit new launcher
719                boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry();
720    
721                if (alreadyRunning && !isUserRetry) {
722                    runningJob = jobClient.getJob(JobID.forName(launcherId));
723                    if (runningJob == null) {
724                        String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
725                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
726                                "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker);
727                    }
728                }
729                else {
730                    XLog.getLog(getClass()).debug("Submitting the job through Job Client for action " + action.getId());
731    
732                    // setting up propagation of the delegation token.
733                    Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(HadoopAccessorService
734                            .getMRDelegationTokenRenewer(launcherJobConf));
735                    launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt);
736    
737                    // insert credentials tokens to launcher job conf if needed
738                    if (needInjectCredentials()) {
739                        for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) {
740                            log.debug("ADDING TOKEN: " + tk.getKind().toString());
741                            launcherJobConf.getCredentials().addToken(tk.getKind(), tk);
742                        }
743                    }
744                    else {
745                        log.info("No need to inject credentials.");
746                    }
747                    runningJob = jobClient.submitJob(launcherJobConf);
748                    if (runningJob == null) {
749                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
750                                "Error submitting launcher for action [{0}]", action.getId());
751                    }
752                    launcherId = runningJob.getID().toString();
753                    XLog.getLog(getClass()).debug("After submission get the launcherId " + launcherId);
754                }
755    
756                String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
757                String consoleUrl = runningJob.getTrackingURL();
758                context.setStartData(launcherId, jobTracker, consoleUrl);
759            }
760            catch (Exception ex) {
761                exception = true;
762                throw convertException(ex);
763            }
764            finally {
765                if (jobClient != null) {
766                    try {
767                        jobClient.close();
768                    }
769                    catch (Exception e) {
770                        if (exception) {
771                            log.error("JobClient error: ", e);
772                        }
773                        else {
774                            throw convertException(e);
775                        }
776                    }
777                }
778            }
779        }
780    
781        private boolean needInjectCredentials() {
782            boolean methodExists = true;
783    
784            Class klass;
785            try {
786                klass = Class.forName("org.apache.hadoop.mapred.JobConf");
787                klass.getMethod("getCredentials");
788            }
789            catch (ClassNotFoundException ex) {
790                methodExists = false;
791            }
792            catch (NoSuchMethodException ex) {
793                methodExists = false;
794            }
795    
796            return methodExists;
797        }
798    
799        protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context,
800                WorkflowAction action, Configuration actionConf) throws Exception {
801            HashMap<String, CredentialsProperties> credPropertiesMap = null;
802            if (context != null && action != null) {
803                credPropertiesMap = getActionCredentialsProperties(context, action);
804                if (credPropertiesMap != null) {
805                    for (String key : credPropertiesMap.keySet()) {
806                        CredentialsProperties prop = credPropertiesMap.get(key);
807                        if (prop != null) {
808                            log.debug("Credential Properties set for action : " + action.getId());
809                            for (String property : prop.getProperties().keySet()) {
810                                actionConf.set(property, prop.getProperties().get(property));
811                                log.debug("property : '" + property + "', value : '" + prop.getProperties().get(property) + "'");
812                            }
813                        }
814                    }
815                }
816                else {
817                    log.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred());
818                }
819            }
820            else {
821                log.warn("context or action is null");
822            }
823            return credPropertiesMap;
824        }
825    
826        protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action,
827                HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception {
828    
829            if (context != null && action != null && credPropertiesMap != null) {
830                for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) {
831                    String credName = entry.getKey();
832                    CredentialsProperties credProps = entry.getValue();
833                    if (credProps != null) {
834                        CredentialsProvider credProvider = new CredentialsProvider(credProps.getType());
835                        Credentials credentialObject = credProvider.createCredentialObject();
836                        if (credentialObject != null) {
837                            credentialObject.addtoJobConf(jobconf, credProps, context);
838                            log.debug("Retrieved Credential '" + credName + "' for action " + action.getId());
839                        }
840                        else {
841                            log.debug("Credentials object is null for name= " + credName + ", type=" + credProps.getType());
842                        }
843                    }
844                }
845            }
846    
847        }
848    
849        protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context,
850                WorkflowAction action) throws Exception {
851            HashMap<String, CredentialsProperties> props = new HashMap<String, CredentialsProperties>();
852            if (context != null && action != null) {
853                String credsInAction = action.getCred();
854                log.debug("Get credential '" + credsInAction + "' properties for action : " + action.getId());
855                String[] credNames = credsInAction.split(",");
856                for (String credName : credNames) {
857                    CredentialsProperties credProps = getCredProperties(context, credName);
858                    props.put(credName, credProps);
859                }
860            }
861            else {
862                log.warn("context or action is null");
863            }
864            return props;
865        }
866    
867        @SuppressWarnings("unchecked")
868        protected CredentialsProperties getCredProperties(Context context, String credName)
869                throws Exception {
870            CredentialsProperties credProp = null;
871            String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition();
872            XConfiguration wfjobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
873            Element elementJob = XmlUtils.parseXml(workflowXml);
874            Element credentials = elementJob.getChild("credentials", elementJob.getNamespace());
875            if (credentials != null) {
876                for (Element credential : (List<Element>) credentials.getChildren("credential", credentials.getNamespace())) {
877                    String name = credential.getAttributeValue("name");
878                    String type = credential.getAttributeValue("type");
879                    log.debug("getCredProperties: Name: " + name + ", Type: " + type);
880                    if (name.equalsIgnoreCase(credName)) {
881                        credProp = new CredentialsProperties(name, type);
882                        for (Element property : (List<Element>) credential.getChildren("property",
883                                credential.getNamespace())) {
884                            String propertyName = property.getChildText("name", property.getNamespace());
885                            String propertyValue = property.getChildText("value", property.getNamespace());
886                            ELEvaluator eval = new ELEvaluator();
887                            for (Map.Entry<String, String> entry : wfjobConf) {
888                                eval.setVariable(entry.getKey(), entry.getValue().trim());
889                            }
890                            propertyName = eval.evaluate(propertyName, String.class);
891                            propertyValue = eval.evaluate(propertyValue, String.class);
892    
893                            credProp.getProperties().put(propertyName, propertyValue);
894                            log.debug("getCredProperties: Properties name :'" + propertyName + "', Value : '"
895                                    + propertyValue + "'");
896                        }
897                    }
898                }
899            } else {
900                log.warn("credentials is null for the action");
901            }
902            return credProp;
903        }
904    
905        @Override
906        public void start(Context context, WorkflowAction action) throws ActionExecutorException {
907            try {
908                XLog.getLog(getClass()).debug("Starting action " + action.getId() + " getting Action File System");
909                FileSystem actionFs = context.getAppFileSystem();
910                XLog.getLog(getClass()).debug("Preparing action Dir through copying " + context.getActionDir());
911                prepareActionDir(actionFs, context);
912                XLog.getLog(getClass()).debug("Action Dir is ready. Submitting the action ");
913                submitLauncher(actionFs, context, action);
914                XLog.getLog(getClass()).debug("Action submit completed. Performing check ");
915                check(context, action);
916                XLog.getLog(getClass()).debug("Action check is done after submission");
917            }
918            catch (Exception ex) {
919                throw convertException(ex);
920            }
921        }
922    
923        @Override
924        public void end(Context context, WorkflowAction action) throws ActionExecutorException {
925            try {
926                String externalStatus = action.getExternalStatus();
927                WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ? WorkflowAction.Status.OK
928                        : WorkflowAction.Status.ERROR;
929                context.setEndData(status, getActionSignal(status));
930            }
931            catch (Exception ex) {
932                throw convertException(ex);
933            }
934            finally {
935                try {
936                    FileSystem actionFs = context.getAppFileSystem();
937                    cleanUpActionDir(actionFs, context);
938                }
939                catch (Exception ex) {
940                    throw convertException(ex);
941                }
942            }
943        }
944    
945        /**
946         * Create job client object
947         *
948         * @param context
949         * @param jobConf
950         * @return
951         * @throws HadoopAccessorException
952         */
953        protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException {
954            String user = context.getWorkflow().getUser();
955            String group = context.getWorkflow().getGroup();
956            return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
957        }
958    
959        @Override
960        public void check(Context context, WorkflowAction action) throws ActionExecutorException {
961            JobClient jobClient = null;
962            boolean exception = false;
963            try {
964                Element actionXml = XmlUtils.parseXml(action.getConf());
965                FileSystem actionFs = context.getAppFileSystem();
966                JobConf jobConf = createBaseHadoopConf(context, actionXml);
967                jobClient = createJobClient(context, jobConf);
968                RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
969                if (runningJob == null) {
970                    context.setExternalStatus(FAILED);
971                    context.setExecutionData(FAILED, null);
972                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
973                            "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", action
974                                    .getExternalId(), action.getId());
975                }
976                if (runningJob.isComplete()) {
977                    Path actionDir = context.getActionDir();
978    
979                    String user = context.getWorkflow().getUser();
980                    String group = context.getWorkflow().getGroup();
981                    if (LauncherMapper.hasIdSwap(runningJob, user, group, actionDir)) {
982                        String launcherId = action.getExternalId();
983                        Path idSwapPath = LauncherMapper.getIdSwapPath(context.getActionDir());
984                        InputStream is = actionFs.open(idSwapPath);
985                        BufferedReader reader = new BufferedReader(new InputStreamReader(is));
986                        Properties props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
987                        reader.close();
988                        String newId = props.getProperty("id");
989                        runningJob = jobClient.getJob(JobID.forName(newId));
990                        if (runningJob == null) {
991                            context.setExternalStatus(FAILED);
992                            throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
993                                    "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", newId,
994                                    action.getId());
995                        }
996    
997                        context.setStartData(newId, action.getTrackerUri(), runningJob.getTrackingURL());
998                        XLog.getLog(getClass()).info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId,
999                                newId);
1000                    }
1001                    if (runningJob.isComplete()) {
1002                        XLog.getLog(getClass()).info(XLog.STD, "action completed, external ID [{0}]",
1003                                action.getExternalId());
1004                        if (runningJob.isSuccessful() && LauncherMapper.isMainSuccessful(runningJob)) {
1005                            getActionData(actionFs, runningJob, action, context);
1006                            XLog.getLog(getClass()).info(XLog.STD, "action produced output");
1007                        }
1008                        else {
1009                            XLog log = XLog.getLog(getClass());
1010                            String errorReason;
1011                            Path actionError = LauncherMapper.getErrorPath(context.getActionDir());
1012                            if (actionFs.exists(actionError)) {
1013                                InputStream is = actionFs.open(actionError);
1014                                BufferedReader reader = new BufferedReader(new InputStreamReader(is));
1015                                Properties props = PropertiesUtils.readProperties(reader, -1);
1016                                reader.close();
1017                                String errorCode = props.getProperty("error.code");
1018                                if (errorCode.equals("0")) {
1019                                    errorCode = "JA018";
1020                                }
1021                                if (errorCode.equals("-1")) {
1022                                    errorCode = "JA019";
1023                                }
1024                                errorReason = props.getProperty("error.reason");
1025                                log.warn("Launcher ERROR, reason: {0}", errorReason);
1026                                String exMsg = props.getProperty("exception.message");
1027                                String errorInfo = (exMsg != null) ? exMsg : errorReason;
1028                                context.setErrorInfo(errorCode, errorInfo);
1029                                String exStackTrace = props.getProperty("exception.stacktrace");
1030                                if (exMsg != null) {
1031                                    log.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
1032                                }
1033                            }
1034                            else {
1035                                errorReason = XLog.format("LauncherMapper died, check Hadoop log for job [{0}:{1}]", action
1036                                        .getTrackerUri(), action.getExternalId());
1037                                log.warn(errorReason);
1038                            }
1039                            context.setExecutionData(FAILED_KILLED, null);
1040                            setActionCompletionData(context, actionFs);
1041                        }
1042                    }
1043                    else {
1044                        context.setExternalStatus(RUNNING);
1045                        XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]",
1046                                action.getExternalId(), action.getExternalStatus());
1047                    }
1048                }
1049                else {
1050                    context.setExternalStatus(RUNNING);
1051                    XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]",
1052                            action.getExternalId(), action.getExternalStatus());
1053                }
1054            }
1055            catch (Exception ex) {
1056                XLog.getLog(getClass()).warn("Exception in check(). Message[{0}]", ex.getMessage(), ex);
1057                exception = true;
1058                throw convertException(ex);
1059            }
1060            finally {
1061                if (jobClient != null) {
1062                    try {
1063                        jobClient.close();
1064                    }
1065                    catch (Exception e) {
1066                        if (exception) {
1067                            log.error("JobClient error: ", e);
1068                        }
1069                        else {
1070                            throw convertException(e);
1071                        }
1072                    }
1073                }
1074            }
1075        }
1076    
1077        /**
1078         * Get the output data of an action. Subclasses should override this method
1079         * to get action specific output data.
1080         *
1081         * @param actionFs the FileSystem object
1082         * @param runningJob the runningJob
1083         * @param action the Workflow action
1084         * @param context executor context
1085         *
1086         */
1087        protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
1088                throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
1089            Properties props = null;
1090            if (getCaptureOutput(action)) {
1091                props = new Properties();
1092                if (LauncherMapper.hasOutputData(runningJob)) {
1093                    Path actionOutput = LauncherMapper.getOutputDataPath(context.getActionDir());
1094                    InputStream is = actionFs.open(actionOutput);
1095                    BufferedReader reader = new BufferedReader(new InputStreamReader(is));
1096                    props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
1097                    reader.close();
1098                }
1099            }
1100            context.setExecutionData(SUCCEEDED, props);
1101        }
1102    
1103        protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
1104            Element eConf = XmlUtils.parseXml(action.getConf());
1105            Namespace ns = eConf.getNamespace();
1106            Element captureOutput = eConf.getChild("capture-output", ns);
1107            return captureOutput != null;
1108        }
1109    
1110        @Override
1111        public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
1112            JobClient jobClient = null;
1113            boolean exception = false;
1114            try {
1115                Element actionXml = XmlUtils.parseXml(action.getConf());
1116                JobConf jobConf = createBaseHadoopConf(context, actionXml);
1117                jobClient = createJobClient(context, jobConf);
1118                RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
1119                if (runningJob != null) {
1120                    runningJob.killJob();
1121                }
1122                context.setExternalStatus(KILLED);
1123                context.setExecutionData(KILLED, null);
1124            }
1125            catch (Exception ex) {
1126                exception = true;
1127                throw convertException(ex);
1128            }
1129            finally {
1130                try {
1131                    FileSystem actionFs = context.getAppFileSystem();
1132                    cleanUpActionDir(actionFs, context);
1133                    if (jobClient != null) {
1134                        jobClient.close();
1135                    }
1136                }
1137                catch (Exception ex) {
1138                    if (exception) {
1139                        log.error("Error: ", ex);
1140                    }
1141                    else {
1142                        throw convertException(ex);
1143                    }
1144                }
1145            }
1146        }
1147    
1148        private static Set<String> FINAL_STATUS = new HashSet<String>();
1149    
1150        static {
1151            FINAL_STATUS.add(SUCCEEDED);
1152            FINAL_STATUS.add(KILLED);
1153            FINAL_STATUS.add(FAILED);
1154            FINAL_STATUS.add(FAILED_KILLED);
1155        }
1156    
1157        @Override
1158        public boolean isCompleted(String externalStatus) {
1159            return FINAL_STATUS.contains(externalStatus);
1160        }
1161    
1162    
1163        /**
1164         * Return the sharelib name for the action.
1165         * <p/>
1166         * If <code>NULL</code> or emtpy, it means that the action does not use the action
1167         * sharelib.
1168         * <p/>
1169         * If a non-empty string, i.e. <code>foo</code>, it means the action uses the
1170         * action sharelib subdirectory <code>foo</code> and all JARs in the sharelib
1171         * <code>foo</code> directory will be in the action classpath.
1172         * <p/>
1173         * The resolution is done using the following precedence order:
1174         * <ul>
1175         *     <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the action configuration</li>
1176         *     <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the job configuration</li>
1177         *     <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the oozie configuration</li>
1178         *     <li>Action Executor <code>getDefaultShareLibName()</code> method</li>
1179         * </ul>
1180         *
1181         *
1182         * @param context executor context.
1183         * @param actionXml
1184         *@param conf action configuration.  @return the action sharelib name.
1185         */
1186        protected String getShareLibName(Context context, Element actionXml, Configuration conf) {
1187            String name = conf.get(ACTION_SHARELIB_FOR + getType());
1188            if (name == null) {
1189                try {
1190                    XConfiguration jobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
1191                    name = jobConf.get(ACTION_SHARELIB_FOR + getType());
1192                    if (name == null) {
1193                        name = Services.get().getConf().get(ACTION_SHARELIB_FOR + getType());
1194                        if (name == null) {
1195                            name = getDefaultShareLibName(actionXml);
1196                        }
1197                    }
1198                }
1199                catch (IOException ex) {
1200                    throw new RuntimeException("It cannot happen, " + ex.toString(), ex);
1201                }
1202            }
1203            return name;
1204        }
1205    
1206        private final static String ACTION_SHARELIB_FOR = "oozie.action.sharelib.for.";
1207    
1208    
1209        /**
1210         * Returns the default sharelib name for the action if any.
1211         *
1212         * @param actionXml the action XML fragment.
1213         * @return the sharelib name for the action, <code>NULL</code> if none.
1214         */
1215        protected String getDefaultShareLibName(Element actionXml) {
1216            return null;
1217        }
1218    
1219        /**
1220         * Sets some data for the action on completion
1221         *
1222         * @param context executor context
1223         * @param actionFs the FileSystem object
1224         */
1225        protected void setActionCompletionData(Context context, FileSystem actionFs) throws IOException,
1226                HadoopAccessorException, URISyntaxException {
1227        }
1228    }