001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.action.hadoop;
020
021import java.io.IOException;
022import java.io.StringReader;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.List;
026import java.util.Map;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.mapred.Counters;
032import org.apache.hadoop.mapred.JobClient;
033import org.apache.hadoop.mapred.JobConf;
034import org.apache.hadoop.mapred.JobID;
035import org.apache.hadoop.mapred.RunningJob;
036import org.apache.hadoop.mapreduce.TypeConverter;
037import org.apache.oozie.action.ActionExecutorException;
038import org.apache.oozie.client.WorkflowAction;
039import org.apache.oozie.service.ConfigurationService;
040import org.apache.oozie.util.XConfiguration;
041import org.apache.oozie.util.XLog;
042import org.apache.oozie.util.XmlUtils;
043import org.jdom.Element;
044import org.jdom.Namespace;
045
046public class MapReduceActionExecutor extends JavaActionExecutor {
047
048    public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
049    public static final String HADOOP_COUNTERS = "hadoop.counters";
050    public static final String OOZIE_MAPREDUCE_UBER_JAR_ENABLE = "oozie.action.mapreduce.uber.jar.enable";
051    private static final String STREAMING_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.StreamingMain";
052    public static final String JOB_END_NOTIFICATION_URL = "job.end.notification.url";
053    private static final String MAPREDUCE_JOB_NAME = "mapreduce.job.name";
054    private XLog log = XLog.getLog(getClass());
055
056    public MapReduceActionExecutor() {
057        super("map-reduce");
058    }
059
060    @Override
061    public List<Class<?>> getLauncherClasses() {
062        List<Class<?>> classes = new ArrayList<Class<?>>();
063        try {
064            classes.add(Class.forName(STREAMING_MAIN_CLASS_NAME));
065        }
066        catch (ClassNotFoundException e) {
067            throw new RuntimeException("Class not found", e);
068        }
069        return classes;
070    }
071
072    @Override
073    protected String getActualExternalId(WorkflowAction action) {
074        String launcherJobId = action.getExternalId();
075        String childId = action.getExternalChildIDs();
076
077        if (childId != null && !childId.isEmpty()) {
078            return childId;
079        } else {
080            return launcherJobId;
081        }
082    }
083
084    @Override
085    protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
086        String mainClass;
087        Namespace ns = actionXml.getNamespace();
088        if (actionXml.getChild("streaming", ns) != null) {
089            mainClass = launcherConf.get(LauncherAMUtils.CONF_OOZIE_ACTION_MAIN_CLASS, STREAMING_MAIN_CLASS_NAME);
090        }
091        else {
092            if (actionXml.getChild("pipes", ns) != null) {
093                mainClass = launcherConf.get(LauncherAMUtils.CONF_OOZIE_ACTION_MAIN_CLASS, PipesMain.class.getName());
094            }
095            else {
096                mainClass = launcherConf.get(LauncherAMUtils.CONF_OOZIE_ACTION_MAIN_CLASS, MapReduceMain.class.getName());
097            }
098        }
099        return mainClass;
100    }
101
102    @Override
103    Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
104            throws ActionExecutorException {
105        super.setupLauncherConf(conf, actionXml, appPath, context);
106        conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
107
108        return conf;
109    }
110
111    private void injectConfigClass(Configuration conf, Element actionXml) {
112        // Inject config-class for launcher to use for action
113        Element e = actionXml.getChild("config-class", actionXml.getNamespace());
114        if (e != null) {
115            conf.set(LauncherAMUtils.OOZIE_ACTION_CONFIG_CLASS, e.getTextTrim());
116        }
117    }
118
119    @Override
120    protected Configuration createBaseHadoopConf(Context context, Element actionXml, boolean loadResources) {
121        Configuration conf = super.createBaseHadoopConf(context, actionXml, loadResources);
122        return conf;
123    }
124
125    @Override
126    @SuppressWarnings("unchecked")
127    Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
128            throws ActionExecutorException {
129        boolean regularMR = false;
130
131        injectConfigClass(actionConf, actionXml);
132        Namespace ns = actionXml.getNamespace();
133        if (actionXml.getChild("streaming", ns) != null) {
134            Element streamingXml = actionXml.getChild("streaming", ns);
135            String mapper = streamingXml.getChildTextTrim("mapper", ns);
136            String reducer = streamingXml.getChildTextTrim("reducer", ns);
137            String recordReader = streamingXml.getChildTextTrim("record-reader", ns);
138            List<Element> list = (List<Element>) streamingXml.getChildren("record-reader-mapping", ns);
139            String[] recordReaderMapping = new String[list.size()];
140            for (int i = 0; i < list.size(); i++) {
141                recordReaderMapping[i] = list.get(i).getTextTrim();
142            }
143            list = (List<Element>) streamingXml.getChildren("env", ns);
144            String[] env = new String[list.size()];
145            for (int i = 0; i < list.size(); i++) {
146                env[i] = list.get(i).getTextTrim();
147            }
148            setStreaming(actionConf, mapper, reducer, recordReader, recordReaderMapping, env);
149        }
150        else {
151            if (actionXml.getChild("pipes", ns) != null) {
152                Element pipesXml = actionXml.getChild("pipes", ns);
153                String map = pipesXml.getChildTextTrim("map", ns);
154                String reduce = pipesXml.getChildTextTrim("reduce", ns);
155                String inputFormat = pipesXml.getChildTextTrim("inputformat", ns);
156                String partitioner = pipesXml.getChildTextTrim("partitioner", ns);
157                String writer = pipesXml.getChildTextTrim("writer", ns);
158                String program = pipesXml.getChildTextTrim("program", ns);
159                PipesMain.setPipes(actionConf, map, reduce, inputFormat, partitioner, writer, program, appPath);
160            }
161            else {
162                regularMR = true;
163            }
164        }
165        actionConf = super.setupActionConf(actionConf, context, actionXml, appPath);
166        setJobName(actionConf, context);
167
168        // For "regular" (not streaming or pipes) MR jobs
169        if (regularMR) {
170            // Resolve uber jar path (has to be done after super because oozie.mapreduce.uber.jar is under <configuration>)
171            String uberJar = actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR);
172            if (uberJar != null) {
173                if (!ConfigurationService.getBoolean(OOZIE_MAPREDUCE_UBER_JAR_ENABLE)){
174                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "MR003",
175                            "{0} property is not allowed.  Set {1} to true in oozie-site to enable.",
176                            MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, OOZIE_MAPREDUCE_UBER_JAR_ENABLE);
177                }
178                String nameNode = actionXml.getChildTextTrim("name-node", ns);
179                if (nameNode != null) {
180                    Path uberJarPath = new Path(uberJar);
181                    if (uberJarPath.toUri().getScheme() == null || uberJarPath.toUri().getAuthority() == null) {
182                        if (uberJarPath.isAbsolute()) {     // absolute path without namenode --> prepend namenode
183                            Path nameNodePath = new Path(nameNode);
184                            String nameNodeSchemeAuthority = nameNodePath.toUri().getScheme()
185                                    + "://" + nameNodePath.toUri().getAuthority();
186                            actionConf.set(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR,
187                                    new Path(nameNodeSchemeAuthority + uberJarPath).toString());
188                        }
189                        else {                              // relative path --> prepend app path
190                            actionConf.set(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, new Path(appPath, uberJarPath).toString());
191                        }
192                    }
193                }
194            }
195        }
196        else {
197            if (actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR) != null) {
198                log.warn("The " + MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR + " property is only applicable for MapReduce (not"
199                        + "streaming nor pipes) workflows, ignoring");
200                actionConf.set(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, "");
201            }
202        }
203
204        // child job cancel delegation token for mapred action
205        actionConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true);
206
207        return actionConf;
208    }
209
210    private void setJobName(Configuration actionConf, Context context) {
211        String jobName = getAppName(context);
212        if (jobName != null) {
213            actionConf.set(MAPREDUCE_JOB_NAME, jobName.replace("oozie:launcher", "oozie:action"));
214        }
215    }
216
217    @Override
218    public void end(Context context, WorkflowAction action) throws ActionExecutorException {
219        super.end(context, action);
220        JobClient jobClient = null;
221        boolean exception = false;
222        try {
223            if (action.getStatus() == WorkflowAction.Status.OK) {
224                Element actionXml = XmlUtils.parseXml(action.getConf());
225                Configuration jobConf = createBaseHadoopConf(context, actionXml);
226                jobClient = createJobClient(context, jobConf);
227                RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalChildIDs()));
228                if (runningJob == null) {
229                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "MR002",
230                            "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!",
231                            action.getExternalChildIDs(), action.getId());
232                }
233
234                Counters counters = runningJob.getCounters();
235                if (counters != null) {
236                    ActionStats stats = new MRStats(counters);
237                    String statsJsonString = stats.toJSON();
238                    context.setVar(HADOOP_COUNTERS, statsJsonString);
239
240                    // If action stats write property is set to false by user or
241                    // size of stats is greater than the maximum allowed size,
242                    // do not store the action stats
243                    if (Boolean.parseBoolean(evaluateConfigurationProperty(actionXml,
244                            OOZIE_ACTION_EXTERNAL_STATS_WRITE, "false"))
245                            && (statsJsonString.getBytes().length <= getMaxExternalStatsSize())) {
246                        context.setExecutionStats(statsJsonString);
247                        log.debug(
248                                "Printing stats for Map-Reduce action as a JSON string : [{0}]", statsJsonString);
249                    }
250                }
251                else {
252                    context.setVar(HADOOP_COUNTERS, "");
253                    XLog.getLog(getClass()).warn("Could not find Hadoop Counters for: [{0}]",
254                            action.getExternalChildIDs());
255                }
256            }
257        }
258        catch (Exception ex) {
259            exception = true;
260            throw convertException(ex);
261        }
262        finally {
263            if (jobClient != null) {
264                try {
265                    jobClient.close();
266                }
267                catch (Exception e) {
268                    if (exception) {
269                        log.error("JobClient error: ", e);
270                    }
271                    else {
272                        throw convertException(e);
273                    }
274                }
275            }
276        }
277    }
278
279    // Return the value of the specified configuration property
280    private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue)
281            throws ActionExecutorException {
282        try {
283            String ret = defaultValue;
284            if (actionConf != null) {
285                Namespace ns = actionConf.getNamespace();
286                Element e = actionConf.getChild("configuration", ns);
287                if(e != null) {
288                    String strConf = XmlUtils.prettyPrint(e).toString();
289                    XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
290                    ret = inlineConf.get(key, defaultValue);
291                }
292            }
293            return ret;
294        }
295        catch (IOException ex) {
296            throw convertException(ex);
297        }
298    }
299
300    /**
301     * Return the sharelib name for the action.
302     *
303     * @return returns <code>streaming</code> if mapreduce-streaming action, <code>NULL</code> otherwise.
304     * @param actionXml
305     */
306    @Override
307    protected String getDefaultShareLibName(Element actionXml) {
308        Namespace ns = actionXml.getNamespace();
309        return (actionXml.getChild("streaming", ns) != null) ? "mapreduce-streaming" : null;
310    }
311
312    public static void setStreaming(Configuration conf, String mapper, String reducer, String recordReader,
313                                    String[] recordReaderMapping, String[] env) {
314        if (mapper != null) {
315            conf.set("oozie.streaming.mapper", mapper);
316        }
317        if (reducer != null) {
318            conf.set("oozie.streaming.reducer", reducer);
319        }
320        if (recordReader != null) {
321            conf.set("oozie.streaming.record-reader", recordReader);
322        }
323        ActionUtils.setStrings(conf, "oozie.streaming.record-reader-mapping", recordReaderMapping);
324        ActionUtils.setStrings(conf, "oozie.streaming.env", env);
325    }
326
327    @Override
328    protected void injectCallback(Context context, Configuration conf) {
329        // add callback for the MapReduce job
330        String callback = context.getCallbackUrl("$jobStatus");
331        String originalCallbackURL = conf.get(JOB_END_NOTIFICATION_URL);
332        if (originalCallbackURL != null) {
333            LOG.warn("Overriding the action job end notification URI. Original value: {0}", originalCallbackURL);
334        }
335        conf.set(JOB_END_NOTIFICATION_URL, callback);
336
337        super.injectCallback(context, conf);
338    }
339
340    @Override
341    public void check(Context context, WorkflowAction action) throws ActionExecutorException {
342        Map<String, String> actionData = Collections.emptyMap();
343        Configuration jobConf = null;
344
345        try {
346            FileSystem actionFs = context.getAppFileSystem();
347            Element actionXml = XmlUtils.parseXml(action.getConf());
348            jobConf = createBaseHadoopConf(context, actionXml);
349            Path actionDir = context.getActionDir();
350            actionData = LauncherHelper.getActionData(actionFs, actionDir, jobConf);
351        } catch (Exception e) {
352            LOG.warn("Exception in check(). Message[{0}]", e.getMessage(), e);
353            throw convertException(e);
354        }
355
356        final String newId = actionData.get(LauncherAMUtils.ACTION_DATA_NEW_ID);
357
358        // check the Hadoop job if newID is defined (which should be the case here) - otherwise perform the normal check()
359        if (newId != null) {
360            boolean jobCompleted;
361            JobClient jobClient = null;
362            boolean exception = false;
363
364            try {
365                jobClient = createJobClient(context, new JobConf(jobConf));
366                RunningJob runningJob = jobClient.getJob(JobID.forName(newId));
367
368                if (runningJob == null) {
369                    context.setExternalStatus(FAILED);
370                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
371                            "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", newId,
372                            action.getId());
373                }
374
375                jobCompleted = runningJob.isComplete();
376            } catch (Exception e) {
377                LOG.warn("Unable to check the state of a running MapReduce job -"
378                        + " please check the health of the Job History Server!", e);
379                exception = true;
380                throw convertException(e);
381            } finally {
382                if (jobClient != null) {
383                    try {
384                        jobClient.close();
385                    } catch (Exception e) {
386                        if (exception) {
387                            LOG.error("JobClient error (not re-throwing due to a previous error): ", e);
388                        } else {
389                            throw convertException(e);
390                        }
391                    }
392                }
393            }
394
395            // run original check() if the MR action is completed or there are errors - otherwise mark it as RUNNING
396            if (jobCompleted || actionData.containsKey(LauncherAMUtils.ACTION_DATA_ERROR_PROPS)) {
397                super.check(context, action);
398            } else {
399                context.setExternalStatus(RUNNING);
400                String externalAppId = TypeConverter.toYarn(JobID.forName(newId)).getAppId().toString();
401                context.setExternalChildIDs(externalAppId);
402            }
403        } else {
404            super.check(context, action);
405        }
406    }
407
408    @Override
409    void injectActionCallback(Context context, Configuration actionConf) {
410        injectCallback(context, actionConf);
411    }
412
413}