001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.io.BufferedReader;
021    import java.io.ByteArrayOutputStream;
022    import java.io.File;
023    import java.io.FileNotFoundException;
024    import java.io.IOException;
025    import java.io.InputStream;
026    import java.io.InputStreamReader;
027    import java.io.PrintStream;
028    import java.io.StringReader;
029    import java.net.ConnectException;
030    import java.net.URI;
031    import java.net.URISyntaxException;
032    import java.net.UnknownHostException;
033    import java.util.ArrayList;
034    import java.util.HashMap;
035    import java.util.HashSet;
036    import java.util.Iterator;
037    import java.util.List;
038    import java.util.Map;
039    import java.util.Properties;
040    import java.util.Set;
041    import java.util.Map.Entry;
042    
043    import org.apache.hadoop.conf.Configuration;
044    import org.apache.hadoop.filecache.DistributedCache;
045    import org.apache.hadoop.fs.FileStatus;
046    import org.apache.hadoop.fs.FileSystem;
047    import org.apache.hadoop.fs.Path;
048    import org.apache.hadoop.fs.permission.AccessControlException;
049    import org.apache.hadoop.mapred.JobClient;
050    import org.apache.hadoop.mapred.JobConf;
051    import org.apache.hadoop.mapred.JobID;
052    import org.apache.hadoop.mapred.RunningJob;
053    import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
054    import org.apache.hadoop.util.DiskChecker;
055    import org.apache.oozie.WorkflowActionBean;
056    import org.apache.oozie.WorkflowJobBean;
057    import org.apache.oozie.action.ActionExecutor;
058    import org.apache.oozie.action.ActionExecutorException;
059    import org.apache.oozie.client.OozieClient;
060    import org.apache.oozie.client.WorkflowAction;
061    import org.apache.oozie.service.HadoopAccessorException;
062    import org.apache.oozie.service.HadoopAccessorService;
063    import org.apache.oozie.service.Services;
064    import org.apache.oozie.service.URIHandlerService;
065    import org.apache.oozie.service.WorkflowAppService;
066    import org.apache.oozie.servlet.CallbackServlet;
067    import org.apache.oozie.util.ELEvaluator;
068    import org.apache.oozie.util.IOUtils;
069    import org.apache.oozie.util.PropertiesUtils;
070    import org.apache.oozie.util.XConfiguration;
071    import org.apache.oozie.util.XLog;
072    import org.apache.oozie.util.XmlUtils;
073    import org.jdom.Element;
074    import org.jdom.JDOMException;
075    import org.jdom.Namespace;
076    import org.apache.hadoop.security.token.Token;
077    import org.apache.hadoop.security.token.TokenIdentifier;
078    
079    public class JavaActionExecutor extends ActionExecutor {
080    
081        private static final String HADOOP_USER = "user.name";
082        public static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
083        public static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
084        public static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
085        public static final String HADOOP_NAME_NODE = "fs.default.name";
086        private static final String HADOOP_JOB_NAME = "mapred.job.name";
087        public static final String OOZIE_COMMON_LIBDIR = "oozie";
088        public static final int MAX_EXTERNAL_STATS_SIZE_DEFAULT = Integer.MAX_VALUE;
089        private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>();
090        public final static String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size";
091        public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
092        public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
093        private static final String HADOOP_YARN_UBER_MODE = "mapreduce.job.ubertask.enable";
094        public static final String OOZIE_ACTION_SHIP_LAUNCHER_JAR = "oozie.action.ship.launcher.jar";
095        private boolean useLauncherJar;
096        private static int maxActionOutputLen;
097        private static int maxExternalStatsSize;
098    
099        private static final String SUCCEEDED = "SUCCEEDED";
100        private static final String KILLED = "KILLED";
101        private static final String FAILED = "FAILED";
102        private static final String FAILED_KILLED = "FAILED/KILLED";
103        private static final String RUNNING = "RUNNING";
104        protected XLog log = XLog.getLog(getClass());
105    
106        static {
107            DISALLOWED_PROPERTIES.add(HADOOP_USER);
108            DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER);
109            DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE);
110            DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER_2);
111            DISALLOWED_PROPERTIES.add(HADOOP_YARN_RM);
112        }
113    
114        public JavaActionExecutor() {
115            this("java");
116        }
117    
118        protected JavaActionExecutor(String type) {
119            super(type);
120            requiresNNJT = true;
121            useLauncherJar = getOozieConf().getBoolean(OOZIE_ACTION_SHIP_LAUNCHER_JAR, true);
122        }
123    
124        protected String getLauncherJarName() {
125            return getType() + "-launcher.jar";
126        }
127    
128        protected List<Class> getLauncherClasses() {
129            List<Class> classes = new ArrayList<Class>();
130            classes.add(LauncherMapper.class);
131            classes.add(LauncherSecurityManager.class);
132            classes.add(LauncherException.class);
133            classes.add(LauncherMainException.class);
134            classes.add(PrepareActionsDriver.class);
135            classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher());
136            classes.add(ActionStats.class);
137            classes.add(ActionType.class);
138            return classes;
139        }
140    
141        @Override
142        public void initActionType() {
143            XLog log = XLog.getLog(getClass());
144            super.initActionType();
145            maxActionOutputLen = getOozieConf()
146              .getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA,
147              // TODO: Remove the below config get in a subsequent release..
148              // This other irrelevant property is only used to
149              // preserve backwards compatibility cause of a typo.
150              // See OOZIE-4.
151              getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN,
152                2 * 1024));
153            //Get the limit for the maximum allowed size of action stats
154            maxExternalStatsSize = getOozieConf().getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE, MAX_EXTERNAL_STATS_SIZE_DEFAULT);
155            maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize;
156    
157            createLauncherJar();
158    
159            registerError(UnknownHostException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA001");
160            registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.NON_TRANSIENT,
161                    "JA002");
162            registerError(DiskChecker.DiskOutOfSpaceException.class.getName(),
163                    ActionExecutorException.ErrorType.NON_TRANSIENT, "JA003");
164            registerError(org.apache.hadoop.hdfs.protocol.QuotaExceededException.class.getName(),
165                    ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004");
166            registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(),
167                    ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005");
168            registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "  JA006");
169            registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007");
170            registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008");
171            registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009");
172        }
173    
174        public void createLauncherJar() {
175            if (useLauncherJar) {
176                try {
177                    List<Class> classes = getLauncherClasses();
178                    Class[] launcherClasses = classes.toArray(new Class[classes.size()]);
179                    IOUtils.createJar(new File(getOozieRuntimeDir()), getLauncherJarName(), launcherClasses);
180                }
181                catch (IOException ex) {
182                    throw new RuntimeException(ex);
183                }
184                catch (java.lang.NoClassDefFoundError err) {
185                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
186                    err.printStackTrace(new PrintStream(baos));
187                    log.warn(baos.toString());
188                }
189             }
190        }
191    
192        /**
193         * Get the maximum allowed size of stats
194         *
195         * @return maximum size of stats
196         */
197        public static int getMaxExternalStatsSize() {
198            return maxExternalStatsSize;
199        }
200    
201        static void checkForDisallowedProps(Configuration conf, String confName) throws ActionExecutorException {
202            for (String prop : DISALLOWED_PROPERTIES) {
203                if (conf.get(prop) != null) {
204                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA010",
205                            "Property [{0}] not allowed in action [{1}] configuration", prop, confName);
206                }
207            }
208        }
209    
210        public JobConf createBaseHadoopConf(Context context, Element actionXml) {
211            Namespace ns = actionXml.getNamespace();
212            String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim();
213            String nameNode = actionXml.getChild("name-node", ns).getTextTrim();
214            JobConf conf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
215            conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER));
216            conf.set(HADOOP_JOB_TRACKER, jobTracker);
217            conf.set(HADOOP_JOB_TRACKER_2, jobTracker);
218            conf.set(HADOOP_YARN_RM, jobTracker);
219            conf.set(HADOOP_NAME_NODE, nameNode);
220            conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true");
221            return conf;
222        }
223    
224        private void injectLauncherProperties(Configuration srcConf, Configuration launcherConf) {
225            for (Map.Entry<String, String> entry : srcConf) {
226                if (entry.getKey().startsWith("oozie.launcher.")) {
227                    String name = entry.getKey().substring("oozie.launcher.".length());
228                    String value = entry.getValue();
229                    // setting original KEY
230                    launcherConf.set(entry.getKey(), value);
231                    // setting un-prefixed key (to allow Hadoop job config
232                    // for the launcher job
233                    launcherConf.set(name, value);
234                }
235            }
236        }
237    
238        Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
239                throws ActionExecutorException {
240            try {
241                Namespace ns = actionXml.getNamespace();
242                Element e = actionXml.getChild("configuration", ns);
243                if (e != null) {
244                    String strConf = XmlUtils.prettyPrint(e).toString();
245                    XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
246    
247                    XConfiguration launcherConf = new XConfiguration();
248                    HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
249                    XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_JOB_TRACKER), getType());
250                    injectLauncherProperties(actionDefaultConf, launcherConf);
251                    injectLauncherProperties(inlineConf, launcherConf);
252                    injectLauncherUseUberMode(launcherConf);
253                    checkForDisallowedProps(launcherConf, "launcher configuration");
254                    XConfiguration.copy(launcherConf, conf);
255                }
256                return conf;
257            }
258            catch (IOException ex) {
259                throw convertException(ex);
260            }
261        }
262    
263        void injectLauncherUseUberMode(Configuration launcherConf) {
264            // Set Uber Mode for the launcher (YARN only, ignored by MR1) if not set by action conf and not disabled in oozie-site
265            if (launcherConf.get(HADOOP_YARN_UBER_MODE) == null) {
266                if (getOozieConf().getBoolean("oozie.action.launcher.mapreduce.job.ubertask.enable", false)) {
267                    launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
268                }
269            }
270        }
271    
272        public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf)
273                throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException {
274            Namespace ns = element.getNamespace();
275            Iterator<Element> it = element.getChildren("job-xml", ns).iterator();
276            while (it.hasNext()) {
277                Element e = it.next();
278                String jobXml = e.getTextTrim();
279                Path path = new Path(appPath, jobXml);
280                FileSystem fs = context.getAppFileSystem();
281                Configuration jobXmlConf = new XConfiguration(fs.open(path));
282                checkForDisallowedProps(jobXmlConf, "job-xml");
283                XConfiguration.copy(jobXmlConf, conf);
284            }
285            Element e = element.getChild("configuration", ns);
286            if (e != null) {
287                String strConf = XmlUtils.prettyPrint(e).toString();
288                XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
289                checkForDisallowedProps(inlineConf, "inline configuration");
290                XConfiguration.copy(inlineConf, conf);
291            }
292        }
293    
294        Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
295                throws ActionExecutorException {
296            try {
297                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
298                XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_JOB_TRACKER), getType());
299                XConfiguration.injectDefaults(actionDefaults, actionConf);
300    
301                has.checkSupportedFilesystem(appPath.toUri());
302    
303                parseJobXmlAndConfiguration(context, actionXml, appPath, actionConf);
304                return actionConf;
305            }
306            catch (IOException ex) {
307                throw convertException(ex);
308            }
309            catch (HadoopAccessorException ex) {
310                throw convertException(ex);
311            }
312            catch (URISyntaxException ex) {
313                throw convertException(ex);
314            }
315        }
316    
317        Configuration addToCache(Configuration conf, Path appPath, String filePath, boolean archive)
318                throws ActionExecutorException {
319    
320            URI uri = null;
321            try {
322                uri = new URI(filePath);
323                URI baseUri = appPath.toUri();
324                if (uri.getScheme() == null) {
325                    String resolvedPath = uri.getPath();
326                    if (!resolvedPath.startsWith("/")) {
327                        resolvedPath = baseUri.getPath() + "/" + resolvedPath;
328                    }
329                    uri = new URI(baseUri.getScheme(), baseUri.getAuthority(), resolvedPath, uri.getQuery(), uri.getFragment());
330                }
331                if (archive) {
332                    DistributedCache.addCacheArchive(uri.normalize(), conf);
333                }
334                else {
335                    String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);
336                    if (fileName.endsWith(".so") || fileName.contains(".so.")) { // .so files
337                        uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), uri.getQuery(), fileName);
338                        DistributedCache.addCacheFile(uri.normalize(), conf);
339                    }
340                    else if (fileName.endsWith(".jar")) { // .jar files
341                        if (!fileName.contains("#")) {
342                            String user = conf.get("user.name");
343                            Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, new Path(uri.normalize()), conf);
344                        }
345                        else {
346                            DistributedCache.addCacheFile(uri, conf);
347                        }
348                    }
349                    else { // regular files
350                        if (!fileName.contains("#")) {
351                            uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), uri.getQuery(), fileName);
352                        }
353                        DistributedCache.addCacheFile(uri.normalize(), conf);
354                    }
355                }
356                DistributedCache.createSymlink(conf);
357                return conf;
358            }
359            catch (Exception ex) {
360                XLog.getLog(getClass()).debug(
361                        "Errors when add to DistributedCache. Path=" + uri.toString() + ", archive=" + archive + ", conf="
362                                + XmlUtils.prettyPrint(conf).toString());
363                throw convertException(ex);
364            }
365        }
366    
367        String getOozieLauncherJar(Context context) throws ActionExecutorException {
368            try {
369                return new Path(context.getActionDir(), getLauncherJarName()).toString();
370            }
371            catch (Exception ex) {
372                throw convertException(ex);
373            }
374        }
375    
376        public void prepareActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
377            try {
378                Path actionDir = context.getActionDir();
379                Path tempActionDir = new Path(actionDir.getParent(), actionDir.getName() + ".tmp");
380                if (!actionFs.exists(actionDir)) {
381                    try {
382                        if (useLauncherJar) {
383                            actionFs.copyFromLocalFile(new Path(getOozieRuntimeDir(), getLauncherJarName()), new Path(
384                                    tempActionDir, getLauncherJarName()));
385                        }
386                        else {
387                            actionFs.mkdirs(tempActionDir);
388                        }
389                        actionFs.rename(tempActionDir, actionDir);
390                    }
391                    catch (IOException ex) {
392                        actionFs.delete(tempActionDir, true);
393                        actionFs.delete(actionDir, true);
394                        throw ex;
395                    }
396                }
397            }
398            catch (Exception ex) {
399                throw convertException(ex);
400            }
401        }
402    
403        void cleanUpActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
404            try {
405                Path actionDir = context.getActionDir();
406                if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)
407                        && actionFs.exists(actionDir)) {
408                    actionFs.delete(actionDir, true);
409                }
410            }
411            catch (Exception ex) {
412                throw convertException(ex);
413            }
414        }
415    
416        protected void addShareLib(Path appPath, Configuration conf, String[] actionShareLibNames)
417                throws ActionExecutorException {
418            if (actionShareLibNames != null) {
419                for (String actionShareLibName : actionShareLibNames) {
420                    try {
421                        Path systemLibPath = Services.get().get(WorkflowAppService.class).getSystemLibPath();
422                        if (systemLibPath != null) {
423                            Path actionLibPath = new Path(systemLibPath, actionShareLibName.trim());
424                            String user = conf.get("user.name");
425                            FileSystem fs;
426                            // If the actionLibPath has a valid scheme and authority, then use them to
427                            // determine the filesystem that the sharelib resides on; otherwise, assume
428                            // it resides on the same filesystem as the appPath and use the appPath to
429                            // determine the filesystem
430                            if (actionLibPath.toUri().getScheme() != null && actionLibPath.toUri().getAuthority() != null) {
431                                fs = Services.get().get(HadoopAccessorService.class)
432                                        .createFileSystem(user, actionLibPath.toUri(), conf);
433                            }
434                            else {
435                                fs = Services.get().get(HadoopAccessorService.class)
436                                        .createFileSystem(user, appPath.toUri(), conf);
437                            }
438                            if (fs.exists(actionLibPath)) {
439                                FileStatus[] files = fs.listStatus(actionLibPath);
440                                for (FileStatus file : files) {
441                                    addToCache(conf, actionLibPath, file.getPath().toUri().getPath(), false);
442                                }
443                            }
444                        }
445                    }
446                    catch (HadoopAccessorException ex) {
447                        throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, ex.getErrorCode()
448                                .toString(), ex.getMessage());
449                    }
450                    catch (IOException ex) {
451                        throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
452                                "It should never happen", ex.getMessage());
453                    }
454                }
455            }
456        }
457    
458        protected void addActionLibs(Path appPath, Configuration conf) throws ActionExecutorException {
459            String[] actionLibsStrArr = conf.getStrings("oozie.launcher.oozie.libpath");
460            if (actionLibsStrArr != null) {
461                try {
462                    for (String actionLibsStr : actionLibsStrArr) {
463                        actionLibsStr = actionLibsStr.trim();
464                        if (actionLibsStr.length() > 0)
465                        {
466                            Path actionLibsPath = new Path(actionLibsStr);
467                            String user = conf.get("user.name");
468                            FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf);
469                            if (fs.exists(actionLibsPath)) {
470                                FileStatus[] files = fs.listStatus(actionLibsPath);
471                                for (FileStatus file : files) {
472                                    addToCache(conf, appPath, file.getPath().toUri().getPath(), false);
473                                }
474                            }
475                        }
476                    }
477                }
478                catch (HadoopAccessorException ex){
479                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
480                            ex.getErrorCode().toString(), ex.getMessage());
481                }
482                catch (IOException ex){
483                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
484                            "It should never happen", ex.getMessage());
485                }
486            }
487        }
488    
489        @SuppressWarnings("unchecked")
490        void setLibFilesArchives(Context context, Element actionXml, Path appPath, Configuration conf)
491                throws ActionExecutorException {
492            Configuration proto = context.getProtoActionConf();
493    
494            // launcher JAR
495            if (useLauncherJar) {
496                addToCache(conf, appPath, getOozieLauncherJar(context), false);
497            }
498    
499            // Workflow lib/
500            String[] paths = proto.getStrings(WorkflowAppService.APP_LIB_PATH_LIST);
501            if (paths != null) {
502                for (String path : paths) {
503                    addToCache(conf, appPath, path, false);
504                }
505            }
506    
507            // Action libs
508            addActionLibs(appPath, conf);
509    
510            // files and archives defined in the action
511            for (Element eProp : (List<Element>) actionXml.getChildren()) {
512                if (eProp.getName().equals("file")) {
513                    String[] filePaths = eProp.getTextTrim().split(",");
514                    for (String path : filePaths) {
515                        addToCache(conf, appPath, path.trim(), false);
516                    }
517                }
518                else if (eProp.getName().equals("archive")) {
519                    String[] archivePaths = eProp.getTextTrim().split(",");
520                    for (String path : archivePaths){
521                        addToCache(conf, appPath, path.trim(), true);
522                    }
523                }
524            }
525    
526            addAllShareLibs(appPath, conf, context, actionXml);
527            }
528    
529        // Adds action specific share libs and common share libs
530        private void addAllShareLibs(Path appPath, Configuration conf, Context context, Element actionXml)
531                throws ActionExecutorException {
532            // Add action specific share libs
533            addActionShareLib(appPath, conf, context, actionXml);
534            // Add common sharelibs for Oozie
535            addShareLib(appPath, conf, new String[]{JavaActionExecutor.OOZIE_COMMON_LIBDIR});
536        }
537    
538        private void addActionShareLib(Path appPath, Configuration conf, Context context, Element actionXml)
539                throws ActionExecutorException {
540            XConfiguration wfJobConf = null;
541            try {
542                wfJobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
543            }
544            catch (IOException ioe) {
545                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen",
546                        ioe.getMessage());
547            }
548            // Action sharelibs are only added if user has specified to use system libpath
549            if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) {
550                // add action specific sharelibs
551                addShareLib(appPath, conf, getShareLibNames(context, actionXml, conf));
552            }
553        }
554    
555    
556        protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
557            Namespace ns = actionXml.getNamespace();
558            Element e = actionXml.getChild("main-class", ns);
559            return e.getTextTrim();
560        }
561    
562        private static final String QUEUE_NAME = "mapred.job.queue.name";
563    
564        private static final Set<String> SPECIAL_PROPERTIES = new HashSet<String>();
565    
566        static {
567            SPECIAL_PROPERTIES.add(QUEUE_NAME);
568            SPECIAL_PROPERTIES.add(ACL_VIEW_JOB);
569            SPECIAL_PROPERTIES.add(ACL_MODIFY_JOB);
570        }
571    
572        @SuppressWarnings("unchecked")
573        JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf)
574                throws ActionExecutorException {
575            try {
576    
577                // app path could be a file
578                Path appPathRoot = new Path(context.getWorkflow().getAppPath());
579                if (actionFs.isFile(appPathRoot)) {
580                    appPathRoot = appPathRoot.getParent();
581                }
582    
583                // launcher job configuration
584                JobConf launcherJobConf = createBaseHadoopConf(context, actionXml);
585                setupLauncherConf(launcherJobConf, actionXml, appPathRoot, context);
586    
587                String actionShareLibProperty = actionConf.get(ACTION_SHARELIB_FOR + getType());
588                if (actionShareLibProperty != null) {
589                    launcherJobConf.set(ACTION_SHARELIB_FOR + getType(), actionShareLibProperty);
590                }
591                setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf);
592    
593                String jobName = launcherJobConf.get(HADOOP_JOB_NAME);
594                if (jobName == null || jobName.isEmpty()) {
595                    jobName = XLog.format(
596                            "oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(),
597                            context.getWorkflow().getAppName(), action.getName(),
598                            context.getWorkflow().getId());
599                launcherJobConf.setJobName(jobName);
600                }
601    
602                String jobId = context.getWorkflow().getId();
603                String actionId = action.getId();
604                Path actionDir = context.getActionDir();
605                String recoveryId = context.getRecoveryId();
606    
607                // Getting the prepare XML from the action XML
608                Namespace ns = actionXml.getNamespace();
609                Element prepareElement = actionXml.getChild("prepare", ns);
610                String prepareXML = "";
611                if (prepareElement != null) {
612                    if (prepareElement.getChildren().size() > 0) {
613                        prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim();
614                    }
615                }
616                LauncherMapperHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf,
617                        prepareXML);
618    
619                LauncherMapperHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
620                LauncherMapperHelper.setupLauncherURIHandlerConf(launcherJobConf);
621                LauncherMapperHelper.setupMaxOutputData(launcherJobConf, maxActionOutputLen);
622                LauncherMapperHelper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize);
623    
624                List<Element> list = actionXml.getChildren("arg", ns);
625                String[] args = new String[list.size()];
626                for (int i = 0; i < list.size(); i++) {
627                    args[i] = list.get(i).getTextTrim();
628                }
629                LauncherMapperHelper.setupMainArguments(launcherJobConf, args);
630    
631                List<Element> javaopts = actionXml.getChildren("java-opt", ns);
632                for (Element opt: javaopts) {
633                    String opts = launcherJobConf.get("mapred.child.java.opts", "");
634                    opts = opts + " " + opt.getTextTrim();
635                    opts = opts.trim();
636                    launcherJobConf.set("mapred.child.java.opts", opts);
637                }
638    
639                Element opt = actionXml.getChild("java-opts", ns);
640                if (opt != null) {
641                    String opts = launcherJobConf.get("mapred.child.java.opts", "");
642                    opts = opts + " " + opt.getTextTrim();
643                    opts = opts.trim();
644                    launcherJobConf.set("mapred.child.java.opts", opts);
645                }
646    
647                // properties from action that are needed by the launcher (e.g. QUEUE NAME, ACLs)
648                // maybe we should add queue to the WF schema, below job-tracker
649                actionConfToLauncherConf(actionConf, launcherJobConf);
650    
651                // to disable cancelation of delegation token on launcher job end
652                launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
653    
654                return launcherJobConf;
655            }
656            catch (Exception ex) {
657                throw convertException(ex);
658            }
659        }
660    
661        private void injectCallback(Context context, Configuration conf) {
662            String callback = context.getCallbackUrl("$jobStatus");
663            if (conf.get("job.end.notification.url") != null) {
664                XLog.getLog(getClass()).warn("Overriding the action job end notification URI");
665            }
666            conf.set("job.end.notification.url", callback);
667        }
668    
669        void injectActionCallback(Context context, Configuration actionConf) {
670            injectCallback(context, actionConf);
671        }
672    
673        void injectLauncherCallback(Context context, Configuration launcherConf) {
674            injectCallback(context, launcherConf);
675        }
676    
677        private void actionConfToLauncherConf(Configuration actionConf, JobConf launcherConf) {
678            for (String name : SPECIAL_PROPERTIES) {
679                if (actionConf.get(name) != null && launcherConf.get("oozie.launcher." + name) == null) {
680                    launcherConf.set(name, actionConf.get(name));
681                }
682            }
683        }
684    
685        public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
686            JobClient jobClient = null;
687            boolean exception = false;
688            try {
689                Path appPathRoot = new Path(context.getWorkflow().getAppPath());
690    
691                // app path could be a file
692                if (actionFs.isFile(appPathRoot)) {
693                    appPathRoot = appPathRoot.getParent();
694                }
695    
696                Element actionXml = XmlUtils.parseXml(action.getConf());
697    
698                // action job configuration
699                Configuration actionConf = createBaseHadoopConf(context, actionXml);
700                setupActionConf(actionConf, context, actionXml, appPathRoot);
701                XLog.getLog(getClass()).debug("Setting LibFilesArchives ");
702                setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
703    
704                String jobName = actionConf.get(HADOOP_JOB_NAME);
705                if (jobName == null || jobName.isEmpty()) {
706                    jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}",
707                            getType(), context.getWorkflow().getAppName(),
708                            action.getName(), context.getWorkflow().getId());
709                    actionConf.set(HADOOP_JOB_NAME, jobName);
710                }
711    
712                injectActionCallback(context, actionConf);
713    
714                if(actionConf.get(ACL_MODIFY_JOB) == null || actionConf.get(ACL_MODIFY_JOB).trim().equals("")) {
715                    // ONLY in the case where user has not given the
716                    // modify-job ACL specifically
717                    if (context.getWorkflow().getAcl() != null) {
718                        // setting the group owning the Oozie job to allow anybody in that
719                        // group to modify the jobs.
720                        actionConf.set(ACL_MODIFY_JOB, context.getWorkflow().getAcl());
721                    }
722                }
723    
724                // Setting the credential properties in launcher conf
725                HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context,
726                        action, actionConf);
727    
728                // Adding if action need to set more credential tokens
729                JobConf credentialsConf = new JobConf(false);
730                XConfiguration.copy(actionConf, credentialsConf);
731                setCredentialTokens(credentialsConf, context, action, credentialsProperties);
732    
733                // insert conf to action conf from credentialsConf
734                for (Entry<String, String> entry : credentialsConf) {
735                    if (actionConf.get(entry.getKey()) == null) {
736                        actionConf.set(entry.getKey(), entry.getValue());
737                    }
738                }
739    
740                JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
741                injectLauncherCallback(context, launcherJobConf);
742                XLog.getLog(getClass()).debug("Creating Job Client for action " + action.getId());
743                jobClient = createJobClient(context, launcherJobConf);
744                String launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
745                        .getRecoveryId());
746                boolean alreadyRunning = launcherId != null;
747                RunningJob runningJob;
748    
749                // if user-retry is on, always submit new launcher
750                boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry();
751    
752                if (alreadyRunning && !isUserRetry) {
753                    runningJob = jobClient.getJob(JobID.forName(launcherId));
754                    if (runningJob == null) {
755                        String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
756                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
757                                "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker);
758                    }
759                }
760                else {
761                    XLog.getLog(getClass()).debug("Submitting the job through Job Client for action " + action.getId());
762    
763                    // setting up propagation of the delegation token.
764                    HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
765                    Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(has
766                            .getMRDelegationTokenRenewer(launcherJobConf));
767                    launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt);
768    
769                    // insert credentials tokens to launcher job conf if needed
770                    if (needInjectCredentials()) {
771                        for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) {
772                            log.debug("ADDING TOKEN: " + tk.getKind().toString());
773                            launcherJobConf.getCredentials().addToken(tk.getKind(), tk);
774                        }
775                    }
776                    else {
777                        log.info("No need to inject credentials.");
778                    }
779                    runningJob = jobClient.submitJob(launcherJobConf);
780                    if (runningJob == null) {
781                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
782                                "Error submitting launcher for action [{0}]", action.getId());
783                    }
784                    launcherId = runningJob.getID().toString();
785                    XLog.getLog(getClass()).debug("After submission get the launcherId " + launcherId);
786                }
787    
788                String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
789                String consoleUrl = runningJob.getTrackingURL();
790                context.setStartData(launcherId, jobTracker, consoleUrl);
791            }
792            catch (Exception ex) {
793                exception = true;
794                throw convertException(ex);
795            }
796            finally {
797                if (jobClient != null) {
798                    try {
799                        jobClient.close();
800                    }
801                    catch (Exception e) {
802                        if (exception) {
803                            log.error("JobClient error: ", e);
804                        }
805                        else {
806                            throw convertException(e);
807                        }
808                    }
809                }
810            }
811        }
812    
813        private boolean needInjectCredentials() {
814            boolean methodExists = true;
815    
816            Class klass;
817            try {
818                klass = Class.forName("org.apache.hadoop.mapred.JobConf");
819                klass.getMethod("getCredentials");
820            }
821            catch (ClassNotFoundException ex) {
822                methodExists = false;
823            }
824            catch (NoSuchMethodException ex) {
825                methodExists = false;
826            }
827    
828            return methodExists;
829        }
830    
831        protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context,
832                WorkflowAction action, Configuration actionConf) throws Exception {
833            HashMap<String, CredentialsProperties> credPropertiesMap = null;
834            if (context != null && action != null) {
835                credPropertiesMap = getActionCredentialsProperties(context, action);
836                if (credPropertiesMap != null) {
837                    for (String key : credPropertiesMap.keySet()) {
838                        CredentialsProperties prop = credPropertiesMap.get(key);
839                        if (prop != null) {
840                            log.debug("Credential Properties set for action : " + action.getId());
841                            for (String property : prop.getProperties().keySet()) {
842                                actionConf.set(property, prop.getProperties().get(property));
843                                log.debug("property : '" + property + "', value : '" + prop.getProperties().get(property) + "'");
844                            }
845                        }
846                    }
847                }
848                else {
849                    log.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred());
850                }
851            }
852            else {
853                log.warn("context or action is null");
854            }
855            return credPropertiesMap;
856        }
857    
858        protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action,
859                HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception {
860    
861            if (context != null && action != null && credPropertiesMap != null) {
862                for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) {
863                    String credName = entry.getKey();
864                    CredentialsProperties credProps = entry.getValue();
865                    if (credProps != null) {
866                        CredentialsProvider credProvider = new CredentialsProvider(credProps.getType());
867                        Credentials credentialObject = credProvider.createCredentialObject();
868                        if (credentialObject != null) {
869                            credentialObject.addtoJobConf(jobconf, credProps, context);
870                            log.debug("Retrieved Credential '" + credName + "' for action " + action.getId());
871                        }
872                        else {
873                            log.debug("Credentials object is null for name= " + credName + ", type=" + credProps.getType());
874                        }
875                    }
876                }
877            }
878    
879        }
880    
881        protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context,
882                WorkflowAction action) throws Exception {
883            HashMap<String, CredentialsProperties> props = new HashMap<String, CredentialsProperties>();
884            if (context != null && action != null) {
885                String credsInAction = action.getCred();
886                log.debug("Get credential '" + credsInAction + "' properties for action : " + action.getId());
887                String[] credNames = credsInAction.split(",");
888                for (String credName : credNames) {
889                    CredentialsProperties credProps = getCredProperties(context, credName);
890                    props.put(credName, credProps);
891                }
892            }
893            else {
894                log.warn("context or action is null");
895            }
896            return props;
897        }
898    
899        @SuppressWarnings("unchecked")
900        protected CredentialsProperties getCredProperties(Context context, String credName)
901                throws Exception {
902            CredentialsProperties credProp = null;
903            String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition();
904            XConfiguration wfjobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
905            Element elementJob = XmlUtils.parseXml(workflowXml);
906            Element credentials = elementJob.getChild("credentials", elementJob.getNamespace());
907            if (credentials != null) {
908                for (Element credential : (List<Element>) credentials.getChildren("credential", credentials.getNamespace())) {
909                    String name = credential.getAttributeValue("name");
910                    String type = credential.getAttributeValue("type");
911                    log.debug("getCredProperties: Name: " + name + ", Type: " + type);
912                    if (name.equalsIgnoreCase(credName)) {
913                        credProp = new CredentialsProperties(name, type);
914                        for (Element property : (List<Element>) credential.getChildren("property",
915                                credential.getNamespace())) {
916                            String propertyName = property.getChildText("name", property.getNamespace());
917                            String propertyValue = property.getChildText("value", property.getNamespace());
918                            ELEvaluator eval = new ELEvaluator();
919                            for (Map.Entry<String, String> entry : wfjobConf) {
920                                eval.setVariable(entry.getKey(), entry.getValue().trim());
921                            }
922                            propertyName = eval.evaluate(propertyName, String.class);
923                            propertyValue = eval.evaluate(propertyValue, String.class);
924    
925                            credProp.getProperties().put(propertyName, propertyValue);
926                            log.debug("getCredProperties: Properties name :'" + propertyName + "', Value : '"
927                                    + propertyValue + "'");
928                        }
929                    }
930                }
931            } else {
932                log.warn("credentials is null for the action");
933            }
934            return credProp;
935        }
936    
937        @Override
938        public void start(Context context, WorkflowAction action) throws ActionExecutorException {
939            try {
940                XLog.getLog(getClass()).debug("Starting action " + action.getId() + " getting Action File System");
941                FileSystem actionFs = context.getAppFileSystem();
942                XLog.getLog(getClass()).debug("Preparing action Dir through copying " + context.getActionDir());
943                prepareActionDir(actionFs, context);
944                XLog.getLog(getClass()).debug("Action Dir is ready. Submitting the action ");
945                submitLauncher(actionFs, context, action);
946                XLog.getLog(getClass()).debug("Action submit completed. Performing check ");
947                check(context, action);
948                XLog.getLog(getClass()).debug("Action check is done after submission");
949            }
950            catch (Exception ex) {
951                throw convertException(ex);
952            }
953        }
954    
955        @Override
956        public void end(Context context, WorkflowAction action) throws ActionExecutorException {
957            try {
958                String externalStatus = action.getExternalStatus();
959                WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ? WorkflowAction.Status.OK
960                        : WorkflowAction.Status.ERROR;
961                context.setEndData(status, getActionSignal(status));
962            }
963            catch (Exception ex) {
964                throw convertException(ex);
965            }
966            finally {
967                try {
968                    FileSystem actionFs = context.getAppFileSystem();
969                    cleanUpActionDir(actionFs, context);
970                }
971                catch (Exception ex) {
972                    throw convertException(ex);
973                }
974            }
975        }
976    
977        /**
978         * Create job client object
979         *
980         * @param context
981         * @param jobConf
982         * @return
983         * @throws HadoopAccessorException
984         */
985        protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException {
986            String user = context.getWorkflow().getUser();
987            String group = context.getWorkflow().getGroup();
988            return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
989        }
990    
991        protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{
992            RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
993            return runningJob;
994        }
995    
996        @Override
997        public void check(Context context, WorkflowAction action) throws ActionExecutorException {
998            JobClient jobClient = null;
999            boolean exception = false;
1000            try {
1001                Element actionXml = XmlUtils.parseXml(action.getConf());
1002                FileSystem actionFs = context.getAppFileSystem();
1003                JobConf jobConf = createBaseHadoopConf(context, actionXml);
1004                jobClient = createJobClient(context, jobConf);
1005                RunningJob runningJob = getRunningJob(context, action, jobClient);
1006                if (runningJob == null) {
1007                    context.setExternalStatus(FAILED);
1008                    context.setExecutionData(FAILED, null);
1009                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
1010                            "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", action
1011                                    .getExternalId(), action.getId());
1012                }
1013                if (runningJob.isComplete()) {
1014                    Path actionDir = context.getActionDir();
1015    
1016                    String user = context.getWorkflow().getUser();
1017                    String group = context.getWorkflow().getGroup();
1018                    if (LauncherMapperHelper.hasIdSwap(runningJob, user, group, actionDir)) {
1019                        String launcherId = action.getExternalId();
1020                        Path idSwapPath = LauncherMapperHelper.getIdSwapPath(context.getActionDir());
1021                        InputStream is = actionFs.open(idSwapPath);
1022                        BufferedReader reader = new BufferedReader(new InputStreamReader(is));
1023                        Properties props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
1024                        reader.close();
1025                        String newId = props.getProperty("id");
1026                        runningJob = jobClient.getJob(JobID.forName(newId));
1027                        if (runningJob == null) {
1028                            context.setExternalStatus(FAILED);
1029                            throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
1030                                    "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", newId,
1031                                    action.getId());
1032                        }
1033    
1034                        context.setExternalChildIDs(newId);
1035                        XLog.getLog(getClass()).info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId,
1036                                newId);
1037                    }
1038                    if (runningJob.isComplete()) {
1039                        XLog.getLog(getClass()).info(XLog.STD, "action completed, external ID [{0}]",
1040                                action.getExternalId());
1041                        if (runningJob.isSuccessful() && LauncherMapperHelper.isMainSuccessful(runningJob)) {
1042                            getActionData(actionFs, runningJob, action, context);
1043                            XLog.getLog(getClass()).info(XLog.STD, "action produced output");
1044                        }
1045                        else {
1046                            XLog log = XLog.getLog(getClass());
1047                            String errorReason;
1048                            Path actionError = LauncherMapperHelper.getErrorPath(context.getActionDir());
1049                            if (actionFs.exists(actionError)) {
1050                                InputStream is = actionFs.open(actionError);
1051                                BufferedReader reader = new BufferedReader(new InputStreamReader(is));
1052                                Properties props = PropertiesUtils.readProperties(reader, -1);
1053                                reader.close();
1054                                String errorCode = props.getProperty("error.code");
1055                                if (errorCode.equals("0")) {
1056                                    errorCode = "JA018";
1057                                }
1058                                if (errorCode.equals("-1")) {
1059                                    errorCode = "JA019";
1060                                }
1061                                errorReason = props.getProperty("error.reason");
1062                                log.warn("Launcher ERROR, reason: {0}", errorReason);
1063                                String exMsg = props.getProperty("exception.message");
1064                                String errorInfo = (exMsg != null) ? exMsg : errorReason;
1065                                context.setErrorInfo(errorCode, errorInfo);
1066                                String exStackTrace = props.getProperty("exception.stacktrace");
1067                                if (exMsg != null) {
1068                                    log.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
1069                                }
1070                            }
1071                            else {
1072                                errorReason = XLog.format("LauncherMapper died, check Hadoop log for job [{0}:{1}]", action
1073                                        .getTrackerUri(), action.getExternalId());
1074                                log.warn(errorReason);
1075                            }
1076                            context.setExecutionData(FAILED_KILLED, null);
1077                            setActionCompletionData(context, actionFs);
1078                        }
1079                    }
1080                    else {
1081                        context.setExternalStatus(RUNNING);
1082                        XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]",
1083                                action.getExternalId(), action.getExternalStatus());
1084                    }
1085                }
1086                else {
1087                    context.setExternalStatus(RUNNING);
1088                    XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]",
1089                            action.getExternalId(), action.getExternalStatus());
1090                }
1091            }
1092            catch (Exception ex) {
1093                XLog.getLog(getClass()).warn("Exception in check(). Message[{0}]", ex.getMessage(), ex);
1094                exception = true;
1095                throw convertException(ex);
1096            }
1097            finally {
1098                if (jobClient != null) {
1099                    try {
1100                        jobClient.close();
1101                    }
1102                    catch (Exception e) {
1103                        if (exception) {
1104                            log.error("JobClient error: ", e);
1105                        }
1106                        else {
1107                            throw convertException(e);
1108                        }
1109                    }
1110                }
1111            }
1112        }
1113    
1114        /**
1115         * Get the output data of an action. Subclasses should override this method
1116         * to get action specific output data.
1117         *
1118         * @param actionFs the FileSystem object
1119         * @param runningJob the runningJob
1120         * @param action the Workflow action
1121         * @param context executor context
1122         *
1123         */
1124        protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
1125                throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
1126            Properties props = null;
1127            if (getCaptureOutput(action)) {
1128                props = new Properties();
1129                if (LauncherMapperHelper.hasOutputData(runningJob)) {
1130                    Path actionOutput = LauncherMapperHelper.getOutputDataPath(context.getActionDir());
1131                    InputStream is = actionFs.open(actionOutput);
1132                    BufferedReader reader = new BufferedReader(new InputStreamReader(is));
1133                    props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
1134                    reader.close();
1135                }
1136            }
1137            context.setExecutionData(SUCCEEDED, props);
1138        }
1139    
1140        protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
1141            Element eConf = XmlUtils.parseXml(action.getConf());
1142            Namespace ns = eConf.getNamespace();
1143            Element captureOutput = eConf.getChild("capture-output", ns);
1144            return captureOutput != null;
1145        }
1146    
1147        @Override
1148        public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
1149            JobClient jobClient = null;
1150            boolean exception = false;
1151            try {
1152                Element actionXml = XmlUtils.parseXml(action.getConf());
1153                JobConf jobConf = createBaseHadoopConf(context, actionXml);
1154                jobClient = createJobClient(context, jobConf);
1155                RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
1156                if (runningJob != null) {
1157                    runningJob.killJob();
1158                }
1159                context.setExternalStatus(KILLED);
1160                context.setExecutionData(KILLED, null);
1161            }
1162            catch (Exception ex) {
1163                exception = true;
1164                throw convertException(ex);
1165            }
1166            finally {
1167                try {
1168                    FileSystem actionFs = context.getAppFileSystem();
1169                    cleanUpActionDir(actionFs, context);
1170                    if (jobClient != null) {
1171                        jobClient.close();
1172                    }
1173                }
1174                catch (Exception ex) {
1175                    if (exception) {
1176                        log.error("Error: ", ex);
1177                    }
1178                    else {
1179                        throw convertException(ex);
1180                    }
1181                }
1182            }
1183        }
1184    
1185        private static Set<String> FINAL_STATUS = new HashSet<String>();
1186    
1187        static {
1188            FINAL_STATUS.add(SUCCEEDED);
1189            FINAL_STATUS.add(KILLED);
1190            FINAL_STATUS.add(FAILED);
1191            FINAL_STATUS.add(FAILED_KILLED);
1192        }
1193    
1194        @Override
1195        public boolean isCompleted(String externalStatus) {
1196            return FINAL_STATUS.contains(externalStatus);
1197        }
1198    
1199    
1200        /**
1201         * Return the sharelib names for the action.
1202         * <p/>
1203         * If <code>NULL</code> or empty, it means that the action does not use the action
1204         * sharelib.
1205         * <p/>
1206         * If a non-empty string, i.e. <code>foo</code>, it means the action uses the
1207         * action sharelib sub-directory <code>foo</code> and all JARs in the sharelib
1208         * <code>foo</code> directory will be in the action classpath. Multiple sharelib
1209         * sub-directories can be specified as a comma separated list.
1210         * <p/>
1211         * The resolution is done using the following precedence order:
1212         * <ul>
1213         *     <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the action configuration</li>
1214         *     <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the job configuration</li>
1215         *     <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the oozie configuration</li>
1216         *     <li>Action Executor <code>getDefaultShareLibName()</code> method</li>
1217         * </ul>
1218         *
1219         *
1220         * @param context executor context.
1221         * @param actionXml
1222         * @param conf action configuration.
1223         * @return the action sharelib names.
1224         */
1225        protected String[] getShareLibNames(Context context, Element actionXml, Configuration conf) {
1226            String[] names = conf.getStrings(ACTION_SHARELIB_FOR + getType());
1227            if (names == null || names.length == 0) {
1228                try {
1229                    XConfiguration jobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
1230                    names = jobConf.getStrings(ACTION_SHARELIB_FOR + getType());
1231                    if (names == null || names.length == 0) {
1232                        names = Services.get().getConf().getStrings(ACTION_SHARELIB_FOR + getType());
1233                        if (names == null || names.length == 0) {
1234                            String name = getDefaultShareLibName(actionXml);
1235                            if (name != null) {
1236                                names = new String[] { name };
1237                            }
1238                        }
1239                    }
1240                }
1241                catch (IOException ex) {
1242                    throw new RuntimeException("It cannot happen, " + ex.toString(), ex);
1243                }
1244            }
1245            return names;
1246        }
1247    
1248        private final static String ACTION_SHARELIB_FOR = "oozie.action.sharelib.for.";
1249    
1250    
1251        /**
1252         * Returns the default sharelib name for the action if any.
1253         *
1254         * @param actionXml the action XML fragment.
1255         * @return the sharelib name for the action, <code>NULL</code> if none.
1256         */
1257        protected String getDefaultShareLibName(Element actionXml) {
1258            return null;
1259        }
1260    
1261        /**
1262         * Sets some data for the action on completion
1263         *
1264         * @param context executor context
1265         * @param actionFs the FileSystem object
1266         */
1267        protected void setActionCompletionData(Context context, FileSystem actionFs) throws IOException,
1268                HadoopAccessorException, URISyntaxException {
1269        }
1270    }