001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.action.hadoop;
020
021import java.io.IOException;
022import java.io.StringReader;
023import java.util.ArrayList;
024import java.util.List;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.mapred.Counters;
030import org.apache.hadoop.mapred.JobClient;
031import org.apache.hadoop.mapred.JobConf;
032import org.apache.hadoop.mapred.JobID;
033import org.apache.hadoop.mapred.RunningJob;
034import org.apache.oozie.action.ActionExecutorException;
035import org.apache.oozie.client.WorkflowAction;
036import org.apache.oozie.service.ConfigurationService;
037import org.apache.oozie.util.XConfiguration;
038import org.apache.oozie.util.XLog;
039import org.apache.oozie.util.XmlUtils;
040import org.jdom.Element;
041import org.jdom.Namespace;
042import org.json.simple.JSONObject;
043
044public class MapReduceActionExecutor extends JavaActionExecutor {
045
046    public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
047    public static final String HADOOP_COUNTERS = "hadoop.counters";
048    public static final String OOZIE_MAPREDUCE_UBER_JAR_ENABLE = "oozie.action.mapreduce.uber.jar.enable";
049    private static final String STREAMING_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.StreamingMain";
050    private XLog log = XLog.getLog(getClass());
051
052    public MapReduceActionExecutor() {
053        super("map-reduce");
054    }
055
056    @SuppressWarnings("rawtypes")
057    @Override
058    public List<Class> getLauncherClasses() {
059        List<Class> classes = new ArrayList<Class>();
060        try {
061            classes.add(Class.forName(STREAMING_MAIN_CLASS_NAME));
062        }
063        catch (ClassNotFoundException e) {
064            throw new RuntimeException("Class not found", e);
065        }
066        return classes;
067    }
068
069    @Override
070    protected String getActualExternalId(WorkflowAction action) {
071        String launcherJobId = action.getExternalId();
072        String childId = action.getExternalChildIDs();
073
074        if (childId != null && !childId.isEmpty()) {
075            return childId;
076        } else {
077            return launcherJobId;
078        }
079    }
080
081    @Override
082    protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
083        String mainClass;
084        Namespace ns = actionXml.getNamespace();
085        if (actionXml.getChild("streaming", ns) != null) {
086            mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, STREAMING_MAIN_CLASS_NAME);
087        }
088        else {
089            if (actionXml.getChild("pipes", ns) != null) {
090                mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, PipesMain.class.getName());
091            }
092            else {
093                mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, MapReduceMain.class.getName());
094            }
095        }
096        return mainClass;
097    }
098
099    @Override
100    Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) throws ActionExecutorException {
101        super.setupLauncherConf(conf, actionXml, appPath, context);
102        conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
103        return conf;
104    }
105
106    @Override
107    @SuppressWarnings("unchecked")
108    Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
109            throws ActionExecutorException {
110        boolean regularMR = false;
111        Namespace ns = actionXml.getNamespace();
112        if (actionXml.getChild("streaming", ns) != null) {
113            Element streamingXml = actionXml.getChild("streaming", ns);
114            String mapper = streamingXml.getChildTextTrim("mapper", ns);
115            String reducer = streamingXml.getChildTextTrim("reducer", ns);
116            String recordReader = streamingXml.getChildTextTrim("record-reader", ns);
117            List<Element> list = (List<Element>) streamingXml.getChildren("record-reader-mapping", ns);
118            String[] recordReaderMapping = new String[list.size()];
119            for (int i = 0; i < list.size(); i++) {
120                recordReaderMapping[i] = list.get(i).getTextTrim();
121            }
122            list = (List<Element>) streamingXml.getChildren("env", ns);
123            String[] env = new String[list.size()];
124            for (int i = 0; i < list.size(); i++) {
125                env[i] = list.get(i).getTextTrim();
126            }
127            setStreaming(actionConf, mapper, reducer, recordReader, recordReaderMapping, env);
128        }
129        else {
130            if (actionXml.getChild("pipes", ns) != null) {
131                Element pipesXml = actionXml.getChild("pipes", ns);
132                String map = pipesXml.getChildTextTrim("map", ns);
133                String reduce = pipesXml.getChildTextTrim("reduce", ns);
134                String inputFormat = pipesXml.getChildTextTrim("inputformat", ns);
135                String partitioner = pipesXml.getChildTextTrim("partitioner", ns);
136                String writer = pipesXml.getChildTextTrim("writer", ns);
137                String program = pipesXml.getChildTextTrim("program", ns);
138                PipesMain.setPipes(actionConf, map, reduce, inputFormat, partitioner, writer, program, appPath);
139            }
140            else {
141                regularMR = true;
142            }
143        }
144        actionConf = super.setupActionConf(actionConf, context, actionXml, appPath);
145
146        // For "regular" (not streaming or pipes) MR jobs
147        if (regularMR) {
148            // Resolve uber jar path (has to be done after super because oozie.mapreduce.uber.jar is under <configuration>)
149            String uberJar = actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR);
150            if (uberJar != null) {
151                if (!ConfigurationService.getBoolean(OOZIE_MAPREDUCE_UBER_JAR_ENABLE)){
152                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "MR003",
153                            "{0} property is not allowed.  Set {1} to true in oozie-site to enable.",
154                            MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, OOZIE_MAPREDUCE_UBER_JAR_ENABLE);
155                }
156                String nameNode = actionXml.getChildTextTrim("name-node", ns);
157                if (nameNode != null) {
158                    Path uberJarPath = new Path(uberJar);
159                    if (uberJarPath.toUri().getScheme() == null || uberJarPath.toUri().getAuthority() == null) {
160                        if (uberJarPath.isAbsolute()) {     // absolute path without namenode --> prepend namenode
161                            Path nameNodePath = new Path(nameNode);
162                            String nameNodeSchemeAuthority = nameNodePath.toUri().getScheme()
163                                    + "://" + nameNodePath.toUri().getAuthority();
164                            actionConf.set(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR,
165                                    new Path(nameNodeSchemeAuthority + uberJarPath).toString());
166                        }
167                        else {                              // relative path --> prepend app path
168                            actionConf.set(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, new Path(appPath, uberJarPath).toString());
169                        }
170                    }
171                }
172            }
173        }
174        else {
175            if (actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR) != null) {
176                log.warn("The " + MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR + " property is only applicable for MapReduce (not"
177                        + "streaming nor pipes) workflows, ignoring");
178                actionConf.set(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, "");
179            }
180        }
181
182        // child job cancel delegation token for mapred action
183        actionConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true);
184
185        return actionConf;
186    }
187
188    @Override
189    public void end(Context context, WorkflowAction action) throws ActionExecutorException {
190        super.end(context, action);
191        JobClient jobClient = null;
192        boolean exception = false;
193        try {
194            if (action.getStatus() == WorkflowAction.Status.OK) {
195                Element actionXml = XmlUtils.parseXml(action.getConf());
196                JobConf jobConf = createBaseHadoopConf(context, actionXml);
197                jobClient = createJobClient(context, jobConf);
198                RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalChildIDs()));
199                if (runningJob == null) {
200                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "MR002",
201                            "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!",
202                            action.getExternalChildIDs(), action.getId());
203                }
204
205                Counters counters = runningJob.getCounters();
206                if (counters != null) {
207                    ActionStats stats = new MRStats(counters);
208                    String statsJsonString = stats.toJSON();
209                    context.setVar(HADOOP_COUNTERS, statsJsonString);
210
211                    // If action stats write property is set to false by user or
212                    // size of stats is greater than the maximum allowed size,
213                    // do not store the action stats
214                    if (Boolean.parseBoolean(evaluateConfigurationProperty(actionXml,
215                            OOZIE_ACTION_EXTERNAL_STATS_WRITE, "false"))
216                            && (statsJsonString.getBytes().length <= getMaxExternalStatsSize())) {
217                        context.setExecutionStats(statsJsonString);
218                        log.debug(
219                                "Printing stats for Map-Reduce action as a JSON string : [{0}]", statsJsonString);
220                    }
221                }
222                else {
223                    context.setVar(HADOOP_COUNTERS, "");
224                    XLog.getLog(getClass()).warn("Could not find Hadoop Counters for: [{0}]",
225                            action.getExternalChildIDs());
226                }
227            }
228        }
229        catch (Exception ex) {
230            exception = true;
231            throw convertException(ex);
232        }
233        finally {
234            if (jobClient != null) {
235                try {
236                    jobClient.close();
237                }
238                catch (Exception e) {
239                    if (exception) {
240                        log.error("JobClient error: ", e);
241                    }
242                    else {
243                        throw convertException(e);
244                    }
245                }
246            }
247        }
248    }
249
250    // Return the value of the specified configuration property
251    private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue) throws ActionExecutorException {
252        try {
253            String ret = defaultValue;
254            if (actionConf != null) {
255                Namespace ns = actionConf.getNamespace();
256                Element e = actionConf.getChild("configuration", ns);
257                if(e != null) {
258                    String strConf = XmlUtils.prettyPrint(e).toString();
259                    XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
260                    ret = inlineConf.get(key, defaultValue);
261                }
262            }
263            return ret;
264        }
265        catch (IOException ex) {
266            throw convertException(ex);
267        }
268    }
269
270    @SuppressWarnings("unchecked")
271    private JSONObject counterstoJson(Counters counters) {
272
273        if (counters == null) {
274            return null;
275        }
276
277        JSONObject groups = new JSONObject();
278        for (String gName : counters.getGroupNames()) {
279            JSONObject group = new JSONObject();
280            for (Counters.Counter counter : counters.getGroup(gName)) {
281                String cName = counter.getName();
282                Long cValue = counter.getCounter();
283                group.put(cName, cValue);
284            }
285            groups.put(gName, group);
286        }
287        return groups;
288    }
289
290    /**
291     * Return the sharelib name for the action.
292     *
293     * @return returns <code>streaming</code> if mapreduce-streaming action, <code>NULL</code> otherwise.
294     * @param actionXml
295     */
296    @Override
297    protected String getDefaultShareLibName(Element actionXml) {
298        Namespace ns = actionXml.getNamespace();
299        return (actionXml.getChild("streaming", ns) != null) ? "mapreduce-streaming" : null;
300    }
301
302    @Override
303    JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml,
304            Configuration actionConf) throws ActionExecutorException {
305        // If the user is using a regular MapReduce job and specified an uber jar, we need to also set it for the launcher;
306        // so we override createLauncherConf to call super and then to set the uber jar if specified. At this point, checking that
307        // uber jars are enabled and resolving the uber jar path is already done by setupActionConf() when it parsed the actionConf
308        // argument and we can just look up the uber jar in the actionConf argument.
309        JobConf launcherJobConf = super.createLauncherConf(actionFs, context, action, actionXml, actionConf);
310        Namespace ns = actionXml.getNamespace();
311        if (actionXml.getChild("streaming", ns) == null && actionXml.getChild("pipes", ns) == null) {
312            // Set for uber jar
313            String uberJar = actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR);
314            if (uberJar != null && uberJar.trim().length() > 0) {
315                launcherJobConf.setJar(uberJar);
316            }
317        }
318        return launcherJobConf;
319    }
320
321    public static void setStreaming(Configuration conf, String mapper, String reducer, String recordReader,
322                                    String[] recordReaderMapping, String[] env) {
323        if (mapper != null) {
324            conf.set("oozie.streaming.mapper", mapper);
325        }
326        if (reducer != null) {
327            conf.set("oozie.streaming.reducer", reducer);
328        }
329        if (recordReader != null) {
330            conf.set("oozie.streaming.record-reader", recordReader);
331        }
332        MapReduceMain.setStrings(conf, "oozie.streaming.record-reader-mapping", recordReaderMapping);
333        MapReduceMain.setStrings(conf, "oozie.streaming.env", env);
334    }
335
336    @Override
337    protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{
338        RunningJob runningJob = null;
339        String jobId = getActualExternalId(action);
340        if (jobId != null) {
341            runningJob = jobClient.getJob(JobID.forName(jobId));
342        }
343        return runningJob;
344    }
345
346    @Override
347    void injectActionCallback(Context context, Configuration actionConf) {
348        injectCallback(context, actionConf);
349    }
350
351}