001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.util.List;
023    import java.util.Map;
024    
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.fs.Path;
027    import org.apache.hadoop.mapred.Counters;
028    import org.apache.hadoop.mapred.JobClient;
029    import org.apache.hadoop.mapred.JobConf;
030    import org.apache.hadoop.mapred.JobID;
031    import org.apache.hadoop.mapred.RunningJob;
032    import org.apache.oozie.action.ActionExecutorException;
033    import org.apache.oozie.client.WorkflowAction;
034    import org.apache.oozie.util.XConfiguration;
035    import org.apache.oozie.util.XLog;
036    import org.apache.oozie.util.XmlUtils;
037    import org.jdom.Element;
038    import org.jdom.Namespace;
039    import org.json.simple.JSONObject;
040    import org.mortbay.util.ajax.JSON;
041    
042    public class MapReduceActionExecutor extends JavaActionExecutor {
043    
044        public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
045        public static final String HADOOP_COUNTERS = "hadoop.counters";
046        private XLog log = XLog.getLog(getClass());
047    
048        public MapReduceActionExecutor() {
049            super("map-reduce");
050        }
051    
052        @Override
053        protected List<Class> getLauncherClasses() {
054            List<Class> classes = super.getLauncherClasses();
055            classes.add(LauncherMain.class);
056            classes.add(MapReduceMain.class);
057            classes.add(StreamingMain.class);
058            classes.add(PipesMain.class);
059            return classes;
060        }
061    
062        @Override
063        protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
064            String mainClass;
065            Namespace ns = actionXml.getNamespace();
066            if (actionXml.getChild("streaming", ns) != null) {
067                mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, StreamingMain.class.getName());
068            }
069            else {
070                if (actionXml.getChild("pipes", ns) != null) {
071                    mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, PipesMain.class.getName());
072                }
073                else {
074                    mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, MapReduceMain.class.getName());
075                }
076            }
077            return mainClass;
078        }
079    
080        @Override
081        Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) throws ActionExecutorException {
082            super.setupLauncherConf(conf, actionXml, appPath, context);
083            conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true);
084            return conf;
085        }
086    
087        @Override
088        @SuppressWarnings("unchecked")
089        Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
090                throws ActionExecutorException {
091            Namespace ns = actionXml.getNamespace();
092            if (actionXml.getChild("streaming", ns) != null) {
093                Element streamingXml = actionXml.getChild("streaming", ns);
094                String mapper = streamingXml.getChildTextTrim("mapper", ns);
095                String reducer = streamingXml.getChildTextTrim("reducer", ns);
096                String recordReader = streamingXml.getChildTextTrim("record-reader", ns);
097                List<Element> list = (List<Element>) streamingXml.getChildren("record-reader-mapping", ns);
098                String[] recordReaderMapping = new String[list.size()];
099                for (int i = 0; i < list.size(); i++) {
100                    recordReaderMapping[i] = list.get(i).getTextTrim();
101                }
102                list = (List<Element>) streamingXml.getChildren("env", ns);
103                String[] env = new String[list.size()];
104                for (int i = 0; i < list.size(); i++) {
105                    env[i] = list.get(i).getTextTrim();
106                }
107                StreamingMain.setStreaming(actionConf, mapper, reducer, recordReader, recordReaderMapping, env);
108            }
109            else {
110                if (actionXml.getChild("pipes", ns) != null) {
111                    Element pipesXml = actionXml.getChild("pipes", ns);
112                    String map = pipesXml.getChildTextTrim("map", ns);
113                    String reduce = pipesXml.getChildTextTrim("reduce", ns);
114                    String inputFormat = pipesXml.getChildTextTrim("inputformat", ns);
115                    String partitioner = pipesXml.getChildTextTrim("partitioner", ns);
116                    String writer = pipesXml.getChildTextTrim("writer", ns);
117                    String program = pipesXml.getChildTextTrim("program", ns);
118                    PipesMain.setPipes(actionConf, map, reduce, inputFormat, partitioner, writer, program, appPath);
119                }
120            }
121            actionConf = super.setupActionConf(actionConf, context, actionXml, appPath);
122            return actionConf;
123        }
124    
125        @Override
126        public void end(Context context, WorkflowAction action) throws ActionExecutorException {
127            super.end(context, action);
128            JobClient jobClient = null;
129            boolean exception = false;
130            try {
131                if (action.getStatus() == WorkflowAction.Status.OK) {
132                    Element actionXml = XmlUtils.parseXml(action.getConf());
133                    JobConf jobConf = createBaseHadoopConf(context, actionXml);
134                    jobClient = createJobClient(context, jobConf);
135                    RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
136                    if (runningJob == null) {
137                        throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "MR002",
138                                                          "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", action
139                                .getExternalId(), action.getId());
140                    }
141    
142                    // TODO this has to be done in a better way
143                    if (!runningJob.getJobName().startsWith("oozie:action:")) {
144                        throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "MR001",
145                                                          "ID swap should have happened in launcher job [{0}]", action.getExternalId());
146                    }
147    
148                    Counters counters = runningJob.getCounters();
149                    if (counters != null) {
150                        ActionStats stats = new MRStats(counters);
151                        String statsJsonString = stats.toJSON();
152                        context.setVar(HADOOP_COUNTERS, statsJsonString);
153    
154                        // If action stats write property is set to false by user or
155                        // size of stats is greater than the maximum allowed size,
156                        // do not store the action stats
157                        if (Boolean.parseBoolean(evaluateConfigurationProperty(actionXml,
158                                OOZIE_ACTION_EXTERNAL_STATS_WRITE, "false"))
159                                && (statsJsonString.getBytes().length <= getMaxExternalStatsSize())) {
160                            context.setExecutionStats(statsJsonString);
161                            log.debug(
162                                    "Printing stats for Map-Reduce action as a JSON string : [{0}]" + statsJsonString);
163                        }
164                    }
165                    else {
166                        context.setVar(HADOOP_COUNTERS, "");
167                        XLog.getLog(getClass()).warn("Could not find Hadoop Counters for: [{0}]", action.getExternalId());
168                    }
169                }
170            }
171            catch (Exception ex) {
172                exception = true;
173                throw convertException(ex);
174            }
175            finally {
176                if (jobClient != null) {
177                    try {
178                        jobClient.close();
179                    }
180                    catch (Exception e) {
181                        if (exception) {
182                            log.error("JobClient error: ", e);
183                        }
184                        else {
185                            throw convertException(e);
186                        }
187                    }
188                }
189            }
190        }
191    
192        // Return the value of the specified configuration property
193        private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue) throws ActionExecutorException {
194            try {
195                if (actionConf != null) {
196                    Namespace ns = actionConf.getNamespace();
197                    Element e = actionConf.getChild("configuration", ns);
198                    String strConf = XmlUtils.prettyPrint(e).toString();
199                    XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
200                    return inlineConf.get(key, defaultValue);
201                }
202                return "";
203            }
204            catch (IOException ex) {
205                throw convertException(ex);
206            }
207        }
208    
209        @SuppressWarnings("unchecked")
210        private JSONObject counterstoJson(Counters counters) {
211    
212            if (counters == null) {
213                return null;
214            }
215    
216            JSONObject groups = new JSONObject();
217            for (String gName : counters.getGroupNames()) {
218                JSONObject group = new JSONObject();
219                for (Counters.Counter counter : counters.getGroup(gName)) {
220                    String cName = counter.getName();
221                    Long cValue = counter.getCounter();
222                    group.put(cName, cValue);
223                }
224                groups.put(gName, group);
225            }
226            return groups;
227        }
228    
229        /**
230         * Return the sharelib postfix for the action.
231         *
232         * @param context executor context.
233         * @param actionXml the action XML.
234         * @return the action sharelib post fix, this implementation returns <code>NULL</code>
235         * or <code>streaming</code> if the mapreduce action is streaming.
236         */
237        protected String getShareLibPostFix(Context context, Element actionXml) {
238            Namespace ns = actionXml.getNamespace();
239            return (actionXml.getChild("streaming", ns) != null) ? "mapreduce-streaming" : null;
240        }
241    
242    }