This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import java.io.BufferedReader;
021 import java.io.File;
022 import java.io.FileReader;
023 import java.io.IOException;
024 import java.io.InputStream;
025 import java.io.InputStreamReader;
026 import java.io.OutputStream;
027 import java.io.OutputStreamWriter;
028 import java.io.PrintWriter;
029 import java.io.StringWriter;
030 import java.io.Writer;
031 import java.lang.reflect.InvocationTargetException;
032 import java.lang.reflect.Method;
033 import java.security.Permission;
034 import java.text.MessageFormat;
035 import java.util.Properties;
036 import java.util.StringTokenizer;
037 import java.util.concurrent.ScheduledThreadPoolExecutor;
038 import java.util.concurrent.TimeUnit;
039
040 import org.apache.hadoop.conf.Configuration;
041 import org.apache.hadoop.fs.FileSystem;
042 import org.apache.hadoop.fs.Path;
043 import org.apache.hadoop.mapred.Counters;
044 import org.apache.hadoop.mapred.JobConf;
045 import org.apache.hadoop.mapred.Mapper;
046 import org.apache.hadoop.mapred.OutputCollector;
047 import org.apache.hadoop.mapred.Reporter;
048 import org.apache.hadoop.mapred.RunningJob;
049 import org.apache.oozie.service.HadoopAccessorException;
050 import org.apache.oozie.service.HadoopAccessorService;
051 import org.apache.oozie.service.Services;
052 import org.apache.oozie.util.XLog;
053
054 public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, Runnable {
055
056 public static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class";
057
058 public static final String CONF_OOZIE_ACTION_MAX_OUTPUT_DATA = "oozie.action.max.output.data";
059
060 private static final String CONF_OOZIE_ACTION_MAIN_ARG_COUNT = "oozie.action.main.arg.count";
061 private static final String CONF_OOZIE_ACTION_MAIN_ARG_PREFIX = "oozie.action.main.arg.";
062 private static final String CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE = "oozie.external.stats.max.size";
063
064 private static final String COUNTER_GROUP = "oozie.launcher";
065 private static final String COUNTER_DO_ID_SWAP = "oozie.do.id.swap";
066 private static final String COUNTER_OUTPUT_DATA = "oozie.output.data";
067 private static final String COUNTER_STATS_DATA = "oozie.stats.data";
068 private static final String COUNTER_LAUNCHER_ERROR = "oozie.launcher.error";
069
070 private static final String OOZIE_JOB_ID = "oozie.job.id";
071 private static final String OOZIE_ACTION_ID = "oozie.action.id";
072
073 private static final String OOZIE_ACTION_DIR_PATH = "oozie.action.dir.path";
074 private static final String OOZIE_ACTION_RECOVERY_ID = "oozie.action.recovery.id";
075
076 public static final String ACTION_PREFIX = "oozie.action.";
077 public static final String EXTERNAL_CHILD_IDS = ACTION_PREFIX + "externalChildIDs.properties";
078 public static final String EXTERNAL_ACTION_STATS = ACTION_PREFIX + "stats.properties";
079
080 static final String ACTION_CONF_XML = "action.xml";
081 public static final String ACTION_PREPARE_XML = "oozie.action.prepare.xml";
082 private static final String ACTION_OUTPUT_PROPS = "output.properties";
083 private static final String ACTION_STATS_PROPS = "stats.properties";
084 private static final String ACTION_EXTERNAL_CHILD_IDS_PROPS = "externalChildIds.properties";
085 private static final String ACTION_NEW_ID_PROPS = "newId.properties";
086 private static final String ACTION_ERROR_PROPS = "error.properties";
087
088 private void setRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId) throws LauncherException {
089 try {
090 FileSystem fs = FileSystem.get(launcherConf);
091 String jobId = launcherConf.get("mapred.job.id");
092 Path path = new Path(actionDir, recoveryId);
093 if (!fs.exists(path)) {
094 try {
095 Writer writer = new OutputStreamWriter(fs.create(path));
096 writer.write(jobId);
097 writer.close();
098 }
099 catch (IOException ex) {
100 failLauncher(0, "IO error", ex);
101 }
102 }
103 else {
104 InputStream is = fs.open(path);
105 BufferedReader reader = new BufferedReader(new InputStreamReader(is));
106 String id = reader.readLine();
107 reader.close();
108 if (!jobId.equals(id)) {
109 failLauncher(0, MessageFormat.format(
110 "Hadoop job Id mismatch, action file [{0}] declares Id [{1}] current Id [{2}]", path, id,
111 jobId), null);
112 }
113
114 }
115 }
116 catch (IOException ex) {
117 failLauncher(0, "IO error", ex);
118 }
119 }
120
121 /**
122 * @param launcherConf
123 * @param actionDir
124 * @param recoveryId
125 * @return
126 * @throws HadoopAccessorException
127 * @throws IOException
128 */
129 public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId)
130 throws HadoopAccessorException, IOException {
131 String jobId = null;
132 Path recoveryFile = new Path(actionDir, recoveryId);
133 //FileSystem fs = FileSystem.get(launcherConf);
134 FileSystem fs = Services.get().get(HadoopAccessorService.class)
135 .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf);
136
137 if (fs.exists(recoveryFile)) {
138 InputStream is = fs.open(recoveryFile);
139 BufferedReader reader = new BufferedReader(new InputStreamReader(is));
140 jobId = reader.readLine();
141 reader.close();
142 }
143 return jobId;
144
145 }
146
147 public static void setupMainClass(Configuration launcherConf, String javaMainClass) {
148 launcherConf.set(CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass);
149 }
150
151 public static void setupMainArguments(Configuration launcherConf, String[] args) {
152 launcherConf.setInt(CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length);
153 for (int i = 0; i < args.length; i++) {
154 launcherConf.set(CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]);
155 }
156 }
157
158 public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) {
159 launcherConf.setInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData);
160 }
161
162 /**
163 * Set the maximum value of stats data
164 *
165 * @param launcherConf the oozie launcher configuration
166 * @param maxStatsData the maximum allowed size of stats data
167 */
168 public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){
169 launcherConf.setInt(CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData);
170 }
171
172 /**
173 * @param launcherConf
174 * @param jobId
175 * @param actionId
176 * @param actionDir
177 * @param recoveryId
178 * @param actionConf
179 * @throws IOException
180 * @throws HadoopAccessorException
181 */
182 public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir,
183 String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException {
184
185 launcherConf.setMapperClass(LauncherMapper.class);
186 launcherConf.setSpeculativeExecution(false);
187 launcherConf.setNumMapTasks(1);
188 launcherConf.setNumReduceTasks(0);
189
190 launcherConf.set(OOZIE_JOB_ID, jobId);
191 launcherConf.set(OOZIE_ACTION_ID, actionId);
192 launcherConf.set(OOZIE_ACTION_DIR_PATH, actionDir.toString());
193 launcherConf.set(OOZIE_ACTION_RECOVERY_ID, recoveryId);
194 launcherConf.set(ACTION_PREPARE_XML, prepareXML);
195
196 actionConf.set(OOZIE_JOB_ID, jobId);
197 actionConf.set(OOZIE_ACTION_ID, actionId);
198
199 FileSystem fs =
200 Services.get().get(HadoopAccessorService.class).createFileSystem(launcherConf.get("user.name"),
201 actionDir.toUri(), launcherConf);
202 fs.mkdirs(actionDir);
203
204 OutputStream os = fs.create(new Path(actionDir, ACTION_CONF_XML));
205 actionConf.writeXml(os);
206 os.close();
207
208 Path inputDir = new Path(actionDir, "input");
209 fs.mkdirs(inputDir);
210 Writer writer = new OutputStreamWriter(fs.create(new Path(inputDir, "dummy.txt")));
211 writer.write("dummy");
212 writer.close();
213
214 launcherConf.set("mapred.input.dir", inputDir.toString());
215 launcherConf.set("mapred.output.dir", new Path(actionDir, "output").toString());
216 }
217
218 public static boolean isMainDone(RunningJob runningJob) throws IOException {
219 return runningJob.isComplete();
220 }
221
222 public static boolean isMainSuccessful(RunningJob runningJob) throws IOException {
223 boolean succeeded = runningJob.isSuccessful();
224 if (succeeded) {
225 Counters counters = runningJob.getCounters();
226 if (counters != null) {
227 Counters.Group group = counters.getGroup(COUNTER_GROUP);
228 if (group != null) {
229 succeeded = group.getCounter(COUNTER_LAUNCHER_ERROR) == 0;
230 }
231 }
232 }
233 return succeeded;
234 }
235
236 public static boolean hasOutputData(RunningJob runningJob) throws IOException {
237 boolean output = false;
238 Counters counters = runningJob.getCounters();
239 if (counters != null) {
240 Counters.Group group = counters.getGroup(COUNTER_GROUP);
241 if (group != null) {
242 output = group.getCounter(COUNTER_OUTPUT_DATA) == 1;
243 }
244 }
245 return output;
246 }
247
248 /**
249 * Check whether runningJob has stats data or not
250 *
251 * @param runningJob the runningJob
252 * @return returns whether the running Job has stats data or not
253 * @throws IOException
254 */
255 public static boolean hasStatsData(RunningJob runningJob) throws IOException{
256 boolean output = false;
257 Counters counters = runningJob.getCounters();
258 if (counters != null) {
259 Counters.Group group = counters.getGroup(COUNTER_GROUP);
260 if (group != null) {
261 output = group.getCounter(COUNTER_STATS_DATA) == 1;
262 }
263 }
264 return output;
265 }
266
267 /**
268 * @param runningJob
269 * @return
270 * @throws IOException
271 */
272 public static boolean hasIdSwap(RunningJob runningJob) throws IOException {
273 boolean swap = false;
274 Counters counters = runningJob.getCounters();
275 if (counters != null) {
276 Counters.Group group = counters.getGroup(COUNTER_GROUP);
277 if (group != null) {
278 swap = group.getCounter(COUNTER_DO_ID_SWAP) == 1;
279 }
280 }
281 return swap;
282 }
283
284 /**
285 * @param runningJob
286 * @param user
287 * @param group
288 * @param actionDir
289 * @return
290 * @throws IOException
291 * @throws HadoopAccessorException
292 */
293 public static boolean hasIdSwap(RunningJob runningJob, String user, String group, Path actionDir)
294 throws IOException, HadoopAccessorException {
295 boolean swap = false;
296
297 XLog log = XLog.getLog("org.apache.oozie.action.hadoop.LauncherMapper");
298
299 Counters counters = runningJob.getCounters();
300 if (counters != null) {
301 Counters.Group counterGroup = counters.getGroup(COUNTER_GROUP);
302 if (counterGroup != null) {
303 swap = counterGroup.getCounter(COUNTER_DO_ID_SWAP) == 1;
304 }
305 }
306 // additional check for swapped hadoop ID
307 // Can't rely on hadoop counters existing
308 // we'll check for the newID file in hdfs if the hadoop counters is null
309 else {
310
311 Path p = getIdSwapPath(actionDir);
312 // log.debug("Checking for newId file in: [{0}]", p);
313
314 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, p.toUri(),
315 new Configuration());
316 if (fs.exists(p)) {
317 log.debug("Hadoop Counters is null, but found newID file.");
318
319 swap = true;
320 }
321 else {
322 log.debug("Hadoop Counters is null, and newID file doesn't exist at: [{0}]", p);
323 }
324 }
325 return swap;
326 }
327
328 public static Path getOutputDataPath(Path actionDir) {
329 return new Path(actionDir, ACTION_OUTPUT_PROPS);
330 }
331
332 /**
333 * Get the location of stats file
334 *
335 * @param actionDir the action directory
336 * @return the hdfs location of the file
337 */
338 public static Path getActionStatsDataPath(Path actionDir){
339 return new Path(actionDir, ACTION_STATS_PROPS);
340 }
341
342 /**
343 * Get the location of external Child IDs file
344 *
345 * @param actionDir the action directory
346 * @return the hdfs location of the file
347 */
348 public static Path getExternalChildIDsDataPath(Path actionDir){
349 return new Path(actionDir, ACTION_EXTERNAL_CHILD_IDS_PROPS);
350 }
351
352 public static Path getErrorPath(Path actionDir) {
353 return new Path(actionDir, ACTION_ERROR_PROPS);
354 }
355
356 public static Path getIdSwapPath(Path actionDir) {
357 return new Path(actionDir, ACTION_NEW_ID_PROPS);
358 }
359
360 private JobConf jobConf;
361 private Path actionDir;
362 private ScheduledThreadPoolExecutor timer;
363
364 private boolean configFailure = false;
365 private LauncherException configureFailureEx;
366 public LauncherMapper() {
367 }
368
369 @Override
370 public void configure(JobConf jobConf) {
371 System.out.println();
372 System.out.println("Oozie Launcher starts");
373 System.out.println();
374 this.jobConf = jobConf;
375 actionDir = new Path(getJobConf().get(OOZIE_ACTION_DIR_PATH));
376 String recoveryId = jobConf.get(OOZIE_ACTION_RECOVERY_ID, null);
377 try {
378 setRecoveryId(jobConf, actionDir, recoveryId);
379 }
380 catch (LauncherException ex) {
381 System.out.println("Launcher config error "+ex.getMessage());
382 configureFailureEx = ex;
383 configFailure = true;
384 }
385 }
386
387 @Override
388 public void map(K1 key, V1 value, OutputCollector<K2, V2> collector, Reporter reporter) throws IOException {
389 try {
390 if (configFailure) {
391 throw configureFailureEx;
392 }
393 else {
394 String mainClass = getJobConf().get(CONF_OOZIE_ACTION_MAIN_CLASS);
395 String msgPrefix = "Main class [" + mainClass + "], ";
396 int errorCode = 0;
397 Throwable errorCause = null;
398 String errorMessage = null;
399
400 try {
401 new LauncherSecurityManager();
402 }
403 catch (SecurityException ex) {
404 errorMessage = "Could not set LauncherSecurityManager";
405 errorCause = ex;
406 }
407
408 try {
409 setupHeartBeater(reporter);
410
411 setupMainConfiguration();
412
413 try {
414 System.out.println("Starting the execution of prepare actions");
415 executePrepare();
416 System.out.println("Completed the execution of prepare actions successfully");
417 } catch (Exception ex) {
418 System.out.println("Prepare execution in the Launcher Mapper has failed");
419 throw new LauncherException(ex.getMessage(), ex);
420 }
421
422 String[] args = getMainArguments(getJobConf());
423
424 printContentsOfCurrentDir();
425
426 System.out.println();
427 System.out.println("Oozie Java/Map-Reduce/Pig action launcher-job configuration");
428 System.out.println("=================================================================");
429 System.out.println("Workflow job id : " + System.getProperty("oozie.job.id"));
430 System.out.println("Workflow action id: " + System.getProperty("oozie.action.id"));
431 System.out.println();
432 System.out.println("Classpath :");
433 System.out.println("------------------------");
434 StringTokenizer st = new StringTokenizer(System.getProperty("java.class.path"), ":");
435 while (st.hasMoreTokens()) {
436 System.out.println(" " + st.nextToken());
437 }
438 System.out.println("------------------------");
439 System.out.println();
440 System.out.println("Main class : " + mainClass);
441 System.out.println();
442 System.out.println("Maximum output : "
443 + getJobConf().getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024));
444 System.out.println();
445 System.out.println("Arguments :");
446 for (String arg : args) {
447 System.out.println(" " + arg);
448 }
449
450 System.out.println();
451 System.out.println("Java System Properties:");
452 System.out.println("------------------------");
453 System.getProperties().store(System.out, "");
454 System.out.flush();
455 System.out.println("------------------------");
456 System.out.println();
457
458 System.out.println("=================================================================");
459 System.out.println();
460 System.out.println(">>> Invoking Main class now >>>");
461 System.out.println();
462 System.out.flush();
463
464 try {
465 Class klass = getJobConf().getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class);
466 Method mainMethod = klass.getMethod("main", String[].class);
467 mainMethod.invoke(null, (Object) args);
468 }
469 catch (InvocationTargetException ex) {
470 if (LauncherMainException.class.isInstance(ex.getCause())) {
471 errorMessage = msgPrefix + "exit code [" +((LauncherMainException)ex.getCause()).getErrorCode()
472 + "]";
473 errorCause = null;
474 }
475 else if (SecurityException.class.isInstance(ex.getCause())) {
476 if (LauncherSecurityManager.getExitInvoked()) {
477 System.out.println("Intercepting System.exit(" + LauncherSecurityManager.getExitCode()
478 + ")");
479 System.err.println("Intercepting System.exit(" + LauncherSecurityManager.getExitCode()
480 + ")");
481 // if 0 main() method finished successfully
482 // ignoring
483 errorCode = LauncherSecurityManager.getExitCode();
484 if (errorCode != 0) {
485 errorMessage = msgPrefix + "exit code [" + errorCode + "]";
486 errorCause = null;
487 }
488 }
489 }
490 else {
491 throw ex;
492 }
493 }
494 finally {
495 System.out.println();
496 System.out.println("<<< Invocation of Main class completed <<<");
497 System.out.println();
498 }
499 if (errorMessage == null) {
500 File outputData = new File(System.getProperty("oozie.action.output.properties"));
501 FileSystem fs = FileSystem.get(getJobConf());
502 if (outputData.exists()) {
503
504 fs.copyFromLocalFile(new Path(outputData.toString()), new Path(actionDir,
505 ACTION_OUTPUT_PROPS));
506 reporter.incrCounter(COUNTER_GROUP, COUNTER_OUTPUT_DATA, 1);
507
508 int maxOutputData = getJobConf().getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024);
509 if (outputData.length() > maxOutputData) {
510 String msg = MessageFormat.format("Output data size [{0}] exceeds maximum [{1}]",
511 outputData.length(), maxOutputData);
512 failLauncher(0, msg, null);
513 }
514 System.out.println();
515 System.out.println("Oozie Launcher, capturing output data:");
516 System.out.println("=======================");
517 Properties props = new Properties();
518 props.load(new FileReader(outputData));
519 props.store(System.out, "");
520 System.out.println();
521 System.out.println("=======================");
522 System.out.println();
523 }
524 handleActionStatsData(fs, reporter);
525 handleExternalChildIDs(fs, reporter);
526 File newId = new File(System.getProperty("oozie.action.newId.properties"));
527 if (newId.exists()) {
528 Properties props = new Properties();
529 props.load(new FileReader(newId));
530 if (props.getProperty("id") == null) {
531 throw new IllegalStateException("ID swap file does not have [id] property");
532 }
533 fs = FileSystem.get(getJobConf());
534 fs.copyFromLocalFile(new Path(newId.toString()), new Path(actionDir, ACTION_NEW_ID_PROPS));
535 reporter.incrCounter(COUNTER_GROUP, COUNTER_DO_ID_SWAP, 1);
536
537 System.out.println("Oozie Launcher, copying new Hadoop job id to file: "
538 + new Path(actionDir, ACTION_NEW_ID_PROPS).toUri());
539
540 System.out.println();
541 System.out.println("Oozie Launcher, propagating new Hadoop job id to Oozie");
542 System.out.println("=======================");
543 System.out.println("id: " + props.getProperty("id"));
544 System.out.println("=======================");
545 System.out.println();
546 }
547 }
548 }
549 catch (NoSuchMethodException ex) {
550 errorMessage = msgPrefix + "main() method not found";
551 errorCause = ex;
552 }
553 catch (InvocationTargetException ex) {
554 errorMessage = msgPrefix + "main() threw exception";
555 errorCause = ex.getTargetException();
556 }
557 catch (Throwable ex) {
558 errorMessage = msgPrefix + "exception invoking main()";
559 errorCause = ex;
560 }
561 finally {
562 destroyHeartBeater();
563 if (errorMessage != null) {
564 failLauncher(errorCode, errorMessage, errorCause);
565 }
566 }
567 }
568 }
569 catch (LauncherException ex) {
570 reporter.incrCounter(COUNTER_GROUP, COUNTER_LAUNCHER_ERROR, 1);
571 System.out.println();
572 System.out.println("Oozie Launcher failed, finishing Hadoop job gracefully");
573 System.out.println();
574 }
575 }
576
577 @Override
578 public void close() throws IOException {
579 System.out.println();
580 System.out.println("Oozie Launcher ends");
581 System.out.println();
582 }
583
584 protected JobConf getJobConf() {
585 return jobConf;
586 }
587
588 private void handleActionStatsData(FileSystem fs, Reporter reporter) throws IOException, LauncherException{
589 File actionStatsData = new File(System.getProperty(EXTERNAL_ACTION_STATS));
590 // If stats are stored by the action, then stats file should exist
591 if (actionStatsData.exists()) {
592 int statsMaxOutputData = getJobConf().getInt(CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE,
593 Integer.MAX_VALUE);
594 reporter.incrCounter(COUNTER_GROUP, COUNTER_STATS_DATA, 1);
595 // fail the launcher if size of stats is greater than the maximum allowed size
596 if (actionStatsData.length() > statsMaxOutputData) {
597 String msg = MessageFormat.format("Output stats size [{0}] exceeds maximum [{1}]",
598 actionStatsData.length(), statsMaxOutputData);
599 failLauncher(0, msg, null);
600 }
601 // copy the stats file to hdfs path which can be accessed by Oozie server
602 fs.copyFromLocalFile(new Path(actionStatsData.toString()), new Path(actionDir,
603 ACTION_STATS_PROPS));
604 }
605 }
606
607 private void handleExternalChildIDs(FileSystem fs, Reporter reporter) throws IOException {
608 File externalChildIDs = new File(System.getProperty(EXTERNAL_CHILD_IDS));
609 // if external ChildIDs are stored by the action, then the file should exist
610 if (externalChildIDs.exists()) {
611 // copy the externalChildIDs file to hdfs path which can be accessed by Oozie server
612 fs.copyFromLocalFile(new Path(externalChildIDs.toString()), new Path(actionDir,
613 ACTION_EXTERNAL_CHILD_IDS_PROPS));
614 }
615 }
616
617 private void setupMainConfiguration() throws IOException {
618 FileSystem fs = FileSystem.get(getJobConf());
619 fs.copyToLocalFile(new Path(getJobConf().get(OOZIE_ACTION_DIR_PATH), ACTION_CONF_XML), new Path(new File(
620 ACTION_CONF_XML).getAbsolutePath()));
621
622 System.setProperty("oozie.launcher.job.id", getJobConf().get("mapred.job.id"));
623 System.setProperty("oozie.job.id", getJobConf().get(OOZIE_JOB_ID));
624 System.setProperty("oozie.action.id", getJobConf().get(OOZIE_ACTION_ID));
625 System.setProperty("oozie.action.conf.xml", new File(ACTION_CONF_XML).getAbsolutePath());
626 System.setProperty("oozie.action.output.properties", new File(ACTION_OUTPUT_PROPS).getAbsolutePath());
627 System.setProperty(EXTERNAL_ACTION_STATS, new File(ACTION_STATS_PROPS).getAbsolutePath());
628 System.setProperty(EXTERNAL_CHILD_IDS, new File(ACTION_EXTERNAL_CHILD_IDS_PROPS).getAbsolutePath());
629 System.setProperty("oozie.action.newId.properties", new File(ACTION_NEW_ID_PROPS).getAbsolutePath());
630 }
631
632 // Method to execute the prepare actions
633 private void executePrepare() throws IOException, LauncherException {
634 String prepareXML = getJobConf().get(ACTION_PREPARE_XML);
635 if (prepareXML != null) {
636 if (!prepareXML.equals("")) {
637 PrepareActionsDriver.doOperations(prepareXML);
638 } else {
639 System.out.println("There are no prepare actions to execute.");
640 }
641 }
642 }
643
644 public static String[] getMainArguments(Configuration conf) {
645 String[] args = new String[conf.getInt(CONF_OOZIE_ACTION_MAIN_ARG_COUNT, 0)];
646 for (int i = 0; i < args.length; i++) {
647 args[i] = conf.get(CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i);
648 }
649 return args;
650 }
651
652 private void setupHeartBeater(Reporter reporter) {
653 timer = new ScheduledThreadPoolExecutor(1);
654 timer.scheduleAtFixedRate(new LauncherMapper(reporter), 0, 30, TimeUnit.SECONDS);
655 }
656
657 private void destroyHeartBeater() {
658 timer.shutdownNow();
659 }
660
661 private Reporter reporter;
662
663 private LauncherMapper(Reporter reporter) {
664 this.reporter = reporter;
665 }
666
667 @Override
668 public void run() {
669 System.out.println("Heart beat");
670 reporter.progress();
671 }
672
673 private void failLauncher(int errorCode, String reason, Throwable ex) throws LauncherException {
674 try {
675 if (ex != null) {
676 reason += ", " + ex.getMessage();
677 }
678 Properties errorProps = new Properties();
679 errorProps.setProperty("error.code", Integer.toString(errorCode));
680 errorProps.setProperty("error.reason", reason);
681 if (ex != null) {
682 if (ex.getMessage() != null) {
683 errorProps.setProperty("exception.message", ex.getMessage());
684 }
685 StringWriter sw = new StringWriter();
686 PrintWriter pw = new PrintWriter(sw);
687 ex.printStackTrace(pw);
688 pw.close();
689 errorProps.setProperty("exception.stacktrace", sw.toString());
690 }
691 FileSystem fs = FileSystem.get(getJobConf());
692 OutputStream os = fs.create(new Path(actionDir, ACTION_ERROR_PROPS));
693 errorProps.store(os, "");
694 os.close();
695
696 System.out.print("Failing Oozie Launcher, " + reason + "\n");
697 System.err.print("Failing Oozie Launcher, " + reason + "\n");
698 if (ex != null) {
699 ex.printStackTrace(System.out);
700 ex.printStackTrace(System.err);
701 }
702 throw new LauncherException(reason, ex);
703 }
704 catch (IOException rex) {
705 throw new RuntimeException("Error while failing launcher, " + rex.getMessage(), rex);
706 }
707 }
708
709 /**
710 * Print files and directories in current directory. Will list files in the sub-directory (only 1 level deep)
711 */
712 protected void printContentsOfCurrentDir() {
713 File folder = new File(".");
714 System.out.println();
715 System.out.println("Files in current dir:" + folder.getAbsolutePath());
716 System.out.println("======================");
717
718 File[] listOfFiles = folder.listFiles();
719 for (File fileName : listOfFiles) {
720 if (fileName.isFile()) {
721 System.out.println("File: " + fileName.getName());
722 }
723 else if (fileName.isDirectory()) {
724 System.out.println("Dir: " + fileName.getName());
725 File subDir = new File(fileName.getName());
726 File[] moreFiles = subDir.listFiles();
727 for (File subFileName : moreFiles) {
728 if (subFileName.isFile()) {
729 System.out.println(" File: " + subFileName.getName());
730 }
731 else if (subFileName.isDirectory()) {
732 System.out.println(" Dir: " + subFileName.getName());
733 }
734 }
735 }
736 }
737 }
738
739 }
740
741 class LauncherSecurityManager extends SecurityManager {
742 private static boolean exitInvoked;
743 private static int exitCode;
744 private SecurityManager securityManager;
745
746 public LauncherSecurityManager() {
747 reset();
748 securityManager = System.getSecurityManager();
749 System.setSecurityManager(this);
750 }
751
752 @Override
753 public void checkPermission(Permission perm, Object context) {
754 if (securityManager != null) {
755 // check everything with the original SecurityManager
756 securityManager.checkPermission(perm, context);
757 }
758 }
759
760 @Override
761 public void checkPermission(Permission perm) {
762 if (securityManager != null) {
763 // check everything with the original SecurityManager
764 securityManager.checkPermission(perm);
765 }
766 }
767
768 @Override
769 public void checkExit(int status) throws SecurityException {
770 exitInvoked = true;
771 exitCode = status;
772 throw new SecurityException("Intercepted System.exit(" + status + ")");
773 }
774
775 public static boolean getExitInvoked() {
776 return exitInvoked;
777 }
778
779 public static int getExitCode() {
780 return exitCode;
781 }
782
783 public static void reset() {
784 exitInvoked = false;
785 exitCode = 0;
786 }
787 }
788
789 class LauncherException extends Exception {
790
791 LauncherException(String message) {
792 super(message);
793 }
794
795 LauncherException(String message, Throwable cause) {
796 super(message, cause);
797 }
798 }