This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.util.List;
023
024 import org.apache.hadoop.conf.Configuration;
025 import org.apache.hadoop.fs.FileSystem;
026 import org.apache.hadoop.fs.Path;
027 import org.apache.hadoop.mapred.Counters;
028 import org.apache.hadoop.mapred.JobClient;
029 import org.apache.hadoop.mapred.JobConf;
030 import org.apache.hadoop.mapred.JobID;
031 import org.apache.hadoop.mapred.RunningJob;
032 import org.apache.oozie.action.ActionExecutorException;
033 import org.apache.oozie.client.WorkflowAction;
034 import org.apache.oozie.service.Services;
035 import org.apache.oozie.util.XConfiguration;
036 import org.apache.oozie.util.XLog;
037 import org.apache.oozie.util.XmlUtils;
038 import org.jdom.Element;
039 import org.jdom.Namespace;
040 import org.json.simple.JSONObject;
041
042 public class MapReduceActionExecutor extends JavaActionExecutor {
043
044 public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
045 public static final String HADOOP_COUNTERS = "hadoop.counters";
046 public static final String OOZIE_MAPREDUCE_UBER_JAR = "oozie.mapreduce.uber.jar";
047 public static final String OOZIE_MAPREDUCE_UBER_JAR_ENABLE = "oozie.action.mapreduce.uber.jar.enable";
048 private XLog log = XLog.getLog(getClass());
049
050 public MapReduceActionExecutor() {
051 super("map-reduce");
052 }
053
054 @Override
055 protected List<Class> getLauncherClasses() {
056 List<Class> classes = super.getLauncherClasses();
057 classes.add(LauncherMain.class);
058 classes.add(MapReduceMain.class);
059 classes.add(StreamingMain.class);
060 classes.add(PipesMain.class);
061 return classes;
062 }
063
064 @Override
065 protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
066 String mainClass;
067 Namespace ns = actionXml.getNamespace();
068 if (actionXml.getChild("streaming", ns) != null) {
069 mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, StreamingMain.class.getName());
070 }
071 else {
072 if (actionXml.getChild("pipes", ns) != null) {
073 mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, PipesMain.class.getName());
074 }
075 else {
076 mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, MapReduceMain.class.getName());
077 }
078 }
079 return mainClass;
080 }
081
082 @Override
083 Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) throws ActionExecutorException {
084 super.setupLauncherConf(conf, actionXml, appPath, context);
085 conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true);
086 return conf;
087 }
088
089 @Override
090 @SuppressWarnings("unchecked")
091 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
092 throws ActionExecutorException {
093 boolean regularMR = false;
094 Namespace ns = actionXml.getNamespace();
095 if (actionXml.getChild("streaming", ns) != null) {
096 Element streamingXml = actionXml.getChild("streaming", ns);
097 String mapper = streamingXml.getChildTextTrim("mapper", ns);
098 String reducer = streamingXml.getChildTextTrim("reducer", ns);
099 String recordReader = streamingXml.getChildTextTrim("record-reader", ns);
100 List<Element> list = (List<Element>) streamingXml.getChildren("record-reader-mapping", ns);
101 String[] recordReaderMapping = new String[list.size()];
102 for (int i = 0; i < list.size(); i++) {
103 recordReaderMapping[i] = list.get(i).getTextTrim();
104 }
105 list = (List<Element>) streamingXml.getChildren("env", ns);
106 String[] env = new String[list.size()];
107 for (int i = 0; i < list.size(); i++) {
108 env[i] = list.get(i).getTextTrim();
109 }
110 StreamingMain.setStreaming(actionConf, mapper, reducer, recordReader, recordReaderMapping, env);
111 }
112 else {
113 if (actionXml.getChild("pipes", ns) != null) {
114 Element pipesXml = actionXml.getChild("pipes", ns);
115 String map = pipesXml.getChildTextTrim("map", ns);
116 String reduce = pipesXml.getChildTextTrim("reduce", ns);
117 String inputFormat = pipesXml.getChildTextTrim("inputformat", ns);
118 String partitioner = pipesXml.getChildTextTrim("partitioner", ns);
119 String writer = pipesXml.getChildTextTrim("writer", ns);
120 String program = pipesXml.getChildTextTrim("program", ns);
121 PipesMain.setPipes(actionConf, map, reduce, inputFormat, partitioner, writer, program, appPath);
122 }
123 else {
124 regularMR = true;
125 }
126 }
127 actionConf = super.setupActionConf(actionConf, context, actionXml, appPath);
128
129 // For "regular" (not streaming or pipes) MR jobs
130 if (regularMR) {
131 // Resolve uber jar path (has to be done after super because oozie.mapreduce.uber.jar is under <configuration>)
132 String uberJar = actionConf.get(OOZIE_MAPREDUCE_UBER_JAR);
133 if (uberJar != null) {
134 if (!Services.get().getConf().getBoolean(OOZIE_MAPREDUCE_UBER_JAR_ENABLE, false)) {
135 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "MR003",
136 "{0} property is not allowed. Set {1} to true in oozie-site to enable.", OOZIE_MAPREDUCE_UBER_JAR,
137 OOZIE_MAPREDUCE_UBER_JAR_ENABLE);
138 }
139 String nameNode = actionXml.getChildTextTrim("name-node", ns);
140 if (nameNode != null) {
141 Path uberJarPath = new Path(uberJar);
142 if (uberJarPath.toUri().getScheme() == null || uberJarPath.toUri().getAuthority() == null) {
143 if (uberJarPath.isAbsolute()) { // absolute path without namenode --> prepend namenode
144 Path nameNodePath = new Path(nameNode);
145 String nameNodeSchemeAuthority = nameNodePath.toUri().getScheme()
146 + "://" + nameNodePath.toUri().getAuthority();
147 actionConf.set(OOZIE_MAPREDUCE_UBER_JAR, new Path(nameNodeSchemeAuthority + uberJarPath).toString());
148 }
149 else { // relative path --> prepend app path
150 actionConf.set(OOZIE_MAPREDUCE_UBER_JAR, new Path(appPath, uberJarPath).toString());
151 }
152 }
153 }
154 }
155 }
156 else {
157 if (actionConf.get(OOZIE_MAPREDUCE_UBER_JAR) != null) {
158 log.warn("The " + OOZIE_MAPREDUCE_UBER_JAR + " property is only applicable for MapReduce (not streaming nor pipes)"
159 + " workflows, ignoring");
160 actionConf.set(OOZIE_MAPREDUCE_UBER_JAR, "");
161 }
162 }
163
164 return actionConf;
165 }
166
167 @Override
168 public void end(Context context, WorkflowAction action) throws ActionExecutorException {
169 super.end(context, action);
170 JobClient jobClient = null;
171 boolean exception = false;
172 try {
173 if (action.getStatus() == WorkflowAction.Status.OK) {
174 Element actionXml = XmlUtils.parseXml(action.getConf());
175 JobConf jobConf = createBaseHadoopConf(context, actionXml);
176 jobClient = createJobClient(context, jobConf);
177 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
178 if (runningJob == null) {
179 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "MR002",
180 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", action
181 .getExternalId(), action.getId());
182 }
183
184 Counters counters = runningJob.getCounters();
185 if (counters != null) {
186 ActionStats stats = new MRStats(counters);
187 String statsJsonString = stats.toJSON();
188 context.setVar(HADOOP_COUNTERS, statsJsonString);
189
190 // If action stats write property is set to false by user or
191 // size of stats is greater than the maximum allowed size,
192 // do not store the action stats
193 if (Boolean.parseBoolean(evaluateConfigurationProperty(actionXml,
194 OOZIE_ACTION_EXTERNAL_STATS_WRITE, "false"))
195 && (statsJsonString.getBytes().length <= getMaxExternalStatsSize())) {
196 context.setExecutionStats(statsJsonString);
197 log.debug(
198 "Printing stats for Map-Reduce action as a JSON string : [{0}]", statsJsonString);
199 }
200 }
201 else {
202 context.setVar(HADOOP_COUNTERS, "");
203 XLog.getLog(getClass()).warn("Could not find Hadoop Counters for: [{0}]", action.getExternalId());
204 }
205 }
206 }
207 catch (Exception ex) {
208 exception = true;
209 throw convertException(ex);
210 }
211 finally {
212 if (jobClient != null) {
213 try {
214 jobClient.close();
215 }
216 catch (Exception e) {
217 if (exception) {
218 log.error("JobClient error: ", e);
219 }
220 else {
221 throw convertException(e);
222 }
223 }
224 }
225 }
226 }
227
228 // Return the value of the specified configuration property
229 private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue) throws ActionExecutorException {
230 try {
231 if (actionConf != null) {
232 Namespace ns = actionConf.getNamespace();
233 Element e = actionConf.getChild("configuration", ns);
234 String strConf = XmlUtils.prettyPrint(e).toString();
235 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
236 return inlineConf.get(key, defaultValue);
237 }
238 return "";
239 }
240 catch (IOException ex) {
241 throw convertException(ex);
242 }
243 }
244
245 @SuppressWarnings("unchecked")
246 private JSONObject counterstoJson(Counters counters) {
247
248 if (counters == null) {
249 return null;
250 }
251
252 JSONObject groups = new JSONObject();
253 for (String gName : counters.getGroupNames()) {
254 JSONObject group = new JSONObject();
255 for (Counters.Counter counter : counters.getGroup(gName)) {
256 String cName = counter.getName();
257 Long cValue = counter.getCounter();
258 group.put(cName, cValue);
259 }
260 groups.put(gName, group);
261 }
262 return groups;
263 }
264
265 /**
266 * Return the sharelib name for the action.
267 *
268 * @return returns <code>streaming</code> if mapreduce-streaming action, <code>NULL</code> otherwise.
269 * @param actionXml
270 */
271 @Override
272 protected String getDefaultShareLibName(Element actionXml) {
273 Namespace ns = actionXml.getNamespace();
274 return (actionXml.getChild("streaming", ns) != null) ? "mapreduce-streaming" : null;
275 }
276
277 @Override
278 JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml,
279 Configuration actionConf) throws ActionExecutorException {
280 // If the user is using a regular MapReduce job and specified an uber jar, we need to also set it for the launcher;
281 // so we override createLauncherConf to call super and then to set the uber jar if specified. At this point, checking that
282 // uber jars are enabled and resolving the uber jar path is already done by setupActionConf() when it parsed the actionConf
283 // argument and we can just look up the uber jar in the actionConf argument.
284 JobConf launcherJobConf = super.createLauncherConf(actionFs, context, action, actionXml, actionConf);
285 Namespace ns = actionXml.getNamespace();
286 if (actionXml.getChild("streaming", ns) == null && actionXml.getChild("pipes", ns) == null) {
287 // Set for uber jar
288 String uberJar = actionConf.get(MapReduceActionExecutor.OOZIE_MAPREDUCE_UBER_JAR);
289 if (uberJar != null && uberJar.trim().length() > 0) {
290 launcherJobConf.setJar(uberJar);
291 }
292 }
293 return launcherJobConf;
294 }
295 }