This project has retired. For details please refer to its Attic page.
001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.util.List;
021    
022    import org.apache.hadoop.conf.Configuration;
023    import org.apache.hadoop.fs.Path;
024    import org.apache.hadoop.mapred.Counters;
025    import org.apache.hadoop.mapred.JobClient;
026    import org.apache.hadoop.mapred.JobConf;
027    import org.apache.hadoop.mapred.JobID;
028    import org.apache.hadoop.mapred.RunningJob;
029    import org.apache.oozie.action.ActionExecutorException;
030    import org.apache.oozie.client.WorkflowAction;
031    import org.apache.oozie.util.XConfiguration;
032    import org.apache.oozie.util.XLog;
033    import org.apache.oozie.util.XmlUtils;
034    import org.jdom.Element;
035    import org.jdom.Namespace;
036    import org.json.simple.JSONObject;
037    
038    public class MapReduceActionExecutor extends JavaActionExecutor {
039    
040        public static final String HADOOP_COUNTERS = "hadoop.counters";
041        private XLog log = XLog.getLog(getClass());
042    
043        public MapReduceActionExecutor() {
044            super("map-reduce");
045        }
046    
047        @Override
048        protected List<Class> getLauncherClasses() {
049            List<Class> classes = super.getLauncherClasses();
050            classes.add(LauncherMain.class);
051            classes.add(MapReduceMain.class);
052            classes.add(StreamingMain.class);
053            classes.add(PipesMain.class);
054            return classes;
055        }
056    
057        @Override
058        protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
059            String mainClass;
060            Namespace ns = actionXml.getNamespace();
061            if (actionXml.getChild("streaming", ns) != null) {
062                mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, StreamingMain.class.getName());
063            }
064            else {
065                if (actionXml.getChild("pipes", ns) != null) {
066                    mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, PipesMain.class.getName());
067                }
068                else {
069                    mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, MapReduceMain.class.getName());
070                }
071            }
072            return mainClass;
073        }
074    
075        @Override
076        Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) throws ActionExecutorException {
077            super.setupLauncherConf(conf, actionXml, appPath, context);
078            conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true);
079            return conf;
080        }
081    
082        @Override
083        @SuppressWarnings("unchecked")
084        Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
085                throws ActionExecutorException {
086            Namespace ns = actionXml.getNamespace();
087            if (actionXml.getChild("streaming", ns) != null) {
088                Element streamingXml = actionXml.getChild("streaming", ns);
089                String mapper = streamingXml.getChildTextTrim("mapper", ns);
090                String reducer = streamingXml.getChildTextTrim("reducer", ns);
091                String recordReader = streamingXml.getChildTextTrim("record-reader", ns);
092                List<Element> list = (List<Element>) streamingXml.getChildren("record-reader-mapping", ns);
093                String[] recordReaderMapping = new String[list.size()];
094                for (int i = 0; i < list.size(); i++) {
095                    recordReaderMapping[i] = list.get(i).getTextTrim();
096                }
097                list = (List<Element>) streamingXml.getChildren("env", ns);
098                String[] env = new String[list.size()];
099                for (int i = 0; i < list.size(); i++) {
100                    env[i] = list.get(i).getTextTrim();
101                }
102                StreamingMain.setStreaming(actionConf, mapper, reducer, recordReader, recordReaderMapping, env);
103            }
104            else {
105                if (actionXml.getChild("pipes", ns) != null) {
106                    Element pipesXml = actionXml.getChild("pipes", ns);
107                    String map = pipesXml.getChildTextTrim("map", ns);
108                    String reduce = pipesXml.getChildTextTrim("reduce", ns);
109                    String inputFormat = pipesXml.getChildTextTrim("inputformat", ns);
110                    String partitioner = pipesXml.getChildTextTrim("partitioner", ns);
111                    String writer = pipesXml.getChildTextTrim("writer", ns);
112                    String program = pipesXml.getChildTextTrim("program", ns);
113                    PipesMain.setPipes(actionConf, map, reduce, inputFormat, partitioner, writer, program, appPath);
114                }
115            }
116            actionConf = super.setupActionConf(actionConf, context, actionXml, appPath);
117            return actionConf;
118        }
119    
120        @Override
121        public void end(Context context, WorkflowAction action) throws ActionExecutorException {
122            super.end(context, action);
123            JobClient jobClient = null;
124            boolean exception = false;
125            try {
126                if (action.getStatus() == WorkflowAction.Status.OK) {
127                    Element actionXml = XmlUtils.parseXml(action.getConf());
128                    Configuration conf = createBaseHadoopConf(context, actionXml);
129                    JobConf jobConf = new JobConf();
130                    XConfiguration.copy(conf, jobConf);
131                    jobClient = createJobClient(context, jobConf);
132                    RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
133                    if (runningJob == null) {
134                        throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "MR002",
135                                                          "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", action
136                                .getExternalId(), action.getId());
137                    }
138    
139                    // TODO this has to be done in a better way
140                    if (!runningJob.getJobName().startsWith("oozie:action:")) {
141                        throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "MR001",
142                                                          "ID swap should have happened in launcher job [{0}]", action.getExternalId());
143                    }
144                    Counters counters = runningJob.getCounters();
145                    if (counters != null) {
146                        JSONObject json = counterstoJson(counters);
147                        context.setVar(HADOOP_COUNTERS, json.toJSONString());
148                    }
149                    else {
150    
151                        context.setVar(HADOOP_COUNTERS, "");
152    
153                        XLog.getLog(getClass()).warn("Could not find Hadoop Counters for: [{0}]", action.getExternalId());
154                    }
155                }
156            }
157            catch (Exception ex) {
158                exception = true;
159                throw convertException(ex);
160            }
161            finally {
162                if (jobClient != null) {
163                    try {
164                        jobClient.close();
165                    }
166                    catch (Exception e) {
167                        if (exception) {
168                            log.error("JobClient error: ", e);
169                        }
170                        else {
171                            throw convertException(e);
172                        }
173                    }
174                }
175            }
176        }
177    
178        @SuppressWarnings("unchecked")
179        private JSONObject counterstoJson(Counters counters) {
180    
181            if (counters == null) {
182                return null;
183            }
184    
185            JSONObject groups = new JSONObject();
186            for (String gName : counters.getGroupNames()) {
187                JSONObject group = new JSONObject();
188                for (Counters.Counter counter : counters.getGroup(gName)) {
189                    String cName = counter.getName();
190                    Long cValue = counter.getCounter();
191                    group.put(cName, cValue);
192                }
193                groups.put(gName, group);
194            }
195            return groups;
196        }
197    
198    }