001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import java.io.BufferedReader; 021 import java.io.IOException; 022 import java.io.InputStream; 023 import java.io.InputStreamReader; 024 import java.io.OutputStream; 025 import java.io.OutputStreamWriter; 026 import java.io.Writer; 027 import java.util.ArrayList; 028 import java.util.Collection; 029 import java.util.List; 030 import java.util.Map; 031 032 import org.apache.hadoop.conf.Configuration; 033 import org.apache.hadoop.fs.FileSystem; 034 import org.apache.hadoop.fs.Path; 035 import org.apache.hadoop.mapred.Counters; 036 import org.apache.hadoop.mapred.JobConf; 037 import org.apache.hadoop.mapred.RunningJob; 038 import org.apache.oozie.service.HadoopAccessorException; 039 import org.apache.oozie.service.HadoopAccessorService; 040 import org.apache.oozie.service.Services; 041 import org.apache.oozie.service.URIHandlerService; 042 import org.apache.oozie.util.XLog; 043 044 public class LauncherMapperHelper { 045 046 public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId) 047 throws HadoopAccessorException, IOException { 048 String jobId = null; 049 Path recoveryFile = new Path(actionDir, recoveryId); 050 FileSystem fs = Services.get().get(HadoopAccessorService.class) 051 .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf); 052 053 if (fs.exists(recoveryFile)) { 054 InputStream is = fs.open(recoveryFile); 055 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 056 jobId = reader.readLine(); 057 reader.close(); 058 } 059 return jobId; 060 061 } 062 063 public static void setupMainClass(Configuration launcherConf, String javaMainClass) { 064 // Only set the javaMainClass if its not null or empty string (should be the case except for java action), this way the user 065 // can override the action's main class via <configuration> property 066 if (javaMainClass != null && !javaMainClass.equals("")) { 067 launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass); 068 } 069 } 070 071 public static void setupLauncherURIHandlerConf(Configuration launcherConf) { 072 for(Map.Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) { 073 launcherConf.set(entry.getKey(), entry.getValue()); 074 } 075 } 076 077 public static void setupMainArguments(Configuration launcherConf, String[] args) { 078 launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length); 079 for (int i = 0; i < args.length; i++) { 080 launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]); 081 } 082 } 083 084 public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) { 085 launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData); 086 } 087 088 /** 089 * Set the maximum value of stats data 090 * 091 * @param launcherConf the oozie launcher configuration 092 * @param maxStatsData the maximum allowed size of stats data 093 */ 094 public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){ 095 launcherConf.setInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData); 096 } 097 098 public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir, 099 String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException { 100 101 launcherConf.setMapperClass(LauncherMapper.class); 102 launcherConf.setSpeculativeExecution(false); 103 launcherConf.setNumMapTasks(1); 104 launcherConf.setNumReduceTasks(0); 105 106 launcherConf.set(LauncherMapper.OOZIE_JOB_ID, jobId); 107 launcherConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId); 108 launcherConf.set(LauncherMapper.OOZIE_ACTION_DIR_PATH, actionDir.toString()); 109 launcherConf.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, recoveryId); 110 launcherConf.set(LauncherMapper.ACTION_PREPARE_XML, prepareXML); 111 112 actionConf.set(LauncherMapper.OOZIE_JOB_ID, jobId); 113 actionConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId); 114 115 if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) { 116 List<String> purgedEntries = new ArrayList<String>(); 117 Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files"); 118 for (String entry : entries) { 119 if (entry.contains("#")) { 120 purgedEntries.add(entry); 121 } 122 } 123 actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()])); 124 launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true); 125 } 126 127 FileSystem fs = 128 Services.get().get(HadoopAccessorService.class).createFileSystem(launcherConf.get("user.name"), 129 actionDir.toUri(), launcherConf); 130 fs.mkdirs(actionDir); 131 132 OutputStream os = fs.create(new Path(actionDir, LauncherMapper.ACTION_CONF_XML)); 133 actionConf.writeXml(os); 134 os.close(); 135 136 Path inputDir = new Path(actionDir, "input"); 137 fs.mkdirs(inputDir); 138 Writer writer = new OutputStreamWriter(fs.create(new Path(inputDir, "dummy.txt"))); 139 writer.write("dummy"); 140 writer.close(); 141 142 launcherConf.set("mapred.input.dir", inputDir.toString()); 143 launcherConf.set("mapred.output.dir", new Path(actionDir, "output").toString()); 144 } 145 146 public static boolean isMainDone(RunningJob runningJob) throws IOException { 147 return runningJob.isComplete(); 148 } 149 150 public static boolean isMainSuccessful(RunningJob runningJob) throws IOException { 151 boolean succeeded = runningJob.isSuccessful(); 152 if (succeeded) { 153 Counters counters = runningJob.getCounters(); 154 if (counters != null) { 155 Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP); 156 if (group != null) { 157 succeeded = group.getCounter(LauncherMapper.COUNTER_LAUNCHER_ERROR) == 0; 158 } 159 } 160 } 161 return succeeded; 162 } 163 164 public static boolean hasOutputData(RunningJob runningJob) throws IOException { 165 boolean output = false; 166 Counters counters = runningJob.getCounters(); 167 if (counters != null) { 168 Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP); 169 if (group != null) { 170 output = group.getCounter(LauncherMapper.COUNTER_OUTPUT_DATA) == 1; 171 } 172 } 173 return output; 174 } 175 176 /** 177 * Check whether runningJob has stats data or not 178 * 179 * @param runningJob the runningJob 180 * @return returns whether the running Job has stats data or not 181 * @throws IOException 182 */ 183 public static boolean hasStatsData(RunningJob runningJob) throws IOException{ 184 boolean output = false; 185 Counters counters = runningJob.getCounters(); 186 if (counters != null) { 187 Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP); 188 if (group != null) { 189 output = group.getCounter(LauncherMapper.COUNTER_STATS_DATA) == 1; 190 } 191 } 192 return output; 193 } 194 195 public static boolean hasIdSwap(RunningJob runningJob) throws IOException { 196 boolean swap = false; 197 Counters counters = runningJob.getCounters(); 198 if (counters != null) { 199 Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP); 200 if (group != null) { 201 swap = group.getCounter(LauncherMapper.COUNTER_DO_ID_SWAP) == 1; 202 } 203 } 204 return swap; 205 } 206 207 public static boolean hasIdSwap(RunningJob runningJob, String user, String group, Path actionDir) 208 throws IOException, HadoopAccessorException { 209 boolean swap = false; 210 211 XLog log = XLog.getLog("org.apache.oozie.action.hadoop.LauncherMapper"); 212 213 Counters counters = runningJob.getCounters(); 214 if (counters != null) { 215 Counters.Group counterGroup = counters.getGroup(LauncherMapper.COUNTER_GROUP); 216 if (counterGroup != null) { 217 swap = counterGroup.getCounter(LauncherMapper.COUNTER_DO_ID_SWAP) == 1; 218 } 219 } 220 // additional check for swapped hadoop ID 221 // Can't rely on hadoop counters existing 222 // we'll check for the newID file in hdfs if the hadoop counters is null 223 else { 224 225 Path p = getIdSwapPath(actionDir); 226 // log.debug("Checking for newId file in: [{0}]", p); 227 228 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 229 Configuration conf = has.createJobConf(p.toUri().getAuthority()); 230 FileSystem fs = has.createFileSystem(user, p.toUri(), conf); 231 if (fs.exists(p)) { 232 log.debug("Hadoop Counters is null, but found newID file."); 233 234 swap = true; 235 } 236 else { 237 log.debug("Hadoop Counters is null, and newID file doesn't exist at: [{0}]", p); 238 } 239 } 240 return swap; 241 } 242 243 public static Path getOutputDataPath(Path actionDir) { 244 return new Path(actionDir, LauncherMapper.ACTION_OUTPUT_PROPS); 245 } 246 247 /** 248 * Get the location of stats file 249 * 250 * @param actionDir the action directory 251 * @return the hdfs location of the file 252 */ 253 public static Path getActionStatsDataPath(Path actionDir){ 254 return new Path(actionDir, LauncherMapper.ACTION_STATS_PROPS); 255 } 256 257 /** 258 * Get the location of external Child IDs file 259 * 260 * @param actionDir the action directory 261 * @return the hdfs location of the file 262 */ 263 public static Path getExternalChildIDsDataPath(Path actionDir){ 264 return new Path(actionDir, LauncherMapper.ACTION_EXTERNAL_CHILD_IDS_PROPS); 265 } 266 267 public static Path getErrorPath(Path actionDir) { 268 return new Path(actionDir, LauncherMapper.ACTION_ERROR_PROPS); 269 } 270 271 public static Path getIdSwapPath(Path actionDir) { 272 return new Path(actionDir, LauncherMapper.ACTION_NEW_ID_PROPS); 273 } 274 }