001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.action.hadoop;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.StringReader;
024import java.net.ConnectException;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.net.UnknownHostException;
028import java.security.PrivilegedExceptionAction;
029import java.text.MessageFormat;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Map;
037import java.util.Map.Entry;
038import java.util.Properties;
039import java.util.Set;
040import java.util.regex.Matcher;
041import java.util.regex.Pattern;
042
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.filecache.DistributedCache;
045import org.apache.hadoop.fs.FileStatus;
046import org.apache.hadoop.fs.FileSystem;
047import org.apache.hadoop.fs.Path;
048import org.apache.hadoop.fs.permission.AccessControlException;
049import org.apache.oozie.hadoop.utils.HadoopShims;
050import org.apache.hadoop.io.Text;
051import org.apache.hadoop.mapred.JobClient;
052import org.apache.hadoop.mapred.JobConf;
053import org.apache.hadoop.mapred.JobID;
054import org.apache.hadoop.mapred.RunningJob;
055import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
056import org.apache.hadoop.security.UserGroupInformation;
057import org.apache.hadoop.security.token.Token;
058import org.apache.hadoop.security.token.TokenIdentifier;
059import org.apache.hadoop.util.DiskChecker;
060import org.apache.oozie.WorkflowActionBean;
061import org.apache.oozie.WorkflowJobBean;
062import org.apache.oozie.action.ActionExecutor;
063import org.apache.oozie.action.ActionExecutorException;
064import org.apache.oozie.client.OozieClient;
065import org.apache.oozie.client.WorkflowAction;
066import org.apache.oozie.client.WorkflowJob;
067import org.apache.oozie.command.coord.CoordActionStartXCommand;
068import org.apache.oozie.service.ConfigurationService;
069import org.apache.oozie.service.HadoopAccessorException;
070import org.apache.oozie.service.HadoopAccessorService;
071import org.apache.oozie.service.Services;
072import org.apache.oozie.service.ShareLibService;
073import org.apache.oozie.service.URIHandlerService;
074import org.apache.oozie.service.UserGroupInformationService;
075import org.apache.oozie.service.WorkflowAppService;
076import org.apache.oozie.util.ELEvaluationException;
077import org.apache.oozie.util.ELEvaluator;
078import org.apache.oozie.util.JobUtils;
079import org.apache.oozie.util.LogUtils;
080import org.apache.oozie.util.PropertiesUtils;
081import org.apache.oozie.util.XConfiguration;
082import org.apache.oozie.util.XLog;
083import org.apache.oozie.util.XmlUtils;
084import org.jdom.Element;
085import org.jdom.JDOMException;
086import org.jdom.Namespace;
087
088
089public class JavaActionExecutor extends ActionExecutor {
090
091    protected static final String HADOOP_USER = "user.name";
092    public static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
093    public static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
094    public static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
095    public static final String HADOOP_NAME_NODE = "fs.default.name";
096    private static final String HADOOP_JOB_NAME = "mapred.job.name";
097    public static final String OOZIE_COMMON_LIBDIR = "oozie";
098    private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>();
099    public final static String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size";
100    public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
101    public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
102    public static final String HADOOP_YARN_TIMELINE_SERVICE_ENABLED = "yarn.timeline-service.enabled";
103    public static final String HADOOP_YARN_UBER_MODE = "mapreduce.job.ubertask.enable";
104    public static final String HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART = "oozie.action.launcher.am.restart.kill.childjobs";
105    public static final String HADOOP_MAP_MEMORY_MB = "mapreduce.map.memory.mb";
106    public static final String HADOOP_CHILD_JAVA_OPTS = "mapred.child.java.opts";
107    public static final String HADOOP_MAP_JAVA_OPTS = "mapreduce.map.java.opts";
108    public static final String HADOOP_REDUCE_JAVA_OPTS = "mapreduce.reduce.java.opts";
109    public static final String HADOOP_CHILD_JAVA_ENV = "mapred.child.env";
110    public static final String HADOOP_MAP_JAVA_ENV = "mapreduce.map.env";
111    public static final String YARN_AM_RESOURCE_MB = "yarn.app.mapreduce.am.resource.mb";
112    public static final String YARN_AM_COMMAND_OPTS = "yarn.app.mapreduce.am.command-opts";
113    public static final String YARN_AM_ENV = "yarn.app.mapreduce.am.env";
114    private static final String JAVA_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.JavaMain";
115    public static final int YARN_MEMORY_MB_MIN = 512;
116    private static int maxActionOutputLen;
117    private static int maxExternalStatsSize;
118    private static int maxFSGlobMax;
119    private static final String SUCCEEDED = "SUCCEEDED";
120    private static final String KILLED = "KILLED";
121    private static final String FAILED = "FAILED";
122    private static final String FAILED_KILLED = "FAILED/KILLED";
123    protected XLog LOG = XLog.getLog(getClass());
124    private static final Pattern heapPattern = Pattern.compile("-Xmx(([0-9]+)[mMgG])");
125    private static final String JAVA_TMP_DIR_SETTINGS = "-Djava.io.tmpdir=";
126    public static final String CONF_HADOOP_YARN_UBER_MODE = "oozie.action.launcher." + HADOOP_YARN_UBER_MODE;
127    public static final String HADOOP_JOB_CLASSLOADER = "mapreduce.job.classloader";
128    public static final String HADOOP_USER_CLASSPATH_FIRST = "mapreduce.user.classpath.first";
129    public static final String OOZIE_CREDENTIALS_SKIP = "oozie.credentials.skip";
130
131    public XConfiguration workflowConf = null;
132
133    static {
134        DISALLOWED_PROPERTIES.add(HADOOP_USER);
135        DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER);
136        DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE);
137        DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER_2);
138        DISALLOWED_PROPERTIES.add(HADOOP_YARN_RM);
139    }
140
141    public JavaActionExecutor() {
142        this("java");
143    }
144
145    protected JavaActionExecutor(String type) {
146        super(type);
147    }
148
149    public static List<Class> getCommonLauncherClasses() {
150        List<Class> classes = new ArrayList<Class>();
151        classes.add(LauncherMapper.class);
152        classes.add(OozieLauncherInputFormat.class);
153        classes.add(OozieLauncherOutputFormat.class);
154        classes.add(OozieLauncherOutputCommitter.class);
155        classes.add(LauncherMainHadoopUtils.class);
156        classes.add(HadoopShims.class);
157        classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher());
158        return classes;
159    }
160
161    public List<Class> getLauncherClasses() {
162       List<Class> classes = new ArrayList<Class>();
163        try {
164            classes.add(Class.forName(JAVA_MAIN_CLASS_NAME));
165        }
166        catch (ClassNotFoundException e) {
167            throw new RuntimeException("Class not found", e);
168        }
169        return classes;
170    }
171
172    @Override
173    public void initActionType() {
174        super.initActionType();
175        maxActionOutputLen = ConfigurationService.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA);
176        //Get the limit for the maximum allowed size of action stats
177        maxExternalStatsSize = ConfigurationService.getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE);
178        maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize;
179        //Get the limit for the maximum number of globbed files/dirs for FS operation
180        maxFSGlobMax = ConfigurationService.getInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX);
181
182        registerError(UnknownHostException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA001");
183        registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.NON_TRANSIENT,
184                "JA002");
185        registerError(DiskChecker.DiskOutOfSpaceException.class.getName(),
186                ActionExecutorException.ErrorType.NON_TRANSIENT, "JA003");
187        registerError(org.apache.hadoop.hdfs.protocol.QuotaExceededException.class.getName(),
188                ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004");
189        registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(),
190                ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005");
191        registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "  JA006");
192        registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007");
193        registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008");
194        registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009");
195    }
196
197
198    /**
199     * Get the maximum allowed size of stats
200     *
201     * @return maximum size of stats
202     */
203    public static int getMaxExternalStatsSize() {
204        return maxExternalStatsSize;
205    }
206
207    static void checkForDisallowedProps(Configuration conf, String confName) throws ActionExecutorException {
208        for (String prop : DISALLOWED_PROPERTIES) {
209            if (conf.get(prop) != null) {
210                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA010",
211                        "Property [{0}] not allowed in action [{1}] configuration", prop, confName);
212            }
213        }
214    }
215
216    public JobConf createBaseHadoopConf(Context context, Element actionXml) {
217        return createBaseHadoopConf(context, actionXml, true);
218    }
219
220    protected JobConf createBaseHadoopConf(Context context, Element actionXml, boolean loadResources) {
221        Namespace ns = actionXml.getNamespace();
222        String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim();
223        String nameNode = actionXml.getChild("name-node", ns).getTextTrim();
224        JobConf conf = null;
225        if (loadResources) {
226            conf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
227        }
228        else {
229            conf = new JobConf(false);
230        }
231        conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER));
232        conf.set(HADOOP_JOB_TRACKER, jobTracker);
233        conf.set(HADOOP_JOB_TRACKER_2, jobTracker);
234        conf.set(HADOOP_YARN_RM, jobTracker);
235        conf.set(HADOOP_NAME_NODE, nameNode);
236        conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true");
237        return conf;
238    }
239
240    protected JobConf loadHadoopDefaultResources(Context context, Element actionXml) {
241        return createBaseHadoopConf(context, actionXml);
242    }
243
244    private static void injectLauncherProperties(Configuration srcConf, Configuration launcherConf) {
245        for (Map.Entry<String, String> entry : srcConf) {
246            if (entry.getKey().startsWith("oozie.launcher.")) {
247                String name = entry.getKey().substring("oozie.launcher.".length());
248                String value = entry.getValue();
249                // setting original KEY
250                launcherConf.set(entry.getKey(), value);
251                // setting un-prefixed key (to allow Hadoop job config
252                // for the launcher job
253                launcherConf.set(name, value);
254            }
255        }
256    }
257
258    Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
259            throws ActionExecutorException {
260        try {
261            Namespace ns = actionXml.getNamespace();
262            XConfiguration launcherConf = new XConfiguration();
263            // Inject action defaults for launcher
264            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
265            XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_JOB_TRACKER), getType());
266            injectLauncherProperties(actionDefaultConf, launcherConf);
267            // Inject <job-xml> and <configuration> for launcher
268            try {
269                parseJobXmlAndConfiguration(context, actionXml, appPath, launcherConf, true);
270            } catch (HadoopAccessorException ex) {
271                throw convertException(ex);
272            } catch (URISyntaxException ex) {
273                throw convertException(ex);
274            }
275            // Inject use uber mode for launcher
276            injectLauncherUseUberMode(launcherConf);
277            XConfiguration.copy(launcherConf, conf);
278            checkForDisallowedProps(launcherConf, "launcher configuration");
279            // Inject config-class for launcher to use for action
280            Element e = actionXml.getChild("config-class", ns);
281            if (e != null) {
282                conf.set(LauncherMapper.OOZIE_ACTION_CONFIG_CLASS, e.getTextTrim());
283            }
284            return conf;
285        }
286        catch (IOException ex) {
287            throw convertException(ex);
288        }
289    }
290
291    void injectLauncherUseUberMode(Configuration launcherConf) {
292        // Set Uber Mode for the launcher (YARN only, ignored by MR1)
293        // Priority:
294        // 1. action's <configuration>
295        // 2. oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable
296        // 3. oozie.action.launcher.mapreduce.job.ubertask.enable
297        if (launcherConf.get(HADOOP_YARN_UBER_MODE) == null) {
298            if (ConfigurationService.get("oozie.action." + getType() + ".launcher." + HADOOP_YARN_UBER_MODE).length() > 0) {
299                if (ConfigurationService.getBoolean("oozie.action." + getType() + ".launcher." + HADOOP_YARN_UBER_MODE)) {
300                    launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
301                }
302            } else {
303                if (ConfigurationService.getBoolean("oozie.action.launcher." + HADOOP_YARN_UBER_MODE)) {
304                    launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
305                }
306            }
307        }
308    }
309
310    void injectLauncherTimelineServiceEnabled(Configuration launcherConf, Configuration actionConf) {
311        // Getting delegation token for ATS. If tez-site.xml is present in distributed cache, turn on timeline service.
312        if (actionConf.get("oozie.launcher." + HADOOP_YARN_TIMELINE_SERVICE_ENABLED) == null
313                && ConfigurationService.getBoolean("oozie.action.launcher." + HADOOP_YARN_TIMELINE_SERVICE_ENABLED)) {
314            String cacheFiles = launcherConf.get("mapred.cache.files");
315            if (cacheFiles != null && cacheFiles.contains("tez-site.xml")) {
316                launcherConf.setBoolean(HADOOP_YARN_TIMELINE_SERVICE_ENABLED, true);
317            }
318        }
319    }
320
321    void updateConfForUberMode(Configuration launcherConf) {
322
323        // child.env
324        boolean hasConflictEnv = false;
325        String launcherMapEnv = launcherConf.get(HADOOP_MAP_JAVA_ENV);
326        if (launcherMapEnv == null) {
327            launcherMapEnv = launcherConf.get(HADOOP_CHILD_JAVA_ENV);
328        }
329        String amEnv = launcherConf.get(YARN_AM_ENV);
330        StringBuffer envStr = new StringBuffer();
331        HashMap<String, List<String>> amEnvMap = null;
332        HashMap<String, List<String>> launcherMapEnvMap = null;
333        if (amEnv != null) {
334            envStr.append(amEnv);
335            amEnvMap = populateEnvMap(amEnv);
336        }
337        if (launcherMapEnv != null) {
338            launcherMapEnvMap = populateEnvMap(launcherMapEnv);
339            if (amEnvMap != null) {
340                Iterator<String> envKeyItr = launcherMapEnvMap.keySet().iterator();
341                while (envKeyItr.hasNext()) {
342                    String envKey = envKeyItr.next();
343                    if (amEnvMap.containsKey(envKey)) {
344                        List<String> amValList = amEnvMap.get(envKey);
345                        List<String> launcherValList = launcherMapEnvMap.get(envKey);
346                        Iterator<String> valItr = launcherValList.iterator();
347                        while (valItr.hasNext()) {
348                            String val = valItr.next();
349                            if (!amValList.contains(val)) {
350                                hasConflictEnv = true;
351                                break;
352                            }
353                            else {
354                                valItr.remove();
355                            }
356                        }
357                        if (launcherValList.isEmpty()) {
358                            envKeyItr.remove();
359                        }
360                    }
361                }
362            }
363        }
364        if (hasConflictEnv) {
365            launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, false);
366        }
367        else {
368            if (launcherMapEnvMap != null) {
369                for (String key : launcherMapEnvMap.keySet()) {
370                    List<String> launcherValList = launcherMapEnvMap.get(key);
371                    for (String val : launcherValList) {
372                        if (envStr.length() > 0) {
373                            envStr.append(",");
374                        }
375                        envStr.append(key).append("=").append(val);
376                    }
377                }
378            }
379
380            launcherConf.set(YARN_AM_ENV, envStr.toString());
381
382            // memory.mb
383            int launcherMapMemoryMB = launcherConf.getInt(HADOOP_MAP_MEMORY_MB, 1536);
384            int amMemoryMB = launcherConf.getInt(YARN_AM_RESOURCE_MB, 1536);
385            // YARN_MEMORY_MB_MIN to provide buffer.
386            // suppose launcher map aggressively use high memory, need some
387            // headroom for AM
388            int memoryMB = Math.max(launcherMapMemoryMB, amMemoryMB) + YARN_MEMORY_MB_MIN;
389            // limit to 4096 in case of 32 bit
390            if (launcherMapMemoryMB < 4096 && amMemoryMB < 4096 && memoryMB > 4096) {
391                memoryMB = 4096;
392            }
393            launcherConf.setInt(YARN_AM_RESOURCE_MB, memoryMB);
394
395            // We already made mapred.child.java.opts and
396            // mapreduce.map.java.opts equal, so just start with one of them
397            String launcherMapOpts = launcherConf.get(HADOOP_MAP_JAVA_OPTS, "");
398            String amChildOpts = launcherConf.get(YARN_AM_COMMAND_OPTS);
399            StringBuilder optsStr = new StringBuilder();
400            int heapSizeForMap = extractHeapSizeMB(launcherMapOpts);
401            int heapSizeForAm = extractHeapSizeMB(amChildOpts);
402            int heapSize = Math.max(heapSizeForMap, heapSizeForAm) + YARN_MEMORY_MB_MIN;
403            // limit to 3584 in case of 32 bit
404            if (heapSizeForMap < 4096 && heapSizeForAm < 4096 && heapSize > 3584) {
405                heapSize = 3584;
406            }
407            if (amChildOpts != null) {
408                optsStr.append(amChildOpts);
409            }
410            optsStr.append(" ").append(launcherMapOpts.trim());
411            if (heapSize > 0) {
412                // append calculated total heap size to the end
413                optsStr.append(" ").append("-Xmx").append(heapSize).append("m");
414            }
415            launcherConf.set(YARN_AM_COMMAND_OPTS, optsStr.toString().trim());
416        }
417    }
418
419    void updateConfForJavaTmpDir(Configuration conf) {
420        String amChildOpts = conf.get(YARN_AM_COMMAND_OPTS);
421        String oozieJavaTmpDirSetting = "-Djava.io.tmpdir=./tmp";
422        if (amChildOpts != null && !amChildOpts.contains(JAVA_TMP_DIR_SETTINGS)) {
423            conf.set(YARN_AM_COMMAND_OPTS, amChildOpts + " " + oozieJavaTmpDirSetting);
424        }
425    }
426
427    private HashMap<String, List<String>> populateEnvMap(String input) {
428        HashMap<String, List<String>> envMaps = new HashMap<String, List<String>>();
429        String[] envEntries = input.split(",");
430        for (String envEntry : envEntries) {
431            String[] envKeyVal = envEntry.split("=");
432            String envKey = envKeyVal[0].trim();
433            List<String> valList = envMaps.get(envKey);
434            if (valList == null) {
435                valList = new ArrayList<String>();
436            }
437            valList.add(envKeyVal[1].trim());
438            envMaps.put(envKey, valList);
439        }
440        return envMaps;
441    }
442
443    public int extractHeapSizeMB(String input) {
444        int ret = 0;
445        if(input == null || input.equals(""))
446            return ret;
447        Matcher m = heapPattern.matcher(input);
448        String heapStr = null;
449        String heapNum = null;
450        // Grabs the last match which takes effect (in case that multiple Xmx options specified)
451        while (m.find()) {
452            heapStr = m.group(1);
453            heapNum = m.group(2);
454        }
455        if (heapStr != null) {
456            // when Xmx specified in Gigabyte
457            if(heapStr.endsWith("g") || heapStr.endsWith("G")) {
458                ret = Integer.parseInt(heapNum) * 1024;
459            } else {
460                ret = Integer.parseInt(heapNum);
461            }
462        }
463        return ret;
464    }
465
466    public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf)
467            throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException {
468        parseJobXmlAndConfiguration(context, element, appPath, conf, false);
469    }
470
471    public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf,
472            boolean isLauncher) throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException {
473        Namespace ns = element.getNamespace();
474        Iterator<Element> it = element.getChildren("job-xml", ns).iterator();
475        HashMap<String, FileSystem> filesystemsMap = new HashMap<String, FileSystem>();
476        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
477        while (it.hasNext()) {
478            Element e = it.next();
479            String jobXml = e.getTextTrim();
480            Path pathSpecified = new Path(jobXml);
481            Path path = pathSpecified.isAbsolute() ? pathSpecified : new Path(appPath, jobXml);
482            FileSystem fs;
483            if (filesystemsMap.containsKey(path.toUri().getAuthority())) {
484              fs = filesystemsMap.get(path.toUri().getAuthority());
485            }
486            else {
487              if (path.toUri().getAuthority() != null) {
488                fs = has.createFileSystem(context.getWorkflow().getUser(), path.toUri(),
489                        has.createJobConf(path.toUri().getAuthority()));
490              }
491              else {
492                fs = context.getAppFileSystem();
493              }
494              filesystemsMap.put(path.toUri().getAuthority(), fs);
495            }
496            Configuration jobXmlConf = new XConfiguration(fs.open(path));
497            try {
498                String jobXmlConfString = XmlUtils.prettyPrint(jobXmlConf).toString();
499                jobXmlConfString = XmlUtils.removeComments(jobXmlConfString);
500                jobXmlConfString = context.getELEvaluator().evaluate(jobXmlConfString, String.class);
501                jobXmlConf = new XConfiguration(new StringReader(jobXmlConfString));
502            }
503            catch (ELEvaluationException ex) {
504                throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, "EL_EVAL_ERROR", ex
505                        .getMessage(), ex);
506            }
507            catch (Exception ex) {
508                context.setErrorInfo("EL_ERROR", ex.getMessage());
509            }
510            checkForDisallowedProps(jobXmlConf, "job-xml");
511            if (isLauncher) {
512                injectLauncherProperties(jobXmlConf, conf);
513            } else {
514                XConfiguration.copy(jobXmlConf, conf);
515            }
516        }
517        Element e = element.getChild("configuration", ns);
518        if (e != null) {
519            String strConf = XmlUtils.prettyPrint(e).toString();
520            XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
521            checkForDisallowedProps(inlineConf, "inline configuration");
522            if (isLauncher) {
523                injectLauncherProperties(inlineConf, conf);
524            } else {
525                XConfiguration.copy(inlineConf, conf);
526            }
527        }
528    }
529
530    Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
531            throws ActionExecutorException {
532        try {
533            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
534            XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_JOB_TRACKER), getType());
535            XConfiguration.injectDefaults(actionDefaults, actionConf);
536            has.checkSupportedFilesystem(appPath.toUri());
537
538            // Set the Java Main Class for the Java action to give to the Java launcher
539            setJavaMain(actionConf, actionXml);
540
541            parseJobXmlAndConfiguration(context, actionXml, appPath, actionConf);
542
543            // set cancel.delegation.token in actionConf that child job doesn't cancel delegation token
544            actionConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
545            updateConfForJavaTmpDir(actionConf);
546            setRootLoggerLevel(actionConf);
547            return actionConf;
548        }
549        catch (IOException ex) {
550            throw convertException(ex);
551        }
552        catch (HadoopAccessorException ex) {
553            throw convertException(ex);
554        }
555        catch (URISyntaxException ex) {
556            throw convertException(ex);
557        }
558    }
559
560    /**
561     * Set root log level property in actionConf
562     * @param actionConf
563     */
564    void setRootLoggerLevel(Configuration actionConf) {
565        String oozieActionTypeRootLogger = "oozie.action." + getType() + LauncherMapper.ROOT_LOGGER_LEVEL;
566        String oozieActionRootLogger = "oozie.action." + LauncherMapper.ROOT_LOGGER_LEVEL;
567
568        // check if root log level has already mentioned in action configuration
569        String rootLogLevel = actionConf.get(oozieActionTypeRootLogger, actionConf.get(oozieActionRootLogger));
570        if (rootLogLevel != null) {
571            // root log level is mentioned in action configuration
572            return;
573        }
574
575        // set the root log level which is mentioned in oozie default
576        rootLogLevel = ConfigurationService.get(oozieActionTypeRootLogger);
577        if (rootLogLevel != null && rootLogLevel.length() > 0) {
578            actionConf.set(oozieActionRootLogger, rootLogLevel);
579        }
580        else {
581            rootLogLevel = ConfigurationService.get(oozieActionRootLogger);
582            if (rootLogLevel != null && rootLogLevel.length() > 0) {
583                actionConf.set(oozieActionRootLogger, rootLogLevel);
584            }
585        }
586    }
587
588    Configuration addToCache(Configuration conf, Path appPath, String filePath, boolean archive)
589            throws ActionExecutorException {
590
591        URI uri = null;
592        try {
593            uri = new URI(filePath);
594            URI baseUri = appPath.toUri();
595            if (uri.getScheme() == null) {
596                String resolvedPath = uri.getPath();
597                if (!resolvedPath.startsWith("/")) {
598                    resolvedPath = baseUri.getPath() + "/" + resolvedPath;
599                }
600                uri = new URI(baseUri.getScheme(), baseUri.getAuthority(), resolvedPath, uri.getQuery(), uri.getFragment());
601            }
602            if (archive) {
603                DistributedCache.addCacheArchive(uri.normalize(), conf);
604            }
605            else {
606                String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);
607                if (fileName.endsWith(".so") || fileName.contains(".so.")) { // .so files
608                    uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), uri.getQuery(), fileName);
609                    DistributedCache.addCacheFile(uri.normalize(), conf);
610                }
611                else if (fileName.endsWith(".jar")) { // .jar files
612                    if (!fileName.contains("#")) {
613                        String user = conf.get("user.name");
614                        Path pathToAdd = new Path(uri.normalize());
615                        Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, pathToAdd, conf);
616                    }
617                    else {
618                        DistributedCache.addCacheFile(uri.normalize(), conf);
619                    }
620                }
621                else { // regular files
622                    if (!fileName.contains("#")) {
623                        uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), uri.getQuery(), fileName);
624                    }
625                    DistributedCache.addCacheFile(uri.normalize(), conf);
626                }
627            }
628            DistributedCache.createSymlink(conf);
629            return conf;
630        }
631        catch (Exception ex) {
632            LOG.debug(
633                    "Errors when add to DistributedCache. Path=" + uri.toString() + ", archive=" + archive + ", conf="
634                            + XmlUtils.prettyPrint(conf).toString());
635            throw convertException(ex);
636        }
637    }
638
639    public void prepareActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
640        try {
641            Path actionDir = context.getActionDir();
642            Path tempActionDir = new Path(actionDir.getParent(), actionDir.getName() + ".tmp");
643            if (!actionFs.exists(actionDir)) {
644                try {
645                    actionFs.mkdirs(tempActionDir);
646                    actionFs.rename(tempActionDir, actionDir);
647                }
648                catch (IOException ex) {
649                    actionFs.delete(tempActionDir, true);
650                    actionFs.delete(actionDir, true);
651                    throw ex;
652                }
653            }
654        }
655        catch (Exception ex) {
656            throw convertException(ex);
657        }
658    }
659
660    void cleanUpActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
661        try {
662            Path actionDir = context.getActionDir();
663            if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)
664                    && actionFs.exists(actionDir)) {
665                actionFs.delete(actionDir, true);
666            }
667        }
668        catch (Exception ex) {
669            throw convertException(ex);
670        }
671    }
672
673    protected void addShareLib(Configuration conf, String[] actionShareLibNames)
674            throws ActionExecutorException {
675        Set<String> confSet = new HashSet<String>(Arrays.asList(getShareLibFilesForActionConf() == null ? new String[0]
676                : getShareLibFilesForActionConf()));
677
678        Set<Path> sharelibList = new HashSet<Path>();
679
680        if (actionShareLibNames != null) {
681            try {
682                ShareLibService shareLibService = Services.get().get(ShareLibService.class);
683                FileSystem fs = shareLibService.getFileSystem();
684                if (fs != null) {
685                    for (String actionShareLibName : actionShareLibNames) {
686                        List<Path> listOfPaths = shareLibService.getShareLibJars(actionShareLibName);
687                        if (listOfPaths != null && !listOfPaths.isEmpty()) {
688                            for (Path actionLibPath : listOfPaths) {
689                                String fragmentName = new URI(actionLibPath.toString()).getFragment();
690                                String fileName = fragmentName == null ? actionLibPath.getName() : fragmentName;
691                                if (confSet.contains(fileName)) {
692                                    Configuration jobXmlConf = shareLibService.getShareLibConf(actionShareLibName,
693                                            actionLibPath);
694                                    if (jobXmlConf != null) {
695                                        checkForDisallowedProps(jobXmlConf, actionLibPath.getName());
696                                        XConfiguration.injectDefaults(jobXmlConf, conf);
697                                        LOG.trace("Adding properties of " + actionLibPath + " to job conf");
698                                    }
699                                }
700                                else {
701                                    // Filtering out duplicate jars or files
702                                    sharelibList.add(new Path(actionLibPath.toUri()) {
703                                        @Override
704                                        public int hashCode() {
705                                            return getName().hashCode();
706                                        }
707                                        @Override
708                                        public String getName() {
709                                            try {
710                                                return (new URI(toString())).getFragment() == null ? new Path(toUri()).getName()
711                                                        : (new URI(toString())).getFragment();
712                                            }
713                                            catch (URISyntaxException e) {
714                                                throw new RuntimeException(e);
715                                            }
716                                        }
717                                        @Override
718                                        public boolean equals(Object input) {
719                                            if (input == null) {
720                                                return false;
721                                            }
722                                            if (input == this) {
723                                                return true;
724                                            }
725                                            if (!(input instanceof Path)) {
726                                                return false;
727                                            }
728                                            return getName().equals(((Path) input).getName());
729                                        }
730                                    });
731                                }
732                            }
733                        }
734                    }
735                }
736                for (Path libPath : sharelibList) {
737                    addToCache(conf, libPath, libPath.toUri().getPath(), false);
738                }
739            }
740            catch (URISyntaxException ex) {
741                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "Error configuring sharelib",
742                        ex.getMessage());
743            }
744            catch (IOException ex) {
745                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen",
746                        ex.getMessage());
747            }
748        }
749    }
750
751    protected void addSystemShareLibForAction(Configuration conf) throws ActionExecutorException {
752        ShareLibService shareLibService = Services.get().get(ShareLibService.class);
753        // ShareLibService is null for test cases
754        if (shareLibService != null) {
755            try {
756                List<Path> listOfPaths = shareLibService.getSystemLibJars(JavaActionExecutor.OOZIE_COMMON_LIBDIR);
757                if (listOfPaths == null || listOfPaths.isEmpty()) {
758                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "EJ001",
759                            "Could not locate Oozie sharelib");
760                }
761                FileSystem fs = listOfPaths.get(0).getFileSystem(conf);
762                for (Path actionLibPath : listOfPaths) {
763                    JobUtils.addFileToClassPath(actionLibPath, conf, fs);
764                    DistributedCache.createSymlink(conf);
765                }
766                listOfPaths = shareLibService.getSystemLibJars(getType());
767                if (listOfPaths != null) {
768                    for (Path actionLibPath : listOfPaths) {
769                        JobUtils.addFileToClassPath(actionLibPath, conf, fs);
770                        DistributedCache.createSymlink(conf);
771                    }
772                }
773            }
774            catch (IOException ex) {
775                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen",
776                        ex.getMessage());
777            }
778        }
779    }
780
781    protected void addActionLibs(Path appPath, Configuration conf) throws ActionExecutorException {
782        String[] actionLibsStrArr = conf.getStrings("oozie.launcher.oozie.libpath");
783        if (actionLibsStrArr != null) {
784            try {
785                for (String actionLibsStr : actionLibsStrArr) {
786                    actionLibsStr = actionLibsStr.trim();
787                    if (actionLibsStr.length() > 0)
788                    {
789                        Path actionLibsPath = new Path(actionLibsStr);
790                        String user = conf.get("user.name");
791                        FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf);
792                        if (fs.exists(actionLibsPath)) {
793                            FileStatus[] files = fs.listStatus(actionLibsPath);
794                            for (FileStatus file : files) {
795                                addToCache(conf, appPath, file.getPath().toUri().getPath(), false);
796                            }
797                        }
798                    }
799                }
800            }
801            catch (HadoopAccessorException ex){
802                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
803                        ex.getErrorCode().toString(), ex.getMessage());
804            }
805            catch (IOException ex){
806                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
807                        "It should never happen", ex.getMessage());
808            }
809        }
810    }
811
812    @SuppressWarnings("unchecked")
813    public void setLibFilesArchives(Context context, Element actionXml, Path appPath, Configuration conf)
814            throws ActionExecutorException {
815        Configuration proto = context.getProtoActionConf();
816
817        // Workflow lib/
818        String[] paths = proto.getStrings(WorkflowAppService.APP_LIB_PATH_LIST);
819        if (paths != null) {
820            for (String path : paths) {
821                addToCache(conf, appPath, path, false);
822            }
823        }
824
825        // Action libs
826        addActionLibs(appPath, conf);
827
828        // files and archives defined in the action
829        for (Element eProp : (List<Element>) actionXml.getChildren()) {
830            if (eProp.getName().equals("file")) {
831                String[] filePaths = eProp.getTextTrim().split(",");
832                for (String path : filePaths) {
833                    addToCache(conf, appPath, path.trim(), false);
834                }
835            }
836            else if (eProp.getName().equals("archive")) {
837                String[] archivePaths = eProp.getTextTrim().split(",");
838                for (String path : archivePaths){
839                    addToCache(conf, appPath, path.trim(), true);
840                }
841            }
842        }
843
844        addAllShareLibs(appPath, conf, context, actionXml);
845    }
846
847    // Adds action specific share libs and common share libs
848    private void addAllShareLibs(Path appPath, Configuration conf, Context context, Element actionXml)
849            throws ActionExecutorException {
850        // Add action specific share libs
851        addActionShareLib(appPath, conf, context, actionXml);
852        // Add common sharelibs for Oozie and launcher jars
853        addSystemShareLibForAction(conf);
854    }
855
856    private void addActionShareLib(Path appPath, Configuration conf, Context context, Element actionXml)
857            throws ActionExecutorException {
858        XConfiguration wfJobConf = null;
859        try {
860            wfJobConf = getWorkflowConf(context);
861        }
862        catch (IOException ioe) {
863            throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen",
864                    ioe.getMessage());
865        }
866        // Action sharelibs are only added if user has specified to use system libpath
867        if (conf.get(OozieClient.USE_SYSTEM_LIBPATH) == null) {
868            if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH,
869                    ConfigurationService.getBoolean(OozieClient.USE_SYSTEM_LIBPATH))) {
870                // add action specific sharelibs
871                addShareLib(conf, getShareLibNames(context, actionXml, conf));
872            }
873        }
874        else {
875            if (conf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) {
876                // add action specific sharelibs
877                addShareLib(conf, getShareLibNames(context, actionXml, conf));
878            }
879        }
880    }
881
882
883    protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
884        return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, JavaMain.class.getName());
885    }
886
887    private void setJavaMain(Configuration actionConf, Element actionXml) {
888        Namespace ns = actionXml.getNamespace();
889        Element e = actionXml.getChild("main-class", ns);
890        if (e != null) {
891            actionConf.set(JavaMain.JAVA_MAIN_CLASS, e.getTextTrim());
892        }
893    }
894
895    private static final String QUEUE_NAME = "mapred.job.queue.name";
896
897    private static final Set<String> SPECIAL_PROPERTIES = new HashSet<String>();
898
899    static {
900        SPECIAL_PROPERTIES.add(QUEUE_NAME);
901        SPECIAL_PROPERTIES.add(ACL_VIEW_JOB);
902        SPECIAL_PROPERTIES.add(ACL_MODIFY_JOB);
903    }
904
905    @SuppressWarnings("unchecked")
906    JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf)
907            throws ActionExecutorException {
908        try {
909
910            // app path could be a file
911            Path appPathRoot = new Path(context.getWorkflow().getAppPath());
912            if (actionFs.isFile(appPathRoot)) {
913                appPathRoot = appPathRoot.getParent();
914            }
915
916            // launcher job configuration
917            JobConf launcherJobConf = createBaseHadoopConf(context, actionXml);
918            // cancel delegation token on a launcher job which stays alive till child job(s) finishes
919            // otherwise (in mapred action), doesn't cancel not to disturb running child job
920            launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true);
921            setupLauncherConf(launcherJobConf, actionXml, appPathRoot, context);
922
923            // Properties for when a launcher job's AM gets restarted
924            if (ConfigurationService.getBoolean(HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART)) {
925                // launcher time filter is required to prune the search of launcher tag.
926                // Setting coordinator action nominal time as launcher time as it child job cannot launch before nominal
927                // time. Workflow created time is good enough when workflow is running independently or workflow is
928                // rerunning from failed node.
929                long launcherTime = System.currentTimeMillis();
930                String coordActionNominalTime = context.getProtoActionConf().get(
931                        CoordActionStartXCommand.OOZIE_COORD_ACTION_NOMINAL_TIME);
932                if (coordActionNominalTime != null) {
933                    launcherTime = Long.parseLong(coordActionNominalTime);
934                }
935                else if (context.getWorkflow().getCreatedTime() != null) {
936                    launcherTime = context.getWorkflow().getCreatedTime().getTime();
937                }
938                String actionYarnTag = getActionYarnTag(getWorkflowConf(context), context.getWorkflow(), action);
939                LauncherMapperHelper.setupYarnRestartHandling(launcherJobConf, actionConf, actionYarnTag, launcherTime);
940            }
941            else {
942                LOG.info(MessageFormat.format("{0} is set to false, not setting YARN restart properties",
943                        HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART));
944            }
945
946            String actionShareLibProperty = actionConf.get(ACTION_SHARELIB_FOR + getType());
947            if (actionShareLibProperty != null) {
948                launcherJobConf.set(ACTION_SHARELIB_FOR + getType(), actionShareLibProperty);
949            }
950            setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf);
951
952            String jobName = launcherJobConf.get(HADOOP_JOB_NAME);
953            if (jobName == null || jobName.isEmpty()) {
954                jobName = XLog.format(
955                        "oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(),
956                        context.getWorkflow().getAppName(), action.getName(),
957                        context.getWorkflow().getId());
958            launcherJobConf.setJobName(jobName);
959            }
960
961            // Inject Oozie job information if enabled.
962            injectJobInfo(launcherJobConf, actionConf, context, action);
963
964            injectLauncherCallback(context, launcherJobConf);
965
966            String jobId = context.getWorkflow().getId();
967            String actionId = action.getId();
968            Path actionDir = context.getActionDir();
969            String recoveryId = context.getRecoveryId();
970
971            // Getting the prepare XML from the action XML
972            Namespace ns = actionXml.getNamespace();
973            Element prepareElement = actionXml.getChild("prepare", ns);
974            String prepareXML = "";
975            if (prepareElement != null) {
976                if (prepareElement.getChildren().size() > 0) {
977                    prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim();
978                }
979            }
980            LauncherMapperHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf,
981                    prepareXML);
982
983            // Set the launcher Main Class
984            LauncherMapperHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
985            LauncherMapperHelper.setupLauncherURIHandlerConf(launcherJobConf);
986            LauncherMapperHelper.setupMaxOutputData(launcherJobConf, maxActionOutputLen);
987            LauncherMapperHelper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize);
988            LauncherMapperHelper.setupMaxFSGlob(launcherJobConf, maxFSGlobMax);
989
990            List<Element> list = actionXml.getChildren("arg", ns);
991            String[] args = new String[list.size()];
992            for (int i = 0; i < list.size(); i++) {
993                args[i] = list.get(i).getTextTrim();
994            }
995            LauncherMapperHelper.setupMainArguments(launcherJobConf, args);
996
997            // Make mapred.child.java.opts and mapreduce.map.java.opts equal, but give values from the latter priority; also append
998            // <java-opt> and <java-opts> and give those highest priority
999            StringBuilder opts = new StringBuilder(launcherJobConf.get(HADOOP_CHILD_JAVA_OPTS, ""));
1000            if (launcherJobConf.get(HADOOP_MAP_JAVA_OPTS) != null) {
1001                opts.append(" ").append(launcherJobConf.get(HADOOP_MAP_JAVA_OPTS));
1002            }
1003            List<Element> javaopts = actionXml.getChildren("java-opt", ns);
1004            for (Element opt: javaopts) {
1005                opts.append(" ").append(opt.getTextTrim());
1006            }
1007            Element opt = actionXml.getChild("java-opts", ns);
1008            if (opt != null) {
1009                opts.append(" ").append(opt.getTextTrim());
1010            }
1011            launcherJobConf.set(HADOOP_CHILD_JAVA_OPTS, opts.toString().trim());
1012            launcherJobConf.set(HADOOP_MAP_JAVA_OPTS, opts.toString().trim());
1013
1014            // setting for uber mode
1015            if (launcherJobConf.getBoolean(HADOOP_YARN_UBER_MODE, false)) {
1016                if (checkPropertiesToDisableUber(launcherJobConf)) {
1017                    launcherJobConf.setBoolean(HADOOP_YARN_UBER_MODE, false);
1018                }
1019                else {
1020                    updateConfForUberMode(launcherJobConf);
1021                }
1022            }
1023            updateConfForJavaTmpDir(launcherJobConf);
1024            injectLauncherTimelineServiceEnabled(launcherJobConf, actionConf);
1025
1026            // properties from action that are needed by the launcher (e.g. QUEUE NAME, ACLs)
1027            // maybe we should add queue to the WF schema, below job-tracker
1028            actionConfToLauncherConf(actionConf, launcherJobConf);
1029
1030            return launcherJobConf;
1031        }
1032        catch (Exception ex) {
1033            throw convertException(ex);
1034        }
1035    }
1036
1037    private boolean checkPropertiesToDisableUber(Configuration launcherConf) {
1038        boolean disable = false;
1039        if (launcherConf.getBoolean(HADOOP_JOB_CLASSLOADER, false)) {
1040            disable = true;
1041        }
1042        else if (launcherConf.getBoolean(HADOOP_USER_CLASSPATH_FIRST, false)) {
1043            disable = true;
1044        }
1045        return disable;
1046    }
1047
1048    protected void injectCallback(Context context, Configuration conf) {
1049        String callback = context.getCallbackUrl("$jobStatus");
1050        if (conf.get("job.end.notification.url") != null) {
1051            LOG.warn("Overriding the action job end notification URI");
1052        }
1053        conf.set("job.end.notification.url", callback);
1054    }
1055
1056    void injectActionCallback(Context context, Configuration actionConf) {
1057        // action callback needs to be injected only for mapreduce actions.
1058    }
1059
1060    void injectLauncherCallback(Context context, Configuration launcherConf) {
1061        injectCallback(context, launcherConf);
1062    }
1063
1064    private void actionConfToLauncherConf(Configuration actionConf, JobConf launcherConf) {
1065        for (String name : SPECIAL_PROPERTIES) {
1066            if (actionConf.get(name) != null && launcherConf.get("oozie.launcher." + name) == null) {
1067                launcherConf.set(name, actionConf.get(name));
1068            }
1069        }
1070    }
1071
1072    public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
1073        JobClient jobClient = null;
1074        boolean exception = false;
1075        try {
1076            Path appPathRoot = new Path(context.getWorkflow().getAppPath());
1077
1078            // app path could be a file
1079            if (actionFs.isFile(appPathRoot)) {
1080                appPathRoot = appPathRoot.getParent();
1081            }
1082
1083            Element actionXml = XmlUtils.parseXml(action.getConf());
1084
1085            // action job configuration
1086            Configuration actionConf = loadHadoopDefaultResources(context, actionXml);
1087            setupActionConf(actionConf, context, actionXml, appPathRoot);
1088            LOG.debug("Setting LibFilesArchives ");
1089            setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
1090
1091            String jobName = actionConf.get(HADOOP_JOB_NAME);
1092            if (jobName == null || jobName.isEmpty()) {
1093                jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}",
1094                        getType(), context.getWorkflow().getAppName(),
1095                        action.getName(), context.getWorkflow().getId());
1096                actionConf.set(HADOOP_JOB_NAME, jobName);
1097            }
1098
1099            injectActionCallback(context, actionConf);
1100
1101            if(actionConf.get(ACL_MODIFY_JOB) == null || actionConf.get(ACL_MODIFY_JOB).trim().equals("")) {
1102                // ONLY in the case where user has not given the
1103                // modify-job ACL specifically
1104                if (context.getWorkflow().getAcl() != null) {
1105                    // setting the group owning the Oozie job to allow anybody in that
1106                    // group to modify the jobs.
1107                    actionConf.set(ACL_MODIFY_JOB, context.getWorkflow().getAcl());
1108                }
1109            }
1110
1111            // Setting the credential properties in launcher conf
1112            JobConf credentialsConf = null;
1113            HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context,
1114                    action, actionConf);
1115            if (credentialsProperties != null) {
1116
1117                // Adding if action need to set more credential tokens
1118                credentialsConf = new JobConf(false);
1119                XConfiguration.copy(actionConf, credentialsConf);
1120                setCredentialTokens(credentialsConf, context, action, credentialsProperties);
1121
1122                // insert conf to action conf from credentialsConf
1123                for (Entry<String, String> entry : credentialsConf) {
1124                    if (actionConf.get(entry.getKey()) == null) {
1125                        actionConf.set(entry.getKey(), entry.getValue());
1126                    }
1127                }
1128            }
1129
1130            JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
1131
1132            LOG.debug("Creating Job Client for action " + action.getId());
1133            jobClient = createJobClient(context, launcherJobConf);
1134            String launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
1135                    .getRecoveryId());
1136            boolean alreadyRunning = launcherId != null;
1137            RunningJob runningJob;
1138
1139            // if user-retry is on, always submit new launcher
1140            boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry();
1141
1142            if (alreadyRunning && !isUserRetry) {
1143                runningJob = jobClient.getJob(JobID.forName(launcherId));
1144                if (runningJob == null) {
1145                    String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
1146                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
1147                            "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker);
1148                }
1149            }
1150            else {
1151                LOG.debug("Submitting the job through Job Client for action " + action.getId());
1152
1153                // setting up propagation of the delegation token.
1154                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
1155                Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(has
1156                        .getMRDelegationTokenRenewer(launcherJobConf));
1157                launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt);
1158
1159                // insert credentials tokens to launcher job conf if needed
1160                if (needInjectCredentials() && credentialsConf != null) {
1161                    for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) {
1162                        Text fauxAlias = new Text(tk.getKind() + "_" + tk.getService());
1163                        LOG.debug("ADDING TOKEN: " + fauxAlias);
1164                        launcherJobConf.getCredentials().addToken(fauxAlias, tk);
1165                    }
1166                    if (credentialsConf.getCredentials().numberOfSecretKeys() > 0) {
1167                        for (Entry<String, CredentialsProperties> entry : credentialsProperties.entrySet()) {
1168                            CredentialsProperties credProps = entry.getValue();
1169                            if (credProps != null) {
1170                                Text credName = new Text(credProps.getName());
1171                                byte[] secKey = credentialsConf.getCredentials().getSecretKey(credName);
1172                                if (secKey != null) {
1173                                    LOG.debug("ADDING CREDENTIAL: " + credProps.getName());
1174                                    launcherJobConf.getCredentials().addSecretKey(credName, secKey);
1175                                }
1176                            }
1177                        }
1178                    }
1179                }
1180                else {
1181                    LOG.info("No need to inject credentials.");
1182                }
1183                runningJob = jobClient.submitJob(launcherJobConf);
1184                if (runningJob == null) {
1185                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
1186                            "Error submitting launcher for action [{0}]", action.getId());
1187                }
1188                launcherId = runningJob.getID().toString();
1189                LOG.debug("After submission get the launcherId " + launcherId);
1190            }
1191
1192            String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
1193            String consoleUrl = runningJob.getTrackingURL();
1194            context.setStartData(launcherId, jobTracker, consoleUrl);
1195        }
1196        catch (Exception ex) {
1197            exception = true;
1198            throw convertException(ex);
1199        }
1200        finally {
1201            if (jobClient != null) {
1202                try {
1203                    jobClient.close();
1204                }
1205                catch (Exception e) {
1206                    if (exception) {
1207                        LOG.error("JobClient error: ", e);
1208                    }
1209                    else {
1210                        throw convertException(e);
1211                    }
1212                }
1213            }
1214        }
1215    }
1216    private boolean needInjectCredentials() {
1217        boolean methodExists = true;
1218
1219        Class klass;
1220        try {
1221            klass = Class.forName("org.apache.hadoop.mapred.JobConf");
1222            klass.getMethod("getCredentials");
1223        }
1224        catch (ClassNotFoundException ex) {
1225            methodExists = false;
1226        }
1227        catch (NoSuchMethodException ex) {
1228            methodExists = false;
1229        }
1230
1231        return methodExists;
1232    }
1233
1234    protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context,
1235            WorkflowAction action, Configuration actionConf) throws Exception {
1236        HashMap<String, CredentialsProperties> credPropertiesMap = null;
1237        if (context != null && action != null) {
1238            if (!"true".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP))) {
1239                XConfiguration wfJobConf = getWorkflowConf(context);
1240                if ("false".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) ||
1241                    !wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP))) {
1242                    credPropertiesMap = getActionCredentialsProperties(context, action);
1243                    if (credPropertiesMap != null) {
1244                        for (String key : credPropertiesMap.keySet()) {
1245                            CredentialsProperties prop = credPropertiesMap.get(key);
1246                            if (prop != null) {
1247                                LOG.debug("Credential Properties set for action : " + action.getId());
1248                                for (String property : prop.getProperties().keySet()) {
1249                                    actionConf.set(property, prop.getProperties().get(property));
1250                                    LOG.debug("property : '" + property + "', value : '" + prop.getProperties().get(property)
1251                                            + "'");
1252                                }
1253                            }
1254                        }
1255                    } else {
1256                        LOG.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred());
1257                    }
1258                } else {
1259                    LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)");
1260                }
1261            } else {
1262                LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)");
1263            }
1264        } else {
1265            LOG.warn("context or action is null");
1266        }
1267        return credPropertiesMap;
1268    }
1269
1270    protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action,
1271            HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception {
1272
1273        if (context != null && action != null && credPropertiesMap != null) {
1274            // Make sure we're logged into Kerberos; if not, or near expiration, it will relogin
1275            CredentialsProvider.ensureKerberosLogin();
1276            for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) {
1277                String credName = entry.getKey();
1278                CredentialsProperties credProps = entry.getValue();
1279                if (credProps != null) {
1280                    CredentialsProvider credProvider = new CredentialsProvider(credProps.getType());
1281                    Credentials credentialObject = credProvider.createCredentialObject();
1282                    if (credentialObject != null) {
1283                        credentialObject.addtoJobConf(jobconf, credProps, context);
1284                        LOG.debug("Retrieved Credential '" + credName + "' for action " + action.getId());
1285                    }
1286                    else {
1287                        LOG.debug("Credentials object is null for name= " + credName + ", type=" + credProps.getType());
1288                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA020",
1289                            "Could not load credentials of type [{0}] with name [{1}]]; perhaps it was not defined"
1290                                + " in oozie-site.xml?", credProps.getType(), credName);
1291                    }
1292                }
1293            }
1294        }
1295
1296    }
1297
1298    protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context,
1299            WorkflowAction action) throws Exception {
1300        HashMap<String, CredentialsProperties> props = new HashMap<String, CredentialsProperties>();
1301        if (context != null && action != null) {
1302            String credsInAction = action.getCred();
1303            if (credsInAction != null) {
1304                LOG.debug("Get credential '" + credsInAction + "' properties for action : " + action.getId());
1305                String[] credNames = credsInAction.split(",");
1306                for (String credName : credNames) {
1307                    CredentialsProperties credProps = getCredProperties(context, credName);
1308                    props.put(credName, credProps);
1309                }
1310            }
1311        }
1312        else {
1313            LOG.warn("context or action is null");
1314        }
1315        return props;
1316    }
1317
1318    @SuppressWarnings("unchecked")
1319    protected CredentialsProperties getCredProperties(Context context, String credName)
1320            throws Exception {
1321        CredentialsProperties credProp = null;
1322        String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition();
1323        XConfiguration wfjobConf = getWorkflowConf(context);
1324        Element elementJob = XmlUtils.parseXml(workflowXml);
1325        Element credentials = elementJob.getChild("credentials", elementJob.getNamespace());
1326        if (credentials != null) {
1327            for (Element credential : (List<Element>) credentials.getChildren("credential", credentials.getNamespace())) {
1328                String name = credential.getAttributeValue("name");
1329                String type = credential.getAttributeValue("type");
1330                LOG.debug("getCredProperties: Name: " + name + ", Type: " + type);
1331                if (name.equalsIgnoreCase(credName)) {
1332                    credProp = new CredentialsProperties(name, type);
1333                    for (Element property : (List<Element>) credential.getChildren("property",
1334                            credential.getNamespace())) {
1335                        String propertyName = property.getChildText("name", property.getNamespace());
1336                        String propertyValue = property.getChildText("value", property.getNamespace());
1337                        ELEvaluator eval = new ELEvaluator();
1338                        for (Map.Entry<String, String> entry : wfjobConf) {
1339                            eval.setVariable(entry.getKey(), entry.getValue().trim());
1340                        }
1341                        propertyName = eval.evaluate(propertyName, String.class);
1342                        propertyValue = eval.evaluate(propertyValue, String.class);
1343
1344                        credProp.getProperties().put(propertyName, propertyValue);
1345                        LOG.debug("getCredProperties: Properties name :'" + propertyName + "', Value : '"
1346                                + propertyValue + "'");
1347                    }
1348                }
1349            }
1350            if (credProp == null && credName != null) {
1351                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA021",
1352                        "Could not load credentials with name [{0}]].", credName);
1353            }
1354        } else {
1355            LOG.debug("credentials is null for the action");
1356        }
1357        return credProp;
1358    }
1359
1360    @Override
1361    public void start(Context context, WorkflowAction action) throws ActionExecutorException {
1362        LogUtils.setLogInfo(action);
1363        try {
1364            LOG.debug("Starting action " + action.getId() + " getting Action File System");
1365            FileSystem actionFs = context.getAppFileSystem();
1366            LOG.debug("Preparing action Dir through copying " + context.getActionDir());
1367            prepareActionDir(actionFs, context);
1368            LOG.debug("Action Dir is ready. Submitting the action ");
1369            submitLauncher(actionFs, context, action);
1370            LOG.debug("Action submit completed. Performing check ");
1371            check(context, action);
1372            LOG.debug("Action check is done after submission");
1373        }
1374        catch (Exception ex) {
1375            throw convertException(ex);
1376        }
1377    }
1378
1379    @Override
1380    public void end(Context context, WorkflowAction action) throws ActionExecutorException {
1381        try {
1382            String externalStatus = action.getExternalStatus();
1383            WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ? WorkflowAction.Status.OK
1384                    : WorkflowAction.Status.ERROR;
1385            context.setEndData(status, getActionSignal(status));
1386        }
1387        catch (Exception ex) {
1388            throw convertException(ex);
1389        }
1390        finally {
1391            try {
1392                FileSystem actionFs = context.getAppFileSystem();
1393                cleanUpActionDir(actionFs, context);
1394            }
1395            catch (Exception ex) {
1396                throw convertException(ex);
1397            }
1398        }
1399    }
1400
1401    /**
1402     * Create job client object
1403     *
1404     * @param context
1405     * @param jobConf
1406     * @return JobClient
1407     * @throws HadoopAccessorException
1408     */
1409    protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException {
1410        String user = context.getWorkflow().getUser();
1411        String group = context.getWorkflow().getGroup();
1412        return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
1413    }
1414
1415    protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{
1416        RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
1417        return runningJob;
1418    }
1419
1420    /**
1421     * Useful for overriding in actions that do subsequent job runs
1422     * such as the MapReduce Action, where the launcher job is not the
1423     * actual job that then gets monitored.
1424     */
1425    protected String getActualExternalId(WorkflowAction action) {
1426        return action.getExternalId();
1427    }
1428
1429    @Override
1430    public void check(Context context, WorkflowAction action) throws ActionExecutorException {
1431        JobClient jobClient = null;
1432        boolean exception = false;
1433        LogUtils.setLogInfo(action);
1434        try {
1435            Element actionXml = XmlUtils.parseXml(action.getConf());
1436            FileSystem actionFs = context.getAppFileSystem();
1437            JobConf jobConf = createBaseHadoopConf(context, actionXml);
1438            jobClient = createJobClient(context, jobConf);
1439            RunningJob runningJob = getRunningJob(context, action, jobClient);
1440            if (runningJob == null) {
1441                context.setExecutionData(FAILED, null);
1442                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
1443                        "Could not lookup launched hadoop Job ID [{0}] which was associated with " +
1444                        " action [{1}].  Failing this action!", getActualExternalId(action), action.getId());
1445            }
1446            if (runningJob.isComplete()) {
1447                Path actionDir = context.getActionDir();
1448                String newId = null;
1449                // load sequence file into object
1450                Map<String, String> actionData = LauncherMapperHelper.getActionData(actionFs, actionDir, jobConf);
1451                if (actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID)) {
1452                    newId = actionData.get(LauncherMapper.ACTION_DATA_NEW_ID);
1453                    String launcherId = action.getExternalId();
1454                    runningJob = jobClient.getJob(JobID.forName(newId));
1455                    if (runningJob == null) {
1456                        context.setExternalStatus(FAILED);
1457                        throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
1458                                "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", newId,
1459                                action.getId());
1460                    }
1461                    context.setExternalChildIDs(newId);
1462                    LOG.info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId,
1463                            newId);
1464                }
1465                else {
1466                    String externalIDs = actionData.get(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS);
1467                    if (externalIDs != null) {
1468                        context.setExternalChildIDs(externalIDs);
1469                        LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs);
1470                    }
1471                    else if (LauncherMapperHelper.hasOutputData(actionData)) {
1472                        // Load stored Hadoop jobs ids and promote them as external child ids
1473                        // This is for jobs launched with older release during upgrade to Oozie 4.3
1474                        Properties props = PropertiesUtils.stringToProperties(actionData
1475                                .get(LauncherMapper.ACTION_DATA_OUTPUT_PROPS));
1476                        if (props.get(LauncherMain.HADOOP_JOBS) != null) {
1477                            externalIDs = (String) props.get(LauncherMain.HADOOP_JOBS);
1478                            context.setExternalChildIDs(externalIDs);
1479                            LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs);
1480                        }
1481                    }
1482                }
1483                if (runningJob.isComplete()) {
1484                    // fetching action output and stats for the Map-Reduce action.
1485                    if (newId != null) {
1486                        actionData = LauncherMapperHelper.getActionData(actionFs, context.getActionDir(), jobConf);
1487                    }
1488                    LOG.info(XLog.STD, "action completed, external ID [{0}]",
1489                            action.getExternalId());
1490                    if (LauncherMapperHelper.isMainSuccessful(runningJob)) {
1491                        if (getCaptureOutput(action) && LauncherMapperHelper.hasOutputData(actionData)) {
1492                            context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData
1493                                    .get(LauncherMapper.ACTION_DATA_OUTPUT_PROPS)));
1494                            LOG.info(XLog.STD, "action produced output");
1495                        }
1496                        else {
1497                            context.setExecutionData(SUCCEEDED, null);
1498                        }
1499                        if (LauncherMapperHelper.hasStatsData(actionData)) {
1500                            context.setExecutionStats(actionData.get(LauncherMapper.ACTION_DATA_STATS));
1501                            LOG.info(XLog.STD, "action produced stats");
1502                        }
1503                        getActionData(actionFs, runningJob, action, context);
1504                    }
1505                    else {
1506                        String errorReason;
1507                        if (actionData.containsKey(LauncherMapper.ACTION_DATA_ERROR_PROPS)) {
1508                            Properties props = PropertiesUtils.stringToProperties(actionData
1509                                    .get(LauncherMapper.ACTION_DATA_ERROR_PROPS));
1510                            String errorCode = props.getProperty("error.code");
1511                            if ("0".equals(errorCode)) {
1512                                errorCode = "JA018";
1513                            }
1514                            if ("-1".equals(errorCode)) {
1515                                errorCode = "JA019";
1516                            }
1517                            errorReason = props.getProperty("error.reason");
1518                            LOG.warn("Launcher ERROR, reason: {0}", errorReason);
1519                            String exMsg = props.getProperty("exception.message");
1520                            String errorInfo = (exMsg != null) ? exMsg : errorReason;
1521                            context.setErrorInfo(errorCode, errorInfo);
1522                            String exStackTrace = props.getProperty("exception.stacktrace");
1523                            if (exMsg != null) {
1524                                LOG.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
1525                            }
1526                        }
1527                        else {
1528                            errorReason = XLog.format("LauncherMapper died, check Hadoop LOG for job [{0}:{1}]", action
1529                                    .getTrackerUri(), action.getExternalId());
1530                            LOG.warn(errorReason);
1531                        }
1532                        context.setExecutionData(FAILED_KILLED, null);
1533                    }
1534                }
1535                else {
1536                    context.setExternalStatus("RUNNING");
1537                    LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]",
1538                            runningJob.getID());
1539                }
1540            }
1541            else {
1542                context.setExternalStatus("RUNNING");
1543                LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]",
1544                        runningJob.getID());
1545            }
1546        }
1547        catch (Exception ex) {
1548            LOG.warn("Exception in check(). Message[{0}]", ex.getMessage(), ex);
1549            exception = true;
1550            throw convertException(ex);
1551        }
1552        finally {
1553            if (jobClient != null) {
1554                try {
1555                    jobClient.close();
1556                }
1557                catch (Exception e) {
1558                    if (exception) {
1559                        LOG.error("JobClient error: ", e);
1560                    }
1561                    else {
1562                        throw convertException(e);
1563                    }
1564                }
1565            }
1566        }
1567    }
1568
1569    /**
1570     * Get the output data of an action. Subclasses should override this method
1571     * to get action specific output data.
1572     *
1573     * @param actionFs the FileSystem object
1574     * @param runningJob the runningJob
1575     * @param action the Workflow action
1576     * @param context executor context
1577     *
1578     */
1579    protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
1580            throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
1581    }
1582
1583    protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
1584        Element eConf = XmlUtils.parseXml(action.getConf());
1585        Namespace ns = eConf.getNamespace();
1586        Element captureOutput = eConf.getChild("capture-output", ns);
1587        return captureOutput != null;
1588    }
1589
1590    @Override
1591    public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
1592        JobClient jobClient = null;
1593        boolean exception = false;
1594        try {
1595            Element actionXml = XmlUtils.parseXml(action.getConf());
1596            final JobConf jobConf = createBaseHadoopConf(context, actionXml);
1597            WorkflowJob wfJob = context.getWorkflow();
1598            Configuration conf = null;
1599            if ( wfJob.getConf() != null ) {
1600                conf = new XConfiguration(new StringReader(wfJob.getConf()));
1601            }
1602            String launcherTag = LauncherMapperHelper.getActionYarnTag(conf, wfJob.getParentId(), action);
1603            jobConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, LauncherMapperHelper.getTag(launcherTag));
1604            jobConf.set(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME, Long.toString(action.getStartTime().getTime()));
1605            UserGroupInformation ugi = Services.get().get(UserGroupInformationService.class)
1606                    .getProxyUser(context.getWorkflow().getUser());
1607            ugi.doAs(new PrivilegedExceptionAction<Void>() {
1608                @Override
1609                public Void run() throws Exception {
1610                    LauncherMainHadoopUtils.killChildYarnJobs(jobConf);
1611                    return null;
1612                }
1613            });
1614            jobClient = createJobClient(context, jobConf);
1615            RunningJob runningJob = getRunningJob(context, action, jobClient);
1616            if (runningJob != null) {
1617                runningJob.killJob();
1618            }
1619            context.setExternalStatus(KILLED);
1620            context.setExecutionData(KILLED, null);
1621        }
1622        catch (Exception ex) {
1623            exception = true;
1624            throw convertException(ex);
1625        }
1626        finally {
1627            try {
1628                FileSystem actionFs = context.getAppFileSystem();
1629                cleanUpActionDir(actionFs, context);
1630                if (jobClient != null) {
1631                    jobClient.close();
1632                }
1633            }
1634            catch (Exception ex) {
1635                if (exception) {
1636                    LOG.error("Error: ", ex);
1637                }
1638                else {
1639                    throw convertException(ex);
1640                }
1641            }
1642        }
1643    }
1644
1645    private static Set<String> FINAL_STATUS = new HashSet<String>();
1646
1647    static {
1648        FINAL_STATUS.add(SUCCEEDED);
1649        FINAL_STATUS.add(KILLED);
1650        FINAL_STATUS.add(FAILED);
1651        FINAL_STATUS.add(FAILED_KILLED);
1652    }
1653
1654    @Override
1655    public boolean isCompleted(String externalStatus) {
1656        return FINAL_STATUS.contains(externalStatus);
1657    }
1658
1659
1660    /**
1661     * Return the sharelib names for the action.
1662     * <p>
1663     * If <code>NULL</code> or empty, it means that the action does not use the action
1664     * sharelib.
1665     * <p>
1666     * If a non-empty string, i.e. <code>foo</code>, it means the action uses the
1667     * action sharelib sub-directory <code>foo</code> and all JARs in the sharelib
1668     * <code>foo</code> directory will be in the action classpath. Multiple sharelib
1669     * sub-directories can be specified as a comma separated list.
1670     * <p>
1671     * The resolution is done using the following precedence order:
1672     * <ul>
1673     *     <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the action configuration</li>
1674     *     <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the job configuration</li>
1675     *     <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the oozie configuration</li>
1676     *     <li>Action Executor <code>getDefaultShareLibName()</code> method</li>
1677     * </ul>
1678     *
1679     *
1680     * @param context executor context.
1681     * @param actionXml
1682     * @param conf action configuration.
1683     * @return the action sharelib names.
1684     */
1685    protected String[] getShareLibNames(Context context, Element actionXml, Configuration conf) {
1686        String[] names = conf.getStrings(ACTION_SHARELIB_FOR + getType());
1687        if (names == null || names.length == 0) {
1688            try {
1689                XConfiguration jobConf = getWorkflowConf(context);
1690                names = jobConf.getStrings(ACTION_SHARELIB_FOR + getType());
1691                if (names == null || names.length == 0) {
1692                    names = Services.get().getConf().getStrings(ACTION_SHARELIB_FOR + getType());
1693                    if (names == null || names.length == 0) {
1694                        String name = getDefaultShareLibName(actionXml);
1695                        if (name != null) {
1696                            names = new String[] { name };
1697                        }
1698                    }
1699                }
1700            }
1701            catch (IOException ex) {
1702                throw new RuntimeException("It cannot happen, " + ex.toString(), ex);
1703            }
1704        }
1705        return names;
1706    }
1707
1708    private final static String ACTION_SHARELIB_FOR = "oozie.action.sharelib.for.";
1709
1710
1711    /**
1712     * Returns the default sharelib name for the action if any.
1713     *
1714     * @param actionXml the action XML fragment.
1715     * @return the sharelib name for the action, <code>NULL</code> if none.
1716     */
1717    protected String getDefaultShareLibName(Element actionXml) {
1718        return null;
1719    }
1720
1721    public String[] getShareLibFilesForActionConf() {
1722        return null;
1723    }
1724
1725    /**
1726     * Sets some data for the action on completion
1727     *
1728     * @param context executor context
1729     * @param actionFs the FileSystem object
1730     */
1731    protected void setActionCompletionData(Context context, FileSystem actionFs) throws IOException,
1732            HadoopAccessorException, URISyntaxException {
1733    }
1734
1735    private void injectJobInfo(JobConf launcherJobConf, Configuration actionConf, Context context, WorkflowAction action) {
1736        if (OozieJobInfo.isJobInfoEnabled()) {
1737            try {
1738                OozieJobInfo jobInfo = new OozieJobInfo(actionConf, context, action);
1739                String jobInfoStr = jobInfo.getJobInfo();
1740                launcherJobConf.set(OozieJobInfo.JOB_INFO_KEY, jobInfoStr + "launcher=true");
1741                actionConf.set(OozieJobInfo.JOB_INFO_KEY, jobInfoStr + "launcher=false");
1742            }
1743            catch (Exception e) {
1744                // Just job info, should not impact the execution.
1745                LOG.error("Error while populating job info", e);
1746            }
1747        }
1748    }
1749
1750    @Override
1751    public boolean requiresNameNodeJobTracker() {
1752        return true;
1753    }
1754
1755    @Override
1756    public boolean supportsConfigurationJobXML() {
1757        return true;
1758    }
1759
1760    private XConfiguration getWorkflowConf(Context context) throws IOException {
1761        if (workflowConf == null) {
1762            workflowConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
1763        }
1764        return workflowConf;
1765
1766    }
1767}