This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.util.List;
023 import java.util.Map;
024
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.hadoop.fs.Path;
027 import org.apache.hadoop.mapred.Counters;
028 import org.apache.hadoop.mapred.JobClient;
029 import org.apache.hadoop.mapred.JobConf;
030 import org.apache.hadoop.mapred.JobID;
031 import org.apache.hadoop.mapred.RunningJob;
032 import org.apache.oozie.action.ActionExecutorException;
033 import org.apache.oozie.client.WorkflowAction;
034 import org.apache.oozie.util.XConfiguration;
035 import org.apache.oozie.util.XLog;
036 import org.apache.oozie.util.XmlUtils;
037 import org.jdom.Element;
038 import org.jdom.Namespace;
039 import org.json.simple.JSONObject;
040 import org.mortbay.util.ajax.JSON;
041
042 public class MapReduceActionExecutor extends JavaActionExecutor {
043
044 public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
045 public static final String HADOOP_COUNTERS = "hadoop.counters";
046 private XLog log = XLog.getLog(getClass());
047
048 public MapReduceActionExecutor() {
049 super("map-reduce");
050 }
051
052 @Override
053 protected List<Class> getLauncherClasses() {
054 List<Class> classes = super.getLauncherClasses();
055 classes.add(LauncherMain.class);
056 classes.add(MapReduceMain.class);
057 classes.add(StreamingMain.class);
058 classes.add(PipesMain.class);
059 return classes;
060 }
061
062 @Override
063 protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
064 String mainClass;
065 Namespace ns = actionXml.getNamespace();
066 if (actionXml.getChild("streaming", ns) != null) {
067 mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, StreamingMain.class.getName());
068 }
069 else {
070 if (actionXml.getChild("pipes", ns) != null) {
071 mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, PipesMain.class.getName());
072 }
073 else {
074 mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, MapReduceMain.class.getName());
075 }
076 }
077 return mainClass;
078 }
079
080 @Override
081 Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) throws ActionExecutorException {
082 super.setupLauncherConf(conf, actionXml, appPath, context);
083 conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true);
084 return conf;
085 }
086
087 @Override
088 @SuppressWarnings("unchecked")
089 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
090 throws ActionExecutorException {
091 Namespace ns = actionXml.getNamespace();
092 if (actionXml.getChild("streaming", ns) != null) {
093 Element streamingXml = actionXml.getChild("streaming", ns);
094 String mapper = streamingXml.getChildTextTrim("mapper", ns);
095 String reducer = streamingXml.getChildTextTrim("reducer", ns);
096 String recordReader = streamingXml.getChildTextTrim("record-reader", ns);
097 List<Element> list = (List<Element>) streamingXml.getChildren("record-reader-mapping", ns);
098 String[] recordReaderMapping = new String[list.size()];
099 for (int i = 0; i < list.size(); i++) {
100 recordReaderMapping[i] = list.get(i).getTextTrim();
101 }
102 list = (List<Element>) streamingXml.getChildren("env", ns);
103 String[] env = new String[list.size()];
104 for (int i = 0; i < list.size(); i++) {
105 env[i] = list.get(i).getTextTrim();
106 }
107 StreamingMain.setStreaming(actionConf, mapper, reducer, recordReader, recordReaderMapping, env);
108 }
109 else {
110 if (actionXml.getChild("pipes", ns) != null) {
111 Element pipesXml = actionXml.getChild("pipes", ns);
112 String map = pipesXml.getChildTextTrim("map", ns);
113 String reduce = pipesXml.getChildTextTrim("reduce", ns);
114 String inputFormat = pipesXml.getChildTextTrim("inputformat", ns);
115 String partitioner = pipesXml.getChildTextTrim("partitioner", ns);
116 String writer = pipesXml.getChildTextTrim("writer", ns);
117 String program = pipesXml.getChildTextTrim("program", ns);
118 PipesMain.setPipes(actionConf, map, reduce, inputFormat, partitioner, writer, program, appPath);
119 }
120 }
121 actionConf = super.setupActionConf(actionConf, context, actionXml, appPath);
122 return actionConf;
123 }
124
125 @Override
126 public void end(Context context, WorkflowAction action) throws ActionExecutorException {
127 super.end(context, action);
128 JobClient jobClient = null;
129 boolean exception = false;
130 try {
131 if (action.getStatus() == WorkflowAction.Status.OK) {
132 Element actionXml = XmlUtils.parseXml(action.getConf());
133 JobConf jobConf = createBaseHadoopConf(context, actionXml);
134 jobClient = createJobClient(context, jobConf);
135 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
136 if (runningJob == null) {
137 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "MR002",
138 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", action
139 .getExternalId(), action.getId());
140 }
141
142 // TODO this has to be done in a better way
143 if (!runningJob.getJobName().startsWith("oozie:action:")) {
144 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "MR001",
145 "ID swap should have happened in launcher job [{0}]", action.getExternalId());
146 }
147
148 Counters counters = runningJob.getCounters();
149 if (counters != null) {
150 ActionStats stats = new MRStats(counters);
151 String statsJsonString = stats.toJSON();
152 context.setVar(HADOOP_COUNTERS, statsJsonString);
153
154 // If action stats write property is set to false by user or
155 // size of stats is greater than the maximum allowed size,
156 // do not store the action stats
157 if (Boolean.parseBoolean(evaluateConfigurationProperty(actionXml,
158 OOZIE_ACTION_EXTERNAL_STATS_WRITE, "false"))
159 && (statsJsonString.getBytes().length <= getMaxExternalStatsSize())) {
160 context.setExecutionStats(statsJsonString);
161 log.debug(
162 "Printing stats for Map-Reduce action as a JSON string : [{0}]" + statsJsonString);
163 }
164 }
165 else {
166 context.setVar(HADOOP_COUNTERS, "");
167 XLog.getLog(getClass()).warn("Could not find Hadoop Counters for: [{0}]", action.getExternalId());
168 }
169 }
170 }
171 catch (Exception ex) {
172 exception = true;
173 throw convertException(ex);
174 }
175 finally {
176 if (jobClient != null) {
177 try {
178 jobClient.close();
179 }
180 catch (Exception e) {
181 if (exception) {
182 log.error("JobClient error: ", e);
183 }
184 else {
185 throw convertException(e);
186 }
187 }
188 }
189 }
190 }
191
192 // Return the value of the specified configuration property
193 private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue) throws ActionExecutorException {
194 try {
195 if (actionConf != null) {
196 Namespace ns = actionConf.getNamespace();
197 Element e = actionConf.getChild("configuration", ns);
198 String strConf = XmlUtils.prettyPrint(e).toString();
199 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
200 return inlineConf.get(key, defaultValue);
201 }
202 return "";
203 }
204 catch (IOException ex) {
205 throw convertException(ex);
206 }
207 }
208
209 @SuppressWarnings("unchecked")
210 private JSONObject counterstoJson(Counters counters) {
211
212 if (counters == null) {
213 return null;
214 }
215
216 JSONObject groups = new JSONObject();
217 for (String gName : counters.getGroupNames()) {
218 JSONObject group = new JSONObject();
219 for (Counters.Counter counter : counters.getGroup(gName)) {
220 String cName = counter.getName();
221 Long cValue = counter.getCounter();
222 group.put(cName, cValue);
223 }
224 groups.put(gName, group);
225 }
226 return groups;
227 }
228
229 /**
230 * Return the sharelib postfix for the action.
231 *
232 * @param context executor context.
233 * @param actionXml the action XML.
234 * @return the action sharelib post fix, this implementation returns <code>NULL</code>
235 * or <code>streaming</code> if the mapreduce action is streaming.
236 */
237 protected String getShareLibPostFix(Context context, Element actionXml) {
238 Namespace ns = actionXml.getNamespace();
239 return (actionXml.getChild("streaming", ns) != null) ? "mapreduce-streaming" : null;
240 }
241
242 }