001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.io.BufferedReader;
021    import java.io.File;
022    import java.io.FileNotFoundException;
023    import java.io.IOException;
024    import java.io.InputStream;
025    import java.io.InputStreamReader;
026    import java.io.StringReader;
027    import java.net.ConnectException;
028    import java.net.URI;
029    import java.net.UnknownHostException;
030    import java.util.ArrayList;
031    import java.util.HashMap;
032    import java.util.HashSet;
033    import java.util.List;
034    import java.util.Map;
035    import java.util.Properties;
036    import java.util.Set;
037    import java.util.Map.Entry;
038    
039    import org.apache.hadoop.conf.Configuration;
040    import org.apache.hadoop.filecache.DistributedCache;
041    import org.apache.hadoop.fs.FileSystem;
042    import org.apache.hadoop.fs.Path;
043    import org.apache.hadoop.fs.permission.AccessControlException;
044    import org.apache.hadoop.mapred.JobClient;
045    import org.apache.hadoop.mapred.JobConf;
046    import org.apache.hadoop.mapred.JobID;
047    import org.apache.hadoop.mapred.RunningJob;
048    import org.apache.hadoop.util.DiskChecker;
049    import org.apache.oozie.WorkflowActionBean;
050    import org.apache.oozie.WorkflowJobBean;
051    import org.apache.oozie.action.ActionExecutor;
052    import org.apache.oozie.action.ActionExecutorException;
053    import org.apache.oozie.client.OozieClient;
054    import org.apache.oozie.client.WorkflowAction;
055    import org.apache.oozie.service.HadoopAccessorException;
056    import org.apache.oozie.service.HadoopAccessorService;
057    import org.apache.oozie.service.Services;
058    import org.apache.oozie.service.WorkflowAppService;
059    import org.apache.oozie.servlet.CallbackServlet;
060    import org.apache.oozie.util.IOUtils;
061    import org.apache.oozie.util.PropertiesUtils;
062    import org.apache.oozie.util.XConfiguration;
063    import org.apache.oozie.util.XLog;
064    import org.apache.oozie.util.XmlUtils;
065    import org.jdom.Element;
066    import org.jdom.JDOMException;
067    import org.jdom.Namespace;
068    import org.apache.hadoop.security.token.Token;
069    import org.apache.hadoop.security.token.TokenIdentifier;
070    
071    public class JavaActionExecutor extends ActionExecutor {
072    
073        private static final String HADOOP_USER = "user.name";
074        private static final String HADOOP_UGI = "hadoop.job.ugi";
075        private static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
076        private static final String HADOOP_NAME_NODE = "fs.default.name";
077    
078        private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>();
079    
080        private static int maxActionOutputLen;
081    
082        private static final String SUCCEEDED = "SUCCEEDED";
083        private static final String KILLED = "KILLED";
084        private static final String FAILED = "FAILED";
085        private static final String FAILED_KILLED = "FAILED/KILLED";
086        private static final String RUNNING = "RUNNING";
087        private XLog log = XLog.getLog(getClass());
088    
089        static {
090            DISALLOWED_PROPERTIES.add(HADOOP_USER);
091            DISALLOWED_PROPERTIES.add(HADOOP_UGI);
092            DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER);
093            DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE);
094            DISALLOWED_PROPERTIES.add(WorkflowAppService.HADOOP_JT_KERBEROS_NAME);
095            DISALLOWED_PROPERTIES.add(WorkflowAppService.HADOOP_NN_KERBEROS_NAME);
096        }
097    
098        public JavaActionExecutor() {
099            this("java");
100        }
101    
102        protected JavaActionExecutor(String type) {
103            super(type);
104        }
105    
106        protected String getLauncherJarName() {
107            return getType() + "-launcher.jar";
108        }
109    
110        protected List<Class> getLauncherClasses() {
111            List<Class> classes = new ArrayList<Class>();
112            classes.add(LauncherMapper.class);
113            classes.add(LauncherSecurityManager.class);
114            classes.add(LauncherException.class);
115            classes.add(LauncherMainException.class);
116            return classes;
117        }
118    
119        @Override
120        public void initActionType() {
121            super.initActionType();
122            maxActionOutputLen = getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN, 2 * 1024);
123            try {
124                List<Class> classes = getLauncherClasses();
125                Class[] launcherClasses = classes.toArray(new Class[classes.size()]);
126                IOUtils.createJar(new File(getOozieRuntimeDir()), getLauncherJarName(), launcherClasses);
127    
128                registerError(UnknownHostException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA001");
129                registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.NON_TRANSIENT,
130                        "JA002");
131                registerError(DiskChecker.DiskOutOfSpaceException.class.getName(),
132                        ActionExecutorException.ErrorType.NON_TRANSIENT, "JA003");
133                registerError(org.apache.hadoop.hdfs.protocol.QuotaExceededException.class.getName(),
134                        ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004");
135                registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(),
136                        ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005");
137                registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA006");
138                registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007");
139                registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008");
140                registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009");
141            }
142            catch (IOException ex) {
143                throw new RuntimeException(ex);
144            }
145        }
146    
147        void checkForDisallowedProps(Configuration conf, String confName) throws ActionExecutorException {
148            for (String prop : DISALLOWED_PROPERTIES) {
149                if (conf.get(prop) != null) {
150                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA010",
151                            "Property [{0}] not allowed in action [{1}] configuration", prop, confName);
152                }
153            }
154        }
155    
156        public Configuration createBaseHadoopConf(Context context, Element actionXml) {
157            Configuration conf = new XConfiguration();
158            conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER));
159            conf.set(HADOOP_UGI, context.getProtoActionConf().get(WorkflowAppService.HADOOP_UGI));
160            if (context.getProtoActionConf().get(WorkflowAppService.HADOOP_JT_KERBEROS_NAME) != null) {
161                conf.set(WorkflowAppService.HADOOP_JT_KERBEROS_NAME, context.getProtoActionConf().get(
162                        WorkflowAppService.HADOOP_JT_KERBEROS_NAME));
163            }
164            if (context.getProtoActionConf().get(WorkflowAppService.HADOOP_NN_KERBEROS_NAME) != null) {
165                conf.set(WorkflowAppService.HADOOP_NN_KERBEROS_NAME, context.getProtoActionConf().get(
166                        WorkflowAppService.HADOOP_NN_KERBEROS_NAME));
167            }
168            conf.set(OozieClient.GROUP_NAME, context.getProtoActionConf().get(OozieClient.GROUP_NAME));
169            Namespace ns = actionXml.getNamespace();
170            String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim();
171            String nameNode = actionXml.getChild("name-node", ns).getTextTrim();
172            conf.set(HADOOP_JOB_TRACKER, jobTracker);
173            conf.set(HADOOP_NAME_NODE, nameNode);
174            conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true");
175            return conf;
176        }
177    
178        Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
179                throws ActionExecutorException {
180            try {
181                Namespace ns = actionXml.getNamespace();
182                Element e = actionXml.getChild("configuration", ns);
183                if (e != null) {
184                    String strConf = XmlUtils.prettyPrint(e).toString();
185                    XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
186    
187                    XConfiguration launcherConf = new XConfiguration();
188                    for (Map.Entry<String, String> entry : inlineConf) {
189                        if (entry.getKey().startsWith("oozie.launcher.")) {
190                            String name = entry.getKey().substring("oozie.launcher.".length());
191                            String value = entry.getValue();
192                            // setting original KEY
193                            launcherConf.set(entry.getKey(), value);
194                            // setting un-prefixed key (to allow Hadoop job config
195                            // for the launcher job
196                            launcherConf.set(name, value);
197                        }
198                    }
199                    checkForDisallowedProps(launcherConf, "inline launcher configuration");
200                    XConfiguration.copy(launcherConf, conf);
201                }
202                return conf;
203            }
204            catch (IOException ex) {
205                throw convertException(ex);
206            }
207        }
208    
209        protected FileSystem getActionFileSystem(Context context, WorkflowAction action) throws ActionExecutorException {
210            try {
211                Element actionXml = XmlUtils.parseXml(action.getConf());
212                return getActionFileSystem(context, actionXml);
213            }
214            catch (JDOMException ex) {
215                throw convertException(ex);
216            }
217        }
218    
219        protected FileSystem getActionFileSystem(Context context, Element actionXml) throws ActionExecutorException {
220            try {
221                return context.getAppFileSystem();
222            }
223            catch (Exception ex) {
224                throw convertException(ex);
225            }
226        }
227    
228        Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
229                throws ActionExecutorException {
230            try {
231                Namespace ns = actionXml.getNamespace();
232                Element e = actionXml.getChild("job-xml", ns);
233                if (e != null) {
234                    String jobXml = e.getTextTrim();
235                    Path path = new Path(appPath, jobXml);
236                    FileSystem fs = getActionFileSystem(context, actionXml);
237                    Configuration jobXmlConf = new XConfiguration(fs.open(path));
238                    checkForDisallowedProps(jobXmlConf, "job-xml");
239                    XConfiguration.copy(jobXmlConf, actionConf);
240                }
241                e = actionXml.getChild("configuration", ns);
242                if (e != null) {
243                    String strConf = XmlUtils.prettyPrint(e).toString();
244                    XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
245                    checkForDisallowedProps(inlineConf, "inline configuration");
246                    XConfiguration.copy(inlineConf, actionConf);
247                }
248                return actionConf;
249            }
250            catch (IOException ex) {
251                throw convertException(ex);
252            }
253        }
254    
255        Configuration addToCache(Configuration conf, Path appPath, String filePath, boolean archive)
256                throws ActionExecutorException {
257            Path path = null;
258            try {
259                if (filePath.startsWith("/")) {
260                    path = new Path(filePath);
261                }
262                else {
263                    path = new Path(appPath, filePath);
264                }
265                URI uri = new URI(path.toUri().getPath());
266                if (archive) {
267                    DistributedCache.addCacheArchive(uri, conf);
268                }
269                else {
270                    String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);
271                    if (fileName.endsWith(".so") || fileName.contains(".so.")) {  // .so files
272                        uri = new Path(path.toString() + "#" + fileName).toUri();
273                        uri = new URI(uri.getPath());
274                        DistributedCache.addCacheFile(uri, conf);
275                    }
276                    else if (fileName.endsWith(".jar")) { // .jar files
277                        if (!fileName.contains("#")) {
278                            path = new Path(uri.toString());
279    
280                            String user = conf.get("user.name");
281                            String group = conf.get("group.name");
282                            Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, group, path, conf);
283                        }
284                        else {
285                            DistributedCache.addCacheFile(uri, conf);
286                        }
287                    }
288                    else { // regular files
289                        if (!fileName.contains("#")) {
290                            uri = new Path(path.toString() + "#" + fileName).toUri();
291                            uri = new URI(uri.getPath());
292                        }
293                        DistributedCache.addCacheFile(uri, conf);
294                    }
295                }
296                DistributedCache.createSymlink(conf);
297                return conf;
298            }
299            catch (Exception ex) {
300                XLog.getLog(getClass()).debug(
301                        "Errors when add to DistributedCache. Path=" + path + ", archive=" + archive + ", conf="
302                                + XmlUtils.prettyPrint(conf).toString());
303                throw convertException(ex);
304            }
305        }
306    
307        String getOozieLauncherJar(Context context) throws ActionExecutorException {
308            try {
309                return new Path(context.getActionDir(), getLauncherJarName()).toString();
310            }
311            catch (Exception ex) {
312                throw convertException(ex);
313            }
314        }
315    
316        public void prepareActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
317            try {
318                Path actionDir = context.getActionDir();
319                Path tempActionDir = new Path(actionDir.getParent(), actionDir.getName() + ".tmp");
320                if (!actionFs.exists(actionDir)) {
321                    try {
322                        actionFs.copyFromLocalFile(new Path(getOozieRuntimeDir(), getLauncherJarName()), new Path(
323                                tempActionDir, getLauncherJarName()));
324                        actionFs.rename(tempActionDir, actionDir);
325                    }
326                    catch (IOException ex) {
327                        actionFs.delete(tempActionDir, true);
328                        actionFs.delete(actionDir, true);
329                        throw ex;
330                    }
331                }
332            }
333            catch (Exception ex) {
334                throw convertException(ex);
335            }
336        }
337    
338        void cleanUpActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
339            try {
340                Path actionDir = context.getActionDir();
341                if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)
342                        && actionFs.exists(actionDir)) {
343                    actionFs.delete(actionDir, true);
344                }
345            }
346            catch (Exception ex) {
347                throw convertException(ex);
348            }
349        }
350    
351        @SuppressWarnings("unchecked")
352        void setLibFilesArchives(Context context, Element actionXml, Path appPath, Configuration conf)
353                throws ActionExecutorException {
354            Configuration proto = context.getProtoActionConf();
355    
356            addToCache(conf, appPath, getOozieLauncherJar(context), false);
357    
358            String[] paths = proto.getStrings(WorkflowAppService.APP_LIB_PATH_LIST);
359            if (paths != null) {
360                for (String path : paths) {
361                    addToCache(conf, appPath, path, false);
362                }
363            }
364    
365            for (Element eProp : (List<Element>) actionXml.getChildren()) {
366                if (eProp.getName().equals("file")) {
367                    String path = eProp.getTextTrim();
368                    addToCache(conf, appPath, path, false);
369                }
370                else {
371                    if (eProp.getName().equals("archive")) {
372                        String path = eProp.getTextTrim();
373                        addToCache(conf, appPath, path, true);
374                    }
375                }
376            }
377        }
378    
379        protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
380            Namespace ns = actionXml.getNamespace();
381            Element e = actionXml.getChild("main-class", ns);
382            return e.getTextTrim();
383        }
384    
385        private static final String QUEUE_NAME = "mapred.job.queue.name";
386        private static final String OOZIE_LAUNCHER_QUEUE_NAME = "oozie.launcher.mapred.job.queue.name";
387    
388        private static final Set<String> SPECIAL_PROPERTIES = new HashSet<String>();
389    
390        static {
391            SPECIAL_PROPERTIES.add(QUEUE_NAME);
392            SPECIAL_PROPERTIES.add("mapreduce.jobtracker.kerberos.principal");
393            SPECIAL_PROPERTIES.add("dfs.namenode.kerberos.principal");
394        }
395    
396        @SuppressWarnings("unchecked")
397        JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf)
398                throws ActionExecutorException {
399            try {
400    
401                // app path could be a file
402                Path appPathRoot = new Path(context.getWorkflow().getAppPath());
403                if (actionFs.isFile(appPathRoot)) {
404                    appPathRoot = appPathRoot.getParent();
405                }
406    
407                // launcher job configuration
408                Configuration launcherConf = createBaseHadoopConf(context, actionXml);
409                setupLauncherConf(launcherConf, actionXml, appPathRoot, context);
410    
411                // we are doing init+copy because if not we are getting 'hdfs'
412                // scheme not known
413                // its seems that new JobConf(Conf) does not load defaults, it
414                // assumes parameter Conf does.
415                JobConf launcherJobConf = new JobConf();
416                XConfiguration.copy(launcherConf, launcherJobConf);
417                setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf);
418                String jobName = XLog.format("oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(), context.getWorkflow()
419                        .getAppName(), action.getName(), context.getWorkflow().getId());
420                launcherJobConf.setJobName(jobName);
421    
422                String jobId = context.getWorkflow().getId();
423                String actionId = action.getId();
424                Path actionDir = context.getActionDir();
425                String recoveryId = context.getRecoveryId();
426    
427                LauncherMapper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf);
428    
429                LauncherMapper.setupMainClass(launcherJobConf, getLauncherMain(launcherConf, actionXml));
430    
431                LauncherMapper.setupMaxOutputData(launcherJobConf, maxActionOutputLen);
432    
433                Namespace ns = actionXml.getNamespace();
434                List<Element> list = actionXml.getChildren("arg", ns);
435                String[] args = new String[list.size()];
436                for (int i = 0; i < list.size(); i++) {
437                    args[i] = list.get(i).getTextTrim();
438                }
439                LauncherMapper.setupMainArguments(launcherJobConf, args);
440    
441                Element opt = actionXml.getChild("java-opts", ns);
442                if (opt != null) {
443                    String opts = launcherConf.get("mapred.child.java.opts", "");
444                    opts = opts + " " + opt.getTextTrim();
445                    opts = opts.trim();
446                    launcherJobConf.set("mapred.child.java.opts", opts);
447                }
448    
449                // properties from action that are needed by the launcher (QUEUE NAME)
450                // maybe we should add queue to the WF schema, below job-tracker
451                for (String name : SPECIAL_PROPERTIES) {
452                    String value = actionConf.get(name);
453                    if (value != null) {
454                        if (!name.equals(QUEUE_NAME) ||
455                            (name.equals(QUEUE_NAME) && launcherJobConf.get(OOZIE_LAUNCHER_QUEUE_NAME) == null)) {
456                            launcherJobConf.set(name, value);
457                        }
458                    }
459                }
460    
461                // to disable cancelation of delegation token on launcher job end
462                launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
463    
464                // setting the group owning the Oozie job to allow anybody in that
465                // group to kill the jobs.
466                launcherJobConf.set("mapreduce.job.acl-modify-job", context.getWorkflow().getGroup());
467    
468                return launcherJobConf;
469            }
470            catch (Exception ex) {
471                throw convertException(ex);
472            }
473        }
474    
475        private void injectCallback(Context context, Configuration conf) {
476            String callback = context.getCallbackUrl("$jobStatus");
477            if (conf.get("job.end.notification.url") != null) {
478                XLog.getLog(getClass()).warn("Overriding the action job end notification URI");
479            }
480            conf.set("job.end.notification.url", callback);
481        }
482    
483        void injectActionCallback(Context context, Configuration actionConf) {
484            injectCallback(context, actionConf);
485        }
486    
487        void injectLauncherCallback(Context context, Configuration launcherConf) {
488            injectCallback(context, launcherConf);
489        }
490    
491        public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
492            JobClient jobClient = null;
493            boolean exception = false;
494            try {
495                Path appPathRoot = new Path(context.getWorkflow().getAppPath());
496    
497                // app path could be a file
498                if (actionFs.isFile(appPathRoot)) {
499                    appPathRoot = appPathRoot.getParent();
500                }
501    
502                Element actionXml = XmlUtils.parseXml(action.getConf());
503    
504                // action job configuration
505                Configuration actionConf = createBaseHadoopConf(context, actionXml);
506                setupActionConf(actionConf, context, actionXml, appPathRoot);
507                XLog.getLog(getClass()).debug("Setting LibFilesArchives ");
508                setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
509                String jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}", getType(), context.getWorkflow()
510                        .getAppName(), action.getName(), context.getWorkflow().getId());
511                actionConf.set("mapred.job.name", jobName);
512                injectActionCallback(context, actionConf);
513    
514                // setting the group owning the Oozie job to allow anybody in that
515                // group to kill the jobs.
516                actionConf.set("mapreduce.job.acl-modify-job", context.getWorkflow().getGroup());
517    
518                // Setting the credential properties in launcher conf
519                HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context,
520                        action, actionConf);
521    
522                // Adding if action need to set more credential tokens
523                JobConf credentialsConf = new JobConf(false);
524                XConfiguration.copy(actionConf, credentialsConf);
525                setCredentialTokens(credentialsConf, context, action, credentialsProperties);
526    
527                // insert conf to action conf from credentialsConf
528                for (Entry<String, String> entry : credentialsConf) {
529                    if (actionConf.get(entry.getKey()) == null) {
530                        actionConf.set(entry.getKey(), entry.getValue());
531                    }
532                }
533    
534                JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
535                injectLauncherCallback(context, launcherJobConf);
536                XLog.getLog(getClass()).debug("Creating Job Client for action " + action.getId());
537                jobClient = createJobClient(context, launcherJobConf);
538                String launcherId = LauncherMapper.getRecoveryId(launcherJobConf, context.getActionDir(), context
539                        .getRecoveryId());
540                boolean alreadyRunning = launcherId != null;
541                RunningJob runningJob;
542    
543                // if user-retry is on, always submit new launcher
544                boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry();
545    
546                if (alreadyRunning && !isUserRetry) {
547                    runningJob = jobClient.getJob(JobID.forName(launcherId));
548                    if (runningJob == null) {
549                        String jobTracker = launcherJobConf.get("mapred.job.tracker");
550                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
551                                "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker);
552                    }
553                }
554                else {
555                    prepare(context, actionXml);
556                    XLog.getLog(getClass()).debug("Submitting the job through Job Client for action " + action.getId());
557    
558                    // setting up propagation of the delegation token.
559                    AuthHelper.get().set(jobClient, launcherJobConf);
560                    log.debug(WorkflowAppService.HADOOP_JT_KERBEROS_NAME + " = "
561                            + launcherJobConf.get(WorkflowAppService.HADOOP_JT_KERBEROS_NAME));
562                    log.debug(WorkflowAppService.HADOOP_NN_KERBEROS_NAME + " = "
563                            + launcherJobConf.get(WorkflowAppService.HADOOP_NN_KERBEROS_NAME));
564    
565                    // insert credentials tokens to launcher job conf if needed
566                    if (needInjectCredentials()) {
567                        for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) {
568                            log.debug("ADDING TOKEN: " + tk.getKind().toString());
569                            launcherJobConf.getCredentials().addToken(tk.getKind(), tk);
570                        }
571                    }
572                    else {
573                        log.info("No need to inject credentials.");
574                    }
575                    runningJob = jobClient.submitJob(launcherJobConf);
576                    if (runningJob == null) {
577                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
578                                "Error submitting launcher for action [{0}]", action.getId());
579                    }
580                    launcherId = runningJob.getID().toString();
581                    XLog.getLog(getClass()).debug("After submission get the launcherId " + launcherId);
582                }
583    
584                String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
585                String consoleUrl = runningJob.getTrackingURL();
586                context.setStartData(launcherId, jobTracker, consoleUrl);
587            }
588            catch (Exception ex) {
589                exception = true;
590                throw convertException(ex);
591            }
592            finally {
593                if (jobClient != null) {
594                    try {
595                        jobClient.close();
596                    }
597                    catch (Exception e) {
598                        if (exception) {
599                            log.error("JobClient error: ", e);
600                        }
601                        else {
602                            throw convertException(e);
603                        }
604                    }
605                }
606            }
607        }
608    
609        private boolean needInjectCredentials() {
610            boolean methodExists = true;
611    
612            Class klass;
613            try {
614                klass = Class.forName("org.apache.hadoop.mapred.JobConf");
615                klass.getMethod("getCredentials");
616            }
617            catch (ClassNotFoundException ex) {
618                methodExists = false;
619            }
620            catch (NoSuchMethodException ex) {
621                methodExists = false;
622            }
623    
624            return methodExists;
625        }
626    
627        protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context,
628                WorkflowAction action, Configuration actionConf) throws Exception {
629            HashMap<String, CredentialsProperties> credPropertiesMap = null;
630            if (context != null && action != null) {
631                credPropertiesMap = getActionCredentialsProperties(context, action);
632                if (credPropertiesMap != null) {
633                    for (String key : credPropertiesMap.keySet()) {
634                        CredentialsProperties prop = credPropertiesMap.get(key);
635                        if (prop != null) {
636                            log.debug("Credential Properties set for action : " + action.getId());
637                            for (String property : prop.getProperties().keySet()) {
638                                actionConf.set(property, prop.getProperties().get(property));
639                                log.debug("property : '" + property + "', value : '" + prop.getProperties().get(property) + "'");
640                            }
641                        }
642                    }
643                }
644                else {
645                    log.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred());
646                }
647            }
648            else {
649                log.warn("context or action is null");
650            }
651            return credPropertiesMap;
652        }
653    
654        protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action,
655                HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception {
656    
657            if (context != null && action != null && credPropertiesMap != null) {
658                for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) {
659                    String credName = entry.getKey();
660                    CredentialsProperties credProps = entry.getValue();
661                    if (credProps != null) {
662                        CredentialsProvider credProvider = new CredentialsProvider(credProps.getType());
663                        Credentials credentialObject = credProvider.createCredentialObject();
664                        if (credentialObject != null) {
665                            credentialObject.addtoJobConf(jobconf, credProps, context);
666                            log.debug("Retrieved Credential '" + credName + "' for action " + action.getId());
667                        }
668                        else {
669                            log.debug("Credentials object is null for name= " + credName + ", type=" + credProps.getType());
670                        }
671                    }
672                    else {
673                        log.warn("Could not find credentials properties for: " + credName);
674                    }
675                }
676            }
677    
678        }
679    
680        protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context,
681                WorkflowAction action) throws Exception {
682            HashMap<String, CredentialsProperties> props = new HashMap<String, CredentialsProperties>();
683            if (context != null && action != null) {
684                String credsInAction = action.getCred();
685                log.debug("Get credential '" + credsInAction + "' properties for action : " + action.getId());
686                String[] credNames = credsInAction.split(",");
687                for (String credName : credNames) {
688                    CredentialsProperties credProps = getCredProperties(context, credName);
689                    props.put(credName, credProps);
690                }
691            }
692            else {
693                log.warn("context or action is null");
694            }
695            return props;
696        }
697    
698        @SuppressWarnings("unchecked")
699        protected CredentialsProperties getCredProperties(Context context, String credName)
700                throws Exception {
701            CredentialsProperties credProp = null;
702            String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition();
703            Element elementJob = XmlUtils.parseXml(workflowXml);
704            Element credentials = elementJob.getChild("credentials", elementJob.getNamespace());
705            if (credentials != null) {
706                for (Element credential : (List<Element>) credentials.getChildren("credential", credentials
707                        .getNamespace())) {
708                    String name = credential.getAttributeValue("name");
709                    String type = credential.getAttributeValue("type");
710                    log.debug("getCredProperties: Name: " + name + ", Type: " + type);
711                    if (name.equalsIgnoreCase(credName)) {
712                        credProp = new CredentialsProperties(name, type);
713                        for (Element property : (List<Element>) credential.getChildren("property", credential
714                                .getNamespace())) {
715                            credProp.getProperties().put(property.getChildText("name", property.getNamespace()),
716                                    property.getChildText("value", property.getNamespace()));
717                            log.debug("getCredProperties: Properties name :'"
718                                    + property.getChildText("name", property.getNamespace()) + "', Value : '"
719                                    + property.getChildText("value", property.getNamespace()) + "'");
720                        }
721                    }
722                }
723            }
724            else {
725                log.warn("credentials is null for the action");
726            }
727            return credProp;
728        }
729    
730        void prepare(Context context, Element actionXml) throws ActionExecutorException {
731            Namespace ns = actionXml.getNamespace();
732            Element prepare = actionXml.getChild("prepare", ns);
733            if (prepare != null) {
734                XLog.getLog(getClass()).debug("Preparing the action with FileSystem operation");
735                FsActionExecutor fsAe = new FsActionExecutor();
736                fsAe.doOperations(context, prepare);
737                XLog.getLog(getClass()).debug("FS Operation is completed");
738            }
739        }
740    
741        @Override
742        public void start(Context context, WorkflowAction action) throws ActionExecutorException {
743            try {
744                XLog.getLog(getClass()).debug("Starting action " + action.getId() + " getting Action File System");
745                FileSystem actionFs = getActionFileSystem(context, action);
746                XLog.getLog(getClass()).debug("Preparing action Dir through copying " + context.getActionDir());
747                prepareActionDir(actionFs, context);
748                XLog.getLog(getClass()).debug("Action Dir is ready. Submitting the action ");
749                submitLauncher(actionFs, context, action);
750                XLog.getLog(getClass()).debug("Action submit completed. Performing check ");
751                check(context, action);
752                XLog.getLog(getClass()).debug("Action check is done after submission");
753            }
754            catch (Exception ex) {
755                throw convertException(ex);
756            }
757        }
758    
759        @Override
760        public void end(Context context, WorkflowAction action) throws ActionExecutorException {
761            try {
762                String externalStatus = action.getExternalStatus();
763                WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ? WorkflowAction.Status.OK
764                        : WorkflowAction.Status.ERROR;
765                context.setEndData(status, getActionSignal(status));
766            }
767            catch (Exception ex) {
768                throw convertException(ex);
769            }
770            finally {
771                try {
772                    FileSystem actionFs = getActionFileSystem(context, action);
773                    cleanUpActionDir(actionFs, context);
774                }
775                catch (Exception ex) {
776                    throw convertException(ex);
777                }
778            }
779        }
780    
781        /**
782         * Create job client object
783         *
784         * @param context
785         * @param jobConf
786         * @return
787         * @throws HadoopAccessorException
788         */
789        protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException {
790            String user = context.getWorkflow().getUser();
791            String group = context.getWorkflow().getGroup();
792            return Services.get().get(HadoopAccessorService.class).createJobClient(user, group, jobConf);
793        }
794    
795        @Override
796        public void check(Context context, WorkflowAction action) throws ActionExecutorException {
797            JobClient jobClient = null;
798            boolean exception = false;
799            try {
800                Element actionXml = XmlUtils.parseXml(action.getConf());
801                FileSystem actionFs = getActionFileSystem(context, actionXml);
802                Configuration conf = createBaseHadoopConf(context, actionXml);
803                JobConf jobConf = new JobConf();
804                XConfiguration.copy(conf, jobConf);
805                jobClient = createJobClient(context, jobConf);
806                RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
807                if (runningJob == null) {
808                    context.setExternalStatus(FAILED);
809                    context.setExecutionData(FAILED, null);
810                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
811                            "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", action
812                                    .getExternalId(), action.getId());
813                }
814                if (runningJob.isComplete()) {
815                    Path actionDir = context.getActionDir();
816    
817                    String user = context.getWorkflow().getUser();
818                    String group = context.getWorkflow().getGroup();
819                    if (LauncherMapper.hasIdSwap(runningJob, user, group, actionDir)) {
820                        String launcherId = action.getExternalId();
821                        Path idSwapPath = LauncherMapper.getIdSwapPath(context.getActionDir());
822                        InputStream is = actionFs.open(idSwapPath);
823                        BufferedReader reader = new BufferedReader(new InputStreamReader(is));
824                        Properties props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
825                        reader.close();
826                        String newId = props.getProperty("id");
827                        runningJob = jobClient.getJob(JobID.forName(newId));
828                        if (runningJob == null) {
829                            context.setExternalStatus(FAILED);
830                            throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
831                                    "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", newId,
832                                    action.getId());
833                        }
834    
835                        context.setStartData(newId, action.getTrackerUri(), runningJob.getTrackingURL());
836                        XLog.getLog(getClass()).info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId,
837                                newId);
838                    }
839                    if (runningJob.isComplete()) {
840                        XLog.getLog(getClass()).info(XLog.STD, "action completed, external ID [{0}]",
841                                action.getExternalId());
842                        if (runningJob.isSuccessful() && LauncherMapper.isMainSuccessful(runningJob)) {
843                            Properties props = null;
844                            if (getCaptureOutput(action)) {
845                                props = new Properties();
846                                if (LauncherMapper.hasOutputData(runningJob)) {
847                                    Path actionOutput = LauncherMapper.getOutputDataPath(context.getActionDir());
848                                    InputStream is = actionFs.open(actionOutput);
849                                    BufferedReader reader = new BufferedReader(new InputStreamReader(is));
850                                    props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
851                                    reader.close();
852                                }
853                            }
854                            context.setExecutionData(SUCCEEDED, props);
855                            XLog.getLog(getClass()).info(XLog.STD, "action produced output");
856                        }
857                        else {
858                            XLog log = XLog.getLog(getClass());
859                            String errorReason;
860                            Path actionError = LauncherMapper.getErrorPath(context.getActionDir());
861                            if (actionFs.exists(actionError)) {
862                                InputStream is = actionFs.open(actionError);
863                                BufferedReader reader = new BufferedReader(new InputStreamReader(is));
864                                Properties props = PropertiesUtils.readProperties(reader, -1);
865                                reader.close();
866                                String errorCode = props.getProperty("error.code");
867                                if (errorCode.equals("0")) {
868                                    errorCode = "JA018";
869                                }
870                                if (errorCode.equals("-1")) {
871                                    errorCode = "JA019";
872                                }
873                                errorReason = props.getProperty("error.reason");
874                                log.warn("Launcher ERROR, reason: {0}", errorReason);
875                                String exMsg = props.getProperty("exception.message");
876                                String errorInfo = (exMsg != null) ? exMsg : errorReason;
877                                context.setErrorInfo(errorCode, errorInfo);
878                                String exStackTrace = props.getProperty("exception.stacktrace");
879                                if (exMsg != null) {
880                                    log.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
881                                }
882                            }
883                            else {
884                                errorReason = XLog.format("LauncherMapper died, check Hadoop log for job [{0}:{1}]", action
885                                        .getTrackerUri(), action.getExternalId());
886                                log.warn(errorReason);
887                            }
888                            context.setExecutionData(FAILED_KILLED, null);
889                        }
890                    }
891                    else {
892                        context.setExternalStatus(RUNNING);
893                        XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]",
894                                action.getExternalId(), action.getExternalStatus());
895                    }
896                }
897                else {
898                    context.setExternalStatus(RUNNING);
899                    XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]",
900                            action.getExternalId(), action.getExternalStatus());
901                }
902            }
903            catch (Exception ex) {
904                XLog.getLog(getClass()).warn("Exception in check(). Message[{0}]", ex.getMessage(), ex);
905                exception = true;
906                throw convertException(ex);
907            }
908            finally {
909                if (jobClient != null) {
910                    try {
911                        jobClient.close();
912                    }
913                    catch (Exception e) {
914                        if (exception) {
915                            log.error("JobClient error: ", e);
916                        }
917                        else {
918                            throw convertException(e);
919                        }
920                    }
921                }
922            }
923        }
924    
925        protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
926            Element eConf = XmlUtils.parseXml(action.getConf());
927            Namespace ns = eConf.getNamespace();
928            Element captureOutput = eConf.getChild("capture-output", ns);
929            return captureOutput != null;
930        }
931    
932        @Override
933        public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
934            JobClient jobClient = null;
935            boolean exception = false;
936            try {
937                Element actionXml = XmlUtils.parseXml(action.getConf());
938                Configuration conf = createBaseHadoopConf(context, actionXml);
939                JobConf jobConf = new JobConf();
940                XConfiguration.copy(conf, jobConf);
941                jobClient = createJobClient(context, jobConf);
942                RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
943                if (runningJob != null) {
944                    runningJob.killJob();
945                }
946                context.setExternalStatus(KILLED);
947                context.setExecutionData(KILLED, null);
948            }
949            catch (Exception ex) {
950                exception = true;
951                throw convertException(ex);
952            }
953            finally {
954                try {
955                    FileSystem actionFs = getActionFileSystem(context, action);
956                    cleanUpActionDir(actionFs, context);
957                    if (jobClient != null) {
958                        jobClient.close();
959                    }
960                }
961                catch (Exception ex) {
962                    if (exception) {
963                        log.error("Error: ", ex);
964                    }
965                    else {
966                        throw convertException(ex);
967                    }
968                }
969            }
970        }
971    
972        private static Set<String> FINAL_STATUS = new HashSet<String>();
973    
974        static {
975            FINAL_STATUS.add(SUCCEEDED);
976            FINAL_STATUS.add(KILLED);
977            FINAL_STATUS.add(FAILED);
978            FINAL_STATUS.add(FAILED_KILLED);
979        }
980    
981        @Override
982        public boolean isCompleted(String externalStatus) {
983            return FINAL_STATUS.contains(externalStatus);
984        }
985    
986    }