001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import java.io.BufferedReader; 021 import java.io.File; 022 import java.io.FileReader; 023 import java.io.IOException; 024 import java.io.InputStream; 025 import java.io.InputStreamReader; 026 import java.io.OutputStream; 027 import java.io.OutputStreamWriter; 028 import java.io.PrintWriter; 029 import java.io.StringWriter; 030 import java.io.Writer; 031 import java.lang.reflect.InvocationTargetException; 032 import java.lang.reflect.Method; 033 import java.security.Permission; 034 import java.text.MessageFormat; 035 import java.util.Properties; 036 import java.util.StringTokenizer; 037 import java.util.concurrent.ScheduledThreadPoolExecutor; 038 import java.util.concurrent.TimeUnit; 039 040 import org.apache.hadoop.conf.Configuration; 041 import org.apache.hadoop.fs.FileSystem; 042 import org.apache.hadoop.fs.Path; 043 import org.apache.hadoop.mapred.Counters; 044 import org.apache.hadoop.mapred.JobConf; 045 import org.apache.hadoop.mapred.Mapper; 046 import org.apache.hadoop.mapred.OutputCollector; 047 import org.apache.hadoop.mapred.Reporter; 048 import org.apache.hadoop.mapred.RunningJob; 049 import org.apache.oozie.service.HadoopAccessorException; 050 import org.apache.oozie.service.HadoopAccessorService; 051 import org.apache.oozie.service.Services; 052 import org.apache.oozie.util.XLog; 053 054 public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, Runnable { 055 056 public static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class"; 057 058 public static final String CONF_OOZIE_ACTION_MAX_OUTPUT_DATA = "oozie.action.max.output.data"; 059 060 private static final String CONF_OOZIE_ACTION_MAIN_ARG_COUNT = "oozie.action.main.arg.count"; 061 private static final String CONF_OOZIE_ACTION_MAIN_ARG_PREFIX = "oozie.action.main.arg."; 062 private static final String CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE = "oozie.external.stats.max.size"; 063 064 private static final String COUNTER_GROUP = "oozie.launcher"; 065 private static final String COUNTER_DO_ID_SWAP = "oozie.do.id.swap"; 066 private static final String COUNTER_OUTPUT_DATA = "oozie.output.data"; 067 private static final String COUNTER_STATS_DATA = "oozie.stats.data"; 068 private static final String COUNTER_LAUNCHER_ERROR = "oozie.launcher.error"; 069 070 private static final String OOZIE_JOB_ID = "oozie.job.id"; 071 private static final String OOZIE_ACTION_ID = "oozie.action.id"; 072 073 private static final String OOZIE_ACTION_DIR_PATH = "oozie.action.dir.path"; 074 private static final String OOZIE_ACTION_RECOVERY_ID = "oozie.action.recovery.id"; 075 076 public static final String ACTION_PREFIX = "oozie.action."; 077 public static final String EXTERNAL_CHILD_IDS = ACTION_PREFIX + "externalChildIDs.properties"; 078 public static final String EXTERNAL_ACTION_STATS = ACTION_PREFIX + "stats.properties"; 079 080 static final String ACTION_CONF_XML = "action.xml"; 081 public static final String ACTION_PREPARE_XML = "oozie.action.prepare.xml"; 082 private static final String ACTION_OUTPUT_PROPS = "output.properties"; 083 private static final String ACTION_STATS_PROPS = "stats.properties"; 084 private static final String ACTION_EXTERNAL_CHILD_IDS_PROPS = "externalChildIds.properties"; 085 private static final String ACTION_NEW_ID_PROPS = "newId.properties"; 086 private static final String ACTION_ERROR_PROPS = "error.properties"; 087 088 private void setRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId) throws LauncherException { 089 try { 090 FileSystem fs = FileSystem.get(launcherConf); 091 String jobId = launcherConf.get("mapred.job.id"); 092 Path path = new Path(actionDir, recoveryId); 093 if (!fs.exists(path)) { 094 try { 095 Writer writer = new OutputStreamWriter(fs.create(path)); 096 writer.write(jobId); 097 writer.close(); 098 } 099 catch (IOException ex) { 100 failLauncher(0, "IO error", ex); 101 } 102 } 103 else { 104 InputStream is = fs.open(path); 105 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 106 String id = reader.readLine(); 107 reader.close(); 108 if (!jobId.equals(id)) { 109 failLauncher(0, MessageFormat.format( 110 "Hadoop job Id mismatch, action file [{0}] declares Id [{1}] current Id [{2}]", path, id, 111 jobId), null); 112 } 113 114 } 115 } 116 catch (IOException ex) { 117 failLauncher(0, "IO error", ex); 118 } 119 } 120 121 /** 122 * @param launcherConf 123 * @param actionDir 124 * @param recoveryId 125 * @return 126 * @throws HadoopAccessorException 127 * @throws IOException 128 */ 129 public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId) 130 throws HadoopAccessorException, IOException { 131 String jobId = null; 132 Path recoveryFile = new Path(actionDir, recoveryId); 133 //FileSystem fs = FileSystem.get(launcherConf); 134 FileSystem fs = Services.get().get(HadoopAccessorService.class) 135 .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf); 136 137 if (fs.exists(recoveryFile)) { 138 InputStream is = fs.open(recoveryFile); 139 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 140 jobId = reader.readLine(); 141 reader.close(); 142 } 143 return jobId; 144 145 } 146 147 public static void setupMainClass(Configuration launcherConf, String javaMainClass) { 148 launcherConf.set(CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass); 149 } 150 151 public static void setupMainArguments(Configuration launcherConf, String[] args) { 152 launcherConf.setInt(CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length); 153 for (int i = 0; i < args.length; i++) { 154 launcherConf.set(CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]); 155 } 156 } 157 158 public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) { 159 launcherConf.setInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData); 160 } 161 162 /** 163 * Set the maximum value of stats data 164 * 165 * @param launcherConf the oozie launcher configuration 166 * @param maxStatsData the maximum allowed size of stats data 167 */ 168 public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){ 169 launcherConf.setInt(CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData); 170 } 171 172 /** 173 * @param launcherConf 174 * @param jobId 175 * @param actionId 176 * @param actionDir 177 * @param recoveryId 178 * @param actionConf 179 * @throws IOException 180 * @throws HadoopAccessorException 181 */ 182 public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir, 183 String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException { 184 185 launcherConf.setMapperClass(LauncherMapper.class); 186 launcherConf.setSpeculativeExecution(false); 187 launcherConf.setNumMapTasks(1); 188 launcherConf.setNumReduceTasks(0); 189 190 launcherConf.set(OOZIE_JOB_ID, jobId); 191 launcherConf.set(OOZIE_ACTION_ID, actionId); 192 launcherConf.set(OOZIE_ACTION_DIR_PATH, actionDir.toString()); 193 launcherConf.set(OOZIE_ACTION_RECOVERY_ID, recoveryId); 194 launcherConf.set(ACTION_PREPARE_XML, prepareXML); 195 196 actionConf.set(OOZIE_JOB_ID, jobId); 197 actionConf.set(OOZIE_ACTION_ID, actionId); 198 199 FileSystem fs = 200 Services.get().get(HadoopAccessorService.class).createFileSystem(launcherConf.get("user.name"), 201 actionDir.toUri(), launcherConf); 202 fs.mkdirs(actionDir); 203 204 OutputStream os = fs.create(new Path(actionDir, ACTION_CONF_XML)); 205 actionConf.writeXml(os); 206 os.close(); 207 208 Path inputDir = new Path(actionDir, "input"); 209 fs.mkdirs(inputDir); 210 Writer writer = new OutputStreamWriter(fs.create(new Path(inputDir, "dummy.txt"))); 211 writer.write("dummy"); 212 writer.close(); 213 214 launcherConf.set("mapred.input.dir", inputDir.toString()); 215 launcherConf.set("mapred.output.dir", new Path(actionDir, "output").toString()); 216 } 217 218 public static boolean isMainDone(RunningJob runningJob) throws IOException { 219 return runningJob.isComplete(); 220 } 221 222 public static boolean isMainSuccessful(RunningJob runningJob) throws IOException { 223 boolean succeeded = runningJob.isSuccessful(); 224 if (succeeded) { 225 Counters counters = runningJob.getCounters(); 226 if (counters != null) { 227 Counters.Group group = counters.getGroup(COUNTER_GROUP); 228 if (group != null) { 229 succeeded = group.getCounter(COUNTER_LAUNCHER_ERROR) == 0; 230 } 231 } 232 } 233 return succeeded; 234 } 235 236 public static boolean hasOutputData(RunningJob runningJob) throws IOException { 237 boolean output = false; 238 Counters counters = runningJob.getCounters(); 239 if (counters != null) { 240 Counters.Group group = counters.getGroup(COUNTER_GROUP); 241 if (group != null) { 242 output = group.getCounter(COUNTER_OUTPUT_DATA) == 1; 243 } 244 } 245 return output; 246 } 247 248 /** 249 * Check whether runningJob has stats data or not 250 * 251 * @param runningJob the runningJob 252 * @return returns whether the running Job has stats data or not 253 * @throws IOException 254 */ 255 public static boolean hasStatsData(RunningJob runningJob) throws IOException{ 256 boolean output = false; 257 Counters counters = runningJob.getCounters(); 258 if (counters != null) { 259 Counters.Group group = counters.getGroup(COUNTER_GROUP); 260 if (group != null) { 261 output = group.getCounter(COUNTER_STATS_DATA) == 1; 262 } 263 } 264 return output; 265 } 266 267 /** 268 * @param runningJob 269 * @return 270 * @throws IOException 271 */ 272 public static boolean hasIdSwap(RunningJob runningJob) throws IOException { 273 boolean swap = false; 274 Counters counters = runningJob.getCounters(); 275 if (counters != null) { 276 Counters.Group group = counters.getGroup(COUNTER_GROUP); 277 if (group != null) { 278 swap = group.getCounter(COUNTER_DO_ID_SWAP) == 1; 279 } 280 } 281 return swap; 282 } 283 284 /** 285 * @param runningJob 286 * @param user 287 * @param group 288 * @param actionDir 289 * @return 290 * @throws IOException 291 * @throws HadoopAccessorException 292 */ 293 public static boolean hasIdSwap(RunningJob runningJob, String user, String group, Path actionDir) 294 throws IOException, HadoopAccessorException { 295 boolean swap = false; 296 297 XLog log = XLog.getLog("org.apache.oozie.action.hadoop.LauncherMapper"); 298 299 Counters counters = runningJob.getCounters(); 300 if (counters != null) { 301 Counters.Group counterGroup = counters.getGroup(COUNTER_GROUP); 302 if (counterGroup != null) { 303 swap = counterGroup.getCounter(COUNTER_DO_ID_SWAP) == 1; 304 } 305 } 306 // additional check for swapped hadoop ID 307 // Can't rely on hadoop counters existing 308 // we'll check for the newID file in hdfs if the hadoop counters is null 309 else { 310 311 Path p = getIdSwapPath(actionDir); 312 // log.debug("Checking for newId file in: [{0}]", p); 313 314 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, p.toUri(), 315 new Configuration()); 316 if (fs.exists(p)) { 317 log.debug("Hadoop Counters is null, but found newID file."); 318 319 swap = true; 320 } 321 else { 322 log.debug("Hadoop Counters is null, and newID file doesn't exist at: [{0}]", p); 323 } 324 } 325 return swap; 326 } 327 328 public static Path getOutputDataPath(Path actionDir) { 329 return new Path(actionDir, ACTION_OUTPUT_PROPS); 330 } 331 332 /** 333 * Get the location of stats file 334 * 335 * @param actionDir the action directory 336 * @return the hdfs location of the file 337 */ 338 public static Path getActionStatsDataPath(Path actionDir){ 339 return new Path(actionDir, ACTION_STATS_PROPS); 340 } 341 342 /** 343 * Get the location of external Child IDs file 344 * 345 * @param actionDir the action directory 346 * @return the hdfs location of the file 347 */ 348 public static Path getExternalChildIDsDataPath(Path actionDir){ 349 return new Path(actionDir, ACTION_EXTERNAL_CHILD_IDS_PROPS); 350 } 351 352 public static Path getErrorPath(Path actionDir) { 353 return new Path(actionDir, ACTION_ERROR_PROPS); 354 } 355 356 public static Path getIdSwapPath(Path actionDir) { 357 return new Path(actionDir, ACTION_NEW_ID_PROPS); 358 } 359 360 private JobConf jobConf; 361 private Path actionDir; 362 private ScheduledThreadPoolExecutor timer; 363 364 private boolean configFailure = false; 365 private LauncherException configureFailureEx; 366 public LauncherMapper() { 367 } 368 369 @Override 370 public void configure(JobConf jobConf) { 371 System.out.println(); 372 System.out.println("Oozie Launcher starts"); 373 System.out.println(); 374 this.jobConf = jobConf; 375 actionDir = new Path(getJobConf().get(OOZIE_ACTION_DIR_PATH)); 376 String recoveryId = jobConf.get(OOZIE_ACTION_RECOVERY_ID, null); 377 try { 378 setRecoveryId(jobConf, actionDir, recoveryId); 379 } 380 catch (LauncherException ex) { 381 System.out.println("Launcher config error "+ex.getMessage()); 382 configureFailureEx = ex; 383 configFailure = true; 384 } 385 } 386 387 @Override 388 public void map(K1 key, V1 value, OutputCollector<K2, V2> collector, Reporter reporter) throws IOException { 389 try { 390 if (configFailure) { 391 throw configureFailureEx; 392 } 393 else { 394 String mainClass = getJobConf().get(CONF_OOZIE_ACTION_MAIN_CLASS); 395 String msgPrefix = "Main class [" + mainClass + "], "; 396 int errorCode = 0; 397 Throwable errorCause = null; 398 String errorMessage = null; 399 400 try { 401 new LauncherSecurityManager(); 402 } 403 catch (SecurityException ex) { 404 errorMessage = "Could not set LauncherSecurityManager"; 405 errorCause = ex; 406 } 407 408 try { 409 setupHeartBeater(reporter); 410 411 setupMainConfiguration(); 412 413 try { 414 System.out.println("Starting the execution of prepare actions"); 415 executePrepare(); 416 System.out.println("Completed the execution of prepare actions successfully"); 417 } catch (Exception ex) { 418 System.out.println("Prepare execution in the Launcher Mapper has failed"); 419 throw new LauncherException(ex.getMessage(), ex); 420 } 421 422 String[] args = getMainArguments(getJobConf()); 423 424 printContentsOfCurrentDir(); 425 426 System.out.println(); 427 System.out.println("Oozie Java/Map-Reduce/Pig action launcher-job configuration"); 428 System.out.println("================================================================="); 429 System.out.println("Workflow job id : " + System.getProperty("oozie.job.id")); 430 System.out.println("Workflow action id: " + System.getProperty("oozie.action.id")); 431 System.out.println(); 432 System.out.println("Classpath :"); 433 System.out.println("------------------------"); 434 StringTokenizer st = new StringTokenizer(System.getProperty("java.class.path"), ":"); 435 while (st.hasMoreTokens()) { 436 System.out.println(" " + st.nextToken()); 437 } 438 System.out.println("------------------------"); 439 System.out.println(); 440 System.out.println("Main class : " + mainClass); 441 System.out.println(); 442 System.out.println("Maximum output : " 443 + getJobConf().getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024)); 444 System.out.println(); 445 System.out.println("Arguments :"); 446 for (String arg : args) { 447 System.out.println(" " + arg); 448 } 449 450 System.out.println(); 451 System.out.println("Java System Properties:"); 452 System.out.println("------------------------"); 453 System.getProperties().store(System.out, ""); 454 System.out.flush(); 455 System.out.println("------------------------"); 456 System.out.println(); 457 458 System.out.println("================================================================="); 459 System.out.println(); 460 System.out.println(">>> Invoking Main class now >>>"); 461 System.out.println(); 462 System.out.flush(); 463 464 try { 465 Class klass = getJobConf().getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class); 466 Method mainMethod = klass.getMethod("main", String[].class); 467 mainMethod.invoke(null, (Object) args); 468 } 469 catch (InvocationTargetException ex) { 470 if (LauncherMainException.class.isInstance(ex.getCause())) { 471 errorMessage = msgPrefix + "exit code [" +((LauncherMainException)ex.getCause()).getErrorCode() 472 + "]"; 473 errorCause = null; 474 } 475 else if (SecurityException.class.isInstance(ex.getCause())) { 476 if (LauncherSecurityManager.getExitInvoked()) { 477 System.out.println("Intercepting System.exit(" + LauncherSecurityManager.getExitCode() 478 + ")"); 479 System.err.println("Intercepting System.exit(" + LauncherSecurityManager.getExitCode() 480 + ")"); 481 // if 0 main() method finished successfully 482 // ignoring 483 errorCode = LauncherSecurityManager.getExitCode(); 484 if (errorCode != 0) { 485 errorMessage = msgPrefix + "exit code [" + errorCode + "]"; 486 errorCause = null; 487 } 488 } 489 } 490 else { 491 throw ex; 492 } 493 } 494 finally { 495 System.out.println(); 496 System.out.println("<<< Invocation of Main class completed <<<"); 497 System.out.println(); 498 } 499 if (errorMessage == null) { 500 File outputData = new File(System.getProperty("oozie.action.output.properties")); 501 FileSystem fs = FileSystem.get(getJobConf()); 502 if (outputData.exists()) { 503 504 fs.copyFromLocalFile(new Path(outputData.toString()), new Path(actionDir, 505 ACTION_OUTPUT_PROPS)); 506 reporter.incrCounter(COUNTER_GROUP, COUNTER_OUTPUT_DATA, 1); 507 508 int maxOutputData = getJobConf().getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024); 509 if (outputData.length() > maxOutputData) { 510 String msg = MessageFormat.format("Output data size [{0}] exceeds maximum [{1}]", 511 outputData.length(), maxOutputData); 512 failLauncher(0, msg, null); 513 } 514 System.out.println(); 515 System.out.println("Oozie Launcher, capturing output data:"); 516 System.out.println("======================="); 517 Properties props = new Properties(); 518 props.load(new FileReader(outputData)); 519 props.store(System.out, ""); 520 System.out.println(); 521 System.out.println("======================="); 522 System.out.println(); 523 } 524 handleActionStatsData(fs, reporter); 525 handleExternalChildIDs(fs, reporter); 526 File newId = new File(System.getProperty("oozie.action.newId.properties")); 527 if (newId.exists()) { 528 Properties props = new Properties(); 529 props.load(new FileReader(newId)); 530 if (props.getProperty("id") == null) { 531 throw new IllegalStateException("ID swap file does not have [id] property"); 532 } 533 fs = FileSystem.get(getJobConf()); 534 fs.copyFromLocalFile(new Path(newId.toString()), new Path(actionDir, ACTION_NEW_ID_PROPS)); 535 reporter.incrCounter(COUNTER_GROUP, COUNTER_DO_ID_SWAP, 1); 536 537 System.out.println("Oozie Launcher, copying new Hadoop job id to file: " 538 + new Path(actionDir, ACTION_NEW_ID_PROPS).toUri()); 539 540 System.out.println(); 541 System.out.println("Oozie Launcher, propagating new Hadoop job id to Oozie"); 542 System.out.println("======================="); 543 System.out.println("id: " + props.getProperty("id")); 544 System.out.println("======================="); 545 System.out.println(); 546 } 547 } 548 } 549 catch (NoSuchMethodException ex) { 550 errorMessage = msgPrefix + "main() method not found"; 551 errorCause = ex; 552 } 553 catch (InvocationTargetException ex) { 554 errorMessage = msgPrefix + "main() threw exception"; 555 errorCause = ex.getTargetException(); 556 } 557 catch (Throwable ex) { 558 errorMessage = msgPrefix + "exception invoking main()"; 559 errorCause = ex; 560 } 561 finally { 562 destroyHeartBeater(); 563 if (errorMessage != null) { 564 failLauncher(errorCode, errorMessage, errorCause); 565 } 566 } 567 } 568 } 569 catch (LauncherException ex) { 570 reporter.incrCounter(COUNTER_GROUP, COUNTER_LAUNCHER_ERROR, 1); 571 System.out.println(); 572 System.out.println("Oozie Launcher failed, finishing Hadoop job gracefully"); 573 System.out.println(); 574 } 575 } 576 577 @Override 578 public void close() throws IOException { 579 System.out.println(); 580 System.out.println("Oozie Launcher ends"); 581 System.out.println(); 582 } 583 584 protected JobConf getJobConf() { 585 return jobConf; 586 } 587 588 private void handleActionStatsData(FileSystem fs, Reporter reporter) throws IOException, LauncherException{ 589 File actionStatsData = new File(System.getProperty(EXTERNAL_ACTION_STATS)); 590 // If stats are stored by the action, then stats file should exist 591 if (actionStatsData.exists()) { 592 int statsMaxOutputData = getJobConf().getInt(CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, 593 Integer.MAX_VALUE); 594 reporter.incrCounter(COUNTER_GROUP, COUNTER_STATS_DATA, 1); 595 // fail the launcher if size of stats is greater than the maximum allowed size 596 if (actionStatsData.length() > statsMaxOutputData) { 597 String msg = MessageFormat.format("Output stats size [{0}] exceeds maximum [{1}]", 598 actionStatsData.length(), statsMaxOutputData); 599 failLauncher(0, msg, null); 600 } 601 // copy the stats file to hdfs path which can be accessed by Oozie server 602 fs.copyFromLocalFile(new Path(actionStatsData.toString()), new Path(actionDir, 603 ACTION_STATS_PROPS)); 604 } 605 } 606 607 private void handleExternalChildIDs(FileSystem fs, Reporter reporter) throws IOException { 608 File externalChildIDs = new File(System.getProperty(EXTERNAL_CHILD_IDS)); 609 // if external ChildIDs are stored by the action, then the file should exist 610 if (externalChildIDs.exists()) { 611 // copy the externalChildIDs file to hdfs path which can be accessed by Oozie server 612 fs.copyFromLocalFile(new Path(externalChildIDs.toString()), new Path(actionDir, 613 ACTION_EXTERNAL_CHILD_IDS_PROPS)); 614 } 615 } 616 617 private void setupMainConfiguration() throws IOException { 618 FileSystem fs = FileSystem.get(getJobConf()); 619 fs.copyToLocalFile(new Path(getJobConf().get(OOZIE_ACTION_DIR_PATH), ACTION_CONF_XML), new Path(new File( 620 ACTION_CONF_XML).getAbsolutePath())); 621 622 System.setProperty("oozie.launcher.job.id", getJobConf().get("mapred.job.id")); 623 System.setProperty("oozie.job.id", getJobConf().get(OOZIE_JOB_ID)); 624 System.setProperty("oozie.action.id", getJobConf().get(OOZIE_ACTION_ID)); 625 System.setProperty("oozie.action.conf.xml", new File(ACTION_CONF_XML).getAbsolutePath()); 626 System.setProperty("oozie.action.output.properties", new File(ACTION_OUTPUT_PROPS).getAbsolutePath()); 627 System.setProperty(EXTERNAL_ACTION_STATS, new File(ACTION_STATS_PROPS).getAbsolutePath()); 628 System.setProperty(EXTERNAL_CHILD_IDS, new File(ACTION_EXTERNAL_CHILD_IDS_PROPS).getAbsolutePath()); 629 System.setProperty("oozie.action.newId.properties", new File(ACTION_NEW_ID_PROPS).getAbsolutePath()); 630 } 631 632 // Method to execute the prepare actions 633 private void executePrepare() throws IOException, LauncherException { 634 String prepareXML = getJobConf().get(ACTION_PREPARE_XML); 635 if (prepareXML != null) { 636 if (!prepareXML.equals("")) { 637 PrepareActionsDriver.doOperations(prepareXML); 638 } else { 639 System.out.println("There are no prepare actions to execute."); 640 } 641 } 642 } 643 644 public static String[] getMainArguments(Configuration conf) { 645 String[] args = new String[conf.getInt(CONF_OOZIE_ACTION_MAIN_ARG_COUNT, 0)]; 646 for (int i = 0; i < args.length; i++) { 647 args[i] = conf.get(CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i); 648 } 649 return args; 650 } 651 652 private void setupHeartBeater(Reporter reporter) { 653 timer = new ScheduledThreadPoolExecutor(1); 654 timer.scheduleAtFixedRate(new LauncherMapper(reporter), 0, 30, TimeUnit.SECONDS); 655 } 656 657 private void destroyHeartBeater() { 658 timer.shutdownNow(); 659 } 660 661 private Reporter reporter; 662 663 private LauncherMapper(Reporter reporter) { 664 this.reporter = reporter; 665 } 666 667 @Override 668 public void run() { 669 System.out.println("Heart beat"); 670 reporter.progress(); 671 } 672 673 private void failLauncher(int errorCode, String reason, Throwable ex) throws LauncherException { 674 try { 675 if (ex != null) { 676 reason += ", " + ex.getMessage(); 677 } 678 Properties errorProps = new Properties(); 679 errorProps.setProperty("error.code", Integer.toString(errorCode)); 680 errorProps.setProperty("error.reason", reason); 681 if (ex != null) { 682 if (ex.getMessage() != null) { 683 errorProps.setProperty("exception.message", ex.getMessage()); 684 } 685 StringWriter sw = new StringWriter(); 686 PrintWriter pw = new PrintWriter(sw); 687 ex.printStackTrace(pw); 688 pw.close(); 689 errorProps.setProperty("exception.stacktrace", sw.toString()); 690 } 691 FileSystem fs = FileSystem.get(getJobConf()); 692 OutputStream os = fs.create(new Path(actionDir, ACTION_ERROR_PROPS)); 693 errorProps.store(os, ""); 694 os.close(); 695 696 System.out.print("Failing Oozie Launcher, " + reason + "\n"); 697 System.err.print("Failing Oozie Launcher, " + reason + "\n"); 698 if (ex != null) { 699 ex.printStackTrace(System.out); 700 ex.printStackTrace(System.err); 701 } 702 throw new LauncherException(reason, ex); 703 } 704 catch (IOException rex) { 705 throw new RuntimeException("Error while failing launcher, " + rex.getMessage(), rex); 706 } 707 } 708 709 /** 710 * Print files and directories in current directory. Will list files in the sub-directory (only 1 level deep) 711 */ 712 protected void printContentsOfCurrentDir() { 713 File folder = new File("."); 714 System.out.println(); 715 System.out.println("Files in current dir:" + folder.getAbsolutePath()); 716 System.out.println("======================"); 717 718 File[] listOfFiles = folder.listFiles(); 719 for (File fileName : listOfFiles) { 720 if (fileName.isFile()) { 721 System.out.println("File: " + fileName.getName()); 722 } 723 else if (fileName.isDirectory()) { 724 System.out.println("Dir: " + fileName.getName()); 725 File subDir = new File(fileName.getName()); 726 File[] moreFiles = subDir.listFiles(); 727 for (File subFileName : moreFiles) { 728 if (subFileName.isFile()) { 729 System.out.println(" File: " + subFileName.getName()); 730 } 731 else if (subFileName.isDirectory()) { 732 System.out.println(" Dir: " + subFileName.getName()); 733 } 734 } 735 } 736 } 737 } 738 739 } 740 741 class LauncherSecurityManager extends SecurityManager { 742 private static boolean exitInvoked; 743 private static int exitCode; 744 private SecurityManager securityManager; 745 746 public LauncherSecurityManager() { 747 reset(); 748 securityManager = System.getSecurityManager(); 749 System.setSecurityManager(this); 750 } 751 752 @Override 753 public void checkPermission(Permission perm, Object context) { 754 if (securityManager != null) { 755 // check everything with the original SecurityManager 756 securityManager.checkPermission(perm, context); 757 } 758 } 759 760 @Override 761 public void checkPermission(Permission perm) { 762 if (securityManager != null) { 763 // check everything with the original SecurityManager 764 securityManager.checkPermission(perm); 765 } 766 } 767 768 @Override 769 public void checkExit(int status) throws SecurityException { 770 exitInvoked = true; 771 exitCode = status; 772 throw new SecurityException("Intercepted System.exit(" + status + ")"); 773 } 774 775 public static boolean getExitInvoked() { 776 return exitInvoked; 777 } 778 779 public static int getExitCode() { 780 return exitCode; 781 } 782 783 public static void reset() { 784 exitInvoked = false; 785 exitCode = 0; 786 } 787 } 788 789 class LauncherException extends Exception { 790 791 LauncherException(String message) { 792 super(message); 793 } 794 795 LauncherException(String message, Throwable cause) { 796 super(message, cause); 797 } 798 }