001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.util.List;
023    
024    import org.apache.hadoop.conf.Configuration;
025    import org.apache.hadoop.fs.FileSystem;
026    import org.apache.hadoop.fs.Path;
027    import org.apache.hadoop.mapred.Counters;
028    import org.apache.hadoop.mapred.JobClient;
029    import org.apache.hadoop.mapred.JobConf;
030    import org.apache.hadoop.mapred.JobID;
031    import org.apache.hadoop.mapred.RunningJob;
032    import org.apache.oozie.action.ActionExecutorException;
033    import org.apache.oozie.client.WorkflowAction;
034    import org.apache.oozie.service.Services;
035    import org.apache.oozie.util.XConfiguration;
036    import org.apache.oozie.util.XLog;
037    import org.apache.oozie.util.XmlUtils;
038    import org.jdom.Element;
039    import org.jdom.Namespace;
040    import org.json.simple.JSONObject;
041    
042    public class MapReduceActionExecutor extends JavaActionExecutor {
043    
044        public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
045        public static final String HADOOP_COUNTERS = "hadoop.counters";
046        public static final String OOZIE_MAPREDUCE_UBER_JAR_ENABLE = "oozie.action.mapreduce.uber.jar.enable";
047        private static final String STREAMING_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.StreamingMain";
048        private XLog log = XLog.getLog(getClass());
049    
050        public MapReduceActionExecutor() {
051            super("map-reduce");
052        }
053    
054        @Override
055        protected List<Class> getLauncherClasses() {
056            List<Class> classes = super.getLauncherClasses();
057            classes.add(LauncherMain.class);
058            classes.add(MapReduceMain.class);
059            classes.add(PipesMain.class);
060            try {
061                classes.add(Class.forName(STREAMING_MAIN_CLASS_NAME));
062            }
063            catch (ClassNotFoundException e) {
064                throw new RuntimeException("Class not found", e);
065            }
066            return classes;
067        }
068    
069        @Override
070        protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
071            String mainClass;
072            Namespace ns = actionXml.getNamespace();
073            if (actionXml.getChild("streaming", ns) != null) {
074                mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, STREAMING_MAIN_CLASS_NAME);
075            }
076            else {
077                if (actionXml.getChild("pipes", ns) != null) {
078                    mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, PipesMain.class.getName());
079                }
080                else {
081                    mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, MapReduceMain.class.getName());
082                }
083            }
084            return mainClass;
085        }
086    
087        @Override
088        Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) throws ActionExecutorException {
089            super.setupLauncherConf(conf, actionXml, appPath, context);
090            conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true);
091            return conf;
092        }
093    
094        @Override
095        @SuppressWarnings("unchecked")
096        Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
097                throws ActionExecutorException {
098            boolean regularMR = false;
099            Namespace ns = actionXml.getNamespace();
100            if (actionXml.getChild("streaming", ns) != null) {
101                Element streamingXml = actionXml.getChild("streaming", ns);
102                String mapper = streamingXml.getChildTextTrim("mapper", ns);
103                String reducer = streamingXml.getChildTextTrim("reducer", ns);
104                String recordReader = streamingXml.getChildTextTrim("record-reader", ns);
105                List<Element> list = (List<Element>) streamingXml.getChildren("record-reader-mapping", ns);
106                String[] recordReaderMapping = new String[list.size()];
107                for (int i = 0; i < list.size(); i++) {
108                    recordReaderMapping[i] = list.get(i).getTextTrim();
109                }
110                list = (List<Element>) streamingXml.getChildren("env", ns);
111                String[] env = new String[list.size()];
112                for (int i = 0; i < list.size(); i++) {
113                    env[i] = list.get(i).getTextTrim();
114                }
115                setStreaming(actionConf, mapper, reducer, recordReader, recordReaderMapping, env);
116            }
117            else {
118                if (actionXml.getChild("pipes", ns) != null) {
119                    Element pipesXml = actionXml.getChild("pipes", ns);
120                    String map = pipesXml.getChildTextTrim("map", ns);
121                    String reduce = pipesXml.getChildTextTrim("reduce", ns);
122                    String inputFormat = pipesXml.getChildTextTrim("inputformat", ns);
123                    String partitioner = pipesXml.getChildTextTrim("partitioner", ns);
124                    String writer = pipesXml.getChildTextTrim("writer", ns);
125                    String program = pipesXml.getChildTextTrim("program", ns);
126                    PipesMain.setPipes(actionConf, map, reduce, inputFormat, partitioner, writer, program, appPath);
127                }
128                else {
129                    regularMR = true;
130                }
131            }
132            actionConf = super.setupActionConf(actionConf, context, actionXml, appPath);
133    
134            // For "regular" (not streaming or pipes) MR jobs
135            if (regularMR) {
136                // Resolve uber jar path (has to be done after super because oozie.mapreduce.uber.jar is under <configuration>)
137                String uberJar = actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR);
138                if (uberJar != null) {
139                    if (!Services.get().getConf().getBoolean(OOZIE_MAPREDUCE_UBER_JAR_ENABLE, false)) {
140                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "MR003",
141                                "{0} property is not allowed.  Set {1} to true in oozie-site to enable.",
142                                MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, OOZIE_MAPREDUCE_UBER_JAR_ENABLE);
143                    }
144                    String nameNode = actionXml.getChildTextTrim("name-node", ns);
145                    if (nameNode != null) {
146                        Path uberJarPath = new Path(uberJar);
147                        if (uberJarPath.toUri().getScheme() == null || uberJarPath.toUri().getAuthority() == null) {
148                            if (uberJarPath.isAbsolute()) {     // absolute path without namenode --> prepend namenode
149                                Path nameNodePath = new Path(nameNode);
150                                String nameNodeSchemeAuthority = nameNodePath.toUri().getScheme()
151                                        + "://" + nameNodePath.toUri().getAuthority();
152                                actionConf.set(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR,
153                                        new Path(nameNodeSchemeAuthority + uberJarPath).toString());
154                            }
155                            else {                              // relative path --> prepend app path
156                                actionConf.set(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, new Path(appPath, uberJarPath).toString());
157                            }
158                        }
159                    }
160                }
161            }
162            else {
163                if (actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR) != null) {
164                    log.warn("The " + MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR + " property is only applicable for MapReduce (not"
165                            + "streaming nor pipes) workflows, ignoring");
166                    actionConf.set(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, "");
167                }
168            }
169    
170            return actionConf;
171        }
172    
173        @Override
174        public void end(Context context, WorkflowAction action) throws ActionExecutorException {
175            super.end(context, action);
176            JobClient jobClient = null;
177            boolean exception = false;
178            try {
179                if (action.getStatus() == WorkflowAction.Status.OK) {
180                    Element actionXml = XmlUtils.parseXml(action.getConf());
181                    JobConf jobConf = createBaseHadoopConf(context, actionXml);
182                    jobClient = createJobClient(context, jobConf);
183                    RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalChildIDs()));
184                    if (runningJob == null) {
185                        throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "MR002",
186                                "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!",
187                                action.getExternalChildIDs(), action.getId());
188                    }
189    
190                    Counters counters = runningJob.getCounters();
191                    if (counters != null) {
192                        ActionStats stats = new MRStats(counters);
193                        String statsJsonString = stats.toJSON();
194                        context.setVar(HADOOP_COUNTERS, statsJsonString);
195    
196                        // If action stats write property is set to false by user or
197                        // size of stats is greater than the maximum allowed size,
198                        // do not store the action stats
199                        if (Boolean.parseBoolean(evaluateConfigurationProperty(actionXml,
200                                OOZIE_ACTION_EXTERNAL_STATS_WRITE, "false"))
201                                && (statsJsonString.getBytes().length <= getMaxExternalStatsSize())) {
202                            context.setExecutionStats(statsJsonString);
203                            log.debug(
204                                    "Printing stats for Map-Reduce action as a JSON string : [{0}]", statsJsonString);
205                        }
206                    }
207                    else {
208                        context.setVar(HADOOP_COUNTERS, "");
209                        XLog.getLog(getClass()).warn("Could not find Hadoop Counters for: [{0}]",
210                                action.getExternalChildIDs());
211                    }
212                }
213            }
214            catch (Exception ex) {
215                exception = true;
216                throw convertException(ex);
217            }
218            finally {
219                if (jobClient != null) {
220                    try {
221                        jobClient.close();
222                    }
223                    catch (Exception e) {
224                        if (exception) {
225                            log.error("JobClient error: ", e);
226                        }
227                        else {
228                            throw convertException(e);
229                        }
230                    }
231                }
232            }
233        }
234    
235        // Return the value of the specified configuration property
236        private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue) throws ActionExecutorException {
237            try {
238                if (actionConf != null) {
239                    Namespace ns = actionConf.getNamespace();
240                    Element e = actionConf.getChild("configuration", ns);
241                    String strConf = XmlUtils.prettyPrint(e).toString();
242                    XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
243                    return inlineConf.get(key, defaultValue);
244                }
245                return "";
246            }
247            catch (IOException ex) {
248                throw convertException(ex);
249            }
250        }
251    
252        @SuppressWarnings("unchecked")
253        private JSONObject counterstoJson(Counters counters) {
254    
255            if (counters == null) {
256                return null;
257            }
258    
259            JSONObject groups = new JSONObject();
260            for (String gName : counters.getGroupNames()) {
261                JSONObject group = new JSONObject();
262                for (Counters.Counter counter : counters.getGroup(gName)) {
263                    String cName = counter.getName();
264                    Long cValue = counter.getCounter();
265                    group.put(cName, cValue);
266                }
267                groups.put(gName, group);
268            }
269            return groups;
270        }
271    
272        /**
273         * Return the sharelib name for the action.
274         *
275         * @return returns <code>streaming</code> if mapreduce-streaming action, <code>NULL</code> otherwise.
276         * @param actionXml
277         */
278        @Override
279        protected String getDefaultShareLibName(Element actionXml) {
280            Namespace ns = actionXml.getNamespace();
281            return (actionXml.getChild("streaming", ns) != null) ? "mapreduce-streaming" : null;
282        }
283    
284        @Override
285        JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml,
286                Configuration actionConf) throws ActionExecutorException {
287            // If the user is using a regular MapReduce job and specified an uber jar, we need to also set it for the launcher;
288            // so we override createLauncherConf to call super and then to set the uber jar if specified. At this point, checking that
289            // uber jars are enabled and resolving the uber jar path is already done by setupActionConf() when it parsed the actionConf
290            // argument and we can just look up the uber jar in the actionConf argument.
291            JobConf launcherJobConf = super.createLauncherConf(actionFs, context, action, actionXml, actionConf);
292            Namespace ns = actionXml.getNamespace();
293            if (actionXml.getChild("streaming", ns) == null && actionXml.getChild("pipes", ns) == null) {
294                // Set for uber jar
295                String uberJar = actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR);
296                if (uberJar != null && uberJar.trim().length() > 0) {
297                    launcherJobConf.setJar(uberJar);
298                }
299            }
300            return launcherJobConf;
301        }
302    
303        public static void setStreaming(Configuration conf, String mapper, String reducer, String recordReader,
304                                        String[] recordReaderMapping, String[] env) {
305            if (mapper != null) {
306                conf.set("oozie.streaming.mapper", mapper);
307            }
308            if (reducer != null) {
309                conf.set("oozie.streaming.reducer", reducer);
310            }
311            if (recordReader != null) {
312                conf.set("oozie.streaming.record-reader", recordReader);
313            }
314            MapReduceMain.setStrings(conf, "oozie.streaming.record-reader-mapping", recordReaderMapping);
315            MapReduceMain.setStrings(conf, "oozie.streaming.env", env);
316        }
317    
318        @Override
319        protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{
320    
321            RunningJob runningJob;
322            String launcherJobId = action.getExternalId();
323            String childJobId = action.getExternalChildIDs();
324    
325            if (childJobId != null && childJobId.length() > 0) {
326                runningJob = jobClient.getJob(JobID.forName(childJobId));
327            }
328            else {
329                runningJob = jobClient.getJob(JobID.forName(launcherJobId));
330            }
331    
332            return runningJob;
333        }
334    }