This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import java.io.BufferedReader;
021 import java.io.File;
022 import java.io.FileReader;
023 import java.io.IOException;
024 import java.io.InputStream;
025 import java.io.InputStreamReader;
026 import java.io.OutputStream;
027 import java.io.OutputStreamWriter;
028 import java.io.PrintWriter;
029 import java.io.StringWriter;
030 import java.io.Writer;
031 import java.lang.reflect.InvocationTargetException;
032 import java.lang.reflect.Method;
033 import java.net.URI;
034 import java.security.Permission;
035 import java.text.MessageFormat;
036 import java.util.ArrayList;
037 import java.util.Collection;
038 import java.util.List;
039 import java.util.Properties;
040 import java.util.StringTokenizer;
041 import java.util.concurrent.ScheduledThreadPoolExecutor;
042 import java.util.concurrent.TimeUnit;
043
044 import org.apache.hadoop.conf.Configuration;
045 import org.apache.hadoop.fs.FileSystem;
046 import org.apache.hadoop.fs.Path;
047 import org.apache.hadoop.mapred.Counters;
048 import org.apache.hadoop.mapred.JobConf;
049 import org.apache.hadoop.mapred.Mapper;
050 import org.apache.hadoop.mapred.OutputCollector;
051 import org.apache.hadoop.mapred.Reporter;
052 import org.apache.hadoop.mapred.RunningJob;
053 import org.apache.oozie.service.HadoopAccessorException;
054 import org.apache.oozie.service.HadoopAccessorService;
055 import org.apache.oozie.service.Services;
056 import org.apache.oozie.util.XLog;
057
058 public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, Runnable {
059
060 public static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class";
061 public static final String CONF_OOZIE_ACTION_SUPPORTED_FILESYSTEMS = "oozie.launcher.action.supported.filesystems";
062
063 public static final String CONF_OOZIE_ACTION_MAX_OUTPUT_DATA = "oozie.action.max.output.data";
064
065 private static final String CONF_OOZIE_ACTION_MAIN_ARG_COUNT = "oozie.action.main.arg.count";
066 private static final String CONF_OOZIE_ACTION_MAIN_ARG_PREFIX = "oozie.action.main.arg.";
067 private static final String CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE = "oozie.external.stats.max.size";
068
069 private static final String COUNTER_GROUP = "oozie.launcher";
070 private static final String COUNTER_DO_ID_SWAP = "oozie.do.id.swap";
071 private static final String COUNTER_OUTPUT_DATA = "oozie.output.data";
072 private static final String COUNTER_STATS_DATA = "oozie.stats.data";
073 private static final String COUNTER_LAUNCHER_ERROR = "oozie.launcher.error";
074
075 private static final String OOZIE_JOB_ID = "oozie.job.id";
076 private static final String OOZIE_ACTION_ID = "oozie.action.id";
077
078 private static final String OOZIE_ACTION_DIR_PATH = "oozie.action.dir.path";
079 private static final String OOZIE_ACTION_RECOVERY_ID = "oozie.action.recovery.id";
080
081 public static final String ACTION_PREFIX = "oozie.action.";
082 public static final String EXTERNAL_CHILD_IDS = ACTION_PREFIX + "externalChildIDs.properties";
083 public static final String EXTERNAL_ACTION_STATS = ACTION_PREFIX + "stats.properties";
084
085 static final String ACTION_CONF_XML = "action.xml";
086 public static final String ACTION_PREPARE_XML = "oozie.action.prepare.xml";
087 private static final String ACTION_OUTPUT_PROPS = "output.properties";
088 private static final String ACTION_STATS_PROPS = "stats.properties";
089 private static final String ACTION_EXTERNAL_CHILD_IDS_PROPS = "externalChildIds.properties";
090 private static final String ACTION_NEW_ID_PROPS = "newId.properties";
091 private static final String ACTION_ERROR_PROPS = "error.properties";
092
093 private void setRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId) throws LauncherException {
094 try {
095 String jobId = launcherConf.get("mapred.job.id");
096 Path path = new Path(actionDir, recoveryId);
097 FileSystem fs = FileSystem.get(path.toUri(), launcherConf);
098 if (!fs.exists(path)) {
099 try {
100 Writer writer = new OutputStreamWriter(fs.create(path));
101 writer.write(jobId);
102 writer.close();
103 }
104 catch (IOException ex) {
105 failLauncher(0, "IO error", ex);
106 }
107 }
108 else {
109 InputStream is = fs.open(path);
110 BufferedReader reader = new BufferedReader(new InputStreamReader(is));
111 String id = reader.readLine();
112 reader.close();
113 if (!jobId.equals(id)) {
114 failLauncher(0, MessageFormat.format(
115 "Hadoop job Id mismatch, action file [{0}] declares Id [{1}] current Id [{2}]", path, id,
116 jobId), null);
117 }
118
119 }
120 }
121 catch (IOException ex) {
122 failLauncher(0, "IO error", ex);
123 }
124 }
125
126 /**
127 * @param launcherConf
128 * @param actionDir
129 * @param recoveryId
130 * @return
131 * @throws HadoopAccessorException
132 * @throws IOException
133 */
134 public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId)
135 throws HadoopAccessorException, IOException {
136 String jobId = null;
137 Path recoveryFile = new Path(actionDir, recoveryId);
138 FileSystem fs = Services.get().get(HadoopAccessorService.class)
139 .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf);
140
141 if (fs.exists(recoveryFile)) {
142 InputStream is = fs.open(recoveryFile);
143 BufferedReader reader = new BufferedReader(new InputStreamReader(is));
144 jobId = reader.readLine();
145 reader.close();
146 }
147 return jobId;
148
149 }
150
151 public static void setupMainClass(Configuration launcherConf, String javaMainClass) {
152 launcherConf.set(CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass);
153 }
154
155 public static void setupSupportedFileSystems(Configuration launcherConf, String supportedFileSystems) {
156 launcherConf.set(CONF_OOZIE_ACTION_SUPPORTED_FILESYSTEMS, supportedFileSystems);
157 }
158
159 public static void setupMainArguments(Configuration launcherConf, String[] args) {
160 launcherConf.setInt(CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length);
161 for (int i = 0; i < args.length; i++) {
162 launcherConf.set(CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]);
163 }
164 }
165
166 public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) {
167 launcherConf.setInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData);
168 }
169
170 /**
171 * Set the maximum value of stats data
172 *
173 * @param launcherConf the oozie launcher configuration
174 * @param maxStatsData the maximum allowed size of stats data
175 */
176 public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){
177 launcherConf.setInt(CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData);
178 }
179
180 /**
181 * @param launcherConf
182 * @param jobId
183 * @param actionId
184 * @param actionDir
185 * @param recoveryId
186 * @param actionConf
187 * @throws IOException
188 * @throws HadoopAccessorException
189 */
190 public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir,
191 String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException {
192
193 launcherConf.setMapperClass(LauncherMapper.class);
194 launcherConf.setSpeculativeExecution(false);
195 launcherConf.setNumMapTasks(1);
196 launcherConf.setNumReduceTasks(0);
197
198 launcherConf.set(OOZIE_JOB_ID, jobId);
199 launcherConf.set(OOZIE_ACTION_ID, actionId);
200 launcherConf.set(OOZIE_ACTION_DIR_PATH, actionDir.toString());
201 launcherConf.set(OOZIE_ACTION_RECOVERY_ID, recoveryId);
202 launcherConf.set(ACTION_PREPARE_XML, prepareXML);
203
204 actionConf.set(OOZIE_JOB_ID, jobId);
205 actionConf.set(OOZIE_ACTION_ID, actionId);
206
207 if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) {
208 List<String> purgedEntries = new ArrayList<String>();
209 Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files");
210 for (String entry : entries) {
211 if (entry.contains("#")) {
212 purgedEntries.add(entry);
213 }
214 }
215 actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()]));
216 launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true);
217 }
218
219 FileSystem fs =
220 Services.get().get(HadoopAccessorService.class).createFileSystem(launcherConf.get("user.name"),
221 actionDir.toUri(), launcherConf);
222 fs.mkdirs(actionDir);
223
224 OutputStream os = fs.create(new Path(actionDir, ACTION_CONF_XML));
225 actionConf.writeXml(os);
226 os.close();
227
228 Path inputDir = new Path(actionDir, "input");
229 fs.mkdirs(inputDir);
230 Writer writer = new OutputStreamWriter(fs.create(new Path(inputDir, "dummy.txt")));
231 writer.write("dummy");
232 writer.close();
233
234 launcherConf.set("mapred.input.dir", inputDir.toString());
235 launcherConf.set("mapred.output.dir", new Path(actionDir, "output").toString());
236 }
237
238 public static boolean isMainDone(RunningJob runningJob) throws IOException {
239 return runningJob.isComplete();
240 }
241
242 public static boolean isMainSuccessful(RunningJob runningJob) throws IOException {
243 boolean succeeded = runningJob.isSuccessful();
244 if (succeeded) {
245 Counters counters = runningJob.getCounters();
246 if (counters != null) {
247 Counters.Group group = counters.getGroup(COUNTER_GROUP);
248 if (group != null) {
249 succeeded = group.getCounter(COUNTER_LAUNCHER_ERROR) == 0;
250 }
251 }
252 }
253 return succeeded;
254 }
255
256 public static boolean hasOutputData(RunningJob runningJob) throws IOException {
257 boolean output = false;
258 Counters counters = runningJob.getCounters();
259 if (counters != null) {
260 Counters.Group group = counters.getGroup(COUNTER_GROUP);
261 if (group != null) {
262 output = group.getCounter(COUNTER_OUTPUT_DATA) == 1;
263 }
264 }
265 return output;
266 }
267
268 /**
269 * Check whether runningJob has stats data or not
270 *
271 * @param runningJob the runningJob
272 * @return returns whether the running Job has stats data or not
273 * @throws IOException
274 */
275 public static boolean hasStatsData(RunningJob runningJob) throws IOException{
276 boolean output = false;
277 Counters counters = runningJob.getCounters();
278 if (counters != null) {
279 Counters.Group group = counters.getGroup(COUNTER_GROUP);
280 if (group != null) {
281 output = group.getCounter(COUNTER_STATS_DATA) == 1;
282 }
283 }
284 return output;
285 }
286
287 /**
288 * @param runningJob
289 * @return
290 * @throws IOException
291 */
292 public static boolean hasIdSwap(RunningJob runningJob) throws IOException {
293 boolean swap = false;
294 Counters counters = runningJob.getCounters();
295 if (counters != null) {
296 Counters.Group group = counters.getGroup(COUNTER_GROUP);
297 if (group != null) {
298 swap = group.getCounter(COUNTER_DO_ID_SWAP) == 1;
299 }
300 }
301 return swap;
302 }
303
304 /**
305 * @param runningJob
306 * @param user
307 * @param group
308 * @param actionDir
309 * @return
310 * @throws IOException
311 * @throws HadoopAccessorException
312 */
313 public static boolean hasIdSwap(RunningJob runningJob, String user, String group, Path actionDir)
314 throws IOException, HadoopAccessorException {
315 boolean swap = false;
316
317 XLog log = XLog.getLog("org.apache.oozie.action.hadoop.LauncherMapper");
318
319 Counters counters = runningJob.getCounters();
320 if (counters != null) {
321 Counters.Group counterGroup = counters.getGroup(COUNTER_GROUP);
322 if (counterGroup != null) {
323 swap = counterGroup.getCounter(COUNTER_DO_ID_SWAP) == 1;
324 }
325 }
326 // additional check for swapped hadoop ID
327 // Can't rely on hadoop counters existing
328 // we'll check for the newID file in hdfs if the hadoop counters is null
329 else {
330
331 Path p = getIdSwapPath(actionDir);
332 // log.debug("Checking for newId file in: [{0}]", p);
333
334 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
335 Configuration conf = has.createJobConf(p.toUri().getAuthority());
336 FileSystem fs = has.createFileSystem(user, p.toUri(), conf);
337 if (fs.exists(p)) {
338 log.debug("Hadoop Counters is null, but found newID file.");
339
340 swap = true;
341 }
342 else {
343 log.debug("Hadoop Counters is null, and newID file doesn't exist at: [{0}]", p);
344 }
345 }
346 return swap;
347 }
348
349 public static Path getOutputDataPath(Path actionDir) {
350 return new Path(actionDir, ACTION_OUTPUT_PROPS);
351 }
352
353 /**
354 * Get the location of stats file
355 *
356 * @param actionDir the action directory
357 * @return the hdfs location of the file
358 */
359 public static Path getActionStatsDataPath(Path actionDir){
360 return new Path(actionDir, ACTION_STATS_PROPS);
361 }
362
363 /**
364 * Get the location of external Child IDs file
365 *
366 * @param actionDir the action directory
367 * @return the hdfs location of the file
368 */
369 public static Path getExternalChildIDsDataPath(Path actionDir){
370 return new Path(actionDir, ACTION_EXTERNAL_CHILD_IDS_PROPS);
371 }
372
373 public static Path getErrorPath(Path actionDir) {
374 return new Path(actionDir, ACTION_ERROR_PROPS);
375 }
376
377 public static Path getIdSwapPath(Path actionDir) {
378 return new Path(actionDir, ACTION_NEW_ID_PROPS);
379 }
380
381 private JobConf jobConf;
382 private Path actionDir;
383 private ScheduledThreadPoolExecutor timer;
384
385 private boolean configFailure = false;
386 private LauncherException configureFailureEx;
387 public LauncherMapper() {
388 }
389
390 @Override
391 public void configure(JobConf jobConf) {
392 System.out.println();
393 System.out.println("Oozie Launcher starts");
394 System.out.println();
395 this.jobConf = jobConf;
396 actionDir = new Path(getJobConf().get(OOZIE_ACTION_DIR_PATH));
397 String recoveryId = jobConf.get(OOZIE_ACTION_RECOVERY_ID, null);
398 try {
399 setRecoveryId(jobConf, actionDir, recoveryId);
400 }
401 catch (LauncherException ex) {
402 System.out.println("Launcher config error "+ex.getMessage());
403 configureFailureEx = ex;
404 configFailure = true;
405 }
406 }
407
408 @Override
409 public void map(K1 key, V1 value, OutputCollector<K2, V2> collector, Reporter reporter) throws IOException {
410 try {
411 if (configFailure) {
412 throw configureFailureEx;
413 }
414 else {
415 String mainClass = getJobConf().get(CONF_OOZIE_ACTION_MAIN_CLASS);
416 if (getJobConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) {
417 System.err.println("WARNING, workaround for Hadoop 2.0.2-alpha distributed cached issue (MAPREDUCE-4820) enabled");
418 }
419 String msgPrefix = "Main class [" + mainClass + "], ";
420 int errorCode = 0;
421 Throwable errorCause = null;
422 String errorMessage = null;
423
424 try {
425 new LauncherSecurityManager();
426 }
427 catch (SecurityException ex) {
428 errorMessage = "Could not set LauncherSecurityManager";
429 errorCause = ex;
430 }
431
432 try {
433 setupHeartBeater(reporter);
434
435 setupMainConfiguration();
436
437 try {
438 System.out.println("Starting the execution of prepare actions");
439 executePrepare();
440 System.out.println("Completed the execution of prepare actions successfully");
441 } catch (Exception ex) {
442 System.out.println("Prepare execution in the Launcher Mapper has failed");
443 throw new LauncherException(ex.getMessage(), ex);
444 }
445
446 String[] args = getMainArguments(getJobConf());
447
448 printContentsOfCurrentDir();
449
450 System.out.println();
451 System.out.println("Oozie Java/Map-Reduce/Pig action launcher-job configuration");
452 System.out.println("=================================================================");
453 System.out.println("Workflow job id : " + System.getProperty("oozie.job.id"));
454 System.out.println("Workflow action id: " + System.getProperty("oozie.action.id"));
455 System.out.println();
456 System.out.println("Classpath :");
457 System.out.println("------------------------");
458 StringTokenizer st = new StringTokenizer(System.getProperty("java.class.path"), ":");
459 while (st.hasMoreTokens()) {
460 System.out.println(" " + st.nextToken());
461 }
462 System.out.println("------------------------");
463 System.out.println();
464 System.out.println("Main class : " + mainClass);
465 System.out.println();
466 System.out.println("Maximum output : "
467 + getJobConf().getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024));
468 System.out.println();
469 System.out.println("Arguments :");
470 for (String arg : args) {
471 System.out.println(" " + arg);
472 }
473
474 System.out.println();
475 System.out.println("Java System Properties:");
476 System.out.println("------------------------");
477 System.getProperties().store(System.out, "");
478 System.out.flush();
479 System.out.println("------------------------");
480 System.out.println();
481
482 System.out.println("=================================================================");
483 System.out.println();
484 System.out.println(">>> Invoking Main class now >>>");
485 System.out.println();
486 System.out.flush();
487
488 try {
489 Class klass = getJobConf().getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class);
490 Method mainMethod = klass.getMethod("main", String[].class);
491 mainMethod.invoke(null, (Object) args);
492 }
493 catch (InvocationTargetException ex) {
494 if (LauncherMainException.class.isInstance(ex.getCause())) {
495 errorMessage = msgPrefix + "exit code [" +((LauncherMainException)ex.getCause()).getErrorCode()
496 + "]";
497 errorCause = null;
498 }
499 else if (SecurityException.class.isInstance(ex.getCause())) {
500 if (LauncherSecurityManager.getExitInvoked()) {
501 System.out.println("Intercepting System.exit(" + LauncherSecurityManager.getExitCode()
502 + ")");
503 System.err.println("Intercepting System.exit(" + LauncherSecurityManager.getExitCode()
504 + ")");
505 // if 0 main() method finished successfully
506 // ignoring
507 errorCode = LauncherSecurityManager.getExitCode();
508 if (errorCode != 0) {
509 errorMessage = msgPrefix + "exit code [" + errorCode + "]";
510 errorCause = null;
511 }
512 }
513 }
514 else {
515 throw ex;
516 }
517 }
518 finally {
519 System.out.println();
520 System.out.println("<<< Invocation of Main class completed <<<");
521 System.out.println();
522 handleExternalChildIDs(reporter);
523 }
524 if (errorMessage == null) {
525 File outputData = new File(System.getProperty("oozie.action.output.properties"));
526 if (outputData.exists()) {
527 URI actionDirUri = new Path(actionDir, ACTION_OUTPUT_PROPS).toUri();
528 FileSystem fs = FileSystem.get(actionDirUri, getJobConf());
529 fs.copyFromLocalFile(new Path(outputData.toString()), new Path(actionDir,
530 ACTION_OUTPUT_PROPS));
531 reporter.incrCounter(COUNTER_GROUP, COUNTER_OUTPUT_DATA, 1);
532
533 int maxOutputData = getJobConf().getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024);
534 if (outputData.length() > maxOutputData) {
535 String msg = MessageFormat.format("Output data size [{0}] exceeds maximum [{1}]",
536 outputData.length(), maxOutputData);
537 failLauncher(0, msg, null);
538 }
539 System.out.println();
540 System.out.println("Oozie Launcher, capturing output data:");
541 System.out.println("=======================");
542 Properties props = new Properties();
543 props.load(new FileReader(outputData));
544 props.store(System.out, "");
545 System.out.println();
546 System.out.println("=======================");
547 System.out.println();
548 }
549 handleActionStatsData(reporter);
550 File newId = new File(System.getProperty("oozie.action.newId.properties"));
551 if (newId.exists()) {
552 Properties props = new Properties();
553 props.load(new FileReader(newId));
554 if (props.getProperty("id") == null) {
555 throw new IllegalStateException("ID swap file does not have [id] property");
556 }
557 URI actionDirUri = new Path(actionDir, ACTION_NEW_ID_PROPS).toUri();
558 FileSystem fs = FileSystem.get(actionDirUri, getJobConf());
559 fs.copyFromLocalFile(new Path(newId.toString()), new Path(actionDir, ACTION_NEW_ID_PROPS));
560 reporter.incrCounter(COUNTER_GROUP, COUNTER_DO_ID_SWAP, 1);
561
562 System.out.println("Oozie Launcher, copying new Hadoop job id to file: "
563 + new Path(actionDir, ACTION_NEW_ID_PROPS).toUri());
564
565 System.out.println();
566 System.out.println("Oozie Launcher, propagating new Hadoop job id to Oozie");
567 System.out.println("=======================");
568 System.out.println("id: " + props.getProperty("id"));
569 System.out.println("=======================");
570 System.out.println();
571 }
572 }
573 }
574 catch (NoSuchMethodException ex) {
575 errorMessage = msgPrefix + "main() method not found";
576 errorCause = ex;
577 }
578 catch (InvocationTargetException ex) {
579 errorMessage = msgPrefix + "main() threw exception";
580 errorCause = ex.getTargetException();
581 }
582 catch (Throwable ex) {
583 errorMessage = msgPrefix + "exception invoking main()";
584 errorCause = ex;
585 }
586 finally {
587 destroyHeartBeater();
588 if (errorMessage != null) {
589 failLauncher(errorCode, errorMessage, errorCause);
590 }
591 }
592 }
593 }
594 catch (LauncherException ex) {
595 reporter.incrCounter(COUNTER_GROUP, COUNTER_LAUNCHER_ERROR, 1);
596 System.out.println();
597 System.out.println("Oozie Launcher failed, finishing Hadoop job gracefully");
598 System.out.println();
599 }
600 }
601
602 @Override
603 public void close() throws IOException {
604 System.out.println();
605 System.out.println("Oozie Launcher ends");
606 System.out.println();
607 }
608
609 protected JobConf getJobConf() {
610 return jobConf;
611 }
612
613 private void handleActionStatsData(Reporter reporter) throws IOException, LauncherException {
614 File actionStatsData = new File(System.getProperty(EXTERNAL_ACTION_STATS));
615 // If stats are stored by the action, then stats file should exist
616 if (actionStatsData.exists()) {
617 int statsMaxOutputData = getJobConf().getInt(CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE,
618 Integer.MAX_VALUE);
619 reporter.incrCounter(COUNTER_GROUP, COUNTER_STATS_DATA, 1);
620 // fail the launcher if size of stats is greater than the maximum allowed size
621 if (actionStatsData.length() > statsMaxOutputData) {
622 String msg = MessageFormat.format("Output stats size [{0}] exceeds maximum [{1}]",
623 actionStatsData.length(), statsMaxOutputData);
624 failLauncher(0, msg, null);
625 }
626 // copy the stats file to hdfs path which can be accessed by Oozie server
627 URI actionDirUri = new Path(actionDir, ACTION_STATS_PROPS).toUri();
628 FileSystem fs = FileSystem.get(actionDirUri, getJobConf());
629 fs.copyFromLocalFile(new Path(actionStatsData.toString()), new Path(actionDir,
630 ACTION_STATS_PROPS));
631 }
632 }
633
634 private void handleExternalChildIDs(Reporter reporter) throws IOException {
635 File externalChildIDs = new File(System.getProperty(EXTERNAL_CHILD_IDS));
636 // if external ChildIDs are stored by the action, then the file should exist
637 if (externalChildIDs.exists()) {
638 // copy the externalChildIDs file to hdfs path which can be accessed by Oozie server
639 URI actionDirUri = new Path(actionDir, ACTION_EXTERNAL_CHILD_IDS_PROPS).toUri();
640 FileSystem fs = FileSystem.get(actionDirUri, getJobConf());
641 fs.copyFromLocalFile(new Path(externalChildIDs.toString()), new Path(actionDir,
642 ACTION_EXTERNAL_CHILD_IDS_PROPS));
643 }
644 }
645
646 private void setupMainConfiguration() throws IOException, HadoopAccessorException {
647 Path pathNew = new Path(new Path(actionDir, ACTION_CONF_XML),
648 new Path(new File(ACTION_CONF_XML).getAbsolutePath()));
649 FileSystem fs = FileSystem.get(pathNew.toUri(), getJobConf());
650 fs.copyToLocalFile(new Path(actionDir, ACTION_CONF_XML),
651 new Path(new File(ACTION_CONF_XML).getAbsolutePath()));
652
653 System.setProperty("oozie.launcher.job.id", getJobConf().get("mapred.job.id"));
654 System.setProperty("oozie.job.id", getJobConf().get(OOZIE_JOB_ID));
655 System.setProperty("oozie.action.id", getJobConf().get(OOZIE_ACTION_ID));
656 System.setProperty("oozie.action.conf.xml", new File(ACTION_CONF_XML).getAbsolutePath());
657 System.setProperty("oozie.action.output.properties", new File(ACTION_OUTPUT_PROPS).getAbsolutePath());
658 System.setProperty(EXTERNAL_ACTION_STATS, new File(ACTION_STATS_PROPS).getAbsolutePath());
659 System.setProperty(EXTERNAL_CHILD_IDS, new File(ACTION_EXTERNAL_CHILD_IDS_PROPS).getAbsolutePath());
660 System.setProperty("oozie.action.newId.properties", new File(ACTION_NEW_ID_PROPS).getAbsolutePath());
661 }
662
663 // Method to execute the prepare actions
664 private void executePrepare() throws IOException, LauncherException {
665 String prepareXML = getJobConf().get(ACTION_PREPARE_XML);
666 if (prepareXML != null) {
667 if (!prepareXML.equals("")) {
668 PrepareActionsDriver.doOperations(
669 getJobConf().getStringCollection(CONF_OOZIE_ACTION_SUPPORTED_FILESYSTEMS), prepareXML);
670 } else {
671 System.out.println("There are no prepare actions to execute.");
672 }
673 }
674 }
675
676 public static String[] getMainArguments(Configuration conf) {
677 String[] args = new String[conf.getInt(CONF_OOZIE_ACTION_MAIN_ARG_COUNT, 0)];
678 for (int i = 0; i < args.length; i++) {
679 args[i] = conf.get(CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i);
680 }
681 return args;
682 }
683
684 private void setupHeartBeater(Reporter reporter) {
685 timer = new ScheduledThreadPoolExecutor(1);
686 timer.scheduleAtFixedRate(new LauncherMapper(reporter), 0, 30, TimeUnit.SECONDS);
687 }
688
689 private void destroyHeartBeater() {
690 timer.shutdownNow();
691 }
692
693 private Reporter reporter;
694
695 private LauncherMapper(Reporter reporter) {
696 this.reporter = reporter;
697 }
698
699 @Override
700 public void run() {
701 System.out.println("Heart beat");
702 reporter.progress();
703 }
704
705 private void failLauncher(int errorCode, String reason, Throwable ex) throws LauncherException {
706 try {
707 if (ex != null) {
708 reason += ", " + ex.getMessage();
709 }
710 Properties errorProps = new Properties();
711 errorProps.setProperty("error.code", Integer.toString(errorCode));
712 errorProps.setProperty("error.reason", reason);
713 if (ex != null) {
714 if (ex.getMessage() != null) {
715 errorProps.setProperty("exception.message", ex.getMessage());
716 }
717 StringWriter sw = new StringWriter();
718 PrintWriter pw = new PrintWriter(sw);
719 ex.printStackTrace(pw);
720 pw.close();
721 errorProps.setProperty("exception.stacktrace", sw.toString());
722 }
723 FileSystem fs = FileSystem.get((new Path(actionDir, ACTION_ERROR_PROPS)).toUri(), getJobConf());
724 OutputStream os = fs.create(new Path(actionDir, ACTION_ERROR_PROPS));
725 errorProps.store(os, "");
726 os.close();
727
728 System.out.print("Failing Oozie Launcher, " + reason + "\n");
729 System.err.print("Failing Oozie Launcher, " + reason + "\n");
730 if (ex != null) {
731 ex.printStackTrace(System.out);
732 ex.printStackTrace(System.err);
733 }
734 throw new LauncherException(reason, ex);
735 }
736 catch (IOException rex) {
737 throw new RuntimeException("Error while failing launcher, " + rex.getMessage(), rex);
738 }
739 }
740
741 /**
742 * Print files and directories in current directory. Will list files in the sub-directory (only 1 level deep)
743 */
744 protected void printContentsOfCurrentDir() {
745 File folder = new File(".");
746 System.out.println();
747 System.out.println("Files in current dir:" + folder.getAbsolutePath());
748 System.out.println("======================");
749
750 File[] listOfFiles = folder.listFiles();
751 for (File fileName : listOfFiles) {
752 if (fileName.isFile()) {
753 System.out.println("File: " + fileName.getName());
754 }
755 else if (fileName.isDirectory()) {
756 System.out.println("Dir: " + fileName.getName());
757 File subDir = new File(fileName.getName());
758 File[] moreFiles = subDir.listFiles();
759 for (File subFileName : moreFiles) {
760 if (subFileName.isFile()) {
761 System.out.println(" File: " + subFileName.getName());
762 }
763 else if (subFileName.isDirectory()) {
764 System.out.println(" Dir: " + subFileName.getName());
765 }
766 }
767 }
768 }
769 }
770
771 }
772
773 class LauncherSecurityManager extends SecurityManager {
774 private static boolean exitInvoked;
775 private static int exitCode;
776 private SecurityManager securityManager;
777
778 public LauncherSecurityManager() {
779 reset();
780 securityManager = System.getSecurityManager();
781 System.setSecurityManager(this);
782 }
783
784 @Override
785 public void checkPermission(Permission perm, Object context) {
786 if (securityManager != null) {
787 // check everything with the original SecurityManager
788 securityManager.checkPermission(perm, context);
789 }
790 }
791
792 @Override
793 public void checkPermission(Permission perm) {
794 if (securityManager != null) {
795 // check everything with the original SecurityManager
796 securityManager.checkPermission(perm);
797 }
798 }
799
800 @Override
801 public void checkExit(int status) throws SecurityException {
802 exitInvoked = true;
803 exitCode = status;
804 throw new SecurityException("Intercepted System.exit(" + status + ")");
805 }
806
807 public static boolean getExitInvoked() {
808 return exitInvoked;
809 }
810
811 public static int getExitCode() {
812 return exitCode;
813 }
814
815 public static void reset() {
816 exitInvoked = false;
817 exitCode = 0;
818 }
819 }
820
821 class LauncherException extends Exception {
822
823 LauncherException(String message) {
824 super(message);
825 }
826
827 LauncherException(String message, Throwable cause) {
828 super(message, cause);
829 }
830 }