001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.util.List;
023    
024    import org.apache.hadoop.conf.Configuration;
025    import org.apache.hadoop.fs.FileSystem;
026    import org.apache.hadoop.fs.Path;
027    import org.apache.hadoop.mapred.Counters;
028    import org.apache.hadoop.mapred.JobClient;
029    import org.apache.hadoop.mapred.JobConf;
030    import org.apache.hadoop.mapred.JobID;
031    import org.apache.hadoop.mapred.RunningJob;
032    import org.apache.oozie.action.ActionExecutorException;
033    import org.apache.oozie.client.WorkflowAction;
034    import org.apache.oozie.service.Services;
035    import org.apache.oozie.util.XConfiguration;
036    import org.apache.oozie.util.XLog;
037    import org.apache.oozie.util.XmlUtils;
038    import org.jdom.Element;
039    import org.jdom.Namespace;
040    import org.json.simple.JSONObject;
041    
042    public class MapReduceActionExecutor extends JavaActionExecutor {
043    
044        public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
045        public static final String HADOOP_COUNTERS = "hadoop.counters";
046        public static final String OOZIE_MAPREDUCE_UBER_JAR = "oozie.mapreduce.uber.jar";
047        public static final String OOZIE_MAPREDUCE_UBER_JAR_ENABLE = "oozie.action.mapreduce.uber.jar.enable";
048        private XLog log = XLog.getLog(getClass());
049    
050        public MapReduceActionExecutor() {
051            super("map-reduce");
052        }
053    
054        @Override
055        protected List<Class> getLauncherClasses() {
056            List<Class> classes = super.getLauncherClasses();
057            classes.add(LauncherMain.class);
058            classes.add(MapReduceMain.class);
059            classes.add(StreamingMain.class);
060            classes.add(PipesMain.class);
061            return classes;
062        }
063    
064        @Override
065        protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
066            String mainClass;
067            Namespace ns = actionXml.getNamespace();
068            if (actionXml.getChild("streaming", ns) != null) {
069                mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, StreamingMain.class.getName());
070            }
071            else {
072                if (actionXml.getChild("pipes", ns) != null) {
073                    mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, PipesMain.class.getName());
074                }
075                else {
076                    mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, MapReduceMain.class.getName());
077                }
078            }
079            return mainClass;
080        }
081    
082        @Override
083        Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) throws ActionExecutorException {
084            super.setupLauncherConf(conf, actionXml, appPath, context);
085            conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true);
086            return conf;
087        }
088    
089        @Override
090        @SuppressWarnings("unchecked")
091        Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
092                throws ActionExecutorException {
093            boolean regularMR = false;
094            Namespace ns = actionXml.getNamespace();
095            if (actionXml.getChild("streaming", ns) != null) {
096                Element streamingXml = actionXml.getChild("streaming", ns);
097                String mapper = streamingXml.getChildTextTrim("mapper", ns);
098                String reducer = streamingXml.getChildTextTrim("reducer", ns);
099                String recordReader = streamingXml.getChildTextTrim("record-reader", ns);
100                List<Element> list = (List<Element>) streamingXml.getChildren("record-reader-mapping", ns);
101                String[] recordReaderMapping = new String[list.size()];
102                for (int i = 0; i < list.size(); i++) {
103                    recordReaderMapping[i] = list.get(i).getTextTrim();
104                }
105                list = (List<Element>) streamingXml.getChildren("env", ns);
106                String[] env = new String[list.size()];
107                for (int i = 0; i < list.size(); i++) {
108                    env[i] = list.get(i).getTextTrim();
109                }
110                StreamingMain.setStreaming(actionConf, mapper, reducer, recordReader, recordReaderMapping, env);
111            }
112            else {
113                if (actionXml.getChild("pipes", ns) != null) {
114                    Element pipesXml = actionXml.getChild("pipes", ns);
115                    String map = pipesXml.getChildTextTrim("map", ns);
116                    String reduce = pipesXml.getChildTextTrim("reduce", ns);
117                    String inputFormat = pipesXml.getChildTextTrim("inputformat", ns);
118                    String partitioner = pipesXml.getChildTextTrim("partitioner", ns);
119                    String writer = pipesXml.getChildTextTrim("writer", ns);
120                    String program = pipesXml.getChildTextTrim("program", ns);
121                    PipesMain.setPipes(actionConf, map, reduce, inputFormat, partitioner, writer, program, appPath);
122                }
123                else {
124                    regularMR = true;
125                }
126            }
127            actionConf = super.setupActionConf(actionConf, context, actionXml, appPath);
128    
129            // For "regular" (not streaming or pipes) MR jobs
130            if (regularMR) {
131                // Resolve uber jar path (has to be done after super because oozie.mapreduce.uber.jar is under <configuration>)
132                String uberJar = actionConf.get(OOZIE_MAPREDUCE_UBER_JAR);
133                if (uberJar != null) {
134                    if (!Services.get().getConf().getBoolean(OOZIE_MAPREDUCE_UBER_JAR_ENABLE, false)) {
135                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "MR003",
136                                "{0} property is not allowed.  Set {1} to true in oozie-site to enable.", OOZIE_MAPREDUCE_UBER_JAR,
137                                OOZIE_MAPREDUCE_UBER_JAR_ENABLE);
138                    }
139                    String nameNode = actionXml.getChildTextTrim("name-node", ns);
140                    if (nameNode != null) {
141                        Path uberJarPath = new Path(uberJar);
142                        if (uberJarPath.toUri().getScheme() == null || uberJarPath.toUri().getAuthority() == null) {
143                            if (uberJarPath.isAbsolute()) {     // absolute path without namenode --> prepend namenode
144                                Path nameNodePath = new Path(nameNode);
145                                String nameNodeSchemeAuthority = nameNodePath.toUri().getScheme()
146                                        + "://" + nameNodePath.toUri().getAuthority();
147                                actionConf.set(OOZIE_MAPREDUCE_UBER_JAR, new Path(nameNodeSchemeAuthority + uberJarPath).toString());
148                            }
149                            else {                              // relative path --> prepend app path
150                                actionConf.set(OOZIE_MAPREDUCE_UBER_JAR, new Path(appPath, uberJarPath).toString());
151                            }
152                        }
153                    }
154                }
155            }
156            else {
157                if (actionConf.get(OOZIE_MAPREDUCE_UBER_JAR) != null) {
158                    log.warn("The " + OOZIE_MAPREDUCE_UBER_JAR + " property is only applicable for MapReduce (not streaming nor pipes)"
159                            + " workflows, ignoring");
160                    actionConf.set(OOZIE_MAPREDUCE_UBER_JAR, "");
161                }
162            }
163    
164            return actionConf;
165        }
166    
167        @Override
168        public void end(Context context, WorkflowAction action) throws ActionExecutorException {
169            super.end(context, action);
170            JobClient jobClient = null;
171            boolean exception = false;
172            try {
173                if (action.getStatus() == WorkflowAction.Status.OK) {
174                    Element actionXml = XmlUtils.parseXml(action.getConf());
175                    JobConf jobConf = createBaseHadoopConf(context, actionXml);
176                    jobClient = createJobClient(context, jobConf);
177                    RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
178                    if (runningJob == null) {
179                        throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "MR002",
180                                                          "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", action
181                                .getExternalId(), action.getId());
182                    }
183    
184                    Counters counters = runningJob.getCounters();
185                    if (counters != null) {
186                        ActionStats stats = new MRStats(counters);
187                        String statsJsonString = stats.toJSON();
188                        context.setVar(HADOOP_COUNTERS, statsJsonString);
189    
190                        // If action stats write property is set to false by user or
191                        // size of stats is greater than the maximum allowed size,
192                        // do not store the action stats
193                        if (Boolean.parseBoolean(evaluateConfigurationProperty(actionXml,
194                                OOZIE_ACTION_EXTERNAL_STATS_WRITE, "false"))
195                                && (statsJsonString.getBytes().length <= getMaxExternalStatsSize())) {
196                            context.setExecutionStats(statsJsonString);
197                            log.debug(
198                                    "Printing stats for Map-Reduce action as a JSON string : [{0}]", statsJsonString);
199                        }
200                    }
201                    else {
202                        context.setVar(HADOOP_COUNTERS, "");
203                        XLog.getLog(getClass()).warn("Could not find Hadoop Counters for: [{0}]", action.getExternalId());
204                    }
205                }
206            }
207            catch (Exception ex) {
208                exception = true;
209                throw convertException(ex);
210            }
211            finally {
212                if (jobClient != null) {
213                    try {
214                        jobClient.close();
215                    }
216                    catch (Exception e) {
217                        if (exception) {
218                            log.error("JobClient error: ", e);
219                        }
220                        else {
221                            throw convertException(e);
222                        }
223                    }
224                }
225            }
226        }
227    
228        // Return the value of the specified configuration property
229        private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue) throws ActionExecutorException {
230            try {
231                if (actionConf != null) {
232                    Namespace ns = actionConf.getNamespace();
233                    Element e = actionConf.getChild("configuration", ns);
234                    String strConf = XmlUtils.prettyPrint(e).toString();
235                    XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
236                    return inlineConf.get(key, defaultValue);
237                }
238                return "";
239            }
240            catch (IOException ex) {
241                throw convertException(ex);
242            }
243        }
244    
245        @SuppressWarnings("unchecked")
246        private JSONObject counterstoJson(Counters counters) {
247    
248            if (counters == null) {
249                return null;
250            }
251    
252            JSONObject groups = new JSONObject();
253            for (String gName : counters.getGroupNames()) {
254                JSONObject group = new JSONObject();
255                for (Counters.Counter counter : counters.getGroup(gName)) {
256                    String cName = counter.getName();
257                    Long cValue = counter.getCounter();
258                    group.put(cName, cValue);
259                }
260                groups.put(gName, group);
261            }
262            return groups;
263        }
264    
265        /**
266         * Return the sharelib name for the action.
267         *
268         * @return returns <code>streaming</code> if mapreduce-streaming action, <code>NULL</code> otherwise.
269         * @param actionXml
270         */
271        @Override
272        protected String getDefaultShareLibName(Element actionXml) {
273            Namespace ns = actionXml.getNamespace();
274            return (actionXml.getChild("streaming", ns) != null) ? "mapreduce-streaming" : null;
275        }
276    
277        @Override
278        JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml,
279                Configuration actionConf) throws ActionExecutorException {
280            // If the user is using a regular MapReduce job and specified an uber jar, we need to also set it for the launcher;
281            // so we override createLauncherConf to call super and then to set the uber jar if specified. At this point, checking that
282            // uber jars are enabled and resolving the uber jar path is already done by setupActionConf() when it parsed the actionConf
283            // argument and we can just look up the uber jar in the actionConf argument.
284            JobConf launcherJobConf = super.createLauncherConf(actionFs, context, action, actionXml, actionConf);
285            Namespace ns = actionXml.getNamespace();
286            if (actionXml.getChild("streaming", ns) == null && actionXml.getChild("pipes", ns) == null) {
287                // Set for uber jar
288                String uberJar = actionConf.get(MapReduceActionExecutor.OOZIE_MAPREDUCE_UBER_JAR);
289                if (uberJar != null && uberJar.trim().length() > 0) {
290                    launcherJobConf.setJar(uberJar);
291                }
292            }
293            return launcherJobConf;
294        }
295    }