001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.action.hadoop; 020 021import java.io.BufferedReader; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.InputStreamReader; 025import java.io.OutputStream; 026import java.math.BigInteger; 027import java.security.MessageDigest; 028import java.security.NoSuchAlgorithmException; 029import java.security.PrivilegedExceptionAction; 030import java.util.ArrayList; 031import java.util.Collection; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.Properties; 036 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.io.SequenceFile; 041import org.apache.hadoop.io.Text; 042import org.apache.hadoop.mapred.JobConf; 043import org.apache.hadoop.mapred.RunningJob; 044import org.apache.hadoop.mapred.Counters; 045import org.apache.hadoop.security.UserGroupInformation; 046import org.apache.oozie.client.OozieClient; 047import org.apache.oozie.client.WorkflowAction; 048import org.apache.oozie.service.HadoopAccessorException; 049import org.apache.oozie.service.HadoopAccessorService; 050import org.apache.oozie.service.Services; 051import org.apache.oozie.service.URIHandlerService; 052import org.apache.oozie.service.UserGroupInformationService; 053import org.apache.oozie.util.IOUtils; 054import org.apache.oozie.util.PropertiesUtils; 055 056public class LauncherMapperHelper { 057 058 public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag"; 059 060 public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId) 061 throws HadoopAccessorException, IOException { 062 String jobId = null; 063 Path recoveryFile = new Path(actionDir, recoveryId); 064 FileSystem fs = Services.get().get(HadoopAccessorService.class) 065 .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf); 066 067 if (fs.exists(recoveryFile)) { 068 InputStream is = fs.open(recoveryFile); 069 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 070 jobId = reader.readLine(); 071 reader.close(); 072 } 073 return jobId; 074 075 } 076 077 public static void setupMainClass(Configuration launcherConf, String javaMainClass) { 078 // Only set the javaMainClass if its not null or empty string, this way the user can override the action's main class via 079 // <configuration> property 080 if (javaMainClass != null && !javaMainClass.equals("")) { 081 launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass); 082 } 083 } 084 085 public static void setupLauncherURIHandlerConf(Configuration launcherConf) { 086 for(Map.Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) { 087 launcherConf.set(entry.getKey(), entry.getValue()); 088 } 089 } 090 091 public static void setupMainArguments(Configuration launcherConf, String[] args) { 092 launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length); 093 for (int i = 0; i < args.length; i++) { 094 launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]); 095 } 096 } 097 098 public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) { 099 launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData); 100 } 101 102 /** 103 * Set the maximum value of stats data 104 * 105 * @param launcherConf the oozie launcher configuration 106 * @param maxStatsData the maximum allowed size of stats data 107 */ 108 public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){ 109 launcherConf.setInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData); 110 } 111 112 /** 113 * Set the maximum number of globbed files/dirs 114 * 115 * @param launcherConf the oozie launcher configuration 116 * @param fsGlobMax the maximum number of files/dirs for FS operation 117 */ 118 public static void setupMaxFSGlob(Configuration launcherConf, int fsGlobMax){ 119 launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX, fsGlobMax); 120 } 121 122 public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir, 123 String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException { 124 125 launcherConf.setMapperClass(LauncherMapper.class); 126 launcherConf.setSpeculativeExecution(false); 127 launcherConf.setNumMapTasks(1); 128 launcherConf.setNumReduceTasks(0); 129 130 launcherConf.set(LauncherMapper.OOZIE_JOB_ID, jobId); 131 launcherConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId); 132 launcherConf.set(LauncherMapper.OOZIE_ACTION_DIR_PATH, actionDir.toString()); 133 launcherConf.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, recoveryId); 134 launcherConf.set(LauncherMapper.ACTION_PREPARE_XML, prepareXML); 135 136 actionConf.set(LauncherMapper.OOZIE_JOB_ID, jobId); 137 actionConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId); 138 139 if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) { 140 List<String> purgedEntries = new ArrayList<String>(); 141 Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files"); 142 for (String entry : entries) { 143 if (entry.contains("#")) { 144 purgedEntries.add(entry); 145 } 146 } 147 actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()])); 148 launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true); 149 } 150 151 FileSystem fs = 152 Services.get().get(HadoopAccessorService.class).createFileSystem(launcherConf.get("user.name"), 153 actionDir.toUri(), launcherConf); 154 fs.mkdirs(actionDir); 155 156 OutputStream os = fs.create(new Path(actionDir, LauncherMapper.ACTION_CONF_XML)); 157 try { 158 actionConf.writeXml(os); 159 } finally { 160 IOUtils.closeSafely(os); 161 } 162 163 launcherConf.setInputFormat(OozieLauncherInputFormat.class); 164 launcherConf.setOutputFormat(OozieLauncherOutputFormat.class); 165 launcherConf.setOutputCommitter(OozieLauncherOutputCommitter.class); 166 } 167 168 public static void setupYarnRestartHandling(JobConf launcherJobConf, Configuration actionConf, String launcherTag, 169 long launcherTime) 170 throws NoSuchAlgorithmException { 171 launcherJobConf.setLong(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME, launcherTime); 172 // Tags are limited to 100 chars so we need to hash them to make sure (the actionId otherwise doesn't have a max length) 173 String tag = getTag(launcherTag); 174 // keeping the oozie.child.mapreduce.job.tags instead of mapreduce.job.tags to avoid killing launcher itself. 175 // mapreduce.job.tags should only go to child job launch by launcher. 176 actionConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, tag); 177 } 178 179 public static String getTag(String launcherTag) throws NoSuchAlgorithmException { 180 MessageDigest digest = MessageDigest.getInstance("MD5"); 181 digest.update(launcherTag.getBytes(), 0, launcherTag.length()); 182 String md5 = "oozie-" + new BigInteger(1, digest.digest()).toString(16); 183 return md5; 184 } 185 186 public static boolean isMainDone(RunningJob runningJob) throws IOException { 187 return runningJob.isComplete(); 188 } 189 190 public static boolean isMainSuccessful(RunningJob runningJob) throws IOException { 191 boolean succeeded = runningJob.isSuccessful(); 192 if (succeeded) { 193 Counters counters = runningJob.getCounters(); 194 if (counters != null) { 195 Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP); 196 if (group != null) { 197 succeeded = group.getCounter(LauncherMapper.COUNTER_LAUNCHER_ERROR) == 0; 198 } 199 } 200 } 201 return succeeded; 202 } 203 204 /** 205 * Determine whether action has external child jobs or not 206 * @param actionData 207 * @return true/false 208 * @throws IOException 209 */ 210 public static boolean hasExternalChildJobs(Map<String, String> actionData) throws IOException { 211 return actionData.containsKey(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS); 212 } 213 214 /** 215 * Determine whether action has output data or not 216 * @param actionData 217 * @return true/false 218 * @throws IOException 219 */ 220 public static boolean hasOutputData(Map<String, String> actionData) throws IOException { 221 return actionData.containsKey(LauncherMapper.ACTION_DATA_OUTPUT_PROPS); 222 } 223 224 /** 225 * Determine whether action has external stats or not 226 * @param actionData 227 * @return true/false 228 * @throws IOException 229 */ 230 public static boolean hasStatsData(Map<String, String> actionData) throws IOException{ 231 return actionData.containsKey(LauncherMapper.ACTION_DATA_STATS); 232 } 233 234 /** 235 * Determine whether action has new id (id swap) or not 236 * @param actionData 237 * @return true/false 238 * @throws IOException 239 */ 240 public static boolean hasIdSwap(Map<String, String> actionData) throws IOException { 241 return actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID); 242 } 243 244 /** 245 * Get the sequence file path storing all action data 246 * @param actionDir 247 * @return 248 */ 249 public static Path getActionDataSequenceFilePath(Path actionDir) { 250 return new Path(actionDir, LauncherMapper.ACTION_DATA_SEQUENCE_FILE); 251 } 252 253 /** 254 * Utility function to load the contents of action data sequence file into 255 * memory object 256 * 257 * @param fs Action Filesystem 258 * @param actionDir Path 259 * @param conf Configuration 260 * @return Map action data 261 * @throws IOException 262 * @throws InterruptedException 263 */ 264 public static Map<String, String> getActionData(final FileSystem fs, final Path actionDir, final Configuration conf) 265 throws IOException, InterruptedException { 266 UserGroupInformationService ugiService = Services.get().get(UserGroupInformationService.class); 267 UserGroupInformation ugi = ugiService.getProxyUser(conf.get(OozieClient.USER_NAME)); 268 269 return ugi.doAs(new PrivilegedExceptionAction<Map<String, String>>() { 270 @Override 271 public Map<String, String> run() throws IOException { 272 Map<String, String> ret = new HashMap<String, String>(); 273 Path seqFilePath = getActionDataSequenceFilePath(actionDir); 274 if (fs.exists(seqFilePath)) { 275 SequenceFile.Reader seqFile = new SequenceFile.Reader(fs, seqFilePath, conf); 276 Text key = new Text(), value = new Text(); 277 while (seqFile.next(key, value)) { 278 ret.put(key.toString(), value.toString()); 279 } 280 seqFile.close(); 281 } 282 else { // maintain backward-compatibility. to be deprecated 283 org.apache.hadoop.fs.FileStatus[] files = fs.listStatus(actionDir); 284 InputStream is; 285 BufferedReader reader = null; 286 Properties props; 287 if (files != null && files.length > 0) { 288 for (int x = 0; x < files.length; x++) { 289 Path file = files[x].getPath(); 290 if (file.equals(new Path(actionDir, "externalChildIds.properties"))) { 291 is = fs.open(file); 292 reader = new BufferedReader(new InputStreamReader(is)); 293 ret.put(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS, 294 IOUtils.getReaderAsString(reader, -1)); 295 } 296 else if (file.equals(new Path(actionDir, "newId.properties"))) { 297 is = fs.open(file); 298 reader = new BufferedReader(new InputStreamReader(is)); 299 props = PropertiesUtils.readProperties(reader, -1); 300 ret.put(LauncherMapper.ACTION_DATA_NEW_ID, props.getProperty("id")); 301 } 302 else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_OUTPUT_PROPS))) { 303 int maxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 304 2 * 1024); 305 is = fs.open(file); 306 reader = new BufferedReader(new InputStreamReader(is)); 307 ret.put(LauncherMapper.ACTION_DATA_OUTPUT_PROPS, PropertiesUtils 308 .propertiesToString(PropertiesUtils.readProperties(reader, maxOutputData))); 309 } 310 else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_STATS))) { 311 int statsMaxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, 312 Integer.MAX_VALUE); 313 is = fs.open(file); 314 reader = new BufferedReader(new InputStreamReader(is)); 315 ret.put(LauncherMapper.ACTION_DATA_STATS, PropertiesUtils 316 .propertiesToString(PropertiesUtils.readProperties(reader, statsMaxOutputData))); 317 } 318 else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_ERROR_PROPS))) { 319 is = fs.open(file); 320 reader = new BufferedReader(new InputStreamReader(is)); 321 ret.put(LauncherMapper.ACTION_DATA_ERROR_PROPS, IOUtils.getReaderAsString(reader, -1)); 322 } 323 } 324 } 325 } 326 return ret; 327 } 328 }); 329 } 330 331 public static String getActionYarnTag(Configuration conf, String parentId, WorkflowAction wfAction) { 332 String tag; 333 if ( conf != null && conf.get(OOZIE_ACTION_YARN_TAG) != null) { 334 tag = conf.get(OOZIE_ACTION_YARN_TAG) + "@" + wfAction.getName(); 335 } else if (parentId != null) { 336 tag = parentId + "@" + wfAction.getName(); 337 } else { 338 tag = wfAction.getId(); 339 } 340 return tag; 341 } 342 343}