001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import java.io.BufferedReader; 021 import java.io.File; 022 import java.io.FileReader; 023 import java.io.IOException; 024 import java.io.InputStream; 025 import java.io.InputStreamReader; 026 import java.io.OutputStream; 027 import java.io.OutputStreamWriter; 028 import java.io.PrintWriter; 029 import java.io.StringWriter; 030 import java.io.Writer; 031 import java.lang.reflect.InvocationTargetException; 032 import java.lang.reflect.Method; 033 import java.net.URI; 034 import java.security.Permission; 035 import java.text.MessageFormat; 036 import java.util.ArrayList; 037 import java.util.Collection; 038 import java.util.List; 039 import java.util.Properties; 040 import java.util.StringTokenizer; 041 import java.util.concurrent.ScheduledThreadPoolExecutor; 042 import java.util.concurrent.TimeUnit; 043 044 import org.apache.hadoop.conf.Configuration; 045 import org.apache.hadoop.fs.FileSystem; 046 import org.apache.hadoop.fs.Path; 047 import org.apache.hadoop.mapred.Counters; 048 import org.apache.hadoop.mapred.JobConf; 049 import org.apache.hadoop.mapred.Mapper; 050 import org.apache.hadoop.mapred.OutputCollector; 051 import org.apache.hadoop.mapred.Reporter; 052 import org.apache.hadoop.mapred.RunningJob; 053 import org.apache.oozie.service.HadoopAccessorException; 054 import org.apache.oozie.service.HadoopAccessorService; 055 import org.apache.oozie.service.Services; 056 import org.apache.oozie.util.XLog; 057 058 public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, Runnable { 059 060 public static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class"; 061 public static final String CONF_OOZIE_ACTION_SUPPORTED_FILESYSTEMS = "oozie.launcher.action.supported.filesystems"; 062 063 public static final String CONF_OOZIE_ACTION_MAX_OUTPUT_DATA = "oozie.action.max.output.data"; 064 065 private static final String CONF_OOZIE_ACTION_MAIN_ARG_COUNT = "oozie.action.main.arg.count"; 066 private static final String CONF_OOZIE_ACTION_MAIN_ARG_PREFIX = "oozie.action.main.arg."; 067 private static final String CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE = "oozie.external.stats.max.size"; 068 069 private static final String COUNTER_GROUP = "oozie.launcher"; 070 private static final String COUNTER_DO_ID_SWAP = "oozie.do.id.swap"; 071 private static final String COUNTER_OUTPUT_DATA = "oozie.output.data"; 072 private static final String COUNTER_STATS_DATA = "oozie.stats.data"; 073 private static final String COUNTER_LAUNCHER_ERROR = "oozie.launcher.error"; 074 075 private static final String OOZIE_JOB_ID = "oozie.job.id"; 076 private static final String OOZIE_ACTION_ID = "oozie.action.id"; 077 078 private static final String OOZIE_ACTION_DIR_PATH = "oozie.action.dir.path"; 079 private static final String OOZIE_ACTION_RECOVERY_ID = "oozie.action.recovery.id"; 080 081 public static final String ACTION_PREFIX = "oozie.action."; 082 public static final String EXTERNAL_CHILD_IDS = ACTION_PREFIX + "externalChildIDs.properties"; 083 public static final String EXTERNAL_ACTION_STATS = ACTION_PREFIX + "stats.properties"; 084 085 static final String ACTION_CONF_XML = "action.xml"; 086 public static final String ACTION_PREPARE_XML = "oozie.action.prepare.xml"; 087 private static final String ACTION_OUTPUT_PROPS = "output.properties"; 088 private static final String ACTION_STATS_PROPS = "stats.properties"; 089 private static final String ACTION_EXTERNAL_CHILD_IDS_PROPS = "externalChildIds.properties"; 090 private static final String ACTION_NEW_ID_PROPS = "newId.properties"; 091 private static final String ACTION_ERROR_PROPS = "error.properties"; 092 093 private void setRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId) throws LauncherException { 094 try { 095 String jobId = launcherConf.get("mapred.job.id"); 096 Path path = new Path(actionDir, recoveryId); 097 FileSystem fs = FileSystem.get(path.toUri(), launcherConf); 098 if (!fs.exists(path)) { 099 try { 100 Writer writer = new OutputStreamWriter(fs.create(path)); 101 writer.write(jobId); 102 writer.close(); 103 } 104 catch (IOException ex) { 105 failLauncher(0, "IO error", ex); 106 } 107 } 108 else { 109 InputStream is = fs.open(path); 110 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 111 String id = reader.readLine(); 112 reader.close(); 113 if (!jobId.equals(id)) { 114 failLauncher(0, MessageFormat.format( 115 "Hadoop job Id mismatch, action file [{0}] declares Id [{1}] current Id [{2}]", path, id, 116 jobId), null); 117 } 118 119 } 120 } 121 catch (IOException ex) { 122 failLauncher(0, "IO error", ex); 123 } 124 } 125 126 /** 127 * @param launcherConf 128 * @param actionDir 129 * @param recoveryId 130 * @return 131 * @throws HadoopAccessorException 132 * @throws IOException 133 */ 134 public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId) 135 throws HadoopAccessorException, IOException { 136 String jobId = null; 137 Path recoveryFile = new Path(actionDir, recoveryId); 138 FileSystem fs = Services.get().get(HadoopAccessorService.class) 139 .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf); 140 141 if (fs.exists(recoveryFile)) { 142 InputStream is = fs.open(recoveryFile); 143 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 144 jobId = reader.readLine(); 145 reader.close(); 146 } 147 return jobId; 148 149 } 150 151 public static void setupMainClass(Configuration launcherConf, String javaMainClass) { 152 launcherConf.set(CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass); 153 } 154 155 public static void setupSupportedFileSystems(Configuration launcherConf, String supportedFileSystems) { 156 launcherConf.set(CONF_OOZIE_ACTION_SUPPORTED_FILESYSTEMS, supportedFileSystems); 157 } 158 159 public static void setupMainArguments(Configuration launcherConf, String[] args) { 160 launcherConf.setInt(CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length); 161 for (int i = 0; i < args.length; i++) { 162 launcherConf.set(CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]); 163 } 164 } 165 166 public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) { 167 launcherConf.setInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData); 168 } 169 170 /** 171 * Set the maximum value of stats data 172 * 173 * @param launcherConf the oozie launcher configuration 174 * @param maxStatsData the maximum allowed size of stats data 175 */ 176 public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){ 177 launcherConf.setInt(CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData); 178 } 179 180 /** 181 * @param launcherConf 182 * @param jobId 183 * @param actionId 184 * @param actionDir 185 * @param recoveryId 186 * @param actionConf 187 * @throws IOException 188 * @throws HadoopAccessorException 189 */ 190 public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir, 191 String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException { 192 193 launcherConf.setMapperClass(LauncherMapper.class); 194 launcherConf.setSpeculativeExecution(false); 195 launcherConf.setNumMapTasks(1); 196 launcherConf.setNumReduceTasks(0); 197 198 launcherConf.set(OOZIE_JOB_ID, jobId); 199 launcherConf.set(OOZIE_ACTION_ID, actionId); 200 launcherConf.set(OOZIE_ACTION_DIR_PATH, actionDir.toString()); 201 launcherConf.set(OOZIE_ACTION_RECOVERY_ID, recoveryId); 202 launcherConf.set(ACTION_PREPARE_XML, prepareXML); 203 204 actionConf.set(OOZIE_JOB_ID, jobId); 205 actionConf.set(OOZIE_ACTION_ID, actionId); 206 207 if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) { 208 List<String> purgedEntries = new ArrayList<String>(); 209 Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files"); 210 for (String entry : entries) { 211 if (entry.contains("#")) { 212 purgedEntries.add(entry); 213 } 214 } 215 actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()])); 216 launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true); 217 } 218 219 FileSystem fs = 220 Services.get().get(HadoopAccessorService.class).createFileSystem(launcherConf.get("user.name"), 221 actionDir.toUri(), launcherConf); 222 fs.mkdirs(actionDir); 223 224 OutputStream os = fs.create(new Path(actionDir, ACTION_CONF_XML)); 225 actionConf.writeXml(os); 226 os.close(); 227 228 Path inputDir = new Path(actionDir, "input"); 229 fs.mkdirs(inputDir); 230 Writer writer = new OutputStreamWriter(fs.create(new Path(inputDir, "dummy.txt"))); 231 writer.write("dummy"); 232 writer.close(); 233 234 launcherConf.set("mapred.input.dir", inputDir.toString()); 235 launcherConf.set("mapred.output.dir", new Path(actionDir, "output").toString()); 236 } 237 238 public static boolean isMainDone(RunningJob runningJob) throws IOException { 239 return runningJob.isComplete(); 240 } 241 242 public static boolean isMainSuccessful(RunningJob runningJob) throws IOException { 243 boolean succeeded = runningJob.isSuccessful(); 244 if (succeeded) { 245 Counters counters = runningJob.getCounters(); 246 if (counters != null) { 247 Counters.Group group = counters.getGroup(COUNTER_GROUP); 248 if (group != null) { 249 succeeded = group.getCounter(COUNTER_LAUNCHER_ERROR) == 0; 250 } 251 } 252 } 253 return succeeded; 254 } 255 256 public static boolean hasOutputData(RunningJob runningJob) throws IOException { 257 boolean output = false; 258 Counters counters = runningJob.getCounters(); 259 if (counters != null) { 260 Counters.Group group = counters.getGroup(COUNTER_GROUP); 261 if (group != null) { 262 output = group.getCounter(COUNTER_OUTPUT_DATA) == 1; 263 } 264 } 265 return output; 266 } 267 268 /** 269 * Check whether runningJob has stats data or not 270 * 271 * @param runningJob the runningJob 272 * @return returns whether the running Job has stats data or not 273 * @throws IOException 274 */ 275 public static boolean hasStatsData(RunningJob runningJob) throws IOException{ 276 boolean output = false; 277 Counters counters = runningJob.getCounters(); 278 if (counters != null) { 279 Counters.Group group = counters.getGroup(COUNTER_GROUP); 280 if (group != null) { 281 output = group.getCounter(COUNTER_STATS_DATA) == 1; 282 } 283 } 284 return output; 285 } 286 287 /** 288 * @param runningJob 289 * @return 290 * @throws IOException 291 */ 292 public static boolean hasIdSwap(RunningJob runningJob) throws IOException { 293 boolean swap = false; 294 Counters counters = runningJob.getCounters(); 295 if (counters != null) { 296 Counters.Group group = counters.getGroup(COUNTER_GROUP); 297 if (group != null) { 298 swap = group.getCounter(COUNTER_DO_ID_SWAP) == 1; 299 } 300 } 301 return swap; 302 } 303 304 /** 305 * @param runningJob 306 * @param user 307 * @param group 308 * @param actionDir 309 * @return 310 * @throws IOException 311 * @throws HadoopAccessorException 312 */ 313 public static boolean hasIdSwap(RunningJob runningJob, String user, String group, Path actionDir) 314 throws IOException, HadoopAccessorException { 315 boolean swap = false; 316 317 XLog log = XLog.getLog("org.apache.oozie.action.hadoop.LauncherMapper"); 318 319 Counters counters = runningJob.getCounters(); 320 if (counters != null) { 321 Counters.Group counterGroup = counters.getGroup(COUNTER_GROUP); 322 if (counterGroup != null) { 323 swap = counterGroup.getCounter(COUNTER_DO_ID_SWAP) == 1; 324 } 325 } 326 // additional check for swapped hadoop ID 327 // Can't rely on hadoop counters existing 328 // we'll check for the newID file in hdfs if the hadoop counters is null 329 else { 330 331 Path p = getIdSwapPath(actionDir); 332 // log.debug("Checking for newId file in: [{0}]", p); 333 334 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 335 Configuration conf = has.createJobConf(p.toUri().getAuthority()); 336 FileSystem fs = has.createFileSystem(user, p.toUri(), conf); 337 if (fs.exists(p)) { 338 log.debug("Hadoop Counters is null, but found newID file."); 339 340 swap = true; 341 } 342 else { 343 log.debug("Hadoop Counters is null, and newID file doesn't exist at: [{0}]", p); 344 } 345 } 346 return swap; 347 } 348 349 public static Path getOutputDataPath(Path actionDir) { 350 return new Path(actionDir, ACTION_OUTPUT_PROPS); 351 } 352 353 /** 354 * Get the location of stats file 355 * 356 * @param actionDir the action directory 357 * @return the hdfs location of the file 358 */ 359 public static Path getActionStatsDataPath(Path actionDir){ 360 return new Path(actionDir, ACTION_STATS_PROPS); 361 } 362 363 /** 364 * Get the location of external Child IDs file 365 * 366 * @param actionDir the action directory 367 * @return the hdfs location of the file 368 */ 369 public static Path getExternalChildIDsDataPath(Path actionDir){ 370 return new Path(actionDir, ACTION_EXTERNAL_CHILD_IDS_PROPS); 371 } 372 373 public static Path getErrorPath(Path actionDir) { 374 return new Path(actionDir, ACTION_ERROR_PROPS); 375 } 376 377 public static Path getIdSwapPath(Path actionDir) { 378 return new Path(actionDir, ACTION_NEW_ID_PROPS); 379 } 380 381 private JobConf jobConf; 382 private Path actionDir; 383 private ScheduledThreadPoolExecutor timer; 384 385 private boolean configFailure = false; 386 private LauncherException configureFailureEx; 387 public LauncherMapper() { 388 } 389 390 @Override 391 public void configure(JobConf jobConf) { 392 System.out.println(); 393 System.out.println("Oozie Launcher starts"); 394 System.out.println(); 395 this.jobConf = jobConf; 396 actionDir = new Path(getJobConf().get(OOZIE_ACTION_DIR_PATH)); 397 String recoveryId = jobConf.get(OOZIE_ACTION_RECOVERY_ID, null); 398 try { 399 setRecoveryId(jobConf, actionDir, recoveryId); 400 } 401 catch (LauncherException ex) { 402 System.out.println("Launcher config error "+ex.getMessage()); 403 configureFailureEx = ex; 404 configFailure = true; 405 } 406 } 407 408 @Override 409 public void map(K1 key, V1 value, OutputCollector<K2, V2> collector, Reporter reporter) throws IOException { 410 try { 411 if (configFailure) { 412 throw configureFailureEx; 413 } 414 else { 415 String mainClass = getJobConf().get(CONF_OOZIE_ACTION_MAIN_CLASS); 416 if (getJobConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) { 417 System.err.println("WARNING, workaround for Hadoop 2.0.2-alpha distributed cached issue (MAPREDUCE-4820) enabled"); 418 } 419 String msgPrefix = "Main class [" + mainClass + "], "; 420 int errorCode = 0; 421 Throwable errorCause = null; 422 String errorMessage = null; 423 424 try { 425 new LauncherSecurityManager(); 426 } 427 catch (SecurityException ex) { 428 errorMessage = "Could not set LauncherSecurityManager"; 429 errorCause = ex; 430 } 431 432 try { 433 setupHeartBeater(reporter); 434 435 setupMainConfiguration(); 436 437 try { 438 System.out.println("Starting the execution of prepare actions"); 439 executePrepare(); 440 System.out.println("Completed the execution of prepare actions successfully"); 441 } catch (Exception ex) { 442 System.out.println("Prepare execution in the Launcher Mapper has failed"); 443 throw new LauncherException(ex.getMessage(), ex); 444 } 445 446 String[] args = getMainArguments(getJobConf()); 447 448 printContentsOfCurrentDir(); 449 450 System.out.println(); 451 System.out.println("Oozie Java/Map-Reduce/Pig action launcher-job configuration"); 452 System.out.println("================================================================="); 453 System.out.println("Workflow job id : " + System.getProperty("oozie.job.id")); 454 System.out.println("Workflow action id: " + System.getProperty("oozie.action.id")); 455 System.out.println(); 456 System.out.println("Classpath :"); 457 System.out.println("------------------------"); 458 StringTokenizer st = new StringTokenizer(System.getProperty("java.class.path"), ":"); 459 while (st.hasMoreTokens()) { 460 System.out.println(" " + st.nextToken()); 461 } 462 System.out.println("------------------------"); 463 System.out.println(); 464 System.out.println("Main class : " + mainClass); 465 System.out.println(); 466 System.out.println("Maximum output : " 467 + getJobConf().getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024)); 468 System.out.println(); 469 System.out.println("Arguments :"); 470 for (String arg : args) { 471 System.out.println(" " + arg); 472 } 473 474 System.out.println(); 475 System.out.println("Java System Properties:"); 476 System.out.println("------------------------"); 477 System.getProperties().store(System.out, ""); 478 System.out.flush(); 479 System.out.println("------------------------"); 480 System.out.println(); 481 482 System.out.println("================================================================="); 483 System.out.println(); 484 System.out.println(">>> Invoking Main class now >>>"); 485 System.out.println(); 486 System.out.flush(); 487 488 try { 489 Class klass = getJobConf().getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class); 490 Method mainMethod = klass.getMethod("main", String[].class); 491 mainMethod.invoke(null, (Object) args); 492 } 493 catch (InvocationTargetException ex) { 494 if (LauncherMainException.class.isInstance(ex.getCause())) { 495 errorMessage = msgPrefix + "exit code [" +((LauncherMainException)ex.getCause()).getErrorCode() 496 + "]"; 497 errorCause = null; 498 } 499 else if (SecurityException.class.isInstance(ex.getCause())) { 500 if (LauncherSecurityManager.getExitInvoked()) { 501 System.out.println("Intercepting System.exit(" + LauncherSecurityManager.getExitCode() 502 + ")"); 503 System.err.println("Intercepting System.exit(" + LauncherSecurityManager.getExitCode() 504 + ")"); 505 // if 0 main() method finished successfully 506 // ignoring 507 errorCode = LauncherSecurityManager.getExitCode(); 508 if (errorCode != 0) { 509 errorMessage = msgPrefix + "exit code [" + errorCode + "]"; 510 errorCause = null; 511 } 512 } 513 } 514 else { 515 throw ex; 516 } 517 } 518 finally { 519 System.out.println(); 520 System.out.println("<<< Invocation of Main class completed <<<"); 521 System.out.println(); 522 handleExternalChildIDs(reporter); 523 } 524 if (errorMessage == null) { 525 File outputData = new File(System.getProperty("oozie.action.output.properties")); 526 if (outputData.exists()) { 527 URI actionDirUri = new Path(actionDir, ACTION_OUTPUT_PROPS).toUri(); 528 FileSystem fs = FileSystem.get(actionDirUri, getJobConf()); 529 fs.copyFromLocalFile(new Path(outputData.toString()), new Path(actionDir, 530 ACTION_OUTPUT_PROPS)); 531 reporter.incrCounter(COUNTER_GROUP, COUNTER_OUTPUT_DATA, 1); 532 533 int maxOutputData = getJobConf().getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024); 534 if (outputData.length() > maxOutputData) { 535 String msg = MessageFormat.format("Output data size [{0}] exceeds maximum [{1}]", 536 outputData.length(), maxOutputData); 537 failLauncher(0, msg, null); 538 } 539 System.out.println(); 540 System.out.println("Oozie Launcher, capturing output data:"); 541 System.out.println("======================="); 542 Properties props = new Properties(); 543 props.load(new FileReader(outputData)); 544 props.store(System.out, ""); 545 System.out.println(); 546 System.out.println("======================="); 547 System.out.println(); 548 } 549 handleActionStatsData(reporter); 550 File newId = new File(System.getProperty("oozie.action.newId.properties")); 551 if (newId.exists()) { 552 Properties props = new Properties(); 553 props.load(new FileReader(newId)); 554 if (props.getProperty("id") == null) { 555 throw new IllegalStateException("ID swap file does not have [id] property"); 556 } 557 URI actionDirUri = new Path(actionDir, ACTION_NEW_ID_PROPS).toUri(); 558 FileSystem fs = FileSystem.get(actionDirUri, getJobConf()); 559 fs.copyFromLocalFile(new Path(newId.toString()), new Path(actionDir, ACTION_NEW_ID_PROPS)); 560 reporter.incrCounter(COUNTER_GROUP, COUNTER_DO_ID_SWAP, 1); 561 562 System.out.println("Oozie Launcher, copying new Hadoop job id to file: " 563 + new Path(actionDir, ACTION_NEW_ID_PROPS).toUri()); 564 565 System.out.println(); 566 System.out.println("Oozie Launcher, propagating new Hadoop job id to Oozie"); 567 System.out.println("======================="); 568 System.out.println("id: " + props.getProperty("id")); 569 System.out.println("======================="); 570 System.out.println(); 571 } 572 } 573 } 574 catch (NoSuchMethodException ex) { 575 errorMessage = msgPrefix + "main() method not found"; 576 errorCause = ex; 577 } 578 catch (InvocationTargetException ex) { 579 errorMessage = msgPrefix + "main() threw exception"; 580 errorCause = ex.getTargetException(); 581 } 582 catch (Throwable ex) { 583 errorMessage = msgPrefix + "exception invoking main()"; 584 errorCause = ex; 585 } 586 finally { 587 destroyHeartBeater(); 588 if (errorMessage != null) { 589 failLauncher(errorCode, errorMessage, errorCause); 590 } 591 } 592 } 593 } 594 catch (LauncherException ex) { 595 reporter.incrCounter(COUNTER_GROUP, COUNTER_LAUNCHER_ERROR, 1); 596 System.out.println(); 597 System.out.println("Oozie Launcher failed, finishing Hadoop job gracefully"); 598 System.out.println(); 599 } 600 } 601 602 @Override 603 public void close() throws IOException { 604 System.out.println(); 605 System.out.println("Oozie Launcher ends"); 606 System.out.println(); 607 } 608 609 protected JobConf getJobConf() { 610 return jobConf; 611 } 612 613 private void handleActionStatsData(Reporter reporter) throws IOException, LauncherException { 614 File actionStatsData = new File(System.getProperty(EXTERNAL_ACTION_STATS)); 615 // If stats are stored by the action, then stats file should exist 616 if (actionStatsData.exists()) { 617 int statsMaxOutputData = getJobConf().getInt(CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, 618 Integer.MAX_VALUE); 619 reporter.incrCounter(COUNTER_GROUP, COUNTER_STATS_DATA, 1); 620 // fail the launcher if size of stats is greater than the maximum allowed size 621 if (actionStatsData.length() > statsMaxOutputData) { 622 String msg = MessageFormat.format("Output stats size [{0}] exceeds maximum [{1}]", 623 actionStatsData.length(), statsMaxOutputData); 624 failLauncher(0, msg, null); 625 } 626 // copy the stats file to hdfs path which can be accessed by Oozie server 627 URI actionDirUri = new Path(actionDir, ACTION_STATS_PROPS).toUri(); 628 FileSystem fs = FileSystem.get(actionDirUri, getJobConf()); 629 fs.copyFromLocalFile(new Path(actionStatsData.toString()), new Path(actionDir, 630 ACTION_STATS_PROPS)); 631 } 632 } 633 634 private void handleExternalChildIDs(Reporter reporter) throws IOException { 635 File externalChildIDs = new File(System.getProperty(EXTERNAL_CHILD_IDS)); 636 // if external ChildIDs are stored by the action, then the file should exist 637 if (externalChildIDs.exists()) { 638 // copy the externalChildIDs file to hdfs path which can be accessed by Oozie server 639 URI actionDirUri = new Path(actionDir, ACTION_EXTERNAL_CHILD_IDS_PROPS).toUri(); 640 FileSystem fs = FileSystem.get(actionDirUri, getJobConf()); 641 fs.copyFromLocalFile(new Path(externalChildIDs.toString()), new Path(actionDir, 642 ACTION_EXTERNAL_CHILD_IDS_PROPS)); 643 } 644 } 645 646 private void setupMainConfiguration() throws IOException, HadoopAccessorException { 647 Path pathNew = new Path(new Path(actionDir, ACTION_CONF_XML), 648 new Path(new File(ACTION_CONF_XML).getAbsolutePath())); 649 FileSystem fs = FileSystem.get(pathNew.toUri(), getJobConf()); 650 fs.copyToLocalFile(new Path(actionDir, ACTION_CONF_XML), 651 new Path(new File(ACTION_CONF_XML).getAbsolutePath())); 652 653 System.setProperty("oozie.launcher.job.id", getJobConf().get("mapred.job.id")); 654 System.setProperty("oozie.job.id", getJobConf().get(OOZIE_JOB_ID)); 655 System.setProperty("oozie.action.id", getJobConf().get(OOZIE_ACTION_ID)); 656 System.setProperty("oozie.action.conf.xml", new File(ACTION_CONF_XML).getAbsolutePath()); 657 System.setProperty("oozie.action.output.properties", new File(ACTION_OUTPUT_PROPS).getAbsolutePath()); 658 System.setProperty(EXTERNAL_ACTION_STATS, new File(ACTION_STATS_PROPS).getAbsolutePath()); 659 System.setProperty(EXTERNAL_CHILD_IDS, new File(ACTION_EXTERNAL_CHILD_IDS_PROPS).getAbsolutePath()); 660 System.setProperty("oozie.action.newId.properties", new File(ACTION_NEW_ID_PROPS).getAbsolutePath()); 661 } 662 663 // Method to execute the prepare actions 664 private void executePrepare() throws IOException, LauncherException { 665 String prepareXML = getJobConf().get(ACTION_PREPARE_XML); 666 if (prepareXML != null) { 667 if (!prepareXML.equals("")) { 668 PrepareActionsDriver.doOperations( 669 getJobConf().getStringCollection(CONF_OOZIE_ACTION_SUPPORTED_FILESYSTEMS), prepareXML); 670 } else { 671 System.out.println("There are no prepare actions to execute."); 672 } 673 } 674 } 675 676 public static String[] getMainArguments(Configuration conf) { 677 String[] args = new String[conf.getInt(CONF_OOZIE_ACTION_MAIN_ARG_COUNT, 0)]; 678 for (int i = 0; i < args.length; i++) { 679 args[i] = conf.get(CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i); 680 } 681 return args; 682 } 683 684 private void setupHeartBeater(Reporter reporter) { 685 timer = new ScheduledThreadPoolExecutor(1); 686 timer.scheduleAtFixedRate(new LauncherMapper(reporter), 0, 30, TimeUnit.SECONDS); 687 } 688 689 private void destroyHeartBeater() { 690 timer.shutdownNow(); 691 } 692 693 private Reporter reporter; 694 695 private LauncherMapper(Reporter reporter) { 696 this.reporter = reporter; 697 } 698 699 @Override 700 public void run() { 701 System.out.println("Heart beat"); 702 reporter.progress(); 703 } 704 705 private void failLauncher(int errorCode, String reason, Throwable ex) throws LauncherException { 706 try { 707 if (ex != null) { 708 reason += ", " + ex.getMessage(); 709 } 710 Properties errorProps = new Properties(); 711 errorProps.setProperty("error.code", Integer.toString(errorCode)); 712 errorProps.setProperty("error.reason", reason); 713 if (ex != null) { 714 if (ex.getMessage() != null) { 715 errorProps.setProperty("exception.message", ex.getMessage()); 716 } 717 StringWriter sw = new StringWriter(); 718 PrintWriter pw = new PrintWriter(sw); 719 ex.printStackTrace(pw); 720 pw.close(); 721 errorProps.setProperty("exception.stacktrace", sw.toString()); 722 } 723 FileSystem fs = FileSystem.get((new Path(actionDir, ACTION_ERROR_PROPS)).toUri(), getJobConf()); 724 OutputStream os = fs.create(new Path(actionDir, ACTION_ERROR_PROPS)); 725 errorProps.store(os, ""); 726 os.close(); 727 728 System.out.print("Failing Oozie Launcher, " + reason + "\n"); 729 System.err.print("Failing Oozie Launcher, " + reason + "\n"); 730 if (ex != null) { 731 ex.printStackTrace(System.out); 732 ex.printStackTrace(System.err); 733 } 734 throw new LauncherException(reason, ex); 735 } 736 catch (IOException rex) { 737 throw new RuntimeException("Error while failing launcher, " + rex.getMessage(), rex); 738 } 739 } 740 741 /** 742 * Print files and directories in current directory. Will list files in the sub-directory (only 1 level deep) 743 */ 744 protected void printContentsOfCurrentDir() { 745 File folder = new File("."); 746 System.out.println(); 747 System.out.println("Files in current dir:" + folder.getAbsolutePath()); 748 System.out.println("======================"); 749 750 File[] listOfFiles = folder.listFiles(); 751 for (File fileName : listOfFiles) { 752 if (fileName.isFile()) { 753 System.out.println("File: " + fileName.getName()); 754 } 755 else if (fileName.isDirectory()) { 756 System.out.println("Dir: " + fileName.getName()); 757 File subDir = new File(fileName.getName()); 758 File[] moreFiles = subDir.listFiles(); 759 for (File subFileName : moreFiles) { 760 if (subFileName.isFile()) { 761 System.out.println(" File: " + subFileName.getName()); 762 } 763 else if (subFileName.isDirectory()) { 764 System.out.println(" Dir: " + subFileName.getName()); 765 } 766 } 767 } 768 } 769 } 770 771 } 772 773 class LauncherSecurityManager extends SecurityManager { 774 private static boolean exitInvoked; 775 private static int exitCode; 776 private SecurityManager securityManager; 777 778 public LauncherSecurityManager() { 779 reset(); 780 securityManager = System.getSecurityManager(); 781 System.setSecurityManager(this); 782 } 783 784 @Override 785 public void checkPermission(Permission perm, Object context) { 786 if (securityManager != null) { 787 // check everything with the original SecurityManager 788 securityManager.checkPermission(perm, context); 789 } 790 } 791 792 @Override 793 public void checkPermission(Permission perm) { 794 if (securityManager != null) { 795 // check everything with the original SecurityManager 796 securityManager.checkPermission(perm); 797 } 798 } 799 800 @Override 801 public void checkExit(int status) throws SecurityException { 802 exitInvoked = true; 803 exitCode = status; 804 throw new SecurityException("Intercepted System.exit(" + status + ")"); 805 } 806 807 public static boolean getExitInvoked() { 808 return exitInvoked; 809 } 810 811 public static int getExitCode() { 812 return exitCode; 813 } 814 815 public static void reset() { 816 exitInvoked = false; 817 exitCode = 0; 818 } 819 } 820 821 class LauncherException extends Exception { 822 823 LauncherException(String message) { 824 super(message); 825 } 826 827 LauncherException(String message, Throwable cause) { 828 super(message, cause); 829 } 830 }