This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.util.List;
023
024 import org.apache.hadoop.conf.Configuration;
025 import org.apache.hadoop.fs.FileSystem;
026 import org.apache.hadoop.fs.Path;
027 import org.apache.hadoop.mapred.Counters;
028 import org.apache.hadoop.mapred.JobClient;
029 import org.apache.hadoop.mapred.JobConf;
030 import org.apache.hadoop.mapred.JobID;
031 import org.apache.hadoop.mapred.RunningJob;
032 import org.apache.oozie.action.ActionExecutorException;
033 import org.apache.oozie.client.WorkflowAction;
034 import org.apache.oozie.service.Services;
035 import org.apache.oozie.util.XConfiguration;
036 import org.apache.oozie.util.XLog;
037 import org.apache.oozie.util.XmlUtils;
038 import org.jdom.Element;
039 import org.jdom.Namespace;
040 import org.json.simple.JSONObject;
041
042 public class MapReduceActionExecutor extends JavaActionExecutor {
043
044 public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
045 public static final String HADOOP_COUNTERS = "hadoop.counters";
046 public static final String OOZIE_MAPREDUCE_UBER_JAR_ENABLE = "oozie.action.mapreduce.uber.jar.enable";
047 private static final String STREAMING_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.StreamingMain";
048 private XLog log = XLog.getLog(getClass());
049
050 public MapReduceActionExecutor() {
051 super("map-reduce");
052 }
053
054 @Override
055 protected List<Class> getLauncherClasses() {
056 List<Class> classes = super.getLauncherClasses();
057 classes.add(LauncherMain.class);
058 classes.add(MapReduceMain.class);
059 classes.add(PipesMain.class);
060 try {
061 classes.add(Class.forName(STREAMING_MAIN_CLASS_NAME));
062 }
063 catch (ClassNotFoundException e) {
064 throw new RuntimeException("Class not found", e);
065 }
066 return classes;
067 }
068
069 @Override
070 protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
071 String mainClass;
072 Namespace ns = actionXml.getNamespace();
073 if (actionXml.getChild("streaming", ns) != null) {
074 mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, STREAMING_MAIN_CLASS_NAME);
075 }
076 else {
077 if (actionXml.getChild("pipes", ns) != null) {
078 mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, PipesMain.class.getName());
079 }
080 else {
081 mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, MapReduceMain.class.getName());
082 }
083 }
084 return mainClass;
085 }
086
087 @Override
088 Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) throws ActionExecutorException {
089 super.setupLauncherConf(conf, actionXml, appPath, context);
090 conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true);
091 return conf;
092 }
093
094 @Override
095 @SuppressWarnings("unchecked")
096 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
097 throws ActionExecutorException {
098 boolean regularMR = false;
099 Namespace ns = actionXml.getNamespace();
100 if (actionXml.getChild("streaming", ns) != null) {
101 Element streamingXml = actionXml.getChild("streaming", ns);
102 String mapper = streamingXml.getChildTextTrim("mapper", ns);
103 String reducer = streamingXml.getChildTextTrim("reducer", ns);
104 String recordReader = streamingXml.getChildTextTrim("record-reader", ns);
105 List<Element> list = (List<Element>) streamingXml.getChildren("record-reader-mapping", ns);
106 String[] recordReaderMapping = new String[list.size()];
107 for (int i = 0; i < list.size(); i++) {
108 recordReaderMapping[i] = list.get(i).getTextTrim();
109 }
110 list = (List<Element>) streamingXml.getChildren("env", ns);
111 String[] env = new String[list.size()];
112 for (int i = 0; i < list.size(); i++) {
113 env[i] = list.get(i).getTextTrim();
114 }
115 setStreaming(actionConf, mapper, reducer, recordReader, recordReaderMapping, env);
116 }
117 else {
118 if (actionXml.getChild("pipes", ns) != null) {
119 Element pipesXml = actionXml.getChild("pipes", ns);
120 String map = pipesXml.getChildTextTrim("map", ns);
121 String reduce = pipesXml.getChildTextTrim("reduce", ns);
122 String inputFormat = pipesXml.getChildTextTrim("inputformat", ns);
123 String partitioner = pipesXml.getChildTextTrim("partitioner", ns);
124 String writer = pipesXml.getChildTextTrim("writer", ns);
125 String program = pipesXml.getChildTextTrim("program", ns);
126 PipesMain.setPipes(actionConf, map, reduce, inputFormat, partitioner, writer, program, appPath);
127 }
128 else {
129 regularMR = true;
130 }
131 }
132 actionConf = super.setupActionConf(actionConf, context, actionXml, appPath);
133
134 // For "regular" (not streaming or pipes) MR jobs
135 if (regularMR) {
136 // Resolve uber jar path (has to be done after super because oozie.mapreduce.uber.jar is under <configuration>)
137 String uberJar = actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR);
138 if (uberJar != null) {
139 if (!Services.get().getConf().getBoolean(OOZIE_MAPREDUCE_UBER_JAR_ENABLE, false)) {
140 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "MR003",
141 "{0} property is not allowed. Set {1} to true in oozie-site to enable.",
142 MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, OOZIE_MAPREDUCE_UBER_JAR_ENABLE);
143 }
144 String nameNode = actionXml.getChildTextTrim("name-node", ns);
145 if (nameNode != null) {
146 Path uberJarPath = new Path(uberJar);
147 if (uberJarPath.toUri().getScheme() == null || uberJarPath.toUri().getAuthority() == null) {
148 if (uberJarPath.isAbsolute()) { // absolute path without namenode --> prepend namenode
149 Path nameNodePath = new Path(nameNode);
150 String nameNodeSchemeAuthority = nameNodePath.toUri().getScheme()
151 + "://" + nameNodePath.toUri().getAuthority();
152 actionConf.set(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR,
153 new Path(nameNodeSchemeAuthority + uberJarPath).toString());
154 }
155 else { // relative path --> prepend app path
156 actionConf.set(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, new Path(appPath, uberJarPath).toString());
157 }
158 }
159 }
160 }
161 }
162 else {
163 if (actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR) != null) {
164 log.warn("The " + MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR + " property is only applicable for MapReduce (not"
165 + "streaming nor pipes) workflows, ignoring");
166 actionConf.set(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, "");
167 }
168 }
169
170 return actionConf;
171 }
172
173 @Override
174 public void end(Context context, WorkflowAction action) throws ActionExecutorException {
175 super.end(context, action);
176 JobClient jobClient = null;
177 boolean exception = false;
178 try {
179 if (action.getStatus() == WorkflowAction.Status.OK) {
180 Element actionXml = XmlUtils.parseXml(action.getConf());
181 JobConf jobConf = createBaseHadoopConf(context, actionXml);
182 jobClient = createJobClient(context, jobConf);
183 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalChildIDs()));
184 if (runningJob == null) {
185 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "MR002",
186 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!",
187 action.getExternalChildIDs(), action.getId());
188 }
189
190 Counters counters = runningJob.getCounters();
191 if (counters != null) {
192 ActionStats stats = new MRStats(counters);
193 String statsJsonString = stats.toJSON();
194 context.setVar(HADOOP_COUNTERS, statsJsonString);
195
196 // If action stats write property is set to false by user or
197 // size of stats is greater than the maximum allowed size,
198 // do not store the action stats
199 if (Boolean.parseBoolean(evaluateConfigurationProperty(actionXml,
200 OOZIE_ACTION_EXTERNAL_STATS_WRITE, "false"))
201 && (statsJsonString.getBytes().length <= getMaxExternalStatsSize())) {
202 context.setExecutionStats(statsJsonString);
203 log.debug(
204 "Printing stats for Map-Reduce action as a JSON string : [{0}]", statsJsonString);
205 }
206 }
207 else {
208 context.setVar(HADOOP_COUNTERS, "");
209 XLog.getLog(getClass()).warn("Could not find Hadoop Counters for: [{0}]",
210 action.getExternalChildIDs());
211 }
212 }
213 }
214 catch (Exception ex) {
215 exception = true;
216 throw convertException(ex);
217 }
218 finally {
219 if (jobClient != null) {
220 try {
221 jobClient.close();
222 }
223 catch (Exception e) {
224 if (exception) {
225 log.error("JobClient error: ", e);
226 }
227 else {
228 throw convertException(e);
229 }
230 }
231 }
232 }
233 }
234
235 // Return the value of the specified configuration property
236 private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue) throws ActionExecutorException {
237 try {
238 if (actionConf != null) {
239 Namespace ns = actionConf.getNamespace();
240 Element e = actionConf.getChild("configuration", ns);
241 String strConf = XmlUtils.prettyPrint(e).toString();
242 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
243 return inlineConf.get(key, defaultValue);
244 }
245 return "";
246 }
247 catch (IOException ex) {
248 throw convertException(ex);
249 }
250 }
251
252 @SuppressWarnings("unchecked")
253 private JSONObject counterstoJson(Counters counters) {
254
255 if (counters == null) {
256 return null;
257 }
258
259 JSONObject groups = new JSONObject();
260 for (String gName : counters.getGroupNames()) {
261 JSONObject group = new JSONObject();
262 for (Counters.Counter counter : counters.getGroup(gName)) {
263 String cName = counter.getName();
264 Long cValue = counter.getCounter();
265 group.put(cName, cValue);
266 }
267 groups.put(gName, group);
268 }
269 return groups;
270 }
271
272 /**
273 * Return the sharelib name for the action.
274 *
275 * @return returns <code>streaming</code> if mapreduce-streaming action, <code>NULL</code> otherwise.
276 * @param actionXml
277 */
278 @Override
279 protected String getDefaultShareLibName(Element actionXml) {
280 Namespace ns = actionXml.getNamespace();
281 return (actionXml.getChild("streaming", ns) != null) ? "mapreduce-streaming" : null;
282 }
283
284 @Override
285 JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml,
286 Configuration actionConf) throws ActionExecutorException {
287 // If the user is using a regular MapReduce job and specified an uber jar, we need to also set it for the launcher;
288 // so we override createLauncherConf to call super and then to set the uber jar if specified. At this point, checking that
289 // uber jars are enabled and resolving the uber jar path is already done by setupActionConf() when it parsed the actionConf
290 // argument and we can just look up the uber jar in the actionConf argument.
291 JobConf launcherJobConf = super.createLauncherConf(actionFs, context, action, actionXml, actionConf);
292 Namespace ns = actionXml.getNamespace();
293 if (actionXml.getChild("streaming", ns) == null && actionXml.getChild("pipes", ns) == null) {
294 // Set for uber jar
295 String uberJar = actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR);
296 if (uberJar != null && uberJar.trim().length() > 0) {
297 launcherJobConf.setJar(uberJar);
298 }
299 }
300 return launcherJobConf;
301 }
302
303 public static void setStreaming(Configuration conf, String mapper, String reducer, String recordReader,
304 String[] recordReaderMapping, String[] env) {
305 if (mapper != null) {
306 conf.set("oozie.streaming.mapper", mapper);
307 }
308 if (reducer != null) {
309 conf.set("oozie.streaming.reducer", reducer);
310 }
311 if (recordReader != null) {
312 conf.set("oozie.streaming.record-reader", recordReader);
313 }
314 MapReduceMain.setStrings(conf, "oozie.streaming.record-reader-mapping", recordReaderMapping);
315 MapReduceMain.setStrings(conf, "oozie.streaming.env", env);
316 }
317
318 @Override
319 protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{
320
321 RunningJob runningJob;
322 String launcherJobId = action.getExternalId();
323 String childJobId = action.getExternalChildIDs();
324
325 if (childJobId != null && childJobId.length() > 0) {
326 runningJob = jobClient.getJob(JobID.forName(childJobId));
327 }
328 else {
329 runningJob = jobClient.getJob(JobID.forName(launcherJobId));
330 }
331
332 return runningJob;
333 }
334 }