001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.action.hadoop;
019
020import java.io.IOException;
021import java.io.StringReader;
022import java.util.ArrayList;
023import java.util.List;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.mapred.Counters;
029import org.apache.hadoop.mapred.JobClient;
030import org.apache.hadoop.mapred.JobConf;
031import org.apache.hadoop.mapred.JobID;
032import org.apache.hadoop.mapred.RunningJob;
033import org.apache.oozie.action.ActionExecutorException;
034import org.apache.oozie.client.WorkflowAction;
035import org.apache.oozie.service.Services;
036import org.apache.oozie.util.XConfiguration;
037import org.apache.oozie.util.XLog;
038import org.apache.oozie.util.XmlUtils;
039import org.jdom.Element;
040import org.jdom.Namespace;
041import org.json.simple.JSONObject;
042
043public class MapReduceActionExecutor extends JavaActionExecutor {
044
045    public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
046    public static final String HADOOP_COUNTERS = "hadoop.counters";
047    public static final String OOZIE_MAPREDUCE_UBER_JAR_ENABLE = "oozie.action.mapreduce.uber.jar.enable";
048    private static final String STREAMING_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.StreamingMain";
049    private XLog log = XLog.getLog(getClass());
050
051    public MapReduceActionExecutor() {
052        super("map-reduce");
053    }
054
055    @SuppressWarnings("rawtypes")
056    @Override
057    public List<Class> getLauncherClasses() {
058        List<Class> classes = new ArrayList<Class>();
059        try {
060            classes.add(Class.forName(STREAMING_MAIN_CLASS_NAME));
061        }
062        catch (ClassNotFoundException e) {
063            throw new RuntimeException("Class not found", e);
064        }
065        return classes;
066    }
067
068    @Override
069    protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
070        String mainClass;
071        Namespace ns = actionXml.getNamespace();
072        if (actionXml.getChild("streaming", ns) != null) {
073            mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, STREAMING_MAIN_CLASS_NAME);
074        }
075        else {
076            if (actionXml.getChild("pipes", ns) != null) {
077                mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, PipesMain.class.getName());
078            }
079            else {
080                mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, MapReduceMain.class.getName());
081            }
082        }
083        return mainClass;
084    }
085
086    @Override
087    Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) throws ActionExecutorException {
088        super.setupLauncherConf(conf, actionXml, appPath, context);
089        conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true);
090        return conf;
091    }
092
093    @Override
094    @SuppressWarnings("unchecked")
095    Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
096            throws ActionExecutorException {
097        boolean regularMR = false;
098        Namespace ns = actionXml.getNamespace();
099        if (actionXml.getChild("streaming", ns) != null) {
100            Element streamingXml = actionXml.getChild("streaming", ns);
101            String mapper = streamingXml.getChildTextTrim("mapper", ns);
102            String reducer = streamingXml.getChildTextTrim("reducer", ns);
103            String recordReader = streamingXml.getChildTextTrim("record-reader", ns);
104            List<Element> list = (List<Element>) streamingXml.getChildren("record-reader-mapping", ns);
105            String[] recordReaderMapping = new String[list.size()];
106            for (int i = 0; i < list.size(); i++) {
107                recordReaderMapping[i] = list.get(i).getTextTrim();
108            }
109            list = (List<Element>) streamingXml.getChildren("env", ns);
110            String[] env = new String[list.size()];
111            for (int i = 0; i < list.size(); i++) {
112                env[i] = list.get(i).getTextTrim();
113            }
114            setStreaming(actionConf, mapper, reducer, recordReader, recordReaderMapping, env);
115        }
116        else {
117            if (actionXml.getChild("pipes", ns) != null) {
118                Element pipesXml = actionXml.getChild("pipes", ns);
119                String map = pipesXml.getChildTextTrim("map", ns);
120                String reduce = pipesXml.getChildTextTrim("reduce", ns);
121                String inputFormat = pipesXml.getChildTextTrim("inputformat", ns);
122                String partitioner = pipesXml.getChildTextTrim("partitioner", ns);
123                String writer = pipesXml.getChildTextTrim("writer", ns);
124                String program = pipesXml.getChildTextTrim("program", ns);
125                PipesMain.setPipes(actionConf, map, reduce, inputFormat, partitioner, writer, program, appPath);
126            }
127            else {
128                regularMR = true;
129            }
130        }
131        actionConf = super.setupActionConf(actionConf, context, actionXml, appPath);
132
133        // For "regular" (not streaming or pipes) MR jobs
134        if (regularMR) {
135            // Resolve uber jar path (has to be done after super because oozie.mapreduce.uber.jar is under <configuration>)
136            String uberJar = actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR);
137            if (uberJar != null) {
138                if (!Services.get().getConf().getBoolean(OOZIE_MAPREDUCE_UBER_JAR_ENABLE, false)) {
139                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "MR003",
140                            "{0} property is not allowed.  Set {1} to true in oozie-site to enable.",
141                            MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, OOZIE_MAPREDUCE_UBER_JAR_ENABLE);
142                }
143                String nameNode = actionXml.getChildTextTrim("name-node", ns);
144                if (nameNode != null) {
145                    Path uberJarPath = new Path(uberJar);
146                    if (uberJarPath.toUri().getScheme() == null || uberJarPath.toUri().getAuthority() == null) {
147                        if (uberJarPath.isAbsolute()) {     // absolute path without namenode --> prepend namenode
148                            Path nameNodePath = new Path(nameNode);
149                            String nameNodeSchemeAuthority = nameNodePath.toUri().getScheme()
150                                    + "://" + nameNodePath.toUri().getAuthority();
151                            actionConf.set(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR,
152                                    new Path(nameNodeSchemeAuthority + uberJarPath).toString());
153                        }
154                        else {                              // relative path --> prepend app path
155                            actionConf.set(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, new Path(appPath, uberJarPath).toString());
156                        }
157                    }
158                }
159            }
160        }
161        else {
162            if (actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR) != null) {
163                log.warn("The " + MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR + " property is only applicable for MapReduce (not"
164                        + "streaming nor pipes) workflows, ignoring");
165                actionConf.set(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, "");
166            }
167        }
168
169        return actionConf;
170    }
171
172    @Override
173    public void end(Context context, WorkflowAction action) throws ActionExecutorException {
174        super.end(context, action);
175        JobClient jobClient = null;
176        boolean exception = false;
177        try {
178            if (action.getStatus() == WorkflowAction.Status.OK) {
179                Element actionXml = XmlUtils.parseXml(action.getConf());
180                JobConf jobConf = createBaseHadoopConf(context, actionXml);
181                jobClient = createJobClient(context, jobConf);
182                RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalChildIDs()));
183                if (runningJob == null) {
184                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "MR002",
185                            "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!",
186                            action.getExternalChildIDs(), action.getId());
187                }
188
189                Counters counters = runningJob.getCounters();
190                if (counters != null) {
191                    ActionStats stats = new MRStats(counters);
192                    String statsJsonString = stats.toJSON();
193                    context.setVar(HADOOP_COUNTERS, statsJsonString);
194
195                    // If action stats write property is set to false by user or
196                    // size of stats is greater than the maximum allowed size,
197                    // do not store the action stats
198                    if (Boolean.parseBoolean(evaluateConfigurationProperty(actionXml,
199                            OOZIE_ACTION_EXTERNAL_STATS_WRITE, "false"))
200                            && (statsJsonString.getBytes().length <= getMaxExternalStatsSize())) {
201                        context.setExecutionStats(statsJsonString);
202                        log.debug(
203                                "Printing stats for Map-Reduce action as a JSON string : [{0}]", statsJsonString);
204                    }
205                }
206                else {
207                    context.setVar(HADOOP_COUNTERS, "");
208                    XLog.getLog(getClass()).warn("Could not find Hadoop Counters for: [{0}]",
209                            action.getExternalChildIDs());
210                }
211            }
212        }
213        catch (Exception ex) {
214            exception = true;
215            throw convertException(ex);
216        }
217        finally {
218            if (jobClient != null) {
219                try {
220                    jobClient.close();
221                }
222                catch (Exception e) {
223                    if (exception) {
224                        log.error("JobClient error: ", e);
225                    }
226                    else {
227                        throw convertException(e);
228                    }
229                }
230            }
231        }
232    }
233
234    // Return the value of the specified configuration property
235    private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue) throws ActionExecutorException {
236        try {
237            String ret = defaultValue;
238            if (actionConf != null) {
239                Namespace ns = actionConf.getNamespace();
240                Element e = actionConf.getChild("configuration", ns);
241                if(e != null) {
242                    String strConf = XmlUtils.prettyPrint(e).toString();
243                    XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
244                    ret = inlineConf.get(key, defaultValue);
245                }
246            }
247            return ret;
248        }
249        catch (IOException ex) {
250            throw convertException(ex);
251        }
252    }
253
254    @SuppressWarnings("unchecked")
255    private JSONObject counterstoJson(Counters counters) {
256
257        if (counters == null) {
258            return null;
259        }
260
261        JSONObject groups = new JSONObject();
262        for (String gName : counters.getGroupNames()) {
263            JSONObject group = new JSONObject();
264            for (Counters.Counter counter : counters.getGroup(gName)) {
265                String cName = counter.getName();
266                Long cValue = counter.getCounter();
267                group.put(cName, cValue);
268            }
269            groups.put(gName, group);
270        }
271        return groups;
272    }
273
274    /**
275     * Return the sharelib name for the action.
276     *
277     * @return returns <code>streaming</code> if mapreduce-streaming action, <code>NULL</code> otherwise.
278     * @param actionXml
279     */
280    @Override
281    protected String getDefaultShareLibName(Element actionXml) {
282        Namespace ns = actionXml.getNamespace();
283        return (actionXml.getChild("streaming", ns) != null) ? "mapreduce-streaming" : null;
284    }
285
286    @Override
287    JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml,
288            Configuration actionConf) throws ActionExecutorException {
289        // If the user is using a regular MapReduce job and specified an uber jar, we need to also set it for the launcher;
290        // so we override createLauncherConf to call super and then to set the uber jar if specified. At this point, checking that
291        // uber jars are enabled and resolving the uber jar path is already done by setupActionConf() when it parsed the actionConf
292        // argument and we can just look up the uber jar in the actionConf argument.
293        JobConf launcherJobConf = super.createLauncherConf(actionFs, context, action, actionXml, actionConf);
294        Namespace ns = actionXml.getNamespace();
295        if (actionXml.getChild("streaming", ns) == null && actionXml.getChild("pipes", ns) == null) {
296            // Set for uber jar
297            String uberJar = actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR);
298            if (uberJar != null && uberJar.trim().length() > 0) {
299                launcherJobConf.setJar(uberJar);
300            }
301        }
302        return launcherJobConf;
303    }
304
305    public static void setStreaming(Configuration conf, String mapper, String reducer, String recordReader,
306                                    String[] recordReaderMapping, String[] env) {
307        if (mapper != null) {
308            conf.set("oozie.streaming.mapper", mapper);
309        }
310        if (reducer != null) {
311            conf.set("oozie.streaming.reducer", reducer);
312        }
313        if (recordReader != null) {
314            conf.set("oozie.streaming.record-reader", recordReader);
315        }
316        MapReduceMain.setStrings(conf, "oozie.streaming.record-reader-mapping", recordReaderMapping);
317        MapReduceMain.setStrings(conf, "oozie.streaming.env", env);
318    }
319
320    @Override
321    protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{
322
323        RunningJob runningJob;
324        String launcherJobId = action.getExternalId();
325        String childJobId = action.getExternalChildIDs();
326
327        if (childJobId != null && childJobId.length() > 0) {
328            runningJob = jobClient.getJob(JobID.forName(childJobId));
329        }
330        else {
331            runningJob = jobClient.getJob(JobID.forName(launcherJobId));
332        }
333
334        return runningJob;
335    }
336}