001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.action.hadoop; 020 021import java.io.IOException; 022import java.io.StringReader; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.List; 026import java.util.Map; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.mapred.Counters; 032import org.apache.hadoop.mapred.JobClient; 033import org.apache.hadoop.mapred.JobConf; 034import org.apache.hadoop.mapred.JobID; 035import org.apache.hadoop.mapred.RunningJob; 036import org.apache.hadoop.mapreduce.TypeConverter; 037import org.apache.oozie.action.ActionExecutorException; 038import org.apache.oozie.client.WorkflowAction; 039import org.apache.oozie.service.ConfigurationService; 040import org.apache.oozie.util.XConfiguration; 041import org.apache.oozie.util.XLog; 042import org.apache.oozie.util.XmlUtils; 043import org.jdom.Element; 044import org.jdom.Namespace; 045 046public class MapReduceActionExecutor extends JavaActionExecutor { 047 048 public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write"; 049 public static final String HADOOP_COUNTERS = "hadoop.counters"; 050 public static final String OOZIE_MAPREDUCE_UBER_JAR_ENABLE = "oozie.action.mapreduce.uber.jar.enable"; 051 private static final String STREAMING_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.StreamingMain"; 052 public static final String JOB_END_NOTIFICATION_URL = "job.end.notification.url"; 053 private static final String MAPREDUCE_JOB_NAME = "mapreduce.job.name"; 054 private XLog log = XLog.getLog(getClass()); 055 056 public MapReduceActionExecutor() { 057 super("map-reduce"); 058 } 059 060 @Override 061 public List<Class<?>> getLauncherClasses() { 062 List<Class<?>> classes = new ArrayList<Class<?>>(); 063 try { 064 classes.add(Class.forName(STREAMING_MAIN_CLASS_NAME)); 065 } 066 catch (ClassNotFoundException e) { 067 throw new RuntimeException("Class not found", e); 068 } 069 return classes; 070 } 071 072 @Override 073 protected String getActualExternalId(WorkflowAction action) { 074 String launcherJobId = action.getExternalId(); 075 String childId = action.getExternalChildIDs(); 076 077 if (childId != null && !childId.isEmpty()) { 078 return childId; 079 } else { 080 return launcherJobId; 081 } 082 } 083 084 @Override 085 protected String getLauncherMain(Configuration launcherConf, Element actionXml) { 086 String mainClass; 087 Namespace ns = actionXml.getNamespace(); 088 if (actionXml.getChild("streaming", ns) != null) { 089 mainClass = launcherConf.get(LauncherAMUtils.CONF_OOZIE_ACTION_MAIN_CLASS, STREAMING_MAIN_CLASS_NAME); 090 } 091 else { 092 if (actionXml.getChild("pipes", ns) != null) { 093 mainClass = launcherConf.get(LauncherAMUtils.CONF_OOZIE_ACTION_MAIN_CLASS, PipesMain.class.getName()); 094 } 095 else { 096 mainClass = launcherConf.get(LauncherAMUtils.CONF_OOZIE_ACTION_MAIN_CLASS, MapReduceMain.class.getName()); 097 } 098 } 099 return mainClass; 100 } 101 102 @Override 103 Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) 104 throws ActionExecutorException { 105 super.setupLauncherConf(conf, actionXml, appPath, context); 106 conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false); 107 108 return conf; 109 } 110 111 private void injectConfigClass(Configuration conf, Element actionXml) { 112 // Inject config-class for launcher to use for action 113 Element e = actionXml.getChild("config-class", actionXml.getNamespace()); 114 if (e != null) { 115 conf.set(LauncherAMUtils.OOZIE_ACTION_CONFIG_CLASS, e.getTextTrim()); 116 } 117 } 118 119 @Override 120 protected Configuration createBaseHadoopConf(Context context, Element actionXml, boolean loadResources) { 121 Configuration conf = super.createBaseHadoopConf(context, actionXml, loadResources); 122 return conf; 123 } 124 125 @Override 126 @SuppressWarnings("unchecked") 127 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) 128 throws ActionExecutorException { 129 boolean regularMR = false; 130 131 injectConfigClass(actionConf, actionXml); 132 Namespace ns = actionXml.getNamespace(); 133 if (actionXml.getChild("streaming", ns) != null) { 134 Element streamingXml = actionXml.getChild("streaming", ns); 135 String mapper = streamingXml.getChildTextTrim("mapper", ns); 136 String reducer = streamingXml.getChildTextTrim("reducer", ns); 137 String recordReader = streamingXml.getChildTextTrim("record-reader", ns); 138 List<Element> list = (List<Element>) streamingXml.getChildren("record-reader-mapping", ns); 139 String[] recordReaderMapping = new String[list.size()]; 140 for (int i = 0; i < list.size(); i++) { 141 recordReaderMapping[i] = list.get(i).getTextTrim(); 142 } 143 list = (List<Element>) streamingXml.getChildren("env", ns); 144 String[] env = new String[list.size()]; 145 for (int i = 0; i < list.size(); i++) { 146 env[i] = list.get(i).getTextTrim(); 147 } 148 setStreaming(actionConf, mapper, reducer, recordReader, recordReaderMapping, env); 149 } 150 else { 151 if (actionXml.getChild("pipes", ns) != null) { 152 Element pipesXml = actionXml.getChild("pipes", ns); 153 String map = pipesXml.getChildTextTrim("map", ns); 154 String reduce = pipesXml.getChildTextTrim("reduce", ns); 155 String inputFormat = pipesXml.getChildTextTrim("inputformat", ns); 156 String partitioner = pipesXml.getChildTextTrim("partitioner", ns); 157 String writer = pipesXml.getChildTextTrim("writer", ns); 158 String program = pipesXml.getChildTextTrim("program", ns); 159 PipesMain.setPipes(actionConf, map, reduce, inputFormat, partitioner, writer, program, appPath); 160 } 161 else { 162 regularMR = true; 163 } 164 } 165 actionConf = super.setupActionConf(actionConf, context, actionXml, appPath); 166 setJobName(actionConf, context); 167 168 // For "regular" (not streaming or pipes) MR jobs 169 if (regularMR) { 170 // Resolve uber jar path (has to be done after super because oozie.mapreduce.uber.jar is under <configuration>) 171 String uberJar = actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR); 172 if (uberJar != null) { 173 if (!ConfigurationService.getBoolean(OOZIE_MAPREDUCE_UBER_JAR_ENABLE)){ 174 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "MR003", 175 "{0} property is not allowed. Set {1} to true in oozie-site to enable.", 176 MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, OOZIE_MAPREDUCE_UBER_JAR_ENABLE); 177 } 178 String nameNode = actionXml.getChildTextTrim("name-node", ns); 179 if (nameNode != null) { 180 Path uberJarPath = new Path(uberJar); 181 if (uberJarPath.toUri().getScheme() == null || uberJarPath.toUri().getAuthority() == null) { 182 if (uberJarPath.isAbsolute()) { // absolute path without namenode --> prepend namenode 183 Path nameNodePath = new Path(nameNode); 184 String nameNodeSchemeAuthority = nameNodePath.toUri().getScheme() 185 + "://" + nameNodePath.toUri().getAuthority(); 186 actionConf.set(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, 187 new Path(nameNodeSchemeAuthority + uberJarPath).toString()); 188 } 189 else { // relative path --> prepend app path 190 actionConf.set(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, new Path(appPath, uberJarPath).toString()); 191 } 192 } 193 } 194 } 195 } 196 else { 197 if (actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR) != null) { 198 log.warn("The " + MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR + " property is only applicable for MapReduce (not" 199 + "streaming nor pipes) workflows, ignoring"); 200 actionConf.set(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, ""); 201 } 202 } 203 204 // child job cancel delegation token for mapred action 205 actionConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true); 206 207 return actionConf; 208 } 209 210 private void setJobName(Configuration actionConf, Context context) { 211 String jobName = getAppName(context); 212 if (jobName != null) { 213 actionConf.set(MAPREDUCE_JOB_NAME, jobName.replace("oozie:launcher", "oozie:action")); 214 } 215 } 216 217 @Override 218 public void end(Context context, WorkflowAction action) throws ActionExecutorException { 219 super.end(context, action); 220 JobClient jobClient = null; 221 boolean exception = false; 222 try { 223 if (action.getStatus() == WorkflowAction.Status.OK) { 224 Element actionXml = XmlUtils.parseXml(action.getConf()); 225 Configuration jobConf = createBaseHadoopConf(context, actionXml); 226 jobClient = createJobClient(context, jobConf); 227 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalChildIDs())); 228 if (runningJob == null) { 229 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "MR002", 230 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", 231 action.getExternalChildIDs(), action.getId()); 232 } 233 234 Counters counters = runningJob.getCounters(); 235 if (counters != null) { 236 ActionStats stats = new MRStats(counters); 237 String statsJsonString = stats.toJSON(); 238 context.setVar(HADOOP_COUNTERS, statsJsonString); 239 240 // If action stats write property is set to false by user or 241 // size of stats is greater than the maximum allowed size, 242 // do not store the action stats 243 if (Boolean.parseBoolean(evaluateConfigurationProperty(actionXml, 244 OOZIE_ACTION_EXTERNAL_STATS_WRITE, "false")) 245 && (statsJsonString.getBytes().length <= getMaxExternalStatsSize())) { 246 context.setExecutionStats(statsJsonString); 247 log.debug( 248 "Printing stats for Map-Reduce action as a JSON string : [{0}]", statsJsonString); 249 } 250 } 251 else { 252 context.setVar(HADOOP_COUNTERS, ""); 253 XLog.getLog(getClass()).warn("Could not find Hadoop Counters for: [{0}]", 254 action.getExternalChildIDs()); 255 } 256 } 257 } 258 catch (Exception ex) { 259 exception = true; 260 throw convertException(ex); 261 } 262 finally { 263 if (jobClient != null) { 264 try { 265 jobClient.close(); 266 } 267 catch (Exception e) { 268 if (exception) { 269 log.error("JobClient error: ", e); 270 } 271 else { 272 throw convertException(e); 273 } 274 } 275 } 276 } 277 } 278 279 // Return the value of the specified configuration property 280 private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue) 281 throws ActionExecutorException { 282 try { 283 String ret = defaultValue; 284 if (actionConf != null) { 285 Namespace ns = actionConf.getNamespace(); 286 Element e = actionConf.getChild("configuration", ns); 287 if(e != null) { 288 String strConf = XmlUtils.prettyPrint(e).toString(); 289 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); 290 ret = inlineConf.get(key, defaultValue); 291 } 292 } 293 return ret; 294 } 295 catch (IOException ex) { 296 throw convertException(ex); 297 } 298 } 299 300 /** 301 * Return the sharelib name for the action. 302 * 303 * @return returns <code>streaming</code> if mapreduce-streaming action, <code>NULL</code> otherwise. 304 * @param actionXml 305 */ 306 @Override 307 protected String getDefaultShareLibName(Element actionXml) { 308 Namespace ns = actionXml.getNamespace(); 309 return (actionXml.getChild("streaming", ns) != null) ? "mapreduce-streaming" : null; 310 } 311 312 public static void setStreaming(Configuration conf, String mapper, String reducer, String recordReader, 313 String[] recordReaderMapping, String[] env) { 314 if (mapper != null) { 315 conf.set("oozie.streaming.mapper", mapper); 316 } 317 if (reducer != null) { 318 conf.set("oozie.streaming.reducer", reducer); 319 } 320 if (recordReader != null) { 321 conf.set("oozie.streaming.record-reader", recordReader); 322 } 323 ActionUtils.setStrings(conf, "oozie.streaming.record-reader-mapping", recordReaderMapping); 324 ActionUtils.setStrings(conf, "oozie.streaming.env", env); 325 } 326 327 @Override 328 protected void injectCallback(Context context, Configuration conf) { 329 // add callback for the MapReduce job 330 String callback = context.getCallbackUrl("$jobStatus"); 331 String originalCallbackURL = conf.get(JOB_END_NOTIFICATION_URL); 332 if (originalCallbackURL != null) { 333 LOG.warn("Overriding the action job end notification URI. Original value: {0}", originalCallbackURL); 334 } 335 conf.set(JOB_END_NOTIFICATION_URL, callback); 336 337 super.injectCallback(context, conf); 338 } 339 340 @Override 341 public void check(Context context, WorkflowAction action) throws ActionExecutorException { 342 Map<String, String> actionData = Collections.emptyMap(); 343 Configuration jobConf = null; 344 345 try { 346 FileSystem actionFs = context.getAppFileSystem(); 347 Element actionXml = XmlUtils.parseXml(action.getConf()); 348 jobConf = createBaseHadoopConf(context, actionXml); 349 Path actionDir = context.getActionDir(); 350 actionData = LauncherHelper.getActionData(actionFs, actionDir, jobConf); 351 } catch (Exception e) { 352 LOG.warn("Exception in check(). Message[{0}]", e.getMessage(), e); 353 throw convertException(e); 354 } 355 356 final String newId = actionData.get(LauncherAMUtils.ACTION_DATA_NEW_ID); 357 358 // check the Hadoop job if newID is defined (which should be the case here) - otherwise perform the normal check() 359 if (newId != null) { 360 boolean jobCompleted; 361 JobClient jobClient = null; 362 boolean exception = false; 363 364 try { 365 jobClient = createJobClient(context, new JobConf(jobConf)); 366 RunningJob runningJob = jobClient.getJob(JobID.forName(newId)); 367 368 if (runningJob == null) { 369 context.setExternalStatus(FAILED); 370 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017", 371 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newId, 372 action.getId()); 373 } 374 375 jobCompleted = runningJob.isComplete(); 376 } catch (Exception e) { 377 LOG.warn("Unable to check the state of a running MapReduce job -" 378 + " please check the health of the Job History Server!", e); 379 exception = true; 380 throw convertException(e); 381 } finally { 382 if (jobClient != null) { 383 try { 384 jobClient.close(); 385 } catch (Exception e) { 386 if (exception) { 387 LOG.error("JobClient error (not re-throwing due to a previous error): ", e); 388 } else { 389 throw convertException(e); 390 } 391 } 392 } 393 } 394 395 // run original check() if the MR action is completed or there are errors - otherwise mark it as RUNNING 396 if (jobCompleted || actionData.containsKey(LauncherAMUtils.ACTION_DATA_ERROR_PROPS)) { 397 super.check(context, action); 398 } else { 399 context.setExternalStatus(RUNNING); 400 String externalAppId = TypeConverter.toYarn(JobID.forName(newId)).getAppId().toString(); 401 context.setExternalChildIDs(externalAppId); 402 } 403 } else { 404 super.check(context, action); 405 } 406 } 407 408 @Override 409 void injectActionCallback(Context context, Configuration actionConf) { 410 injectCallback(context, actionConf); 411 } 412 413}