This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import java.util.List;
021
022 import org.apache.hadoop.conf.Configuration;
023 import org.apache.hadoop.fs.Path;
024 import org.apache.hadoop.mapred.Counters;
025 import org.apache.hadoop.mapred.JobClient;
026 import org.apache.hadoop.mapred.JobConf;
027 import org.apache.hadoop.mapred.JobID;
028 import org.apache.hadoop.mapred.RunningJob;
029 import org.apache.oozie.action.ActionExecutorException;
030 import org.apache.oozie.client.WorkflowAction;
031 import org.apache.oozie.util.XConfiguration;
032 import org.apache.oozie.util.XLog;
033 import org.apache.oozie.util.XmlUtils;
034 import org.jdom.Element;
035 import org.jdom.Namespace;
036 import org.json.simple.JSONObject;
037
038 public class MapReduceActionExecutor extends JavaActionExecutor {
039
040 public static final String HADOOP_COUNTERS = "hadoop.counters";
041 private XLog log = XLog.getLog(getClass());
042
043 public MapReduceActionExecutor() {
044 super("map-reduce");
045 }
046
047 @Override
048 protected List<Class> getLauncherClasses() {
049 List<Class> classes = super.getLauncherClasses();
050 classes.add(LauncherMain.class);
051 classes.add(MapReduceMain.class);
052 classes.add(StreamingMain.class);
053 classes.add(PipesMain.class);
054 return classes;
055 }
056
057 @Override
058 protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
059 String mainClass;
060 Namespace ns = actionXml.getNamespace();
061 if (actionXml.getChild("streaming", ns) != null) {
062 mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, StreamingMain.class.getName());
063 }
064 else {
065 if (actionXml.getChild("pipes", ns) != null) {
066 mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, PipesMain.class.getName());
067 }
068 else {
069 mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, MapReduceMain.class.getName());
070 }
071 }
072 return mainClass;
073 }
074
075 @Override
076 Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) throws ActionExecutorException {
077 super.setupLauncherConf(conf, actionXml, appPath, context);
078 conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true);
079 return conf;
080 }
081
082 @Override
083 @SuppressWarnings("unchecked")
084 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
085 throws ActionExecutorException {
086 Namespace ns = actionXml.getNamespace();
087 if (actionXml.getChild("streaming", ns) != null) {
088 Element streamingXml = actionXml.getChild("streaming", ns);
089 String mapper = streamingXml.getChildTextTrim("mapper", ns);
090 String reducer = streamingXml.getChildTextTrim("reducer", ns);
091 String recordReader = streamingXml.getChildTextTrim("record-reader", ns);
092 List<Element> list = (List<Element>) streamingXml.getChildren("record-reader-mapping", ns);
093 String[] recordReaderMapping = new String[list.size()];
094 for (int i = 0; i < list.size(); i++) {
095 recordReaderMapping[i] = list.get(i).getTextTrim();
096 }
097 list = (List<Element>) streamingXml.getChildren("env", ns);
098 String[] env = new String[list.size()];
099 for (int i = 0; i < list.size(); i++) {
100 env[i] = list.get(i).getTextTrim();
101 }
102 StreamingMain.setStreaming(actionConf, mapper, reducer, recordReader, recordReaderMapping, env);
103 }
104 else {
105 if (actionXml.getChild("pipes", ns) != null) {
106 Element pipesXml = actionXml.getChild("pipes", ns);
107 String map = pipesXml.getChildTextTrim("map", ns);
108 String reduce = pipesXml.getChildTextTrim("reduce", ns);
109 String inputFormat = pipesXml.getChildTextTrim("inputformat", ns);
110 String partitioner = pipesXml.getChildTextTrim("partitioner", ns);
111 String writer = pipesXml.getChildTextTrim("writer", ns);
112 String program = pipesXml.getChildTextTrim("program", ns);
113 PipesMain.setPipes(actionConf, map, reduce, inputFormat, partitioner, writer, program, appPath);
114 }
115 }
116 actionConf = super.setupActionConf(actionConf, context, actionXml, appPath);
117 return actionConf;
118 }
119
120 @Override
121 public void end(Context context, WorkflowAction action) throws ActionExecutorException {
122 super.end(context, action);
123 JobClient jobClient = null;
124 boolean exception = false;
125 try {
126 if (action.getStatus() == WorkflowAction.Status.OK) {
127 Element actionXml = XmlUtils.parseXml(action.getConf());
128 Configuration conf = createBaseHadoopConf(context, actionXml);
129 JobConf jobConf = new JobConf();
130 XConfiguration.copy(conf, jobConf);
131 jobClient = createJobClient(context, jobConf);
132 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
133 if (runningJob == null) {
134 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "MR002",
135 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", action
136 .getExternalId(), action.getId());
137 }
138
139 // TODO this has to be done in a better way
140 if (!runningJob.getJobName().startsWith("oozie:action:")) {
141 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "MR001",
142 "ID swap should have happened in launcher job [{0}]", action.getExternalId());
143 }
144 Counters counters = runningJob.getCounters();
145 if (counters != null) {
146 JSONObject json = counterstoJson(counters);
147 context.setVar(HADOOP_COUNTERS, json.toJSONString());
148 }
149 else {
150
151 context.setVar(HADOOP_COUNTERS, "");
152
153 XLog.getLog(getClass()).warn("Could not find Hadoop Counters for: [{0}]", action.getExternalId());
154 }
155 }
156 }
157 catch (Exception ex) {
158 exception = true;
159 throw convertException(ex);
160 }
161 finally {
162 if (jobClient != null) {
163 try {
164 jobClient.close();
165 }
166 catch (Exception e) {
167 if (exception) {
168 log.error("JobClient error: ", e);
169 }
170 else {
171 throw convertException(e);
172 }
173 }
174 }
175 }
176 }
177
178 @SuppressWarnings("unchecked")
179 private JSONObject counterstoJson(Counters counters) {
180
181 if (counters == null) {
182 return null;
183 }
184
185 JSONObject groups = new JSONObject();
186 for (String gName : counters.getGroupNames()) {
187 JSONObject group = new JSONObject();
188 for (Counters.Counter counter : counters.getGroup(gName)) {
189 String cName = counter.getName();
190 Long cValue = counter.getCounter();
191 group.put(cName, cValue);
192 }
193 groups.put(gName, group);
194 }
195 return groups;
196 }
197
198 }