001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.action.hadoop;
019
020import java.io.ByteArrayOutputStream;
021import java.io.File;
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.io.PrintStream;
025import java.io.StringReader;
026import java.net.ConnectException;
027import java.net.URI;
028import java.net.URISyntaxException;
029import java.net.UnknownHostException;
030import java.util.ArrayList;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Map;
036import java.util.Properties;
037import java.util.Set;
038import java.util.Map.Entry;
039import java.util.regex.Matcher;
040import java.util.regex.Pattern;
041
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.filecache.DistributedCache;
044import org.apache.hadoop.fs.FileStatus;
045import org.apache.hadoop.fs.FileSystem;
046import org.apache.hadoop.fs.Path;
047import org.apache.hadoop.fs.permission.AccessControlException;
048import org.apache.hadoop.io.Text;
049import org.apache.hadoop.mapred.JobClient;
050import org.apache.hadoop.mapred.JobConf;
051import org.apache.hadoop.mapred.JobID;
052import org.apache.hadoop.mapred.RunningJob;
053import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
054import org.apache.hadoop.util.DiskChecker;
055import org.apache.oozie.WorkflowActionBean;
056import org.apache.oozie.WorkflowJobBean;
057import org.apache.oozie.action.ActionExecutor;
058import org.apache.oozie.action.ActionExecutorException;
059import org.apache.oozie.client.OozieClient;
060import org.apache.oozie.client.WorkflowAction;
061import org.apache.oozie.service.HadoopAccessorException;
062import org.apache.oozie.service.HadoopAccessorService;
063import org.apache.oozie.service.Services;
064import org.apache.oozie.service.ShareLibService;
065import org.apache.oozie.service.URIHandlerService;
066import org.apache.oozie.service.WorkflowAppService;
067import org.apache.oozie.servlet.CallbackServlet;
068import org.apache.oozie.util.ELEvaluator;
069import org.apache.oozie.util.JobUtils;
070import org.apache.oozie.util.LogUtils;
071import org.apache.oozie.util.PropertiesUtils;
072import org.apache.oozie.util.XConfiguration;
073import org.apache.oozie.util.XLog;
074import org.apache.oozie.util.XmlUtils;
075import org.apache.oozie.util.ELEvaluationException;
076import org.jdom.Element;
077import org.jdom.JDOMException;
078import org.jdom.Namespace;
079import org.apache.hadoop.security.token.Token;
080import org.apache.hadoop.security.token.TokenIdentifier;
081
082public class JavaActionExecutor extends ActionExecutor {
083
084    private static final String HADOOP_USER = "user.name";
085    public static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
086    public static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
087    public static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
088    public static final String HADOOP_NAME_NODE = "fs.default.name";
089    private static final String HADOOP_JOB_NAME = "mapred.job.name";
090    public static final String OOZIE_COMMON_LIBDIR = "oozie";
091    public static final int MAX_EXTERNAL_STATS_SIZE_DEFAULT = Integer.MAX_VALUE;
092    private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>();
093    public final static String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size";
094    public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
095    public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
096    public static final String HADOOP_YARN_UBER_MODE = "mapreduce.job.ubertask.enable";
097    public static final String HADOOP_MAP_MEMORY_MB = "mapreduce.map.memory.mb";
098    public static final String HADOOP_CHILD_JAVA_OPTS = "mapred.child.java.opts";
099    public static final String HADOOP_MAP_JAVA_OPTS = "mapreduce.map.java.opts";
100    public static final String HADOOP_CHILD_JAVA_ENV = "mapred.child.env";
101    public static final String HADOOP_MAP_JAVA_ENV = "mapreduce.map.env";
102    public static final String YARN_AM_RESOURCE_MB = "yarn.app.mapreduce.am.resource.mb";
103    public static final String YARN_AM_COMMAND_OPTS = "yarn.app.mapreduce.am.command-opts";
104    public static final String YARN_AM_ENV = "yarn.app.mapreduce.am.env";
105    private static final String JAVA_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.JavaMain";
106    public static final int YARN_MEMORY_MB_MIN = 512;
107    private static int maxActionOutputLen;
108    private static int maxExternalStatsSize;
109    private static int maxFSGlobMax;
110    private static final String SUCCEEDED = "SUCCEEDED";
111    private static final String KILLED = "KILLED";
112    private static final String FAILED = "FAILED";
113    private static final String FAILED_KILLED = "FAILED/KILLED";
114    protected XLog LOG = XLog.getLog(getClass());
115    private static final Pattern heapPattern = Pattern.compile("-Xmx(([0-9]+)[mMgG])");
116
117    static {
118        DISALLOWED_PROPERTIES.add(HADOOP_USER);
119        DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER);
120        DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE);
121        DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER_2);
122        DISALLOWED_PROPERTIES.add(HADOOP_YARN_RM);
123    }
124
125    public JavaActionExecutor() {
126        this("java");
127    }
128
129    protected JavaActionExecutor(String type) {
130        super(type);
131        requiresNNJT = true;
132    }
133
134    public static List<Class> getCommonLauncherClasses() {
135        List<Class> classes = new ArrayList<Class>();
136        classes.add(LauncherMapper.class);
137        classes.add(OozieLauncherInputFormat.class);
138        classes.add(LauncherMainHadoopUtils.class);
139        classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher());
140        return classes;
141    }
142
143    public List<Class> getLauncherClasses() {
144       List<Class> classes = new ArrayList<Class>();
145        try {
146            classes.add(Class.forName(JAVA_MAIN_CLASS_NAME));
147        }
148        catch (ClassNotFoundException e) {
149            throw new RuntimeException("Class not found", e);
150        }
151        return classes;
152    }
153
154    @Override
155    public void initActionType() {
156        super.initActionType();
157        maxActionOutputLen = getOozieConf()
158          .getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA,
159          // TODO: Remove the below config get in a subsequent release..
160          // This other irrelevant property is only used to
161          // preserve backwards compatibility cause of a typo.
162                  // See OOZIE-4.
163          getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN,
164            2 * 1024));
165        //Get the limit for the maximum allowed size of action stats
166        maxExternalStatsSize = getOozieConf().getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE, MAX_EXTERNAL_STATS_SIZE_DEFAULT);
167        maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize;
168        //Get the limit for the maximum number of globbed files/dirs for FS operation
169        maxFSGlobMax = getOozieConf().getInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX, LauncherMapper.GLOB_MAX_DEFAULT);
170
171        registerError(UnknownHostException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA001");
172        registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.NON_TRANSIENT,
173                "JA002");
174        registerError(DiskChecker.DiskOutOfSpaceException.class.getName(),
175                ActionExecutorException.ErrorType.NON_TRANSIENT, "JA003");
176        registerError(org.apache.hadoop.hdfs.protocol.QuotaExceededException.class.getName(),
177                ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004");
178        registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(),
179                ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005");
180        registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "  JA006");
181        registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007");
182        registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008");
183        registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009");
184    }
185
186
187    /**
188     * Get the maximum allowed size of stats
189     *
190     * @return maximum size of stats
191     */
192    public static int getMaxExternalStatsSize() {
193        return maxExternalStatsSize;
194    }
195
196    static void checkForDisallowedProps(Configuration conf, String confName) throws ActionExecutorException {
197        for (String prop : DISALLOWED_PROPERTIES) {
198            if (conf.get(prop) != null) {
199                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA010",
200                        "Property [{0}] not allowed in action [{1}] configuration", prop, confName);
201            }
202        }
203    }
204
205    public JobConf createBaseHadoopConf(Context context, Element actionXml) {
206        Namespace ns = actionXml.getNamespace();
207        String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim();
208        String nameNode = actionXml.getChild("name-node", ns).getTextTrim();
209        JobConf conf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
210        conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER));
211        conf.set(HADOOP_JOB_TRACKER, jobTracker);
212        conf.set(HADOOP_JOB_TRACKER_2, jobTracker);
213        conf.set(HADOOP_YARN_RM, jobTracker);
214        conf.set(HADOOP_NAME_NODE, nameNode);
215        conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true");
216        return conf;
217    }
218
219    private void injectLauncherProperties(Configuration srcConf, Configuration launcherConf) {
220        for (Map.Entry<String, String> entry : srcConf) {
221            if (entry.getKey().startsWith("oozie.launcher.")) {
222                String name = entry.getKey().substring("oozie.launcher.".length());
223                String value = entry.getValue();
224                // setting original KEY
225                launcherConf.set(entry.getKey(), value);
226                // setting un-prefixed key (to allow Hadoop job config
227                // for the launcher job
228                launcherConf.set(name, value);
229            }
230        }
231    }
232
233    Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
234            throws ActionExecutorException {
235        try {
236            Namespace ns = actionXml.getNamespace();
237            Element e = actionXml.getChild("configuration", ns);
238            if (e != null) {
239                String strConf = XmlUtils.prettyPrint(e).toString();
240                XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
241
242                XConfiguration launcherConf = new XConfiguration();
243                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
244                XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_JOB_TRACKER), getType());
245                injectLauncherProperties(actionDefaultConf, launcherConf);
246                injectLauncherProperties(inlineConf, launcherConf);
247                injectLauncherUseUberMode(launcherConf);
248                checkForDisallowedProps(launcherConf, "launcher configuration");
249                XConfiguration.copy(launcherConf, conf);
250            } else {
251                XConfiguration launcherConf = new XConfiguration();
252                injectLauncherUseUberMode(launcherConf);
253                XConfiguration.copy(launcherConf, conf);
254            }
255            return conf;
256        }
257        catch (IOException ex) {
258            throw convertException(ex);
259        }
260    }
261
262    void injectLauncherUseUberMode(Configuration launcherConf) {
263        // Set Uber Mode for the launcher (YARN only, ignored by MR1) if not set by action conf and not disabled in oozie-site
264        if (launcherConf.get(HADOOP_YARN_UBER_MODE) == null) {
265            if (getOozieConf().getBoolean("oozie.action.launcher.mapreduce.job.ubertask.enable", false)) {
266                launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
267            }
268        }
269    }
270
271    void updateConfForUberMode(Configuration launcherConf) {
272
273        // child.env
274        boolean hasConflictEnv = false;
275        String launcherMapEnv = launcherConf.get(HADOOP_MAP_JAVA_ENV);
276        if (launcherMapEnv == null) {
277            launcherMapEnv = launcherConf.get(HADOOP_CHILD_JAVA_ENV);
278        }
279        String amEnv = launcherConf.get(YARN_AM_ENV);
280        StringBuffer envStr = new StringBuffer();
281        HashMap<String, List<String>> amEnvMap = null;
282        HashMap<String, List<String>> launcherMapEnvMap = null;
283        if (amEnv != null) {
284            envStr.append(amEnv);
285            amEnvMap = populateEnvMap(amEnv);
286        }
287        if (launcherMapEnv != null) {
288            launcherMapEnvMap = populateEnvMap(launcherMapEnv);
289            if (amEnvMap != null) {
290                Iterator<String> envKeyItr = launcherMapEnvMap.keySet().iterator();
291                while (envKeyItr.hasNext()) {
292                    String envKey = envKeyItr.next();
293                    if (amEnvMap.containsKey(envKey)) {
294                        List<String> amValList = amEnvMap.get(envKey);
295                        List<String> launcherValList = launcherMapEnvMap.get(envKey);
296                        Iterator<String> valItr = launcherValList.iterator();
297                        while (valItr.hasNext()) {
298                            String val = valItr.next();
299                            if (!amValList.contains(val)) {
300                                hasConflictEnv = true;
301                                break;
302                            }
303                            else {
304                                valItr.remove();
305                            }
306                        }
307                        if (launcherValList.isEmpty()) {
308                            envKeyItr.remove();
309                        }
310                    }
311                }
312            }
313        }
314        if (hasConflictEnv) {
315            launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, false);
316        }
317        else {
318            if (launcherMapEnvMap != null) {
319                for (String key : launcherMapEnvMap.keySet()) {
320                    List<String> launcherValList = launcherMapEnvMap.get(key);
321                    for (String val : launcherValList) {
322                        if (envStr.length() > 0) {
323                            envStr.append(",");
324                        }
325                        envStr.append(key).append("=").append(val);
326                    }
327                }
328            }
329
330            launcherConf.set(YARN_AM_ENV, envStr.toString());
331
332            // memory.mb
333            int launcherMapMemoryMB = launcherConf.getInt(HADOOP_MAP_MEMORY_MB, 1536);
334            int amMemoryMB = launcherConf.getInt(YARN_AM_RESOURCE_MB, 1536);
335            // YARN_MEMORY_MB_MIN to provide buffer.
336            // suppose launcher map aggressively use high memory, need some
337            // headroom for AM
338            int memoryMB = Math.max(launcherMapMemoryMB, amMemoryMB) + YARN_MEMORY_MB_MIN;
339            // limit to 4096 in case of 32 bit
340            if (launcherMapMemoryMB < 4096 && amMemoryMB < 4096 && memoryMB > 4096) {
341                memoryMB = 4096;
342            }
343            launcherConf.setInt(YARN_AM_RESOURCE_MB, memoryMB);
344
345            // We already made mapred.child.java.opts and
346            // mapreduce.map.java.opts equal, so just start with one of them
347            String launcherMapOpts = launcherConf.get(HADOOP_MAP_JAVA_OPTS, "");
348            String amChildOpts = launcherConf.get(YARN_AM_COMMAND_OPTS);
349            StringBuilder optsStr = new StringBuilder();
350            int heapSizeForMap = extractHeapSizeMB(launcherMapOpts);
351            int heapSizeForAm = extractHeapSizeMB(amChildOpts);
352            int heapSize = Math.max(heapSizeForMap, heapSizeForAm) + YARN_MEMORY_MB_MIN;
353            // limit to 3584 in case of 32 bit
354            if (heapSizeForMap < 4096 && heapSizeForAm < 4096 && heapSize > 3584) {
355                heapSize = 3584;
356            }
357            if (amChildOpts != null) {
358                optsStr.append(amChildOpts);
359            }
360            optsStr.append(" ").append(launcherMapOpts.trim());
361            if (heapSize > 0) {
362                // append calculated total heap size to the end
363                optsStr.append(" ").append("-Xmx").append(heapSize).append("m");
364            }
365            launcherConf.set(YARN_AM_COMMAND_OPTS, optsStr.toString().trim());
366        }
367    }
368
369    private HashMap<String, List<String>> populateEnvMap(String input) {
370        HashMap<String, List<String>> envMaps = new HashMap<String, List<String>>();
371        String[] envEntries = input.split(",");
372        for (String envEntry : envEntries) {
373            String[] envKeyVal = envEntry.split("=");
374            String envKey = envKeyVal[0].trim();
375            List<String> valList = envMaps.get(envKey);
376            if (valList == null) {
377                valList = new ArrayList<String>();
378            }
379            valList.add(envKeyVal[1].trim());
380            envMaps.put(envKey, valList);
381        }
382        return envMaps;
383    }
384
385    public int extractHeapSizeMB(String input) {
386        int ret = 0;
387        if(input == null || input.equals(""))
388            return ret;
389        Matcher m = heapPattern.matcher(input);
390        String heapStr = null;
391        String heapNum = null;
392        // Grabs the last match which takes effect (in case that multiple Xmx options specified)
393        while (m.find()) {
394            heapStr = m.group(1);
395            heapNum = m.group(2);
396        }
397        if (heapStr != null) {
398            // when Xmx specified in Gigabyte
399            if(heapStr.endsWith("g") || heapStr.endsWith("G")) {
400                ret = Integer.parseInt(heapNum) * 1024;
401            } else {
402                ret = Integer.parseInt(heapNum);
403            }
404        }
405        return ret;
406    }
407
408    public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf)
409            throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException {
410        Namespace ns = element.getNamespace();
411        Iterator<Element> it = element.getChildren("job-xml", ns).iterator();
412        HashMap<String, FileSystem> filesystemsMap = new HashMap<String, FileSystem>();
413        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
414        while (it.hasNext()) {
415            Element e = it.next();
416            String jobXml = e.getTextTrim();
417            Path pathSpecified = new Path(jobXml);
418            Path path = pathSpecified.isAbsolute() ? pathSpecified : new Path(appPath, jobXml);
419            FileSystem fs;
420            if (filesystemsMap.containsKey(path.toUri().getAuthority())) {
421              fs = filesystemsMap.get(path.toUri().getAuthority());
422            }
423            else {
424              if (path.toUri().getAuthority() != null) {
425                fs = has.createFileSystem(context.getWorkflow().getUser(), path.toUri(),
426                        has.createJobConf(path.toUri().getAuthority()));
427              }
428              else {
429                fs = context.getAppFileSystem();
430              }
431              filesystemsMap.put(path.toUri().getAuthority(), fs);
432            }
433            Configuration jobXmlConf = new XConfiguration(fs.open(path));
434            try {
435                String jobXmlConfString = XmlUtils.prettyPrint(jobXmlConf).toString();
436                jobXmlConfString = XmlUtils.removeComments(jobXmlConfString);
437                jobXmlConfString = context.getELEvaluator().evaluate(jobXmlConfString, String.class);
438                jobXmlConf = new XConfiguration(new StringReader(jobXmlConfString));
439            }
440            catch (ELEvaluationException ex) {
441                throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, "EL_EVAL_ERROR", ex
442                        .getMessage(), ex);
443            }
444            catch (Exception ex) {
445                context.setErrorInfo("EL_ERROR", ex.getMessage());
446            }
447            checkForDisallowedProps(jobXmlConf, "job-xml");
448            XConfiguration.copy(jobXmlConf, conf);
449        }
450        Element e = element.getChild("configuration", ns);
451        if (e != null) {
452            String strConf = XmlUtils.prettyPrint(e).toString();
453            XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
454            checkForDisallowedProps(inlineConf, "inline configuration");
455            XConfiguration.copy(inlineConf, conf);
456        }
457    }
458
459    Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
460            throws ActionExecutorException {
461        try {
462            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
463            XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_JOB_TRACKER), getType());
464            XConfiguration.injectDefaults(actionDefaults, actionConf);
465
466            has.checkSupportedFilesystem(appPath.toUri());
467
468            // Set the Java Main Class for the Java action to give to the Java launcher
469            setJavaMain(actionConf, actionXml);
470
471            parseJobXmlAndConfiguration(context, actionXml, appPath, actionConf);
472            return actionConf;
473        }
474        catch (IOException ex) {
475            throw convertException(ex);
476        }
477        catch (HadoopAccessorException ex) {
478            throw convertException(ex);
479        }
480        catch (URISyntaxException ex) {
481            throw convertException(ex);
482        }
483    }
484
485    Configuration addToCache(Configuration conf, Path appPath, String filePath, boolean archive)
486            throws ActionExecutorException {
487
488        URI uri = null;
489        try {
490            uri = new URI(filePath);
491            URI baseUri = appPath.toUri();
492            if (uri.getScheme() == null) {
493                String resolvedPath = uri.getPath();
494                if (!resolvedPath.startsWith("/")) {
495                    resolvedPath = baseUri.getPath() + "/" + resolvedPath;
496                }
497                uri = new URI(baseUri.getScheme(), baseUri.getAuthority(), resolvedPath, uri.getQuery(), uri.getFragment());
498            }
499            if (archive) {
500                DistributedCache.addCacheArchive(uri.normalize(), conf);
501            }
502            else {
503                String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);
504                if (fileName.endsWith(".so") || fileName.contains(".so.")) { // .so files
505                    uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), uri.getQuery(), fileName);
506                    DistributedCache.addCacheFile(uri.normalize(), conf);
507                }
508                else if (fileName.endsWith(".jar")) { // .jar files
509                    if (!fileName.contains("#")) {
510                        String user = conf.get("user.name");
511                        Path pathToAdd = new Path(uri.normalize());
512                        Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, pathToAdd, conf);
513                    }
514                    else {
515                        DistributedCache.addCacheFile(uri.normalize(), conf);
516                    }
517                }
518                else { // regular files
519                    if (!fileName.contains("#")) {
520                        uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), uri.getQuery(), fileName);
521                    }
522                    DistributedCache.addCacheFile(uri.normalize(), conf);
523                }
524            }
525            DistributedCache.createSymlink(conf);
526            return conf;
527        }
528        catch (Exception ex) {
529            LOG.debug(
530                    "Errors when add to DistributedCache. Path=" + uri.toString() + ", archive=" + archive + ", conf="
531                            + XmlUtils.prettyPrint(conf).toString());
532            throw convertException(ex);
533        }
534    }
535
536    public void prepareActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
537        try {
538            Path actionDir = context.getActionDir();
539            Path tempActionDir = new Path(actionDir.getParent(), actionDir.getName() + ".tmp");
540            if (!actionFs.exists(actionDir)) {
541                try {
542                    actionFs.mkdirs(tempActionDir);
543                    actionFs.rename(tempActionDir, actionDir);
544                }
545                catch (IOException ex) {
546                    actionFs.delete(tempActionDir, true);
547                    actionFs.delete(actionDir, true);
548                    throw ex;
549                }
550            }
551        }
552        catch (Exception ex) {
553            throw convertException(ex);
554        }
555    }
556
557    void cleanUpActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
558        try {
559            Path actionDir = context.getActionDir();
560            if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)
561                    && actionFs.exists(actionDir)) {
562                actionFs.delete(actionDir, true);
563            }
564        }
565        catch (Exception ex) {
566            throw convertException(ex);
567        }
568    }
569
570    protected void addShareLib(Configuration conf, String[] actionShareLibNames)
571            throws ActionExecutorException {
572        if (actionShareLibNames != null) {
573            try {
574                ShareLibService shareLibService = Services.get().get(ShareLibService.class);
575                FileSystem fs = shareLibService.getFileSystem();
576                if (fs != null) {
577                  for (String actionShareLibName : actionShareLibNames) {
578                        List<Path> listOfPaths = shareLibService.getShareLibJars(actionShareLibName);
579                        if (listOfPaths != null && !listOfPaths.isEmpty()) {
580
581                            for (Path actionLibPath : listOfPaths) {
582                                JobUtils.addFileToClassPath(actionLibPath, conf, fs);
583                                DistributedCache.createSymlink(conf);
584                            }
585                        }
586                    }
587                }
588            }
589            catch (IOException ex) {
590                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen",
591                        ex.getMessage());
592            }
593        }
594    }
595
596    protected void addSystemShareLibForAction(Configuration conf) throws ActionExecutorException {
597        ShareLibService shareLibService = Services.get().get(ShareLibService.class);
598        // ShareLibService is null for test cases
599        if (shareLibService != null) {
600            try {
601                List<Path> listOfPaths = shareLibService.getSystemLibJars(JavaActionExecutor.OOZIE_COMMON_LIBDIR);
602                if (listOfPaths == null || listOfPaths.isEmpty()) {
603                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "EJ001",
604                            "Could not locate Oozie sharelib");
605                }
606                FileSystem fs = listOfPaths.get(0).getFileSystem(conf);
607                for (Path actionLibPath : listOfPaths) {
608                    JobUtils.addFileToClassPath(actionLibPath, conf, fs);
609                    DistributedCache.createSymlink(conf);
610                }
611                listOfPaths = shareLibService.getSystemLibJars(getType());
612                if (listOfPaths != null) {
613                    for (Path actionLibPath : listOfPaths) {
614                        JobUtils.addFileToClassPath(actionLibPath, conf, fs);
615                        DistributedCache.createSymlink(conf);
616                    }
617                }
618            }
619            catch (IOException ex) {
620                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen",
621                        ex.getMessage());
622            }
623        }
624    }
625
626    protected void addActionLibs(Path appPath, Configuration conf) throws ActionExecutorException {
627        String[] actionLibsStrArr = conf.getStrings("oozie.launcher.oozie.libpath");
628        if (actionLibsStrArr != null) {
629            try {
630                for (String actionLibsStr : actionLibsStrArr) {
631                    actionLibsStr = actionLibsStr.trim();
632                    if (actionLibsStr.length() > 0)
633                    {
634                        Path actionLibsPath = new Path(actionLibsStr);
635                        String user = conf.get("user.name");
636                        FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf);
637                        if (fs.exists(actionLibsPath)) {
638                            FileStatus[] files = fs.listStatus(actionLibsPath);
639                            for (FileStatus file : files) {
640                                addToCache(conf, appPath, file.getPath().toUri().getPath(), false);
641                            }
642                        }
643                    }
644                }
645            }
646            catch (HadoopAccessorException ex){
647                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
648                        ex.getErrorCode().toString(), ex.getMessage());
649            }
650            catch (IOException ex){
651                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
652                        "It should never happen", ex.getMessage());
653            }
654        }
655    }
656
657    @SuppressWarnings("unchecked")
658    public void setLibFilesArchives(Context context, Element actionXml, Path appPath, Configuration conf)
659            throws ActionExecutorException {
660        Configuration proto = context.getProtoActionConf();
661
662        // Workflow lib/
663        String[] paths = proto.getStrings(WorkflowAppService.APP_LIB_PATH_LIST);
664        if (paths != null) {
665            for (String path : paths) {
666                addToCache(conf, appPath, path, false);
667            }
668        }
669
670        // Action libs
671        addActionLibs(appPath, conf);
672
673        // files and archives defined in the action
674        for (Element eProp : (List<Element>) actionXml.getChildren()) {
675            if (eProp.getName().equals("file")) {
676                String[] filePaths = eProp.getTextTrim().split(",");
677                for (String path : filePaths) {
678                    addToCache(conf, appPath, path.trim(), false);
679                }
680            }
681            else if (eProp.getName().equals("archive")) {
682                String[] archivePaths = eProp.getTextTrim().split(",");
683                for (String path : archivePaths){
684                    addToCache(conf, appPath, path.trim(), true);
685                }
686            }
687        }
688
689        addAllShareLibs(appPath, conf, context, actionXml);
690        }
691
692    // Adds action specific share libs and common share libs
693    private void addAllShareLibs(Path appPath, Configuration conf, Context context, Element actionXml)
694            throws ActionExecutorException {
695        // Add action specific share libs
696        addActionShareLib(appPath, conf, context, actionXml);
697        // Add common sharelibs for Oozie and launcher jars
698        addSystemShareLibForAction(conf);
699    }
700
701    private void addActionShareLib(Path appPath, Configuration conf, Context context, Element actionXml)
702            throws ActionExecutorException {
703        XConfiguration wfJobConf = null;
704        try {
705            wfJobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
706        }
707        catch (IOException ioe) {
708            throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen",
709                    ioe.getMessage());
710        }
711        // Action sharelibs are only added if user has specified to use system libpath
712        if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) {
713            // add action specific sharelibs
714            addShareLib(conf, getShareLibNames(context, actionXml, conf));
715        }
716    }
717
718
719    protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
720        return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, JavaMain.class.getName());
721    }
722
723    private void setJavaMain(Configuration actionConf, Element actionXml) {
724        Namespace ns = actionXml.getNamespace();
725        Element e = actionXml.getChild("main-class", ns);
726        if (e != null) {
727            actionConf.set(JavaMain.JAVA_MAIN_CLASS, e.getTextTrim());
728        }
729    }
730
731    private static final String QUEUE_NAME = "mapred.job.queue.name";
732
733    private static final Set<String> SPECIAL_PROPERTIES = new HashSet<String>();
734
735    static {
736        SPECIAL_PROPERTIES.add(QUEUE_NAME);
737        SPECIAL_PROPERTIES.add(ACL_VIEW_JOB);
738        SPECIAL_PROPERTIES.add(ACL_MODIFY_JOB);
739    }
740
741    @SuppressWarnings("unchecked")
742    JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf)
743            throws ActionExecutorException {
744        try {
745
746            // app path could be a file
747            Path appPathRoot = new Path(context.getWorkflow().getAppPath());
748            if (actionFs.isFile(appPathRoot)) {
749                appPathRoot = appPathRoot.getParent();
750            }
751
752            // launcher job configuration
753            JobConf launcherJobConf = createBaseHadoopConf(context, actionXml);
754            setupLauncherConf(launcherJobConf, actionXml, appPathRoot, context);
755
756            // Properties for when a launcher job's AM gets restarted
757            LauncherMapperHelper.setupYarnRestartHandling(launcherJobConf, actionConf, action.getId());
758
759            String actionShareLibProperty = actionConf.get(ACTION_SHARELIB_FOR + getType());
760            if (actionShareLibProperty != null) {
761                launcherJobConf.set(ACTION_SHARELIB_FOR + getType(), actionShareLibProperty);
762            }
763            setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf);
764
765            String jobName = launcherJobConf.get(HADOOP_JOB_NAME);
766            if (jobName == null || jobName.isEmpty()) {
767                jobName = XLog.format(
768                        "oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(),
769                        context.getWorkflow().getAppName(), action.getName(),
770                        context.getWorkflow().getId());
771            launcherJobConf.setJobName(jobName);
772            }
773
774            String jobId = context.getWorkflow().getId();
775            String actionId = action.getId();
776            Path actionDir = context.getActionDir();
777            String recoveryId = context.getRecoveryId();
778
779            // Getting the prepare XML from the action XML
780            Namespace ns = actionXml.getNamespace();
781            Element prepareElement = actionXml.getChild("prepare", ns);
782            String prepareXML = "";
783            if (prepareElement != null) {
784                if (prepareElement.getChildren().size() > 0) {
785                    prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim();
786                }
787            }
788            LauncherMapperHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf,
789                    prepareXML);
790
791            // Set the launcher Main Class
792            LauncherMapperHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
793            LauncherMapperHelper.setupLauncherURIHandlerConf(launcherJobConf);
794            LauncherMapperHelper.setupMaxOutputData(launcherJobConf, maxActionOutputLen);
795            LauncherMapperHelper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize);
796            if (getOozieConf().get(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX) != null) {
797                LauncherMapperHelper.setupMaxFSGlob(launcherJobConf, maxFSGlobMax);
798            }
799
800            List<Element> list = actionXml.getChildren("arg", ns);
801            String[] args = new String[list.size()];
802            for (int i = 0; i < list.size(); i++) {
803                args[i] = list.get(i).getTextTrim();
804            }
805            LauncherMapperHelper.setupMainArguments(launcherJobConf, args);
806
807            // Make mapred.child.java.opts and mapreduce.map.java.opts equal, but give values from the latter priority; also append
808            // <java-opt> and <java-opts> and give those highest priority
809            StringBuilder opts = new StringBuilder(launcherJobConf.get(HADOOP_CHILD_JAVA_OPTS, ""));
810            if (launcherJobConf.get(HADOOP_MAP_JAVA_OPTS) != null) {
811                opts.append(" ").append(launcherJobConf.get(HADOOP_MAP_JAVA_OPTS));
812            }
813            List<Element> javaopts = actionXml.getChildren("java-opt", ns);
814            for (Element opt: javaopts) {
815                opts.append(" ").append(opt.getTextTrim());
816            }
817            Element opt = actionXml.getChild("java-opts", ns);
818            if (opt != null) {
819                opts.append(" ").append(opt.getTextTrim());
820            }
821            launcherJobConf.set(HADOOP_CHILD_JAVA_OPTS, opts.toString().trim());
822            launcherJobConf.set(HADOOP_MAP_JAVA_OPTS, opts.toString().trim());
823
824            // setting for uber mode
825            if (launcherJobConf.getBoolean(HADOOP_YARN_UBER_MODE, false)) {
826                updateConfForUberMode(launcherJobConf);
827            }
828
829            // properties from action that are needed by the launcher (e.g. QUEUE NAME, ACLs)
830            // maybe we should add queue to the WF schema, below job-tracker
831            actionConfToLauncherConf(actionConf, launcherJobConf);
832
833            // to disable cancelation of delegation token on launcher job end
834            launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
835
836            return launcherJobConf;
837        }
838        catch (Exception ex) {
839            throw convertException(ex);
840        }
841    }
842
843    private void injectCallback(Context context, Configuration conf) {
844        String callback = context.getCallbackUrl("$jobStatus");
845        if (conf.get("job.end.notification.url") != null) {
846            LOG.warn("Overriding the action job end notification URI");
847        }
848        conf.set("job.end.notification.url", callback);
849    }
850
851    void injectActionCallback(Context context, Configuration actionConf) {
852        injectCallback(context, actionConf);
853    }
854
855    void injectLauncherCallback(Context context, Configuration launcherConf) {
856        injectCallback(context, launcherConf);
857    }
858
859    private void actionConfToLauncherConf(Configuration actionConf, JobConf launcherConf) {
860        for (String name : SPECIAL_PROPERTIES) {
861            if (actionConf.get(name) != null && launcherConf.get("oozie.launcher." + name) == null) {
862                launcherConf.set(name, actionConf.get(name));
863            }
864        }
865    }
866
867    public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
868        JobClient jobClient = null;
869        boolean exception = false;
870        try {
871            Path appPathRoot = new Path(context.getWorkflow().getAppPath());
872
873            // app path could be a file
874            if (actionFs.isFile(appPathRoot)) {
875                appPathRoot = appPathRoot.getParent();
876            }
877
878            Element actionXml = XmlUtils.parseXml(action.getConf());
879
880            // action job configuration
881            Configuration actionConf = createBaseHadoopConf(context, actionXml);
882            setupActionConf(actionConf, context, actionXml, appPathRoot);
883            LOG.debug("Setting LibFilesArchives ");
884            setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
885
886            String jobName = actionConf.get(HADOOP_JOB_NAME);
887            if (jobName == null || jobName.isEmpty()) {
888                jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}",
889                        getType(), context.getWorkflow().getAppName(),
890                        action.getName(), context.getWorkflow().getId());
891                actionConf.set(HADOOP_JOB_NAME, jobName);
892            }
893
894            injectActionCallback(context, actionConf);
895
896            if(actionConf.get(ACL_MODIFY_JOB) == null || actionConf.get(ACL_MODIFY_JOB).trim().equals("")) {
897                // ONLY in the case where user has not given the
898                // modify-job ACL specifically
899                if (context.getWorkflow().getAcl() != null) {
900                    // setting the group owning the Oozie job to allow anybody in that
901                    // group to modify the jobs.
902                    actionConf.set(ACL_MODIFY_JOB, context.getWorkflow().getAcl());
903                }
904            }
905
906            // Setting the credential properties in launcher conf
907            HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context,
908                    action, actionConf);
909
910            // Adding if action need to set more credential tokens
911            JobConf credentialsConf = new JobConf(false);
912            XConfiguration.copy(actionConf, credentialsConf);
913            setCredentialTokens(credentialsConf, context, action, credentialsProperties);
914
915            // insert conf to action conf from credentialsConf
916            for (Entry<String, String> entry : credentialsConf) {
917                if (actionConf.get(entry.getKey()) == null) {
918                    actionConf.set(entry.getKey(), entry.getValue());
919                }
920            }
921
922            JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
923
924            injectJobInfo(launcherJobConf, actionConf, context, action);
925            injectLauncherCallback(context, launcherJobConf);
926            LOG.debug("Creating Job Client for action " + action.getId());
927            jobClient = createJobClient(context, launcherJobConf);
928            String launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
929                    .getRecoveryId());
930            boolean alreadyRunning = launcherId != null;
931            RunningJob runningJob;
932
933            // if user-retry is on, always submit new launcher
934            boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry();
935
936            if (alreadyRunning && !isUserRetry) {
937                runningJob = jobClient.getJob(JobID.forName(launcherId));
938                if (runningJob == null) {
939                    String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
940                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
941                            "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker);
942                }
943            }
944            else {
945                LOG.debug("Submitting the job through Job Client for action " + action.getId());
946
947                // setting up propagation of the delegation token.
948                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
949                Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(has
950                        .getMRDelegationTokenRenewer(launcherJobConf));
951                launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt);
952
953                // insert credentials tokens to launcher job conf if needed
954                if (needInjectCredentials()) {
955                    for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) {
956                        Text fauxAlias = new Text(tk.getKind() + "_" + tk.getService());
957                        LOG.debug("ADDING TOKEN: " + fauxAlias);
958                        launcherJobConf.getCredentials().addToken(fauxAlias, tk);
959                    }
960                }
961                else {
962                    LOG.info("No need to inject credentials.");
963                }
964                runningJob = jobClient.submitJob(launcherJobConf);
965                if (runningJob == null) {
966                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
967                            "Error submitting launcher for action [{0}]", action.getId());
968                }
969                launcherId = runningJob.getID().toString();
970                LOG.debug("After submission get the launcherId " + launcherId);
971            }
972
973            String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
974            String consoleUrl = runningJob.getTrackingURL();
975            context.setStartData(launcherId, jobTracker, consoleUrl);
976        }
977        catch (Exception ex) {
978            exception = true;
979            throw convertException(ex);
980        }
981        finally {
982            if (jobClient != null) {
983                try {
984                    jobClient.close();
985                }
986                catch (Exception e) {
987                    if (exception) {
988                        LOG.error("JobClient error: ", e);
989                    }
990                    else {
991                        throw convertException(e);
992                    }
993                }
994            }
995        }
996    }
997    private boolean needInjectCredentials() {
998        boolean methodExists = true;
999
1000        Class klass;
1001        try {
1002            klass = Class.forName("org.apache.hadoop.mapred.JobConf");
1003            klass.getMethod("getCredentials");
1004        }
1005        catch (ClassNotFoundException ex) {
1006            methodExists = false;
1007        }
1008        catch (NoSuchMethodException ex) {
1009            methodExists = false;
1010        }
1011
1012        return methodExists;
1013    }
1014
1015    protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context,
1016            WorkflowAction action, Configuration actionConf) throws Exception {
1017        HashMap<String, CredentialsProperties> credPropertiesMap = null;
1018        if (context != null && action != null) {
1019            credPropertiesMap = getActionCredentialsProperties(context, action);
1020            if (credPropertiesMap != null) {
1021                for (String key : credPropertiesMap.keySet()) {
1022                    CredentialsProperties prop = credPropertiesMap.get(key);
1023                    if (prop != null) {
1024                        LOG.debug("Credential Properties set for action : " + action.getId());
1025                        for (String property : prop.getProperties().keySet()) {
1026                            actionConf.set(property, prop.getProperties().get(property));
1027                            LOG.debug("property : '" + property + "', value : '" + prop.getProperties().get(property) + "'");
1028                        }
1029                    }
1030                }
1031            }
1032            else {
1033                LOG.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred());
1034            }
1035        }
1036        else {
1037            LOG.warn("context or action is null");
1038        }
1039        return credPropertiesMap;
1040    }
1041
1042    protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action,
1043            HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception {
1044
1045        if (context != null && action != null && credPropertiesMap != null) {
1046            for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) {
1047                String credName = entry.getKey();
1048                CredentialsProperties credProps = entry.getValue();
1049                if (credProps != null) {
1050                    CredentialsProvider credProvider = new CredentialsProvider(credProps.getType());
1051                    Credentials credentialObject = credProvider.createCredentialObject();
1052                    if (credentialObject != null) {
1053                        credentialObject.addtoJobConf(jobconf, credProps, context);
1054                        LOG.debug("Retrieved Credential '" + credName + "' for action " + action.getId());
1055                    }
1056                    else {
1057                        LOG.debug("Credentials object is null for name= " + credName + ", type=" + credProps.getType());
1058                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA020",
1059                            "Could not load credentials of type [{0}] with name [{1}]]; perhaps it was not defined"
1060                                + " in oozie-site.xml?", credProps.getType(), credName);
1061                    }
1062                }
1063            }
1064        }
1065
1066    }
1067
1068    protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context,
1069            WorkflowAction action) throws Exception {
1070        HashMap<String, CredentialsProperties> props = new HashMap<String, CredentialsProperties>();
1071        if (context != null && action != null) {
1072            String credsInAction = action.getCred();
1073            LOG.debug("Get credential '" + credsInAction + "' properties for action : " + action.getId());
1074            String[] credNames = credsInAction.split(",");
1075            for (String credName : credNames) {
1076                CredentialsProperties credProps = getCredProperties(context, credName);
1077                props.put(credName, credProps);
1078            }
1079        }
1080        else {
1081            LOG.warn("context or action is null");
1082        }
1083        return props;
1084    }
1085
1086    @SuppressWarnings("unchecked")
1087    protected CredentialsProperties getCredProperties(Context context, String credName)
1088            throws Exception {
1089        CredentialsProperties credProp = null;
1090        String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition();
1091        XConfiguration wfjobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
1092        Element elementJob = XmlUtils.parseXml(workflowXml);
1093        Element credentials = elementJob.getChild("credentials", elementJob.getNamespace());
1094        if (credentials != null) {
1095            for (Element credential : (List<Element>) credentials.getChildren("credential", credentials.getNamespace())) {
1096                String name = credential.getAttributeValue("name");
1097                String type = credential.getAttributeValue("type");
1098                LOG.debug("getCredProperties: Name: " + name + ", Type: " + type);
1099                if (name.equalsIgnoreCase(credName)) {
1100                    credProp = new CredentialsProperties(name, type);
1101                    for (Element property : (List<Element>) credential.getChildren("property",
1102                            credential.getNamespace())) {
1103                        String propertyName = property.getChildText("name", property.getNamespace());
1104                        String propertyValue = property.getChildText("value", property.getNamespace());
1105                        ELEvaluator eval = new ELEvaluator();
1106                        for (Map.Entry<String, String> entry : wfjobConf) {
1107                            eval.setVariable(entry.getKey(), entry.getValue().trim());
1108                        }
1109                        propertyName = eval.evaluate(propertyName, String.class);
1110                        propertyValue = eval.evaluate(propertyValue, String.class);
1111
1112                        credProp.getProperties().put(propertyName, propertyValue);
1113                        LOG.debug("getCredProperties: Properties name :'" + propertyName + "', Value : '"
1114                                + propertyValue + "'");
1115                    }
1116                }
1117            }
1118        } else {
1119            LOG.debug("credentials is null for the action");
1120        }
1121        return credProp;
1122    }
1123
1124    @Override
1125    public void start(Context context, WorkflowAction action) throws ActionExecutorException {
1126        LOG = XLog.resetPrefix(LOG);
1127        LogUtils.setLogInfo(action, new XLog.Info());
1128        try {
1129            LOG.debug("Starting action " + action.getId() + " getting Action File System");
1130            FileSystem actionFs = context.getAppFileSystem();
1131            LOG.debug("Preparing action Dir through copying " + context.getActionDir());
1132            prepareActionDir(actionFs, context);
1133            LOG.debug("Action Dir is ready. Submitting the action ");
1134            submitLauncher(actionFs, context, action);
1135            LOG.debug("Action submit completed. Performing check ");
1136            check(context, action);
1137            LOG.debug("Action check is done after submission");
1138        }
1139        catch (Exception ex) {
1140            throw convertException(ex);
1141        }
1142    }
1143
1144    @Override
1145    public void end(Context context, WorkflowAction action) throws ActionExecutorException {
1146        try {
1147            String externalStatus = action.getExternalStatus();
1148            WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ? WorkflowAction.Status.OK
1149                    : WorkflowAction.Status.ERROR;
1150            context.setEndData(status, getActionSignal(status));
1151        }
1152        catch (Exception ex) {
1153            throw convertException(ex);
1154        }
1155        finally {
1156            try {
1157                FileSystem actionFs = context.getAppFileSystem();
1158                cleanUpActionDir(actionFs, context);
1159            }
1160            catch (Exception ex) {
1161                throw convertException(ex);
1162            }
1163        }
1164    }
1165
1166    /**
1167     * Create job client object
1168     *
1169     * @param context
1170     * @param jobConf
1171     * @return JobClient
1172     * @throws HadoopAccessorException
1173     */
1174    protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException {
1175        String user = context.getWorkflow().getUser();
1176        String group = context.getWorkflow().getGroup();
1177        return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
1178    }
1179
1180    protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{
1181        RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
1182        return runningJob;
1183    }
1184
1185    @Override
1186    public void check(Context context, WorkflowAction action) throws ActionExecutorException {
1187        JobClient jobClient = null;
1188        boolean exception = false;
1189        LOG = XLog.resetPrefix(LOG);
1190        LogUtils.setLogInfo(action, new XLog.Info());
1191        try {
1192            Element actionXml = XmlUtils.parseXml(action.getConf());
1193            FileSystem actionFs = context.getAppFileSystem();
1194            JobConf jobConf = createBaseHadoopConf(context, actionXml);
1195            jobClient = createJobClient(context, jobConf);
1196            RunningJob runningJob = getRunningJob(context, action, jobClient);
1197            if (runningJob == null) {
1198                context.setExecutionData(FAILED, null);
1199                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
1200                        "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", action
1201                                .getExternalId(), action.getId());
1202            }
1203            if (runningJob.isComplete()) {
1204                Path actionDir = context.getActionDir();
1205                String newId = null;
1206                // load sequence file into object
1207                Map<String, String> actionData = LauncherMapperHelper.getActionData(actionFs, actionDir, jobConf);
1208                if (actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID)) {
1209                    newId = actionData.get(LauncherMapper.ACTION_DATA_NEW_ID);
1210                    String launcherId = action.getExternalId();
1211                    runningJob = jobClient.getJob(JobID.forName(newId));
1212                    if (runningJob == null) {
1213                        context.setExternalStatus(FAILED);
1214                        throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
1215                                "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", newId,
1216                                action.getId());
1217                    }
1218                    context.setExternalChildIDs(newId);
1219                    LOG.info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId,
1220                            newId);
1221                }
1222                else {
1223                    String externalIDs = actionData.get(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS);
1224                    if (externalIDs != null) {
1225                        context.setExternalChildIDs(externalIDs);
1226                        LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs);
1227                    }
1228                }
1229                if (runningJob.isComplete()) {
1230                    // fetching action output and stats for the Map-Reduce action.
1231                    if (newId != null) {
1232                        actionData = LauncherMapperHelper.getActionData(actionFs, context.getActionDir(), jobConf);
1233                    }
1234                    LOG.info(XLog.STD, "action completed, external ID [{0}]",
1235                            action.getExternalId());
1236                    if (LauncherMapperHelper.isMainSuccessful(runningJob)) {
1237                        if (getCaptureOutput(action) && LauncherMapperHelper.hasOutputData(actionData)) {
1238                            context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData
1239                                    .get(LauncherMapper.ACTION_DATA_OUTPUT_PROPS)));
1240                            LOG.info(XLog.STD, "action produced output");
1241                        }
1242                        else {
1243                            context.setExecutionData(SUCCEEDED, null);
1244                        }
1245                        if (LauncherMapperHelper.hasStatsData(actionData)) {
1246                            context.setExecutionStats(actionData.get(LauncherMapper.ACTION_DATA_STATS));
1247                            LOG.info(XLog.STD, "action produced stats");
1248                        }
1249                        getActionData(actionFs, runningJob, action, context);
1250                    }
1251                    else {
1252                        String errorReason;
1253                        if (actionData.containsKey(LauncherMapper.ACTION_DATA_ERROR_PROPS)) {
1254                            Properties props = PropertiesUtils.stringToProperties(actionData
1255                                    .get(LauncherMapper.ACTION_DATA_ERROR_PROPS));
1256                            String errorCode = props.getProperty("error.code");
1257                            if ("0".equals(errorCode)) {
1258                                errorCode = "JA018";
1259                            }
1260                            if ("-1".equals(errorCode)) {
1261                                errorCode = "JA019";
1262                            }
1263                            errorReason = props.getProperty("error.reason");
1264                            LOG.warn("Launcher ERROR, reason: {0}", errorReason);
1265                            String exMsg = props.getProperty("exception.message");
1266                            String errorInfo = (exMsg != null) ? exMsg : errorReason;
1267                            context.setErrorInfo(errorCode, errorInfo);
1268                            String exStackTrace = props.getProperty("exception.stacktrace");
1269                            if (exMsg != null) {
1270                                LOG.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
1271                            }
1272                        }
1273                        else {
1274                            errorReason = XLog.format("LauncherMapper died, check Hadoop LOG for job [{0}:{1}]", action
1275                                    .getTrackerUri(), action.getExternalId());
1276                            LOG.warn(errorReason);
1277                        }
1278                        context.setExecutionData(FAILED_KILLED, null);
1279                    }
1280                }
1281                else {
1282                    context.setExternalStatus("RUNNING");
1283                    LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]",
1284                            runningJob.getID());
1285                }
1286            }
1287            else {
1288                context.setExternalStatus("RUNNING");
1289                LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]",
1290                        runningJob.getID());
1291            }
1292        }
1293        catch (Exception ex) {
1294            LOG.warn("Exception in check(). Message[{0}]", ex.getMessage(), ex);
1295            exception = true;
1296            throw convertException(ex);
1297        }
1298        finally {
1299            if (jobClient != null) {
1300                try {
1301                    jobClient.close();
1302                }
1303                catch (Exception e) {
1304                    if (exception) {
1305                        LOG.error("JobClient error: ", e);
1306                    }
1307                    else {
1308                        throw convertException(e);
1309                    }
1310                }
1311            }
1312        }
1313    }
1314
1315    /**
1316     * Get the output data of an action. Subclasses should override this method
1317     * to get action specific output data.
1318     *
1319     * @param actionFs the FileSystem object
1320     * @param runningJob the runningJob
1321     * @param action the Workflow action
1322     * @param context executor context
1323     *
1324     */
1325    protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
1326            throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
1327    }
1328
1329    protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
1330        Element eConf = XmlUtils.parseXml(action.getConf());
1331        Namespace ns = eConf.getNamespace();
1332        Element captureOutput = eConf.getChild("capture-output", ns);
1333        return captureOutput != null;
1334    }
1335
1336    @Override
1337    public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
1338        JobClient jobClient = null;
1339        boolean exception = false;
1340        try {
1341            Element actionXml = XmlUtils.parseXml(action.getConf());
1342            JobConf jobConf = createBaseHadoopConf(context, actionXml);
1343            jobClient = createJobClient(context, jobConf);
1344            RunningJob runningJob = getRunningJob(context, action, jobClient);
1345            if (runningJob != null) {
1346                runningJob.killJob();
1347            }
1348            context.setExternalStatus(KILLED);
1349            context.setExecutionData(KILLED, null);
1350        }
1351        catch (Exception ex) {
1352            exception = true;
1353            throw convertException(ex);
1354        }
1355        finally {
1356            try {
1357                FileSystem actionFs = context.getAppFileSystem();
1358                cleanUpActionDir(actionFs, context);
1359                if (jobClient != null) {
1360                    jobClient.close();
1361                }
1362            }
1363            catch (Exception ex) {
1364                if (exception) {
1365                    LOG.error("Error: ", ex);
1366                }
1367                else {
1368                    throw convertException(ex);
1369                }
1370            }
1371        }
1372    }
1373
1374    private static Set<String> FINAL_STATUS = new HashSet<String>();
1375
1376    static {
1377        FINAL_STATUS.add(SUCCEEDED);
1378        FINAL_STATUS.add(KILLED);
1379        FINAL_STATUS.add(FAILED);
1380        FINAL_STATUS.add(FAILED_KILLED);
1381    }
1382
1383    @Override
1384    public boolean isCompleted(String externalStatus) {
1385        return FINAL_STATUS.contains(externalStatus);
1386    }
1387
1388
1389    /**
1390     * Return the sharelib names for the action.
1391     * <p/>
1392     * If <code>NULL</code> or empty, it means that the action does not use the action
1393     * sharelib.
1394     * <p/>
1395     * If a non-empty string, i.e. <code>foo</code>, it means the action uses the
1396     * action sharelib sub-directory <code>foo</code> and all JARs in the sharelib
1397     * <code>foo</code> directory will be in the action classpath. Multiple sharelib
1398     * sub-directories can be specified as a comma separated list.
1399     * <p/>
1400     * The resolution is done using the following precedence order:
1401     * <ul>
1402     *     <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the action configuration</li>
1403     *     <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the job configuration</li>
1404     *     <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the oozie configuration</li>
1405     *     <li>Action Executor <code>getDefaultShareLibName()</code> method</li>
1406     * </ul>
1407     *
1408     *
1409     * @param context executor context.
1410     * @param actionXml
1411     * @param conf action configuration.
1412     * @return the action sharelib names.
1413     */
1414    protected String[] getShareLibNames(Context context, Element actionXml, Configuration conf) {
1415        String[] names = conf.getStrings(ACTION_SHARELIB_FOR + getType());
1416        if (names == null || names.length == 0) {
1417            try {
1418                XConfiguration jobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
1419                names = jobConf.getStrings(ACTION_SHARELIB_FOR + getType());
1420                if (names == null || names.length == 0) {
1421                    names = Services.get().getConf().getStrings(ACTION_SHARELIB_FOR + getType());
1422                    if (names == null || names.length == 0) {
1423                        String name = getDefaultShareLibName(actionXml);
1424                        if (name != null) {
1425                            names = new String[] { name };
1426                        }
1427                    }
1428                }
1429            }
1430            catch (IOException ex) {
1431                throw new RuntimeException("It cannot happen, " + ex.toString(), ex);
1432            }
1433        }
1434        return names;
1435    }
1436
1437    private final static String ACTION_SHARELIB_FOR = "oozie.action.sharelib.for.";
1438
1439
1440    /**
1441     * Returns the default sharelib name for the action if any.
1442     *
1443     * @param actionXml the action XML fragment.
1444     * @return the sharelib name for the action, <code>NULL</code> if none.
1445     */
1446    protected String getDefaultShareLibName(Element actionXml) {
1447        return null;
1448    }
1449
1450    /**
1451     * Sets some data for the action on completion
1452     *
1453     * @param context executor context
1454     * @param actionFs the FileSystem object
1455     */
1456    protected void setActionCompletionData(Context context, FileSystem actionFs) throws IOException,
1457            HadoopAccessorException, URISyntaxException {
1458    }
1459
1460    private void injectJobInfo(JobConf launcherJobConf, Configuration actionConf, Context context, WorkflowAction action) {
1461        if (OozieJobInfo.isJobInfoEnabled()) {
1462            try {
1463                OozieJobInfo jobInfo = new OozieJobInfo(actionConf, context, action);
1464                String jobInfoStr = jobInfo.getJobInfo();
1465                launcherJobConf.set(OozieJobInfo.JOB_INFO_KEY, jobInfoStr + "launcher=true");
1466                actionConf.set(OozieJobInfo.JOB_INFO_KEY, jobInfoStr + "launcher=false");
1467            }
1468            catch (Exception e) {
1469                // Just job info, should not impact the execution.
1470                LOG.error("Error while populating job info", e);
1471            }
1472        }
1473    }
1474}