This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import java.io.BufferedReader;
021 import java.io.IOException;
022 import java.io.InputStream;
023 import java.io.InputStreamReader;
024 import java.io.OutputStream;
025 import java.io.OutputStreamWriter;
026 import java.io.Writer;
027 import java.util.ArrayList;
028 import java.util.Collection;
029 import java.util.List;
030 import java.util.Map;
031
032 import org.apache.hadoop.conf.Configuration;
033 import org.apache.hadoop.fs.FileSystem;
034 import org.apache.hadoop.fs.Path;
035 import org.apache.hadoop.mapred.Counters;
036 import org.apache.hadoop.mapred.JobConf;
037 import org.apache.hadoop.mapred.RunningJob;
038 import org.apache.oozie.service.HadoopAccessorException;
039 import org.apache.oozie.service.HadoopAccessorService;
040 import org.apache.oozie.service.Services;
041 import org.apache.oozie.service.URIHandlerService;
042 import org.apache.oozie.util.XLog;
043
044 public class LauncherMapperHelper {
045
046 public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId)
047 throws HadoopAccessorException, IOException {
048 String jobId = null;
049 Path recoveryFile = new Path(actionDir, recoveryId);
050 FileSystem fs = Services.get().get(HadoopAccessorService.class)
051 .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf);
052
053 if (fs.exists(recoveryFile)) {
054 InputStream is = fs.open(recoveryFile);
055 BufferedReader reader = new BufferedReader(new InputStreamReader(is));
056 jobId = reader.readLine();
057 reader.close();
058 }
059 return jobId;
060
061 }
062
063 public static void setupMainClass(Configuration launcherConf, String javaMainClass) {
064 // Only set the javaMainClass if its not null or empty string (should be the case except for java action), this way the user
065 // can override the action's main class via <configuration> property
066 if (javaMainClass != null && !javaMainClass.equals("")) {
067 launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass);
068 }
069 }
070
071 public static void setupLauncherURIHandlerConf(Configuration launcherConf) {
072 for(Map.Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) {
073 launcherConf.set(entry.getKey(), entry.getValue());
074 }
075 }
076
077 public static void setupMainArguments(Configuration launcherConf, String[] args) {
078 launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length);
079 for (int i = 0; i < args.length; i++) {
080 launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]);
081 }
082 }
083
084 public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) {
085 launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData);
086 }
087
088 /**
089 * Set the maximum value of stats data
090 *
091 * @param launcherConf the oozie launcher configuration
092 * @param maxStatsData the maximum allowed size of stats data
093 */
094 public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){
095 launcherConf.setInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData);
096 }
097
098 public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir,
099 String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException {
100
101 launcherConf.setMapperClass(LauncherMapper.class);
102 launcherConf.setSpeculativeExecution(false);
103 launcherConf.setNumMapTasks(1);
104 launcherConf.setNumReduceTasks(0);
105
106 launcherConf.set(LauncherMapper.OOZIE_JOB_ID, jobId);
107 launcherConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId);
108 launcherConf.set(LauncherMapper.OOZIE_ACTION_DIR_PATH, actionDir.toString());
109 launcherConf.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, recoveryId);
110 launcherConf.set(LauncherMapper.ACTION_PREPARE_XML, prepareXML);
111
112 actionConf.set(LauncherMapper.OOZIE_JOB_ID, jobId);
113 actionConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId);
114
115 if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) {
116 List<String> purgedEntries = new ArrayList<String>();
117 Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files");
118 for (String entry : entries) {
119 if (entry.contains("#")) {
120 purgedEntries.add(entry);
121 }
122 }
123 actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()]));
124 launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true);
125 }
126
127 FileSystem fs =
128 Services.get().get(HadoopAccessorService.class).createFileSystem(launcherConf.get("user.name"),
129 actionDir.toUri(), launcherConf);
130 fs.mkdirs(actionDir);
131
132 OutputStream os = fs.create(new Path(actionDir, LauncherMapper.ACTION_CONF_XML));
133 actionConf.writeXml(os);
134 os.close();
135
136 Path inputDir = new Path(actionDir, "input");
137 fs.mkdirs(inputDir);
138 Writer writer = new OutputStreamWriter(fs.create(new Path(inputDir, "dummy.txt")));
139 writer.write("dummy");
140 writer.close();
141
142 launcherConf.set("mapred.input.dir", inputDir.toString());
143 launcherConf.set("mapred.output.dir", new Path(actionDir, "output").toString());
144 }
145
146 public static boolean isMainDone(RunningJob runningJob) throws IOException {
147 return runningJob.isComplete();
148 }
149
150 public static boolean isMainSuccessful(RunningJob runningJob) throws IOException {
151 boolean succeeded = runningJob.isSuccessful();
152 if (succeeded) {
153 Counters counters = runningJob.getCounters();
154 if (counters != null) {
155 Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP);
156 if (group != null) {
157 succeeded = group.getCounter(LauncherMapper.COUNTER_LAUNCHER_ERROR) == 0;
158 }
159 }
160 }
161 return succeeded;
162 }
163
164 public static boolean hasOutputData(RunningJob runningJob) throws IOException {
165 boolean output = false;
166 Counters counters = runningJob.getCounters();
167 if (counters != null) {
168 Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP);
169 if (group != null) {
170 output = group.getCounter(LauncherMapper.COUNTER_OUTPUT_DATA) == 1;
171 }
172 }
173 return output;
174 }
175
176 /**
177 * Check whether runningJob has stats data or not
178 *
179 * @param runningJob the runningJob
180 * @return returns whether the running Job has stats data or not
181 * @throws IOException
182 */
183 public static boolean hasStatsData(RunningJob runningJob) throws IOException{
184 boolean output = false;
185 Counters counters = runningJob.getCounters();
186 if (counters != null) {
187 Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP);
188 if (group != null) {
189 output = group.getCounter(LauncherMapper.COUNTER_STATS_DATA) == 1;
190 }
191 }
192 return output;
193 }
194
195 public static boolean hasIdSwap(RunningJob runningJob) throws IOException {
196 boolean swap = false;
197 Counters counters = runningJob.getCounters();
198 if (counters != null) {
199 Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP);
200 if (group != null) {
201 swap = group.getCounter(LauncherMapper.COUNTER_DO_ID_SWAP) == 1;
202 }
203 }
204 return swap;
205 }
206
207 public static boolean hasIdSwap(RunningJob runningJob, String user, String group, Path actionDir)
208 throws IOException, HadoopAccessorException {
209 boolean swap = false;
210
211 XLog log = XLog.getLog("org.apache.oozie.action.hadoop.LauncherMapper");
212
213 Counters counters = runningJob.getCounters();
214 if (counters != null) {
215 Counters.Group counterGroup = counters.getGroup(LauncherMapper.COUNTER_GROUP);
216 if (counterGroup != null) {
217 swap = counterGroup.getCounter(LauncherMapper.COUNTER_DO_ID_SWAP) == 1;
218 }
219 }
220 // additional check for swapped hadoop ID
221 // Can't rely on hadoop counters existing
222 // we'll check for the newID file in hdfs if the hadoop counters is null
223 else {
224
225 Path p = getIdSwapPath(actionDir);
226 // log.debug("Checking for newId file in: [{0}]", p);
227
228 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
229 Configuration conf = has.createJobConf(p.toUri().getAuthority());
230 FileSystem fs = has.createFileSystem(user, p.toUri(), conf);
231 if (fs.exists(p)) {
232 log.debug("Hadoop Counters is null, but found newID file.");
233
234 swap = true;
235 }
236 else {
237 log.debug("Hadoop Counters is null, and newID file doesn't exist at: [{0}]", p);
238 }
239 }
240 return swap;
241 }
242
243 public static Path getOutputDataPath(Path actionDir) {
244 return new Path(actionDir, LauncherMapper.ACTION_OUTPUT_PROPS);
245 }
246
247 /**
248 * Get the location of stats file
249 *
250 * @param actionDir the action directory
251 * @return the hdfs location of the file
252 */
253 public static Path getActionStatsDataPath(Path actionDir){
254 return new Path(actionDir, LauncherMapper.ACTION_STATS_PROPS);
255 }
256
257 /**
258 * Get the location of external Child IDs file
259 *
260 * @param actionDir the action directory
261 * @return the hdfs location of the file
262 */
263 public static Path getExternalChildIDsDataPath(Path actionDir){
264 return new Path(actionDir, LauncherMapper.ACTION_EXTERNAL_CHILD_IDS_PROPS);
265 }
266
267 public static Path getErrorPath(Path actionDir) {
268 return new Path(actionDir, LauncherMapper.ACTION_ERROR_PROPS);
269 }
270
271 public static Path getIdSwapPath(Path actionDir) {
272 return new Path(actionDir, LauncherMapper.ACTION_NEW_ID_PROPS);
273 }
274 }