001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.io.BufferedReader;
021    import java.io.File;
022    import java.io.FileReader;
023    import java.io.IOException;
024    import java.io.InputStream;
025    import java.io.InputStreamReader;
026    import java.io.OutputStream;
027    import java.io.OutputStreamWriter;
028    import java.io.PrintWriter;
029    import java.io.StringWriter;
030    import java.io.Writer;
031    import java.lang.reflect.InvocationTargetException;
032    import java.lang.reflect.Method;
033    import java.security.Permission;
034    import java.text.MessageFormat;
035    import java.util.Properties;
036    import java.util.StringTokenizer;
037    import java.util.concurrent.ScheduledThreadPoolExecutor;
038    import java.util.concurrent.TimeUnit;
039    
040    import org.apache.hadoop.conf.Configuration;
041    import org.apache.hadoop.fs.FileSystem;
042    import org.apache.hadoop.fs.Path;
043    import org.apache.hadoop.mapred.Counters;
044    import org.apache.hadoop.mapred.JobConf;
045    import org.apache.hadoop.mapred.Mapper;
046    import org.apache.hadoop.mapred.OutputCollector;
047    import org.apache.hadoop.mapred.Reporter;
048    import org.apache.hadoop.mapred.RunningJob;
049    import org.apache.oozie.service.HadoopAccessorException;
050    import org.apache.oozie.service.HadoopAccessorService;
051    import org.apache.oozie.service.Services;
052    import org.apache.oozie.util.XLog;
053    
054    public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, Runnable {
055    
056        public static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class";
057    
058        public static final String CONF_OOZIE_ACTION_MAX_OUTPUT_DATA = "oozie.action.max.output.data";
059    
060        private static final String CONF_OOZIE_ACTION_MAIN_ARG_COUNT = "oozie.action.main.arg.count";
061        private static final String CONF_OOZIE_ACTION_MAIN_ARG_PREFIX = "oozie.action.main.arg.";
062        private static final String CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE = "oozie.external.stats.max.size";
063    
064        private static final String COUNTER_GROUP = "oozie.launcher";
065        private static final String COUNTER_DO_ID_SWAP = "oozie.do.id.swap";
066        private static final String COUNTER_OUTPUT_DATA = "oozie.output.data";
067        private static final String COUNTER_STATS_DATA = "oozie.stats.data";
068        private static final String COUNTER_LAUNCHER_ERROR = "oozie.launcher.error";
069    
070        private static final String OOZIE_JOB_ID = "oozie.job.id";
071        private static final String OOZIE_ACTION_ID = "oozie.action.id";
072    
073        private static final String OOZIE_ACTION_DIR_PATH = "oozie.action.dir.path";
074        private static final String OOZIE_ACTION_RECOVERY_ID = "oozie.action.recovery.id";
075    
076        public static final String ACTION_PREFIX = "oozie.action.";
077        public static final String EXTERNAL_CHILD_IDS = ACTION_PREFIX + "externalChildIDs.properties";
078        public static final String EXTERNAL_ACTION_STATS = ACTION_PREFIX + "stats.properties";
079    
080        static final String ACTION_CONF_XML = "action.xml";
081        public static final String ACTION_PREPARE_XML = "oozie.action.prepare.xml";
082        private static final String ACTION_OUTPUT_PROPS = "output.properties";
083        private static final String ACTION_STATS_PROPS = "stats.properties";
084        private static final String ACTION_EXTERNAL_CHILD_IDS_PROPS = "externalChildIds.properties";
085        private static final String ACTION_NEW_ID_PROPS = "newId.properties";
086        private static final String ACTION_ERROR_PROPS = "error.properties";
087    
088        private void setRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId) throws LauncherException {
089            try {
090                FileSystem fs = FileSystem.get(launcherConf);
091                String jobId = launcherConf.get("mapred.job.id");
092                Path path = new Path(actionDir, recoveryId);
093                if (!fs.exists(path)) {
094                    try {
095                        Writer writer = new OutputStreamWriter(fs.create(path));
096                        writer.write(jobId);
097                        writer.close();
098                    }
099                    catch (IOException ex) {
100                        failLauncher(0, "IO error", ex);
101                    }
102                }
103                else {
104                    InputStream is = fs.open(path);
105                    BufferedReader reader = new BufferedReader(new InputStreamReader(is));
106                    String id = reader.readLine();
107                    reader.close();
108                    if (!jobId.equals(id)) {
109                        failLauncher(0, MessageFormat.format(
110                                "Hadoop job Id mismatch, action file [{0}] declares Id [{1}] current Id [{2}]", path, id,
111                                jobId), null);
112                    }
113    
114                }
115            }
116            catch (IOException ex) {
117                failLauncher(0, "IO error", ex);
118            }
119        }
120    
121        /**
122         * @param launcherConf
123         * @param actionDir
124         * @param recoveryId
125         * @return
126         * @throws HadoopAccessorException
127         * @throws IOException
128         */
129        public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId)
130                throws HadoopAccessorException, IOException {
131            String jobId = null;
132            Path recoveryFile = new Path(actionDir, recoveryId);
133            //FileSystem fs = FileSystem.get(launcherConf);
134            FileSystem fs = Services.get().get(HadoopAccessorService.class)
135                    .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf);
136    
137            if (fs.exists(recoveryFile)) {
138                InputStream is = fs.open(recoveryFile);
139                BufferedReader reader = new BufferedReader(new InputStreamReader(is));
140                jobId = reader.readLine();
141                reader.close();
142            }
143            return jobId;
144    
145        }
146    
147        public static void setupMainClass(Configuration launcherConf, String javaMainClass) {
148            launcherConf.set(CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass);
149        }
150    
151        public static void setupMainArguments(Configuration launcherConf, String[] args) {
152            launcherConf.setInt(CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length);
153            for (int i = 0; i < args.length; i++) {
154                launcherConf.set(CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]);
155            }
156        }
157    
158        public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) {
159            launcherConf.setInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData);
160        }
161    
162        /**
163         * Set the maximum value of stats data
164         *
165         * @param launcherConf the oozie launcher configuration
166         * @param maxStatsData the maximum allowed size of stats data
167         */
168        public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){
169            launcherConf.setInt(CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData);
170        }
171    
172        /**
173         * @param launcherConf
174         * @param jobId
175         * @param actionId
176         * @param actionDir
177         * @param recoveryId
178         * @param actionConf
179         * @throws IOException
180         * @throws HadoopAccessorException
181         */
182        public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir,
183                String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException {
184    
185            launcherConf.setMapperClass(LauncherMapper.class);
186            launcherConf.setSpeculativeExecution(false);
187            launcherConf.setNumMapTasks(1);
188            launcherConf.setNumReduceTasks(0);
189    
190            launcherConf.set(OOZIE_JOB_ID, jobId);
191            launcherConf.set(OOZIE_ACTION_ID, actionId);
192            launcherConf.set(OOZIE_ACTION_DIR_PATH, actionDir.toString());
193            launcherConf.set(OOZIE_ACTION_RECOVERY_ID, recoveryId);
194            launcherConf.set(ACTION_PREPARE_XML, prepareXML);
195    
196            actionConf.set(OOZIE_JOB_ID, jobId);
197            actionConf.set(OOZIE_ACTION_ID, actionId);
198    
199            FileSystem fs =
200              Services.get().get(HadoopAccessorService.class).createFileSystem(launcherConf.get("user.name"),
201                                                                               actionDir.toUri(), launcherConf);
202            fs.mkdirs(actionDir);
203    
204            OutputStream os = fs.create(new Path(actionDir, ACTION_CONF_XML));
205            actionConf.writeXml(os);
206            os.close();
207    
208            Path inputDir = new Path(actionDir, "input");
209            fs.mkdirs(inputDir);
210            Writer writer = new OutputStreamWriter(fs.create(new Path(inputDir, "dummy.txt")));
211            writer.write("dummy");
212            writer.close();
213    
214            launcherConf.set("mapred.input.dir", inputDir.toString());
215            launcherConf.set("mapred.output.dir", new Path(actionDir, "output").toString());
216        }
217    
218        public static boolean isMainDone(RunningJob runningJob) throws IOException {
219            return runningJob.isComplete();
220        }
221    
222        public static boolean isMainSuccessful(RunningJob runningJob) throws IOException {
223            boolean succeeded = runningJob.isSuccessful();
224            if (succeeded) {
225                Counters counters = runningJob.getCounters();
226                if (counters != null) {
227                    Counters.Group group = counters.getGroup(COUNTER_GROUP);
228                    if (group != null) {
229                        succeeded = group.getCounter(COUNTER_LAUNCHER_ERROR) == 0;
230                    }
231                }
232            }
233            return succeeded;
234        }
235    
236        public static boolean hasOutputData(RunningJob runningJob) throws IOException {
237            boolean output = false;
238            Counters counters = runningJob.getCounters();
239            if (counters != null) {
240                Counters.Group group = counters.getGroup(COUNTER_GROUP);
241                if (group != null) {
242                    output = group.getCounter(COUNTER_OUTPUT_DATA) == 1;
243                }
244            }
245            return output;
246        }
247    
248        /**
249         * Check whether runningJob has stats data or not
250         *
251         * @param runningJob the runningJob
252         * @return returns whether the running Job has stats data or not
253         * @throws IOException
254         */
255        public static boolean hasStatsData(RunningJob runningJob) throws IOException{
256            boolean output = false;
257            Counters counters = runningJob.getCounters();
258            if (counters != null) {
259                Counters.Group group = counters.getGroup(COUNTER_GROUP);
260                if (group != null) {
261                    output = group.getCounter(COUNTER_STATS_DATA) == 1;
262                }
263            }
264            return output;
265        }
266    
267        /**
268         * @param runningJob
269         * @return
270         * @throws IOException
271         */
272        public static boolean hasIdSwap(RunningJob runningJob) throws IOException {
273            boolean swap = false;
274            Counters counters = runningJob.getCounters();
275            if (counters != null) {
276                Counters.Group group = counters.getGroup(COUNTER_GROUP);
277                if (group != null) {
278                    swap = group.getCounter(COUNTER_DO_ID_SWAP) == 1;
279                }
280            }
281            return swap;
282        }
283    
284        /**
285         * @param runningJob
286         * @param user
287         * @param group
288         * @param actionDir
289         * @return
290         * @throws IOException
291         * @throws HadoopAccessorException
292         */
293        public static boolean hasIdSwap(RunningJob runningJob, String user, String group, Path actionDir)
294                throws IOException, HadoopAccessorException {
295            boolean swap = false;
296    
297            XLog log = XLog.getLog("org.apache.oozie.action.hadoop.LauncherMapper");
298    
299            Counters counters = runningJob.getCounters();
300            if (counters != null) {
301                Counters.Group counterGroup = counters.getGroup(COUNTER_GROUP);
302                if (counterGroup != null) {
303                    swap = counterGroup.getCounter(COUNTER_DO_ID_SWAP) == 1;
304                }
305            }
306            // additional check for swapped hadoop ID
307            // Can't rely on hadoop counters existing
308            // we'll check for the newID file in hdfs if the hadoop counters is null
309            else {
310    
311                Path p = getIdSwapPath(actionDir);
312                // log.debug("Checking for newId file in: [{0}]", p);
313    
314                FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, p.toUri(),
315                                                                                                 new Configuration());
316                if (fs.exists(p)) {
317                    log.debug("Hadoop Counters is null, but found newID file.");
318    
319                    swap = true;
320                }
321                else {
322                    log.debug("Hadoop Counters is null, and newID file doesn't exist at: [{0}]", p);
323                }
324            }
325            return swap;
326        }
327    
328        public static Path getOutputDataPath(Path actionDir) {
329            return new Path(actionDir, ACTION_OUTPUT_PROPS);
330        }
331    
332        /**
333         * Get the location of stats file
334         *
335         * @param actionDir the action directory
336         * @return the hdfs location of the file
337         */
338        public static Path getActionStatsDataPath(Path actionDir){
339            return new Path(actionDir, ACTION_STATS_PROPS);
340        }
341    
342        /**
343         * Get the location of external Child IDs file
344         *
345         * @param actionDir the action directory
346         * @return the hdfs location of the file
347         */
348        public static Path getExternalChildIDsDataPath(Path actionDir){
349            return new Path(actionDir, ACTION_EXTERNAL_CHILD_IDS_PROPS);
350        }
351    
352        public static Path getErrorPath(Path actionDir) {
353            return new Path(actionDir, ACTION_ERROR_PROPS);
354        }
355    
356        public static Path getIdSwapPath(Path actionDir) {
357            return new Path(actionDir, ACTION_NEW_ID_PROPS);
358        }
359    
360        private JobConf jobConf;
361        private Path actionDir;
362        private ScheduledThreadPoolExecutor timer;
363    
364        private boolean configFailure = false;
365        private LauncherException configureFailureEx;
366        public LauncherMapper() {
367        }
368    
369        @Override
370        public void configure(JobConf jobConf) {
371            System.out.println();
372            System.out.println("Oozie Launcher starts");
373            System.out.println();
374            this.jobConf = jobConf;
375            actionDir = new Path(getJobConf().get(OOZIE_ACTION_DIR_PATH));
376            String recoveryId = jobConf.get(OOZIE_ACTION_RECOVERY_ID, null);
377            try {
378                setRecoveryId(jobConf, actionDir, recoveryId);
379            }
380            catch (LauncherException ex) {
381                System.out.println("Launcher config error "+ex.getMessage());
382                configureFailureEx = ex;
383                configFailure = true;
384            }
385        }
386    
387        @Override
388        public void map(K1 key, V1 value, OutputCollector<K2, V2> collector, Reporter reporter) throws IOException {
389            try {
390                if (configFailure) {
391                    throw configureFailureEx;
392                }
393                else {
394                    String mainClass = getJobConf().get(CONF_OOZIE_ACTION_MAIN_CLASS);
395                    String msgPrefix = "Main class [" + mainClass + "], ";
396                    int errorCode = 0;
397                    Throwable errorCause = null;
398                    String errorMessage = null;
399    
400                    try {
401                        new LauncherSecurityManager();
402                    }
403                    catch (SecurityException ex) {
404                        errorMessage = "Could not set LauncherSecurityManager";
405                        errorCause = ex;
406                    }
407    
408                    try {
409                        setupHeartBeater(reporter);
410    
411                        setupMainConfiguration();
412    
413                        try {
414                            System.out.println("Starting the execution of prepare actions");
415                            executePrepare();
416                            System.out.println("Completed the execution of prepare actions successfully");
417                        } catch (Exception ex) {
418                            System.out.println("Prepare execution in the Launcher Mapper has failed");
419                            throw new LauncherException(ex.getMessage(), ex);
420                        }
421    
422                        String[] args = getMainArguments(getJobConf());
423    
424                        printContentsOfCurrentDir();
425    
426                        System.out.println();
427                        System.out.println("Oozie Java/Map-Reduce/Pig action launcher-job configuration");
428                        System.out.println("=================================================================");
429                        System.out.println("Workflow job id   : " + System.getProperty("oozie.job.id"));
430                        System.out.println("Workflow action id: " + System.getProperty("oozie.action.id"));
431                        System.out.println();
432                        System.out.println("Classpath         :");
433                        System.out.println("------------------------");
434                        StringTokenizer st = new StringTokenizer(System.getProperty("java.class.path"), ":");
435                        while (st.hasMoreTokens()) {
436                            System.out.println("  " + st.nextToken());
437                        }
438                        System.out.println("------------------------");
439                        System.out.println();
440                        System.out.println("Main class        : " + mainClass);
441                        System.out.println();
442                        System.out.println("Maximum output    : "
443                                + getJobConf().getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024));
444                        System.out.println();
445                        System.out.println("Arguments         :");
446                        for (String arg : args) {
447                            System.out.println("                    " + arg);
448                        }
449    
450                        System.out.println();
451                        System.out.println("Java System Properties:");
452                        System.out.println("------------------------");
453                        System.getProperties().store(System.out, "");
454                        System.out.flush();
455                        System.out.println("------------------------");
456                        System.out.println();
457    
458                        System.out.println("=================================================================");
459                        System.out.println();
460                        System.out.println(">>> Invoking Main class now >>>");
461                        System.out.println();
462                        System.out.flush();
463    
464                        try {
465                            Class klass = getJobConf().getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class);
466                            Method mainMethod = klass.getMethod("main", String[].class);
467                            mainMethod.invoke(null, (Object) args);
468                        }
469                        catch (InvocationTargetException ex) {
470                            if (LauncherMainException.class.isInstance(ex.getCause())) {
471                                errorMessage = msgPrefix + "exit code [" +((LauncherMainException)ex.getCause()).getErrorCode()
472                                    + "]";
473                                errorCause = null;
474                            }
475                            else if (SecurityException.class.isInstance(ex.getCause())) {
476                                if (LauncherSecurityManager.getExitInvoked()) {
477                                    System.out.println("Intercepting System.exit(" + LauncherSecurityManager.getExitCode()
478                                            + ")");
479                                    System.err.println("Intercepting System.exit(" + LauncherSecurityManager.getExitCode()
480                                            + ")");
481                                    // if 0 main() method finished successfully
482                                    // ignoring
483                                    errorCode = LauncherSecurityManager.getExitCode();
484                                    if (errorCode != 0) {
485                                        errorMessage = msgPrefix + "exit code [" + errorCode + "]";
486                                        errorCause = null;
487                                    }
488                                }
489                            }
490                            else {
491                                throw ex;
492                            }
493                        }
494                        finally {
495                            System.out.println();
496                            System.out.println("<<< Invocation of Main class completed <<<");
497                            System.out.println();
498                        }
499                        if (errorMessage == null) {
500                            File outputData = new File(System.getProperty("oozie.action.output.properties"));
501                            FileSystem fs = FileSystem.get(getJobConf());
502                            if (outputData.exists()) {
503    
504                                fs.copyFromLocalFile(new Path(outputData.toString()), new Path(actionDir,
505                                                                                               ACTION_OUTPUT_PROPS));
506                                reporter.incrCounter(COUNTER_GROUP, COUNTER_OUTPUT_DATA, 1);
507    
508                                int maxOutputData = getJobConf().getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024);
509                                if (outputData.length() > maxOutputData) {
510                                    String msg = MessageFormat.format("Output data size [{0}] exceeds maximum [{1}]",
511                                                                      outputData.length(), maxOutputData);
512                                    failLauncher(0, msg, null);
513                                }
514                                System.out.println();
515                                System.out.println("Oozie Launcher, capturing output data:");
516                                System.out.println("=======================");
517                                Properties props = new Properties();
518                                props.load(new FileReader(outputData));
519                                props.store(System.out, "");
520                                System.out.println();
521                                System.out.println("=======================");
522                                System.out.println();
523                            }
524                            handleActionStatsData(fs, reporter);
525                            handleExternalChildIDs(fs, reporter);
526                            File newId = new File(System.getProperty("oozie.action.newId.properties"));
527                            if (newId.exists()) {
528                                Properties props = new Properties();
529                                props.load(new FileReader(newId));
530                                if (props.getProperty("id") == null) {
531                                    throw new IllegalStateException("ID swap file does not have [id] property");
532                                }
533                                fs = FileSystem.get(getJobConf());
534                                fs.copyFromLocalFile(new Path(newId.toString()), new Path(actionDir, ACTION_NEW_ID_PROPS));
535                                reporter.incrCounter(COUNTER_GROUP, COUNTER_DO_ID_SWAP, 1);
536    
537                                System.out.println("Oozie Launcher, copying new Hadoop job id to file: "
538                                        + new Path(actionDir, ACTION_NEW_ID_PROPS).toUri());
539    
540                                System.out.println();
541                                System.out.println("Oozie Launcher, propagating new Hadoop job id to Oozie");
542                                System.out.println("=======================");
543                                System.out.println("id: " + props.getProperty("id"));
544                                System.out.println("=======================");
545                                System.out.println();
546                            }
547                        }
548                    }
549                    catch (NoSuchMethodException ex) {
550                        errorMessage = msgPrefix + "main() method not found";
551                        errorCause = ex;
552                    }
553                    catch (InvocationTargetException ex) {
554                        errorMessage = msgPrefix + "main() threw exception";
555                        errorCause = ex.getTargetException();
556                    }
557                    catch (Throwable ex) {
558                        errorMessage = msgPrefix + "exception invoking main()";
559                        errorCause = ex;
560                    }
561                    finally {
562                        destroyHeartBeater();
563                        if (errorMessage != null) {
564                            failLauncher(errorCode, errorMessage, errorCause);
565                        }
566                    }
567                }
568            }
569            catch (LauncherException ex) {
570                reporter.incrCounter(COUNTER_GROUP, COUNTER_LAUNCHER_ERROR, 1);
571                System.out.println();
572                System.out.println("Oozie Launcher failed, finishing Hadoop job gracefully");
573                System.out.println();
574            }
575        }
576    
577        @Override
578        public void close() throws IOException {
579            System.out.println();
580            System.out.println("Oozie Launcher ends");
581            System.out.println();
582        }
583    
584        protected JobConf getJobConf() {
585            return jobConf;
586        }
587    
588        private void handleActionStatsData(FileSystem fs, Reporter reporter) throws IOException, LauncherException{
589            File actionStatsData = new File(System.getProperty(EXTERNAL_ACTION_STATS));
590            // If stats are stored by the action, then stats file should exist
591            if (actionStatsData.exists()) {
592                int statsMaxOutputData = getJobConf().getInt(CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE,
593                        Integer.MAX_VALUE);
594                reporter.incrCounter(COUNTER_GROUP, COUNTER_STATS_DATA, 1);
595                // fail the launcher if size of stats is greater than the maximum allowed size
596                if (actionStatsData.length() > statsMaxOutputData) {
597                    String msg = MessageFormat.format("Output stats size [{0}] exceeds maximum [{1}]",
598                            actionStatsData.length(), statsMaxOutputData);
599                    failLauncher(0, msg, null);
600                }
601                // copy the stats file to hdfs path which can be accessed by Oozie server
602                fs.copyFromLocalFile(new Path(actionStatsData.toString()), new Path(actionDir,
603                        ACTION_STATS_PROPS));
604            }
605        }
606    
607        private void handleExternalChildIDs(FileSystem fs, Reporter reporter) throws IOException {
608            File externalChildIDs = new File(System.getProperty(EXTERNAL_CHILD_IDS));
609            // if external ChildIDs are stored by the action, then the file should exist
610            if (externalChildIDs.exists()) {
611                // copy the externalChildIDs file to hdfs path which can be accessed by Oozie server
612                fs.copyFromLocalFile(new Path(externalChildIDs.toString()), new Path(actionDir,
613                        ACTION_EXTERNAL_CHILD_IDS_PROPS));
614            }
615        }
616    
617        private void setupMainConfiguration() throws IOException {
618            FileSystem fs = FileSystem.get(getJobConf());
619            fs.copyToLocalFile(new Path(getJobConf().get(OOZIE_ACTION_DIR_PATH), ACTION_CONF_XML), new Path(new File(
620                    ACTION_CONF_XML).getAbsolutePath()));
621    
622            System.setProperty("oozie.launcher.job.id", getJobConf().get("mapred.job.id"));
623            System.setProperty("oozie.job.id", getJobConf().get(OOZIE_JOB_ID));
624            System.setProperty("oozie.action.id", getJobConf().get(OOZIE_ACTION_ID));
625            System.setProperty("oozie.action.conf.xml", new File(ACTION_CONF_XML).getAbsolutePath());
626            System.setProperty("oozie.action.output.properties", new File(ACTION_OUTPUT_PROPS).getAbsolutePath());
627            System.setProperty(EXTERNAL_ACTION_STATS, new File(ACTION_STATS_PROPS).getAbsolutePath());
628            System.setProperty(EXTERNAL_CHILD_IDS, new File(ACTION_EXTERNAL_CHILD_IDS_PROPS).getAbsolutePath());
629            System.setProperty("oozie.action.newId.properties", new File(ACTION_NEW_ID_PROPS).getAbsolutePath());
630        }
631    
632        // Method to execute the prepare actions
633        private void executePrepare() throws IOException, LauncherException {
634            String prepareXML = getJobConf().get(ACTION_PREPARE_XML);
635            if (prepareXML != null) {
636                 if (!prepareXML.equals("")) {
637                     PrepareActionsDriver.doOperations(prepareXML);
638                 } else {
639                     System.out.println("There are no prepare actions to execute.");
640                 }
641            }
642        }
643    
644        public static String[] getMainArguments(Configuration conf) {
645            String[] args = new String[conf.getInt(CONF_OOZIE_ACTION_MAIN_ARG_COUNT, 0)];
646            for (int i = 0; i < args.length; i++) {
647                args[i] = conf.get(CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i);
648            }
649            return args;
650        }
651    
652        private void setupHeartBeater(Reporter reporter) {
653            timer = new ScheduledThreadPoolExecutor(1);
654            timer.scheduleAtFixedRate(new LauncherMapper(reporter), 0, 30, TimeUnit.SECONDS);
655        }
656    
657        private void destroyHeartBeater() {
658            timer.shutdownNow();
659        }
660    
661        private Reporter reporter;
662    
663        private LauncherMapper(Reporter reporter) {
664            this.reporter = reporter;
665        }
666    
667        @Override
668        public void run() {
669            System.out.println("Heart beat");
670            reporter.progress();
671        }
672    
673        private void failLauncher(int errorCode, String reason, Throwable ex) throws LauncherException {
674            try {
675                if (ex != null) {
676                    reason += ", " + ex.getMessage();
677                }
678                Properties errorProps = new Properties();
679                errorProps.setProperty("error.code", Integer.toString(errorCode));
680                errorProps.setProperty("error.reason", reason);
681                if (ex != null) {
682                    if (ex.getMessage() != null) {
683                        errorProps.setProperty("exception.message", ex.getMessage());
684                    }
685                    StringWriter sw = new StringWriter();
686                    PrintWriter pw = new PrintWriter(sw);
687                    ex.printStackTrace(pw);
688                    pw.close();
689                    errorProps.setProperty("exception.stacktrace", sw.toString());
690                }
691                FileSystem fs = FileSystem.get(getJobConf());
692                OutputStream os = fs.create(new Path(actionDir, ACTION_ERROR_PROPS));
693                errorProps.store(os, "");
694                os.close();
695    
696                System.out.print("Failing Oozie Launcher, " + reason + "\n");
697                System.err.print("Failing Oozie Launcher, " + reason + "\n");
698                if (ex != null) {
699                    ex.printStackTrace(System.out);
700                    ex.printStackTrace(System.err);
701                }
702                throw new LauncherException(reason, ex);
703            }
704            catch (IOException rex) {
705                throw new RuntimeException("Error while failing launcher, " + rex.getMessage(), rex);
706            }
707        }
708    
709        /**
710         * Print files and directories in current directory. Will list files in the sub-directory (only 1 level deep)
711         */
712        protected void printContentsOfCurrentDir() {
713            File folder = new File(".");
714            System.out.println();
715            System.out.println("Files in current dir:" + folder.getAbsolutePath());
716            System.out.println("======================");
717    
718            File[] listOfFiles = folder.listFiles();
719            for (File fileName : listOfFiles) {
720                if (fileName.isFile()) {
721                    System.out.println("File: " + fileName.getName());
722                }
723                else if (fileName.isDirectory()) {
724                    System.out.println("Dir: " + fileName.getName());
725                    File subDir = new File(fileName.getName());
726                    File[] moreFiles = subDir.listFiles();
727                    for (File subFileName : moreFiles) {
728                        if (subFileName.isFile()) {
729                            System.out.println("  File: " + subFileName.getName());
730                        }
731                        else if (subFileName.isDirectory()) {
732                            System.out.println("  Dir: " + subFileName.getName());
733                        }
734                    }
735                }
736            }
737        }
738    
739    }
740    
741    class LauncherSecurityManager extends SecurityManager {
742        private static boolean exitInvoked;
743        private static int exitCode;
744        private SecurityManager securityManager;
745    
746        public LauncherSecurityManager() {
747            reset();
748            securityManager = System.getSecurityManager();
749            System.setSecurityManager(this);
750        }
751    
752        @Override
753        public void checkPermission(Permission perm, Object context) {
754            if (securityManager != null) {
755                // check everything with the original SecurityManager
756                securityManager.checkPermission(perm, context);
757            }
758        }
759    
760        @Override
761        public void checkPermission(Permission perm) {
762            if (securityManager != null) {
763                // check everything with the original SecurityManager
764                securityManager.checkPermission(perm);
765            }
766        }
767    
768        @Override
769        public void checkExit(int status) throws SecurityException {
770            exitInvoked = true;
771            exitCode = status;
772            throw new SecurityException("Intercepted System.exit(" + status + ")");
773        }
774    
775        public static boolean getExitInvoked() {
776            return exitInvoked;
777        }
778    
779        public static int getExitCode() {
780            return exitCode;
781        }
782    
783        public static void reset() {
784            exitInvoked = false;
785            exitCode = 0;
786        }
787    }
788    
789    class LauncherException extends Exception {
790    
791        LauncherException(String message) {
792            super(message);
793        }
794    
795        LauncherException(String message, Throwable cause) {
796            super(message, cause);
797        }
798    }