001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.action.hadoop; 019 020import java.io.BufferedReader; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.InputStreamReader; 024import java.io.OutputStream; 025import java.io.OutputStreamWriter; 026import java.io.Writer; 027import java.math.BigInteger; 028import java.security.MessageDigest; 029import java.security.NoSuchAlgorithmException; 030import java.security.PrivilegedExceptionAction; 031import java.util.ArrayList; 032import java.util.Collection; 033import java.util.HashMap; 034import java.util.List; 035import java.util.Map; 036import java.util.Properties; 037 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.io.SequenceFile; 042import org.apache.hadoop.io.Text; 043import org.apache.hadoop.mapred.JobConf; 044import org.apache.hadoop.mapred.RunningJob; 045import org.apache.hadoop.mapred.Counters; 046import org.apache.hadoop.security.UserGroupInformation; 047import org.apache.oozie.client.OozieClient; 048import org.apache.oozie.client.WorkflowAction; 049import org.apache.oozie.service.HadoopAccessorException; 050import org.apache.oozie.service.HadoopAccessorService; 051import org.apache.oozie.service.Services; 052import org.apache.oozie.service.URIHandlerService; 053import org.apache.oozie.service.UserGroupInformationService; 054import org.apache.oozie.util.IOUtils; 055import org.apache.oozie.util.PropertiesUtils; 056 057public class LauncherMapperHelper { 058 059 public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId) 060 throws HadoopAccessorException, IOException { 061 String jobId = null; 062 Path recoveryFile = new Path(actionDir, recoveryId); 063 FileSystem fs = Services.get().get(HadoopAccessorService.class) 064 .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf); 065 066 if (fs.exists(recoveryFile)) { 067 InputStream is = fs.open(recoveryFile); 068 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 069 jobId = reader.readLine(); 070 reader.close(); 071 } 072 return jobId; 073 074 } 075 076 public static void setupMainClass(Configuration launcherConf, String javaMainClass) { 077 // Only set the javaMainClass if its not null or empty string, this way the user can override the action's main class via 078 // <configuration> property 079 if (javaMainClass != null && !javaMainClass.equals("")) { 080 launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass); 081 } 082 } 083 084 public static void setupLauncherURIHandlerConf(Configuration launcherConf) { 085 for(Map.Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) { 086 launcherConf.set(entry.getKey(), entry.getValue()); 087 } 088 } 089 090 public static void setupMainArguments(Configuration launcherConf, String[] args) { 091 launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length); 092 for (int i = 0; i < args.length; i++) { 093 launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]); 094 } 095 } 096 097 public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) { 098 launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData); 099 } 100 101 /** 102 * Set the maximum value of stats data 103 * 104 * @param launcherConf the oozie launcher configuration 105 * @param maxStatsData the maximum allowed size of stats data 106 */ 107 public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){ 108 launcherConf.setInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData); 109 } 110 111 /** 112 * Set the maximum number of globbed files/dirs 113 * 114 * @param launcherConf the oozie launcher configuration 115 * @param fsGlobMax the maximum number of files/dirs for FS operation 116 */ 117 public static void setupMaxFSGlob(Configuration launcherConf, int fsGlobMax){ 118 launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX, fsGlobMax); 119 } 120 121 public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir, 122 String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException { 123 124 launcherConf.setMapperClass(LauncherMapper.class); 125 launcherConf.setSpeculativeExecution(false); 126 launcherConf.setNumMapTasks(1); 127 launcherConf.setNumReduceTasks(0); 128 129 launcherConf.set(LauncherMapper.OOZIE_JOB_ID, jobId); 130 launcherConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId); 131 launcherConf.set(LauncherMapper.OOZIE_ACTION_DIR_PATH, actionDir.toString()); 132 launcherConf.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, recoveryId); 133 launcherConf.set(LauncherMapper.ACTION_PREPARE_XML, prepareXML); 134 135 actionConf.set(LauncherMapper.OOZIE_JOB_ID, jobId); 136 actionConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId); 137 138 if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) { 139 List<String> purgedEntries = new ArrayList<String>(); 140 Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files"); 141 for (String entry : entries) { 142 if (entry.contains("#")) { 143 purgedEntries.add(entry); 144 } 145 } 146 actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()])); 147 launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true); 148 } 149 150 FileSystem fs = 151 Services.get().get(HadoopAccessorService.class).createFileSystem(launcherConf.get("user.name"), 152 actionDir.toUri(), launcherConf); 153 fs.mkdirs(actionDir); 154 155 OutputStream os = fs.create(new Path(actionDir, LauncherMapper.ACTION_CONF_XML)); 156 try { 157 actionConf.writeXml(os); 158 } finally { 159 IOUtils.closeSafely(os); 160 } 161 162 launcherConf.setInputFormat(OozieLauncherInputFormat.class); 163 launcherConf.set("mapred.output.dir", new Path(actionDir, "output").toString()); 164 } 165 166 public static void setupYarnRestartHandling(JobConf launcherJobConf, Configuration actionConf, String actionId) 167 throws NoSuchAlgorithmException { 168 launcherJobConf.setLong("oozie.job.launch.time", System.currentTimeMillis()); 169 // Tags are limited to 100 chars so we need to hash them to make sure (the actionId otherwise doesn't have a max length) 170 String tag = getTag(actionId); 171 actionConf.set("mapreduce.job.tags", tag); 172 } 173 174 private static String getTag(String actionId) throws NoSuchAlgorithmException { 175 MessageDigest digest = MessageDigest.getInstance("MD5"); 176 digest.update(actionId.getBytes(), 0, actionId.length()); 177 String md5 = "oozie-" + new BigInteger(1, digest.digest()).toString(16); 178 return md5; 179 } 180 181 public static boolean isMainDone(RunningJob runningJob) throws IOException { 182 return runningJob.isComplete(); 183 } 184 185 public static boolean isMainSuccessful(RunningJob runningJob) throws IOException { 186 boolean succeeded = runningJob.isSuccessful(); 187 if (succeeded) { 188 Counters counters = runningJob.getCounters(); 189 if (counters != null) { 190 Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP); 191 if (group != null) { 192 succeeded = group.getCounter(LauncherMapper.COUNTER_LAUNCHER_ERROR) == 0; 193 } 194 } 195 } 196 return succeeded; 197 } 198 199 /** 200 * Determine whether action has external child jobs or not 201 * @param actionData 202 * @return true/false 203 * @throws IOException 204 */ 205 public static boolean hasExternalChildJobs(Map<String, String> actionData) throws IOException { 206 return actionData.containsKey(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS); 207 } 208 209 /** 210 * Determine whether action has output data or not 211 * @param actionData 212 * @return true/false 213 * @throws IOException 214 */ 215 public static boolean hasOutputData(Map<String, String> actionData) throws IOException { 216 return actionData.containsKey(LauncherMapper.ACTION_DATA_OUTPUT_PROPS); 217 } 218 219 /** 220 * Determine whether action has external stats or not 221 * @param actionData 222 * @return true/false 223 * @throws IOException 224 */ 225 public static boolean hasStatsData(Map<String, String> actionData) throws IOException{ 226 return actionData.containsKey(LauncherMapper.ACTION_DATA_STATS); 227 } 228 229 /** 230 * Determine whether action has new id (id swap) or not 231 * @param actionData 232 * @return true/false 233 * @throws IOException 234 */ 235 public static boolean hasIdSwap(Map<String, String> actionData) throws IOException { 236 return actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID); 237 } 238 239 /** 240 * Get the sequence file path storing all action data 241 * @param actionDir 242 * @return 243 */ 244 public static Path getActionDataSequenceFilePath(Path actionDir) { 245 return new Path(actionDir, LauncherMapper.ACTION_DATA_SEQUENCE_FILE); 246 } 247 248 /** 249 * Utility function to load the contents of action data sequence file into 250 * memory object 251 * 252 * @param fs Action Filesystem 253 * @param actionDir Path 254 * @param conf Configuration 255 * @return Map action data 256 * @throws IOException 257 * @throws InterruptedException 258 */ 259 public static Map<String, String> getActionData(final FileSystem fs, final Path actionDir, final Configuration conf) 260 throws IOException, InterruptedException { 261 UserGroupInformationService ugiService = Services.get().get(UserGroupInformationService.class); 262 UserGroupInformation ugi = ugiService.getProxyUser(conf.get(OozieClient.USER_NAME)); 263 264 return ugi.doAs(new PrivilegedExceptionAction<Map<String, String>>() { 265 @Override 266 public Map<String, String> run() throws IOException { 267 Map<String, String> ret = new HashMap<String, String>(); 268 Path seqFilePath = getActionDataSequenceFilePath(actionDir); 269 if (fs.exists(seqFilePath)) { 270 SequenceFile.Reader seqFile = new SequenceFile.Reader(fs, seqFilePath, conf); 271 Text key = new Text(), value = new Text(); 272 while (seqFile.next(key, value)) { 273 ret.put(key.toString(), value.toString()); 274 } 275 seqFile.close(); 276 } 277 else { // maintain backward-compatibility. to be deprecated 278 org.apache.hadoop.fs.FileStatus[] files = fs.listStatus(actionDir); 279 InputStream is; 280 BufferedReader reader = null; 281 Properties props; 282 if (files != null && files.length > 0) { 283 for (int x = 0; x < files.length; x++) { 284 Path file = files[x].getPath(); 285 if (file.equals(new Path(actionDir, "externalChildIds.properties"))) { 286 is = fs.open(file); 287 reader = new BufferedReader(new InputStreamReader(is)); 288 ret.put(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS, 289 IOUtils.getReaderAsString(reader, -1)); 290 } 291 else if (file.equals(new Path(actionDir, "newId.properties"))) { 292 is = fs.open(file); 293 reader = new BufferedReader(new InputStreamReader(is)); 294 props = PropertiesUtils.readProperties(reader, -1); 295 ret.put(LauncherMapper.ACTION_DATA_NEW_ID, props.getProperty("id")); 296 } 297 else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_OUTPUT_PROPS))) { 298 int maxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 299 2 * 1024); 300 is = fs.open(file); 301 reader = new BufferedReader(new InputStreamReader(is)); 302 ret.put(LauncherMapper.ACTION_DATA_OUTPUT_PROPS, PropertiesUtils 303 .propertiesToString(PropertiesUtils.readProperties(reader, maxOutputData))); 304 } 305 else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_STATS))) { 306 int statsMaxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, 307 Integer.MAX_VALUE); 308 is = fs.open(file); 309 reader = new BufferedReader(new InputStreamReader(is)); 310 ret.put(LauncherMapper.ACTION_DATA_STATS, PropertiesUtils 311 .propertiesToString(PropertiesUtils.readProperties(reader, statsMaxOutputData))); 312 } 313 else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_ERROR_PROPS))) { 314 is = fs.open(file); 315 reader = new BufferedReader(new InputStreamReader(is)); 316 ret.put(LauncherMapper.ACTION_DATA_ERROR_PROPS, IOUtils.getReaderAsString(reader, -1)); 317 } 318 } 319 } 320 } 321 return ret; 322 } 323 }); 324 } 325}