001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.io.BufferedReader;
021    import java.io.File;
022    import java.io.FileNotFoundException;
023    import java.io.IOException;
024    import java.io.InputStream;
025    import java.io.InputStreamReader;
026    import java.io.StringReader;
027    import java.net.ConnectException;
028    import java.net.URI;
029    import java.net.URISyntaxException;
030    import java.net.UnknownHostException;
031    import java.util.ArrayList;
032    import java.util.HashMap;
033    import java.util.HashSet;
034    import java.util.List;
035    import java.util.Map;
036    import java.util.Properties;
037    import java.util.Set;
038    import java.util.Map.Entry;
039    
040    import org.apache.hadoop.conf.Configuration;
041    import org.apache.hadoop.filecache.DistributedCache;
042    import org.apache.hadoop.fs.FileStatus;
043    import org.apache.hadoop.fs.FileSystem;
044    import org.apache.hadoop.fs.Path;
045    import org.apache.hadoop.fs.permission.AccessControlException;
046    import org.apache.hadoop.io.Text;
047    import org.apache.hadoop.mapred.JobClient;
048    import org.apache.hadoop.mapred.JobConf;
049    import org.apache.hadoop.mapred.JobID;
050    import org.apache.hadoop.mapred.RunningJob;
051    import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
052    import org.apache.hadoop.util.DiskChecker;
053    import org.apache.oozie.WorkflowActionBean;
054    import org.apache.oozie.WorkflowJobBean;
055    import org.apache.oozie.action.ActionExecutor;
056    import org.apache.oozie.action.ActionExecutorException;
057    import org.apache.oozie.client.OozieClient;
058    import org.apache.oozie.client.WorkflowAction;
059    import org.apache.oozie.service.HadoopAccessorException;
060    import org.apache.oozie.service.HadoopAccessorService;
061    import org.apache.oozie.service.Services;
062    import org.apache.oozie.service.WorkflowAppService;
063    import org.apache.oozie.servlet.CallbackServlet;
064    import org.apache.oozie.util.IOUtils;
065    import org.apache.oozie.util.PropertiesUtils;
066    import org.apache.oozie.util.XConfiguration;
067    import org.apache.oozie.util.XLog;
068    import org.apache.oozie.util.XmlUtils;
069    import org.jdom.Element;
070    import org.jdom.JDOMException;
071    import org.jdom.Namespace;
072    import org.apache.hadoop.security.token.Token;
073    import org.apache.hadoop.security.token.TokenIdentifier;
074    
075    public class JavaActionExecutor extends ActionExecutor {
076    
077        private static final String HADOOP_USER = "user.name";
078        private static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
079        private static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
080        private static final String HADOOP_NAME_NODE = "fs.default.name";
081        public static final String OOZIE_COMMON_LIBDIR = "oozie";
082        public static final int MAX_EXTERNAL_STATS_SIZE_DEFAULT = Integer.MAX_VALUE;
083        private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>();
084        public final static String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size";
085        private static int maxActionOutputLen;
086        private static int maxExternalStatsSize;
087    
088        private static final String SUCCEEDED = "SUCCEEDED";
089        private static final String KILLED = "KILLED";
090        private static final String FAILED = "FAILED";
091        private static final String FAILED_KILLED = "FAILED/KILLED";
092        private static final String RUNNING = "RUNNING";
093        protected XLog log = XLog.getLog(getClass());
094    
095        static {
096            DISALLOWED_PROPERTIES.add(HADOOP_USER);
097            DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER);
098            DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE);
099            DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER_2);
100        }
101    
102        public JavaActionExecutor() {
103            this("java");
104        }
105    
106        protected JavaActionExecutor(String type) {
107            super(type);
108        }
109    
110        protected String getLauncherJarName() {
111            return getType() + "-launcher.jar";
112        }
113    
114        protected List<Class> getLauncherClasses() {
115            List<Class> classes = new ArrayList<Class>();
116            classes.add(LauncherMapper.class);
117            classes.add(LauncherSecurityManager.class);
118            classes.add(LauncherException.class);
119            classes.add(LauncherMainException.class);
120            classes.add(FileSystemActions.class);
121            classes.add(PrepareActionsDriver.class);
122            classes.add(ActionStats.class);
123            classes.add(ActionType.class);
124            return classes;
125        }
126    
127        @Override
128        public void initActionType() {
129            super.initActionType();
130            maxActionOutputLen = getOozieConf()
131              .getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA,
132              // TODO: Remove the below config get in a subsequent release..
133              // This other irrelevant property is only used to
134              // preserve backwards compatibility cause of a typo.
135              // See OOZIE-4.
136              getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN,
137                2 * 1024));
138            //Get the limit for the maximum allowed size of action stats
139            maxExternalStatsSize = getOozieConf().getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE, MAX_EXTERNAL_STATS_SIZE_DEFAULT);
140            maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize;
141            try {
142                List<Class> classes = getLauncherClasses();
143                Class[] launcherClasses = classes.toArray(new Class[classes.size()]);
144                IOUtils.createJar(new File(getOozieRuntimeDir()), getLauncherJarName(), launcherClasses);
145    
146                registerError(UnknownHostException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA001");
147                registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.NON_TRANSIENT,
148                        "JA002");
149                registerError(DiskChecker.DiskOutOfSpaceException.class.getName(),
150                        ActionExecutorException.ErrorType.NON_TRANSIENT, "JA003");
151                registerError(org.apache.hadoop.hdfs.protocol.QuotaExceededException.class.getName(),
152                        ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004");
153                registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(),
154                        ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005");
155                registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA006");
156                registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007");
157                registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008");
158                registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009");
159            }
160            catch (IOException ex) {
161                throw new RuntimeException(ex);
162            }
163        }
164    
165        /**
166         * Get the maximum allowed size of stats
167         *
168         * @return maximum size of stats
169         */
170        public static int getMaxExternalStatsSize() {
171            return maxExternalStatsSize;
172        }
173    
174        void checkForDisallowedProps(Configuration conf, String confName) throws ActionExecutorException {
175            for (String prop : DISALLOWED_PROPERTIES) {
176                if (conf.get(prop) != null) {
177                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA010",
178                            "Property [{0}] not allowed in action [{1}] configuration", prop, confName);
179                }
180            }
181        }
182    
183        public JobConf createBaseHadoopConf(Context context, Element actionXml) {
184            Namespace ns = actionXml.getNamespace();
185            String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim();
186            String nameNode = actionXml.getChild("name-node", ns).getTextTrim();
187            JobConf conf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
188            conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER));
189            conf.set(HADOOP_JOB_TRACKER, jobTracker);
190            conf.set(HADOOP_JOB_TRACKER_2, jobTracker);
191            conf.set(HADOOP_NAME_NODE, nameNode);
192            conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true");
193            return conf;
194        }
195    
196        private void injectLauncherProperties(Configuration srcConf, Configuration launcherConf) {
197            for (Map.Entry<String, String> entry : srcConf) {
198                if (entry.getKey().startsWith("oozie.launcher.")) {
199                    String name = entry.getKey().substring("oozie.launcher.".length());
200                    String value = entry.getValue();
201                    // setting original KEY
202                    launcherConf.set(entry.getKey(), value);
203                    // setting un-prefixed key (to allow Hadoop job config
204                    // for the launcher job
205                    launcherConf.set(name, value);
206                }
207            }
208        }
209    
210        Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
211                throws ActionExecutorException {
212            try {
213                Namespace ns = actionXml.getNamespace();
214                Element e = actionXml.getChild("configuration", ns);
215                if (e != null) {
216                    String strConf = XmlUtils.prettyPrint(e).toString();
217                    XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
218    
219                    XConfiguration launcherConf = new XConfiguration();
220                    HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
221                    XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_JOB_TRACKER), getType());
222                    injectLauncherProperties(actionDefaultConf, launcherConf);
223                    injectLauncherProperties(inlineConf, launcherConf);
224                    checkForDisallowedProps(launcherConf, "launcher configuration");
225                    XConfiguration.copy(launcherConf, conf);
226                }
227                return conf;
228            }
229            catch (IOException ex) {
230                throw convertException(ex);
231            }
232        }
233    
234        protected FileSystem getActionFileSystem(Context context, WorkflowAction action) throws ActionExecutorException {
235            try {
236                Element actionXml = XmlUtils.parseXml(action.getConf());
237                return getActionFileSystem(context, actionXml);
238            }
239            catch (JDOMException ex) {
240                throw convertException(ex);
241            }
242        }
243    
244        protected FileSystem getActionFileSystem(Context context, Element actionXml) throws ActionExecutorException {
245            try {
246                return context.getAppFileSystem();
247            }
248            catch (Exception ex) {
249                throw convertException(ex);
250            }
251        }
252    
253        Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
254                throws ActionExecutorException {
255            try {
256                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
257                XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_JOB_TRACKER), getType());
258                XConfiguration.injectDefaults(actionDefaults, actionConf);
259                Namespace ns = actionXml.getNamespace();
260                Element e = actionXml.getChild("job-xml", ns);
261                if (e != null) {
262                    String jobXml = e.getTextTrim();
263                    Path path = new Path(appPath, jobXml);
264                    FileSystem fs = getActionFileSystem(context, actionXml);
265                    Configuration jobXmlConf = new XConfiguration(fs.open(path));
266                    checkForDisallowedProps(jobXmlConf, "job-xml");
267                    XConfiguration.copy(jobXmlConf, actionConf);
268                }
269                e = actionXml.getChild("configuration", ns);
270                if (e != null) {
271                    String strConf = XmlUtils.prettyPrint(e).toString();
272                    XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
273                    checkForDisallowedProps(inlineConf, "inline configuration");
274                    XConfiguration.copy(inlineConf, actionConf);
275                }
276                return actionConf;
277            }
278            catch (IOException ex) {
279                throw convertException(ex);
280            }
281        }
282    
283        Configuration addToCache(Configuration conf, Path appPath, String filePath, boolean archive)
284                throws ActionExecutorException {
285            Path path = null;
286            try {
287                if (filePath.startsWith("/")) {
288                    path = new Path(filePath);
289                }
290                else {
291                    path = new Path(appPath, filePath);
292                }
293                URI uri = new URI(path.toUri().getPath());
294                if (archive) {
295                    DistributedCache.addCacheArchive(uri, conf);
296                }
297                else {
298                    String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);
299                    if (fileName.endsWith(".so") || fileName.contains(".so.")) {  // .so files
300                        uri = new Path(path.toString() + "#" + fileName).toUri();
301                        uri = new URI(uri.getPath());
302                        DistributedCache.addCacheFile(uri, conf);
303                    }
304                    else if (fileName.endsWith(".jar")) { // .jar files
305                        if (!fileName.contains("#")) {
306                            path = new Path(uri.toString());
307    
308                            String user = conf.get("user.name");
309                            String group = conf.get("group.name");
310                            Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, path, conf);
311                        }
312                        else {
313                            DistributedCache.addCacheFile(uri, conf);
314                        }
315                    }
316                    else { // regular files
317                        if (!fileName.contains("#")) {
318                            uri = new Path(path.toString() + "#" + fileName).toUri();
319                            uri = new URI(uri.getPath());
320                        }
321                        DistributedCache.addCacheFile(uri, conf);
322                    }
323                }
324                DistributedCache.createSymlink(conf);
325                return conf;
326            }
327            catch (Exception ex) {
328                XLog.getLog(getClass()).debug(
329                        "Errors when add to DistributedCache. Path=" + path + ", archive=" + archive + ", conf="
330                                + XmlUtils.prettyPrint(conf).toString());
331                throw convertException(ex);
332            }
333        }
334    
335        String getOozieLauncherJar(Context context) throws ActionExecutorException {
336            try {
337                return new Path(context.getActionDir(), getLauncherJarName()).toString();
338            }
339            catch (Exception ex) {
340                throw convertException(ex);
341            }
342        }
343    
344        public void prepareActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
345            try {
346                Path actionDir = context.getActionDir();
347                Path tempActionDir = new Path(actionDir.getParent(), actionDir.getName() + ".tmp");
348                if (!actionFs.exists(actionDir)) {
349                    try {
350                        actionFs.copyFromLocalFile(new Path(getOozieRuntimeDir(), getLauncherJarName()), new Path(
351                                tempActionDir, getLauncherJarName()));
352                        actionFs.rename(tempActionDir, actionDir);
353                    }
354                    catch (IOException ex) {
355                        actionFs.delete(tempActionDir, true);
356                        actionFs.delete(actionDir, true);
357                        throw ex;
358                    }
359                }
360            }
361            catch (Exception ex) {
362                throw convertException(ex);
363            }
364        }
365    
366        void cleanUpActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
367            try {
368                Path actionDir = context.getActionDir();
369                if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)
370                        && actionFs.exists(actionDir)) {
371                    actionFs.delete(actionDir, true);
372                }
373            }
374            catch (Exception ex) {
375                throw convertException(ex);
376            }
377        }
378    
379        protected void addShareLib(Path appPath, Configuration conf, String actionShareLibPostfix)
380        throws ActionExecutorException {
381            if (actionShareLibPostfix != null) {
382                try {
383                    Path systemLibPath = Services.get().get(WorkflowAppService.class).getSystemLibPath();
384                    if (systemLibPath != null) {
385                        Path actionLibPath = new Path(systemLibPath, actionShareLibPostfix);
386                        String user = conf.get("user.name");
387                        String group = conf.get("group.name");
388                        FileSystem fs =
389                            Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf);
390                        if (fs.exists(actionLibPath)) {
391                            FileStatus[] files = fs.listStatus(actionLibPath);
392                            for (FileStatus file : files) {
393                                addToCache(conf, appPath, file.getPath().toUri().getPath(), false);
394                            }
395                        }
396                    }
397                }
398                catch (HadoopAccessorException ex){
399                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
400                            ex.getErrorCode().toString(), ex.getMessage());
401                }
402                catch (IOException ex){
403                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
404                            "It should never happen", ex.getMessage());
405                }
406            }
407        }
408    
409        @SuppressWarnings("unchecked")
410        void setLibFilesArchives(Context context, Element actionXml, Path appPath, Configuration conf)
411                throws ActionExecutorException {
412            Configuration proto = context.getProtoActionConf();
413    
414            // launcher JAR
415            addToCache(conf, appPath, getOozieLauncherJar(context), false);
416    
417            // Workflow lib/
418            String[] paths = proto.getStrings(WorkflowAppService.APP_LIB_PATH_LIST);
419            if (paths != null) {
420                for (String path : paths) {
421                    addToCache(conf, appPath, path, false);
422                }
423            }
424    
425            // files and archives defined in the action
426            for (Element eProp : (List<Element>) actionXml.getChildren()) {
427                if (eProp.getName().equals("file")) {
428                    String path = eProp.getTextTrim();
429                    addToCache(conf, appPath, path, false);
430                }
431                else {
432                    if (eProp.getName().equals("archive")) {
433                        String path = eProp.getTextTrim();
434                        addToCache(conf, appPath, path, true);
435                    }
436                }
437            }
438    
439            addAllShareLibs(appPath, conf, context, actionXml);
440            }
441    
442        // Adds action specific share libs and common share libs
443        private void addAllShareLibs(Path appPath, Configuration conf, Context context, Element actionXml)
444                throws ActionExecutorException {
445            // Add action specific share libs
446            addActionShareLib(appPath, conf, context, actionXml);
447            // Add common sharelibs for Oozie
448            addShareLib(appPath, conf, JavaActionExecutor.OOZIE_COMMON_LIBDIR);
449        }
450    
451        private void addActionShareLib(Path appPath, Configuration conf, Context context, Element actionXml) throws ActionExecutorException {
452            XConfiguration wfJobConf = null;
453            try {
454                wfJobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
455            }
456            catch (IOException ioe) {
457                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen",
458                        ioe.getMessage());
459            }
460            // Action sharelibs are only added if user has specified to use system libpath
461            if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) {
462                // add action specific sharelibs
463                addShareLib(appPath, conf, getShareLibPostFix(context, actionXml));
464            }
465        }
466    
467    
468        protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
469            Namespace ns = actionXml.getNamespace();
470            Element e = actionXml.getChild("main-class", ns);
471            return e.getTextTrim();
472        }
473    
474        private static final String QUEUE_NAME = "mapred.job.queue.name";
475        private static final String OOZIE_LAUNCHER_QUEUE_NAME = "oozie.launcher.mapred.job.queue.name";
476    
477        private static final Set<String> SPECIAL_PROPERTIES = new HashSet<String>();
478    
479        static {
480            SPECIAL_PROPERTIES.add(QUEUE_NAME);
481            SPECIAL_PROPERTIES.add("mapreduce.jobtracker.kerberos.principal");
482            SPECIAL_PROPERTIES.add("dfs.namenode.kerberos.principal");
483        }
484    
485        @SuppressWarnings("unchecked")
486        JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf)
487                throws ActionExecutorException {
488            try {
489    
490                // app path could be a file
491                Path appPathRoot = new Path(context.getWorkflow().getAppPath());
492                if (actionFs.isFile(appPathRoot)) {
493                    appPathRoot = appPathRoot.getParent();
494                }
495    
496                // launcher job configuration
497                JobConf launcherJobConf = createBaseHadoopConf(context, actionXml);
498                setupLauncherConf(launcherJobConf, actionXml, appPathRoot, context);
499    
500                setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf);
501                String jobName = XLog.format("oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(), context.getWorkflow()
502                        .getAppName(), action.getName(), context.getWorkflow().getId());
503                launcherJobConf.setJobName(jobName);
504    
505                String jobId = context.getWorkflow().getId();
506                String actionId = action.getId();
507                Path actionDir = context.getActionDir();
508                String recoveryId = context.getRecoveryId();
509    
510                // Getting the prepare XML from the action XML
511                Namespace ns = actionXml.getNamespace();
512                Element prepareElement = actionXml.getChild("prepare", ns);
513                String prepareXML = "";
514                if (prepareElement != null) {
515                    if (prepareElement.getChildren().size() > 0) {
516                        prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim();
517                    }
518                }
519                LauncherMapper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf,
520                        prepareXML);
521    
522                LauncherMapper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
523    
524                LauncherMapper.setupMaxOutputData(launcherJobConf, maxActionOutputLen);
525                LauncherMapper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize);
526    
527                List<Element> list = actionXml.getChildren("arg", ns);
528                String[] args = new String[list.size()];
529                for (int i = 0; i < list.size(); i++) {
530                    args[i] = list.get(i).getTextTrim();
531                }
532                LauncherMapper.setupMainArguments(launcherJobConf, args);
533    
534                Element opt = actionXml.getChild("java-opts", ns);
535                if (opt != null) {
536                    String opts = launcherJobConf.get("mapred.child.java.opts", "");
537                    opts = opts + " " + opt.getTextTrim();
538                    opts = opts.trim();
539                    launcherJobConf.set("mapred.child.java.opts", opts);
540                }
541    
542                // properties from action that are needed by the launcher (QUEUE NAME)
543                // maybe we should add queue to the WF schema, below job-tracker
544                for (String name : SPECIAL_PROPERTIES) {
545                    String value = actionConf.get(name);
546                    if (value != null) {
547                        if (!name.equals(QUEUE_NAME) ||
548                            (name.equals(QUEUE_NAME) && launcherJobConf.get(OOZIE_LAUNCHER_QUEUE_NAME) == null)) {
549                            launcherJobConf.set(name, value);
550                        }
551                    }
552                }
553    
554                // to disable cancelation of delegation token on launcher job end
555                launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
556    
557                if (context.getWorkflow().getAcl() != null) {
558                    // setting the group owning the Oozie job to allow anybody in that
559                    // group to kill the jobs.
560                    launcherJobConf.set("mapreduce.job.acl-modify-job", context.getWorkflow().getAcl());
561                }
562    
563                return launcherJobConf;
564            }
565            catch (Exception ex) {
566                throw convertException(ex);
567            }
568        }
569    
570        private void injectCallback(Context context, Configuration conf) {
571            String callback = context.getCallbackUrl("$jobStatus");
572            if (conf.get("job.end.notification.url") != null) {
573                XLog.getLog(getClass()).warn("Overriding the action job end notification URI");
574            }
575            conf.set("job.end.notification.url", callback);
576        }
577    
578        void injectActionCallback(Context context, Configuration actionConf) {
579            injectCallback(context, actionConf);
580        }
581    
582        void injectLauncherCallback(Context context, Configuration launcherConf) {
583            injectCallback(context, launcherConf);
584        }
585    
586        public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
587            JobClient jobClient = null;
588            boolean exception = false;
589            try {
590                Path appPathRoot = new Path(context.getWorkflow().getAppPath());
591    
592                // app path could be a file
593                if (actionFs.isFile(appPathRoot)) {
594                    appPathRoot = appPathRoot.getParent();
595                }
596    
597                Element actionXml = XmlUtils.parseXml(action.getConf());
598    
599                // action job configuration
600                Configuration actionConf = createBaseHadoopConf(context, actionXml);
601                setupActionConf(actionConf, context, actionXml, appPathRoot);
602                XLog.getLog(getClass()).debug("Setting LibFilesArchives ");
603                setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
604                String jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}", getType(), context.getWorkflow()
605                        .getAppName(), action.getName(), context.getWorkflow().getId());
606                actionConf.set("mapred.job.name", jobName);
607                injectActionCallback(context, actionConf);
608    
609                if (context.getWorkflow().getAcl() != null) {
610                    // setting the group owning the Oozie job to allow anybody in that
611                    // group to kill the jobs.
612                    actionConf.set("mapreduce.job.acl-modify-job", context.getWorkflow().getAcl());
613                }
614    
615                // Setting the credential properties in launcher conf
616                HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context,
617                        action, actionConf);
618    
619                // Adding if action need to set more credential tokens
620                JobConf credentialsConf = new JobConf(false);
621                XConfiguration.copy(actionConf, credentialsConf);
622                setCredentialTokens(credentialsConf, context, action, credentialsProperties);
623    
624                // insert conf to action conf from credentialsConf
625                for (Entry<String, String> entry : credentialsConf) {
626                    if (actionConf.get(entry.getKey()) == null) {
627                        actionConf.set(entry.getKey(), entry.getValue());
628                    }
629                }
630    
631                JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
632                injectLauncherCallback(context, launcherJobConf);
633                XLog.getLog(getClass()).debug("Creating Job Client for action " + action.getId());
634                jobClient = createJobClient(context, launcherJobConf);
635                String launcherId = LauncherMapper.getRecoveryId(launcherJobConf, context.getActionDir(), context
636                        .getRecoveryId());
637                boolean alreadyRunning = launcherId != null;
638                RunningJob runningJob;
639    
640                // if user-retry is on, always submit new launcher
641                boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry();
642    
643                if (alreadyRunning && !isUserRetry) {
644                    runningJob = jobClient.getJob(JobID.forName(launcherId));
645                    if (runningJob == null) {
646                        String jobTracker = launcherJobConf.get("mapred.job.tracker");
647                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
648                                "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker);
649                    }
650                }
651                else {
652                    XLog.getLog(getClass()).debug("Submitting the job through Job Client for action " + action.getId());
653    
654                    // setting up propagation of the delegation token.
655                    Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(new Text("mr token"));
656                    launcherJobConf.getCredentials().addToken(new Text("mr token"), mrdt);
657    
658                    // insert credentials tokens to launcher job conf if needed
659                    if (needInjectCredentials()) {
660                        for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) {
661                            log.debug("ADDING TOKEN: " + tk.getKind().toString());
662                            launcherJobConf.getCredentials().addToken(tk.getKind(), tk);
663                        }
664                    }
665                    else {
666                        log.info("No need to inject credentials.");
667                    }
668                    runningJob = jobClient.submitJob(launcherJobConf);
669                    if (runningJob == null) {
670                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
671                                "Error submitting launcher for action [{0}]", action.getId());
672                    }
673                    launcherId = runningJob.getID().toString();
674                    XLog.getLog(getClass()).debug("After submission get the launcherId " + launcherId);
675                }
676    
677                String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
678                String consoleUrl = runningJob.getTrackingURL();
679                context.setStartData(launcherId, jobTracker, consoleUrl);
680            }
681            catch (Exception ex) {
682                exception = true;
683                throw convertException(ex);
684            }
685            finally {
686                if (jobClient != null) {
687                    try {
688                        jobClient.close();
689                    }
690                    catch (Exception e) {
691                        if (exception) {
692                            log.error("JobClient error: ", e);
693                        }
694                        else {
695                            throw convertException(e);
696                        }
697                    }
698                }
699            }
700        }
701    
702        private boolean needInjectCredentials() {
703            boolean methodExists = true;
704    
705            Class klass;
706            try {
707                klass = Class.forName("org.apache.hadoop.mapred.JobConf");
708                klass.getMethod("getCredentials");
709            }
710            catch (ClassNotFoundException ex) {
711                methodExists = false;
712            }
713            catch (NoSuchMethodException ex) {
714                methodExists = false;
715            }
716    
717            return methodExists;
718        }
719    
720        protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context,
721                WorkflowAction action, Configuration actionConf) throws Exception {
722            HashMap<String, CredentialsProperties> credPropertiesMap = null;
723            if (context != null && action != null) {
724                credPropertiesMap = getActionCredentialsProperties(context, action);
725                if (credPropertiesMap != null) {
726                    for (String key : credPropertiesMap.keySet()) {
727                        CredentialsProperties prop = credPropertiesMap.get(key);
728                        if (prop != null) {
729                            log.debug("Credential Properties set for action : " + action.getId());
730                            for (String property : prop.getProperties().keySet()) {
731                                actionConf.set(property, prop.getProperties().get(property));
732                                log.debug("property : '" + property + "', value : '" + prop.getProperties().get(property) + "'");
733                            }
734                        }
735                    }
736                }
737                else {
738                    log.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred());
739                }
740            }
741            else {
742                log.warn("context or action is null");
743            }
744            return credPropertiesMap;
745        }
746    
747        protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action,
748                HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception {
749    
750            if (context != null && action != null && credPropertiesMap != null) {
751                for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) {
752                    String credName = entry.getKey();
753                    CredentialsProperties credProps = entry.getValue();
754                    if (credProps != null) {
755                        CredentialsProvider credProvider = new CredentialsProvider(credProps.getType());
756                        Credentials credentialObject = credProvider.createCredentialObject();
757                        if (credentialObject != null) {
758                            credentialObject.addtoJobConf(jobconf, credProps, context);
759                            log.debug("Retrieved Credential '" + credName + "' for action " + action.getId());
760                        }
761                        else {
762                            log.debug("Credentials object is null for name= " + credName + ", type=" + credProps.getType());
763                        }
764                    }
765                    else {
766                        log.warn("Could not find credentials properties for: " + credName);
767                    }
768                }
769            }
770    
771        }
772    
773        protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context,
774                WorkflowAction action) throws Exception {
775            HashMap<String, CredentialsProperties> props = new HashMap<String, CredentialsProperties>();
776            if (context != null && action != null) {
777                String credsInAction = action.getCred();
778                log.debug("Get credential '" + credsInAction + "' properties for action : " + action.getId());
779                String[] credNames = credsInAction.split(",");
780                for (String credName : credNames) {
781                    CredentialsProperties credProps = getCredProperties(context, credName);
782                    props.put(credName, credProps);
783                }
784            }
785            else {
786                log.warn("context or action is null");
787            }
788            return props;
789        }
790    
791        @SuppressWarnings("unchecked")
792        protected CredentialsProperties getCredProperties(Context context, String credName)
793                throws Exception {
794            CredentialsProperties credProp = null;
795            String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition();
796            Element elementJob = XmlUtils.parseXml(workflowXml);
797            Element credentials = elementJob.getChild("credentials", elementJob.getNamespace());
798            if (credentials != null) {
799                for (Element credential : (List<Element>) credentials.getChildren("credential", credentials
800                        .getNamespace())) {
801                    String name = credential.getAttributeValue("name");
802                    String type = credential.getAttributeValue("type");
803                    log.debug("getCredProperties: Name: " + name + ", Type: " + type);
804                    if (name.equalsIgnoreCase(credName)) {
805                        credProp = new CredentialsProperties(name, type);
806                        for (Element property : (List<Element>) credential.getChildren("property", credential
807                                .getNamespace())) {
808                            credProp.getProperties().put(property.getChildText("name", property.getNamespace()),
809                                    property.getChildText("value", property.getNamespace()));
810                            log.debug("getCredProperties: Properties name :'"
811                                    + property.getChildText("name", property.getNamespace()) + "', Value : '"
812                                    + property.getChildText("value", property.getNamespace()) + "'");
813                        }
814                    }
815                }
816            }
817            else {
818                log.warn("credentials is null for the action");
819            }
820            return credProp;
821        }
822    
823        @Override
824        public void start(Context context, WorkflowAction action) throws ActionExecutorException {
825            try {
826                XLog.getLog(getClass()).debug("Starting action " + action.getId() + " getting Action File System");
827                FileSystem actionFs = getActionFileSystem(context, action);
828                XLog.getLog(getClass()).debug("Preparing action Dir through copying " + context.getActionDir());
829                prepareActionDir(actionFs, context);
830                XLog.getLog(getClass()).debug("Action Dir is ready. Submitting the action ");
831                submitLauncher(actionFs, context, action);
832                XLog.getLog(getClass()).debug("Action submit completed. Performing check ");
833                check(context, action);
834                XLog.getLog(getClass()).debug("Action check is done after submission");
835            }
836            catch (Exception ex) {
837                throw convertException(ex);
838            }
839        }
840    
841        @Override
842        public void end(Context context, WorkflowAction action) throws ActionExecutorException {
843            try {
844                String externalStatus = action.getExternalStatus();
845                WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ? WorkflowAction.Status.OK
846                        : WorkflowAction.Status.ERROR;
847                context.setEndData(status, getActionSignal(status));
848            }
849            catch (Exception ex) {
850                throw convertException(ex);
851            }
852            finally {
853                try {
854                    FileSystem actionFs = getActionFileSystem(context, action);
855                    cleanUpActionDir(actionFs, context);
856                }
857                catch (Exception ex) {
858                    throw convertException(ex);
859                }
860            }
861        }
862    
863        /**
864         * Create job client object
865         *
866         * @param context
867         * @param jobConf
868         * @return
869         * @throws HadoopAccessorException
870         */
871        protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException {
872            String user = context.getWorkflow().getUser();
873            String group = context.getWorkflow().getGroup();
874            return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
875        }
876    
877        @Override
878        public void check(Context context, WorkflowAction action) throws ActionExecutorException {
879            JobClient jobClient = null;
880            boolean exception = false;
881            try {
882                Element actionXml = XmlUtils.parseXml(action.getConf());
883                FileSystem actionFs = getActionFileSystem(context, actionXml);
884                JobConf jobConf = createBaseHadoopConf(context, actionXml);
885                jobClient = createJobClient(context, jobConf);
886                RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
887                if (runningJob == null) {
888                    context.setExternalStatus(FAILED);
889                    context.setExecutionData(FAILED, null);
890                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
891                            "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", action
892                                    .getExternalId(), action.getId());
893                }
894                if (runningJob.isComplete()) {
895                    Path actionDir = context.getActionDir();
896    
897                    String user = context.getWorkflow().getUser();
898                    String group = context.getWorkflow().getGroup();
899                    if (LauncherMapper.hasIdSwap(runningJob, user, group, actionDir)) {
900                        String launcherId = action.getExternalId();
901                        Path idSwapPath = LauncherMapper.getIdSwapPath(context.getActionDir());
902                        InputStream is = actionFs.open(idSwapPath);
903                        BufferedReader reader = new BufferedReader(new InputStreamReader(is));
904                        Properties props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
905                        reader.close();
906                        String newId = props.getProperty("id");
907                        runningJob = jobClient.getJob(JobID.forName(newId));
908                        if (runningJob == null) {
909                            context.setExternalStatus(FAILED);
910                            throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
911                                    "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", newId,
912                                    action.getId());
913                        }
914    
915                        context.setStartData(newId, action.getTrackerUri(), runningJob.getTrackingURL());
916                        XLog.getLog(getClass()).info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId,
917                                newId);
918                    }
919                    if (runningJob.isComplete()) {
920                        XLog.getLog(getClass()).info(XLog.STD, "action completed, external ID [{0}]",
921                                action.getExternalId());
922                        if (runningJob.isSuccessful() && LauncherMapper.isMainSuccessful(runningJob)) {
923                            getActionData(actionFs, runningJob, action, context);
924                            XLog.getLog(getClass()).info(XLog.STD, "action produced output");
925                        }
926                        else {
927                            XLog log = XLog.getLog(getClass());
928                            String errorReason;
929                            Path actionError = LauncherMapper.getErrorPath(context.getActionDir());
930                            if (actionFs.exists(actionError)) {
931                                InputStream is = actionFs.open(actionError);
932                                BufferedReader reader = new BufferedReader(new InputStreamReader(is));
933                                Properties props = PropertiesUtils.readProperties(reader, -1);
934                                reader.close();
935                                String errorCode = props.getProperty("error.code");
936                                if (errorCode.equals("0")) {
937                                    errorCode = "JA018";
938                                }
939                                if (errorCode.equals("-1")) {
940                                    errorCode = "JA019";
941                                }
942                                errorReason = props.getProperty("error.reason");
943                                log.warn("Launcher ERROR, reason: {0}", errorReason);
944                                String exMsg = props.getProperty("exception.message");
945                                String errorInfo = (exMsg != null) ? exMsg : errorReason;
946                                context.setErrorInfo(errorCode, errorInfo);
947                                String exStackTrace = props.getProperty("exception.stacktrace");
948                                if (exMsg != null) {
949                                    log.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
950                                }
951                            }
952                            else {
953                                errorReason = XLog.format("LauncherMapper died, check Hadoop log for job [{0}:{1}]", action
954                                        .getTrackerUri(), action.getExternalId());
955                                log.warn(errorReason);
956                            }
957                            context.setExecutionData(FAILED_KILLED, null);
958                        }
959                    }
960                    else {
961                        context.setExternalStatus(RUNNING);
962                        XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]",
963                                action.getExternalId(), action.getExternalStatus());
964                    }
965                }
966                else {
967                    context.setExternalStatus(RUNNING);
968                    XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]",
969                            action.getExternalId(), action.getExternalStatus());
970                }
971            }
972            catch (Exception ex) {
973                XLog.getLog(getClass()).warn("Exception in check(). Message[{0}]", ex.getMessage(), ex);
974                exception = true;
975                throw convertException(ex);
976            }
977            finally {
978                if (jobClient != null) {
979                    try {
980                        jobClient.close();
981                    }
982                    catch (Exception e) {
983                        if (exception) {
984                            log.error("JobClient error: ", e);
985                        }
986                        else {
987                            throw convertException(e);
988                        }
989                    }
990                }
991            }
992        }
993    
994        /**
995         * Get the output data of an action. Subclasses should override this method
996         * to get action specific output data.
997         *
998         * @param actionFs the FileSystem object
999         * @param runningJob the runningJob
1000         * @param action the Workflow action
1001         * @param context executor context
1002         *
1003         */
1004        protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
1005                throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
1006            Properties props = null;
1007            if (getCaptureOutput(action)) {
1008                props = new Properties();
1009                if (LauncherMapper.hasOutputData(runningJob)) {
1010                    Path actionOutput = LauncherMapper.getOutputDataPath(context.getActionDir());
1011                    InputStream is = actionFs.open(actionOutput);
1012                    BufferedReader reader = new BufferedReader(new InputStreamReader(is));
1013                    props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
1014                    reader.close();
1015                }
1016            }
1017            context.setExecutionData(SUCCEEDED, props);
1018        }
1019    
1020        protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
1021            Element eConf = XmlUtils.parseXml(action.getConf());
1022            Namespace ns = eConf.getNamespace();
1023            Element captureOutput = eConf.getChild("capture-output", ns);
1024            return captureOutput != null;
1025        }
1026    
1027        @Override
1028        public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
1029            JobClient jobClient = null;
1030            boolean exception = false;
1031            try {
1032                Element actionXml = XmlUtils.parseXml(action.getConf());
1033                JobConf jobConf = createBaseHadoopConf(context, actionXml);
1034                jobClient = createJobClient(context, jobConf);
1035                RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
1036                if (runningJob != null) {
1037                    runningJob.killJob();
1038                }
1039                context.setExternalStatus(KILLED);
1040                context.setExecutionData(KILLED, null);
1041            }
1042            catch (Exception ex) {
1043                exception = true;
1044                throw convertException(ex);
1045            }
1046            finally {
1047                try {
1048                    FileSystem actionFs = getActionFileSystem(context, action);
1049                    cleanUpActionDir(actionFs, context);
1050                    if (jobClient != null) {
1051                        jobClient.close();
1052                    }
1053                }
1054                catch (Exception ex) {
1055                    if (exception) {
1056                        log.error("Error: ", ex);
1057                    }
1058                    else {
1059                        throw convertException(ex);
1060                    }
1061                }
1062            }
1063        }
1064    
1065        private static Set<String> FINAL_STATUS = new HashSet<String>();
1066    
1067        static {
1068            FINAL_STATUS.add(SUCCEEDED);
1069            FINAL_STATUS.add(KILLED);
1070            FINAL_STATUS.add(FAILED);
1071            FINAL_STATUS.add(FAILED_KILLED);
1072        }
1073    
1074        @Override
1075        public boolean isCompleted(String externalStatus) {
1076            return FINAL_STATUS.contains(externalStatus);
1077        }
1078    
1079        /**
1080         * Return the sharelib postfix for the action.
1081         * <p/>
1082         * If <code>NULL</code> or emtpy, it means that the action does not use the action
1083         * sharelib.
1084         * <p/>
1085         * If a non-empty string, i.e. <code>foo</code>, it means the action uses the
1086         * action sharelib subdirectory <code>foo</code> and all JARs in the <code>foo</code>
1087         * directory will be in the action classpath.
1088         *
1089         * @param context executor context.
1090         * @param actionXml the action XML.
1091         * @return the action sharelib post fix, this implementation returns <code>NULL</code>.
1092         */
1093        protected String getShareLibPostFix(Context context, Element actionXml) {
1094            return null;
1095        }
1096    
1097    }