001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.io.BufferedReader;
021    import java.io.File;
022    import java.io.FileReader;
023    import java.io.IOException;
024    import java.io.InputStream;
025    import java.io.InputStreamReader;
026    import java.io.OutputStream;
027    import java.io.OutputStreamWriter;
028    import java.io.PrintWriter;
029    import java.io.StringWriter;
030    import java.io.Writer;
031    import java.lang.reflect.InvocationTargetException;
032    import java.lang.reflect.Method;
033    import java.net.URI;
034    import java.security.Permission;
035    import java.text.MessageFormat;
036    import java.util.ArrayList;
037    import java.util.Collection;
038    import java.util.List;
039    import java.util.Properties;
040    import java.util.StringTokenizer;
041    import java.util.concurrent.ScheduledThreadPoolExecutor;
042    import java.util.concurrent.TimeUnit;
043    
044    import org.apache.hadoop.conf.Configuration;
045    import org.apache.hadoop.fs.FileSystem;
046    import org.apache.hadoop.fs.Path;
047    import org.apache.hadoop.mapred.Counters;
048    import org.apache.hadoop.mapred.JobConf;
049    import org.apache.hadoop.mapred.Mapper;
050    import org.apache.hadoop.mapred.OutputCollector;
051    import org.apache.hadoop.mapred.Reporter;
052    import org.apache.hadoop.mapred.RunningJob;
053    import org.apache.oozie.service.HadoopAccessorException;
054    import org.apache.oozie.service.HadoopAccessorService;
055    import org.apache.oozie.service.Services;
056    import org.apache.oozie.util.XLog;
057    
058    public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, Runnable {
059    
060        public static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class";
061        public static final String CONF_OOZIE_ACTION_SUPPORTED_FILESYSTEMS = "oozie.launcher.action.supported.filesystems";
062    
063        public static final String CONF_OOZIE_ACTION_MAX_OUTPUT_DATA = "oozie.action.max.output.data";
064    
065        private static final String CONF_OOZIE_ACTION_MAIN_ARG_COUNT = "oozie.action.main.arg.count";
066        private static final String CONF_OOZIE_ACTION_MAIN_ARG_PREFIX = "oozie.action.main.arg.";
067        private static final String CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE = "oozie.external.stats.max.size";
068    
069        private static final String COUNTER_GROUP = "oozie.launcher";
070        private static final String COUNTER_DO_ID_SWAP = "oozie.do.id.swap";
071        private static final String COUNTER_OUTPUT_DATA = "oozie.output.data";
072        private static final String COUNTER_STATS_DATA = "oozie.stats.data";
073        private static final String COUNTER_LAUNCHER_ERROR = "oozie.launcher.error";
074    
075        private static final String OOZIE_JOB_ID = "oozie.job.id";
076        private static final String OOZIE_ACTION_ID = "oozie.action.id";
077    
078        private static final String OOZIE_ACTION_DIR_PATH = "oozie.action.dir.path";
079        private static final String OOZIE_ACTION_RECOVERY_ID = "oozie.action.recovery.id";
080    
081        public static final String ACTION_PREFIX = "oozie.action.";
082        public static final String EXTERNAL_CHILD_IDS = ACTION_PREFIX + "externalChildIDs.properties";
083        public static final String EXTERNAL_ACTION_STATS = ACTION_PREFIX + "stats.properties";
084    
085        static final String ACTION_CONF_XML = "action.xml";
086        public static final String ACTION_PREPARE_XML = "oozie.action.prepare.xml";
087        private static final String ACTION_OUTPUT_PROPS = "output.properties";
088        private static final String ACTION_STATS_PROPS = "stats.properties";
089        private static final String ACTION_EXTERNAL_CHILD_IDS_PROPS = "externalChildIds.properties";
090        private static final String ACTION_NEW_ID_PROPS = "newId.properties";
091        private static final String ACTION_ERROR_PROPS = "error.properties";
092    
093        private void setRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId) throws LauncherException {
094            try {
095                String jobId = launcherConf.get("mapred.job.id");
096                Path path = new Path(actionDir, recoveryId);
097                FileSystem fs = FileSystem.get(path.toUri(), launcherConf);
098                if (!fs.exists(path)) {
099                    try {
100                        Writer writer = new OutputStreamWriter(fs.create(path));
101                        writer.write(jobId);
102                        writer.close();
103                    }
104                    catch (IOException ex) {
105                        failLauncher(0, "IO error", ex);
106                    }
107                }
108                else {
109                    InputStream is = fs.open(path);
110                    BufferedReader reader = new BufferedReader(new InputStreamReader(is));
111                    String id = reader.readLine();
112                    reader.close();
113                    if (!jobId.equals(id)) {
114                        failLauncher(0, MessageFormat.format(
115                                "Hadoop job Id mismatch, action file [{0}] declares Id [{1}] current Id [{2}]", path, id,
116                                jobId), null);
117                    }
118    
119                }
120            }
121            catch (IOException ex) {
122                failLauncher(0, "IO error", ex);
123            }
124        }
125    
126        /**
127         * @param launcherConf
128         * @param actionDir
129         * @param recoveryId
130         * @return
131         * @throws HadoopAccessorException
132         * @throws IOException
133         */
134        public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId)
135                throws HadoopAccessorException, IOException {
136            String jobId = null;
137            Path recoveryFile = new Path(actionDir, recoveryId);
138            FileSystem fs = Services.get().get(HadoopAccessorService.class)
139                    .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf);
140    
141            if (fs.exists(recoveryFile)) {
142                InputStream is = fs.open(recoveryFile);
143                BufferedReader reader = new BufferedReader(new InputStreamReader(is));
144                jobId = reader.readLine();
145                reader.close();
146            }
147            return jobId;
148    
149        }
150    
151        public static void setupMainClass(Configuration launcherConf, String javaMainClass) {
152            launcherConf.set(CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass);
153        }
154    
155        public static void setupSupportedFileSystems(Configuration launcherConf, String supportedFileSystems) {
156            launcherConf.set(CONF_OOZIE_ACTION_SUPPORTED_FILESYSTEMS, supportedFileSystems);
157        }
158    
159        public static void setupMainArguments(Configuration launcherConf, String[] args) {
160            launcherConf.setInt(CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length);
161            for (int i = 0; i < args.length; i++) {
162                launcherConf.set(CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]);
163            }
164        }
165    
166        public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) {
167            launcherConf.setInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData);
168        }
169    
170        /**
171         * Set the maximum value of stats data
172         *
173         * @param launcherConf the oozie launcher configuration
174         * @param maxStatsData the maximum allowed size of stats data
175         */
176        public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){
177            launcherConf.setInt(CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData);
178        }
179    
180        /**
181         * @param launcherConf
182         * @param jobId
183         * @param actionId
184         * @param actionDir
185         * @param recoveryId
186         * @param actionConf
187         * @throws IOException
188         * @throws HadoopAccessorException
189         */
190        public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir,
191                String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException {
192    
193            launcherConf.setMapperClass(LauncherMapper.class);
194            launcherConf.setSpeculativeExecution(false);
195            launcherConf.setNumMapTasks(1);
196            launcherConf.setNumReduceTasks(0);
197    
198            launcherConf.set(OOZIE_JOB_ID, jobId);
199            launcherConf.set(OOZIE_ACTION_ID, actionId);
200            launcherConf.set(OOZIE_ACTION_DIR_PATH, actionDir.toString());
201            launcherConf.set(OOZIE_ACTION_RECOVERY_ID, recoveryId);
202            launcherConf.set(ACTION_PREPARE_XML, prepareXML);
203    
204            actionConf.set(OOZIE_JOB_ID, jobId);
205            actionConf.set(OOZIE_ACTION_ID, actionId);
206    
207            if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) {
208              List<String> purgedEntries = new ArrayList<String>();
209              Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files");
210              for (String entry : entries) {
211                if (entry.contains("#")) {
212                  purgedEntries.add(entry);
213                }
214              }
215              actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()]));
216              launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true);
217            }
218    
219            FileSystem fs =
220              Services.get().get(HadoopAccessorService.class).createFileSystem(launcherConf.get("user.name"),
221                                                                               actionDir.toUri(), launcherConf);
222            fs.mkdirs(actionDir);
223    
224            OutputStream os = fs.create(new Path(actionDir, ACTION_CONF_XML));
225            actionConf.writeXml(os);
226            os.close();
227    
228            Path inputDir = new Path(actionDir, "input");
229            fs.mkdirs(inputDir);
230            Writer writer = new OutputStreamWriter(fs.create(new Path(inputDir, "dummy.txt")));
231            writer.write("dummy");
232            writer.close();
233    
234            launcherConf.set("mapred.input.dir", inputDir.toString());
235            launcherConf.set("mapred.output.dir", new Path(actionDir, "output").toString());
236        }
237    
238        public static boolean isMainDone(RunningJob runningJob) throws IOException {
239            return runningJob.isComplete();
240        }
241    
242        public static boolean isMainSuccessful(RunningJob runningJob) throws IOException {
243            boolean succeeded = runningJob.isSuccessful();
244            if (succeeded) {
245                Counters counters = runningJob.getCounters();
246                if (counters != null) {
247                    Counters.Group group = counters.getGroup(COUNTER_GROUP);
248                    if (group != null) {
249                        succeeded = group.getCounter(COUNTER_LAUNCHER_ERROR) == 0;
250                    }
251                }
252            }
253            return succeeded;
254        }
255    
256        public static boolean hasOutputData(RunningJob runningJob) throws IOException {
257            boolean output = false;
258            Counters counters = runningJob.getCounters();
259            if (counters != null) {
260                Counters.Group group = counters.getGroup(COUNTER_GROUP);
261                if (group != null) {
262                    output = group.getCounter(COUNTER_OUTPUT_DATA) == 1;
263                }
264            }
265            return output;
266        }
267    
268        /**
269         * Check whether runningJob has stats data or not
270         *
271         * @param runningJob the runningJob
272         * @return returns whether the running Job has stats data or not
273         * @throws IOException
274         */
275        public static boolean hasStatsData(RunningJob runningJob) throws IOException{
276            boolean output = false;
277            Counters counters = runningJob.getCounters();
278            if (counters != null) {
279                Counters.Group group = counters.getGroup(COUNTER_GROUP);
280                if (group != null) {
281                    output = group.getCounter(COUNTER_STATS_DATA) == 1;
282                }
283            }
284            return output;
285        }
286    
287        /**
288         * @param runningJob
289         * @return
290         * @throws IOException
291         */
292        public static boolean hasIdSwap(RunningJob runningJob) throws IOException {
293            boolean swap = false;
294            Counters counters = runningJob.getCounters();
295            if (counters != null) {
296                Counters.Group group = counters.getGroup(COUNTER_GROUP);
297                if (group != null) {
298                    swap = group.getCounter(COUNTER_DO_ID_SWAP) == 1;
299                }
300            }
301            return swap;
302        }
303    
304        /**
305         * @param runningJob
306         * @param user
307         * @param group
308         * @param actionDir
309         * @return
310         * @throws IOException
311         * @throws HadoopAccessorException
312         */
313        public static boolean hasIdSwap(RunningJob runningJob, String user, String group, Path actionDir)
314                throws IOException, HadoopAccessorException {
315            boolean swap = false;
316    
317            XLog log = XLog.getLog("org.apache.oozie.action.hadoop.LauncherMapper");
318    
319            Counters counters = runningJob.getCounters();
320            if (counters != null) {
321                Counters.Group counterGroup = counters.getGroup(COUNTER_GROUP);
322                if (counterGroup != null) {
323                    swap = counterGroup.getCounter(COUNTER_DO_ID_SWAP) == 1;
324                }
325            }
326            // additional check for swapped hadoop ID
327            // Can't rely on hadoop counters existing
328            // we'll check for the newID file in hdfs if the hadoop counters is null
329            else {
330    
331                Path p = getIdSwapPath(actionDir);
332                // log.debug("Checking for newId file in: [{0}]", p);
333    
334                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
335                Configuration conf = has.createJobConf(p.toUri().getAuthority());
336                FileSystem fs = has.createFileSystem(user, p.toUri(), conf);
337                if (fs.exists(p)) {
338                    log.debug("Hadoop Counters is null, but found newID file.");
339    
340                    swap = true;
341                }
342                else {
343                    log.debug("Hadoop Counters is null, and newID file doesn't exist at: [{0}]", p);
344                }
345            }
346            return swap;
347        }
348    
349        public static Path getOutputDataPath(Path actionDir) {
350            return new Path(actionDir, ACTION_OUTPUT_PROPS);
351        }
352    
353        /**
354         * Get the location of stats file
355         *
356         * @param actionDir the action directory
357         * @return the hdfs location of the file
358         */
359        public static Path getActionStatsDataPath(Path actionDir){
360            return new Path(actionDir, ACTION_STATS_PROPS);
361        }
362    
363        /**
364         * Get the location of external Child IDs file
365         *
366         * @param actionDir the action directory
367         * @return the hdfs location of the file
368         */
369        public static Path getExternalChildIDsDataPath(Path actionDir){
370            return new Path(actionDir, ACTION_EXTERNAL_CHILD_IDS_PROPS);
371        }
372    
373        public static Path getErrorPath(Path actionDir) {
374            return new Path(actionDir, ACTION_ERROR_PROPS);
375        }
376    
377        public static Path getIdSwapPath(Path actionDir) {
378            return new Path(actionDir, ACTION_NEW_ID_PROPS);
379        }
380    
381        private JobConf jobConf;
382        private Path actionDir;
383        private ScheduledThreadPoolExecutor timer;
384    
385        private boolean configFailure = false;
386        private LauncherException configureFailureEx;
387        public LauncherMapper() {
388        }
389    
390        @Override
391        public void configure(JobConf jobConf) {
392            System.out.println();
393            System.out.println("Oozie Launcher starts");
394            System.out.println();
395            this.jobConf = jobConf;
396            actionDir = new Path(getJobConf().get(OOZIE_ACTION_DIR_PATH));
397            String recoveryId = jobConf.get(OOZIE_ACTION_RECOVERY_ID, null);
398            try {
399                setRecoveryId(jobConf, actionDir, recoveryId);
400            }
401            catch (LauncherException ex) {
402                System.out.println("Launcher config error "+ex.getMessage());
403                configureFailureEx = ex;
404                configFailure = true;
405            }
406        }
407    
408        @Override
409        public void map(K1 key, V1 value, OutputCollector<K2, V2> collector, Reporter reporter) throws IOException {
410            try {
411                if (configFailure) {
412                    throw configureFailureEx;
413                }
414                else {
415                    String mainClass = getJobConf().get(CONF_OOZIE_ACTION_MAIN_CLASS);
416                    if (getJobConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) {
417                      System.err.println("WARNING, workaround for Hadoop 2.0.2-alpha distributed cached issue (MAPREDUCE-4820) enabled");
418                    }
419                    String msgPrefix = "Main class [" + mainClass + "], ";
420                    int errorCode = 0;
421                    Throwable errorCause = null;
422                    String errorMessage = null;
423    
424                    try {
425                        new LauncherSecurityManager();
426                    }
427                    catch (SecurityException ex) {
428                        errorMessage = "Could not set LauncherSecurityManager";
429                        errorCause = ex;
430                    }
431    
432                    try {
433                        setupHeartBeater(reporter);
434    
435                        setupMainConfiguration();
436    
437                        try {
438                            System.out.println("Starting the execution of prepare actions");
439                            executePrepare();
440                            System.out.println("Completed the execution of prepare actions successfully");
441                        } catch (Exception ex) {
442                            System.out.println("Prepare execution in the Launcher Mapper has failed");
443                            throw new LauncherException(ex.getMessage(), ex);
444                        }
445    
446                        String[] args = getMainArguments(getJobConf());
447    
448                        printContentsOfCurrentDir();
449    
450                        System.out.println();
451                        System.out.println("Oozie Java/Map-Reduce/Pig action launcher-job configuration");
452                        System.out.println("=================================================================");
453                        System.out.println("Workflow job id   : " + System.getProperty("oozie.job.id"));
454                        System.out.println("Workflow action id: " + System.getProperty("oozie.action.id"));
455                        System.out.println();
456                        System.out.println("Classpath         :");
457                        System.out.println("------------------------");
458                        StringTokenizer st = new StringTokenizer(System.getProperty("java.class.path"), ":");
459                        while (st.hasMoreTokens()) {
460                            System.out.println("  " + st.nextToken());
461                        }
462                        System.out.println("------------------------");
463                        System.out.println();
464                        System.out.println("Main class        : " + mainClass);
465                        System.out.println();
466                        System.out.println("Maximum output    : "
467                                + getJobConf().getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024));
468                        System.out.println();
469                        System.out.println("Arguments         :");
470                        for (String arg : args) {
471                            System.out.println("                    " + arg);
472                        }
473    
474                        System.out.println();
475                        System.out.println("Java System Properties:");
476                        System.out.println("------------------------");
477                        System.getProperties().store(System.out, "");
478                        System.out.flush();
479                        System.out.println("------------------------");
480                        System.out.println();
481    
482                        System.out.println("=================================================================");
483                        System.out.println();
484                        System.out.println(">>> Invoking Main class now >>>");
485                        System.out.println();
486                        System.out.flush();
487    
488                        try {
489                            Class klass = getJobConf().getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class);
490                            Method mainMethod = klass.getMethod("main", String[].class);
491                            mainMethod.invoke(null, (Object) args);
492                        }
493                        catch (InvocationTargetException ex) {
494                            if (LauncherMainException.class.isInstance(ex.getCause())) {
495                                errorMessage = msgPrefix + "exit code [" +((LauncherMainException)ex.getCause()).getErrorCode()
496                                    + "]";
497                                errorCause = null;
498                            }
499                            else if (SecurityException.class.isInstance(ex.getCause())) {
500                                if (LauncherSecurityManager.getExitInvoked()) {
501                                    System.out.println("Intercepting System.exit(" + LauncherSecurityManager.getExitCode()
502                                            + ")");
503                                    System.err.println("Intercepting System.exit(" + LauncherSecurityManager.getExitCode()
504                                            + ")");
505                                    // if 0 main() method finished successfully
506                                    // ignoring
507                                    errorCode = LauncherSecurityManager.getExitCode();
508                                    if (errorCode != 0) {
509                                        errorMessage = msgPrefix + "exit code [" + errorCode + "]";
510                                        errorCause = null;
511                                    }
512                                }
513                            }
514                            else {
515                                throw ex;
516                            }
517                        }
518                        finally {
519                            System.out.println();
520                            System.out.println("<<< Invocation of Main class completed <<<");
521                            System.out.println();
522                            handleExternalChildIDs(reporter);
523                        }
524                        if (errorMessage == null) {
525                            File outputData = new File(System.getProperty("oozie.action.output.properties"));
526                            if (outputData.exists()) {
527                                URI actionDirUri = new Path(actionDir, ACTION_OUTPUT_PROPS).toUri();
528                                FileSystem fs = FileSystem.get(actionDirUri, getJobConf());
529                                fs.copyFromLocalFile(new Path(outputData.toString()), new Path(actionDir,
530                                                                                               ACTION_OUTPUT_PROPS));
531                                reporter.incrCounter(COUNTER_GROUP, COUNTER_OUTPUT_DATA, 1);
532    
533                                int maxOutputData = getJobConf().getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024);
534                                if (outputData.length() > maxOutputData) {
535                                    String msg = MessageFormat.format("Output data size [{0}] exceeds maximum [{1}]",
536                                                                      outputData.length(), maxOutputData);
537                                    failLauncher(0, msg, null);
538                                }
539                                System.out.println();
540                                System.out.println("Oozie Launcher, capturing output data:");
541                                System.out.println("=======================");
542                                Properties props = new Properties();
543                                props.load(new FileReader(outputData));
544                                props.store(System.out, "");
545                                System.out.println();
546                                System.out.println("=======================");
547                                System.out.println();
548                            }
549                            handleActionStatsData(reporter);
550                            File newId = new File(System.getProperty("oozie.action.newId.properties"));
551                            if (newId.exists()) {
552                                Properties props = new Properties();
553                                props.load(new FileReader(newId));
554                                if (props.getProperty("id") == null) {
555                                    throw new IllegalStateException("ID swap file does not have [id] property");
556                                }
557                                URI actionDirUri = new Path(actionDir, ACTION_NEW_ID_PROPS).toUri();
558                                FileSystem fs = FileSystem.get(actionDirUri, getJobConf());
559                                fs.copyFromLocalFile(new Path(newId.toString()), new Path(actionDir, ACTION_NEW_ID_PROPS));
560                                reporter.incrCounter(COUNTER_GROUP, COUNTER_DO_ID_SWAP, 1);
561    
562                                System.out.println("Oozie Launcher, copying new Hadoop job id to file: "
563                                        + new Path(actionDir, ACTION_NEW_ID_PROPS).toUri());
564    
565                                System.out.println();
566                                System.out.println("Oozie Launcher, propagating new Hadoop job id to Oozie");
567                                System.out.println("=======================");
568                                System.out.println("id: " + props.getProperty("id"));
569                                System.out.println("=======================");
570                                System.out.println();
571                            }
572                        }
573                    }
574                    catch (NoSuchMethodException ex) {
575                        errorMessage = msgPrefix + "main() method not found";
576                        errorCause = ex;
577                    }
578                    catch (InvocationTargetException ex) {
579                        errorMessage = msgPrefix + "main() threw exception";
580                        errorCause = ex.getTargetException();
581                    }
582                    catch (Throwable ex) {
583                        errorMessage = msgPrefix + "exception invoking main()";
584                        errorCause = ex;
585                    }
586                    finally {
587                        destroyHeartBeater();
588                        if (errorMessage != null) {
589                            failLauncher(errorCode, errorMessage, errorCause);
590                        }
591                    }
592                }
593            }
594            catch (LauncherException ex) {
595                reporter.incrCounter(COUNTER_GROUP, COUNTER_LAUNCHER_ERROR, 1);
596                System.out.println();
597                System.out.println("Oozie Launcher failed, finishing Hadoop job gracefully");
598                System.out.println();
599            }
600        }
601    
602        @Override
603        public void close() throws IOException {
604            System.out.println();
605            System.out.println("Oozie Launcher ends");
606            System.out.println();
607        }
608    
609        protected JobConf getJobConf() {
610            return jobConf;
611        }
612    
613        private void handleActionStatsData(Reporter reporter) throws IOException, LauncherException {
614            File actionStatsData = new File(System.getProperty(EXTERNAL_ACTION_STATS));
615            // If stats are stored by the action, then stats file should exist
616            if (actionStatsData.exists()) {
617                int statsMaxOutputData = getJobConf().getInt(CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE,
618                        Integer.MAX_VALUE);
619                reporter.incrCounter(COUNTER_GROUP, COUNTER_STATS_DATA, 1);
620                // fail the launcher if size of stats is greater than the maximum allowed size
621                if (actionStatsData.length() > statsMaxOutputData) {
622                    String msg = MessageFormat.format("Output stats size [{0}] exceeds maximum [{1}]",
623                            actionStatsData.length(), statsMaxOutputData);
624                    failLauncher(0, msg, null);
625                }
626                // copy the stats file to hdfs path which can be accessed by Oozie server
627                URI actionDirUri = new Path(actionDir, ACTION_STATS_PROPS).toUri();
628                FileSystem fs = FileSystem.get(actionDirUri, getJobConf());
629                fs.copyFromLocalFile(new Path(actionStatsData.toString()), new Path(actionDir,
630                        ACTION_STATS_PROPS));
631            }
632        }
633    
634        private void handleExternalChildIDs(Reporter reporter) throws IOException {
635            File externalChildIDs = new File(System.getProperty(EXTERNAL_CHILD_IDS));
636            // if external ChildIDs are stored by the action, then the file should exist
637            if (externalChildIDs.exists()) {
638                // copy the externalChildIDs file to hdfs path which can be accessed by Oozie server
639                URI actionDirUri = new Path(actionDir, ACTION_EXTERNAL_CHILD_IDS_PROPS).toUri();
640                FileSystem fs = FileSystem.get(actionDirUri, getJobConf());
641                fs.copyFromLocalFile(new Path(externalChildIDs.toString()), new Path(actionDir,
642                        ACTION_EXTERNAL_CHILD_IDS_PROPS));
643            }
644        }
645    
646        private void setupMainConfiguration() throws IOException, HadoopAccessorException {
647            Path pathNew = new Path(new Path(actionDir, ACTION_CONF_XML),
648                    new Path(new File(ACTION_CONF_XML).getAbsolutePath()));
649            FileSystem fs = FileSystem.get(pathNew.toUri(), getJobConf());
650            fs.copyToLocalFile(new Path(actionDir, ACTION_CONF_XML),
651                    new Path(new File(ACTION_CONF_XML).getAbsolutePath()));
652    
653            System.setProperty("oozie.launcher.job.id", getJobConf().get("mapred.job.id"));
654            System.setProperty("oozie.job.id", getJobConf().get(OOZIE_JOB_ID));
655            System.setProperty("oozie.action.id", getJobConf().get(OOZIE_ACTION_ID));
656            System.setProperty("oozie.action.conf.xml", new File(ACTION_CONF_XML).getAbsolutePath());
657            System.setProperty("oozie.action.output.properties", new File(ACTION_OUTPUT_PROPS).getAbsolutePath());
658            System.setProperty(EXTERNAL_ACTION_STATS, new File(ACTION_STATS_PROPS).getAbsolutePath());
659            System.setProperty(EXTERNAL_CHILD_IDS, new File(ACTION_EXTERNAL_CHILD_IDS_PROPS).getAbsolutePath());
660            System.setProperty("oozie.action.newId.properties", new File(ACTION_NEW_ID_PROPS).getAbsolutePath());
661        }
662    
663        // Method to execute the prepare actions
664        private void executePrepare() throws IOException, LauncherException {
665            String prepareXML = getJobConf().get(ACTION_PREPARE_XML);
666            if (prepareXML != null) {
667                 if (!prepareXML.equals("")) {
668                     PrepareActionsDriver.doOperations(
669                         getJobConf().getStringCollection(CONF_OOZIE_ACTION_SUPPORTED_FILESYSTEMS), prepareXML);
670                 } else {
671                     System.out.println("There are no prepare actions to execute.");
672                 }
673            }
674        }
675    
676        public static String[] getMainArguments(Configuration conf) {
677            String[] args = new String[conf.getInt(CONF_OOZIE_ACTION_MAIN_ARG_COUNT, 0)];
678            for (int i = 0; i < args.length; i++) {
679                args[i] = conf.get(CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i);
680            }
681            return args;
682        }
683    
684        private void setupHeartBeater(Reporter reporter) {
685            timer = new ScheduledThreadPoolExecutor(1);
686            timer.scheduleAtFixedRate(new LauncherMapper(reporter), 0, 30, TimeUnit.SECONDS);
687        }
688    
689        private void destroyHeartBeater() {
690            timer.shutdownNow();
691        }
692    
693        private Reporter reporter;
694    
695        private LauncherMapper(Reporter reporter) {
696            this.reporter = reporter;
697        }
698    
699        @Override
700        public void run() {
701            System.out.println("Heart beat");
702            reporter.progress();
703        }
704    
705        private void failLauncher(int errorCode, String reason, Throwable ex) throws LauncherException {
706            try {
707                if (ex != null) {
708                    reason += ", " + ex.getMessage();
709                }
710                Properties errorProps = new Properties();
711                errorProps.setProperty("error.code", Integer.toString(errorCode));
712                errorProps.setProperty("error.reason", reason);
713                if (ex != null) {
714                    if (ex.getMessage() != null) {
715                        errorProps.setProperty("exception.message", ex.getMessage());
716                    }
717                    StringWriter sw = new StringWriter();
718                    PrintWriter pw = new PrintWriter(sw);
719                    ex.printStackTrace(pw);
720                    pw.close();
721                    errorProps.setProperty("exception.stacktrace", sw.toString());
722                }
723                FileSystem fs = FileSystem.get((new Path(actionDir, ACTION_ERROR_PROPS)).toUri(), getJobConf());
724                OutputStream os = fs.create(new Path(actionDir, ACTION_ERROR_PROPS));
725                errorProps.store(os, "");
726                os.close();
727    
728                System.out.print("Failing Oozie Launcher, " + reason + "\n");
729                System.err.print("Failing Oozie Launcher, " + reason + "\n");
730                if (ex != null) {
731                    ex.printStackTrace(System.out);
732                    ex.printStackTrace(System.err);
733                }
734                throw new LauncherException(reason, ex);
735            }
736            catch (IOException rex) {
737                throw new RuntimeException("Error while failing launcher, " + rex.getMessage(), rex);
738            }
739        }
740    
741        /**
742         * Print files and directories in current directory. Will list files in the sub-directory (only 1 level deep)
743         */
744        protected void printContentsOfCurrentDir() {
745            File folder = new File(".");
746            System.out.println();
747            System.out.println("Files in current dir:" + folder.getAbsolutePath());
748            System.out.println("======================");
749    
750            File[] listOfFiles = folder.listFiles();
751            for (File fileName : listOfFiles) {
752                if (fileName.isFile()) {
753                    System.out.println("File: " + fileName.getName());
754                }
755                else if (fileName.isDirectory()) {
756                    System.out.println("Dir: " + fileName.getName());
757                    File subDir = new File(fileName.getName());
758                    File[] moreFiles = subDir.listFiles();
759                    for (File subFileName : moreFiles) {
760                        if (subFileName.isFile()) {
761                            System.out.println("  File: " + subFileName.getName());
762                        }
763                        else if (subFileName.isDirectory()) {
764                            System.out.println("  Dir: " + subFileName.getName());
765                        }
766                    }
767                }
768            }
769        }
770    
771    }
772    
773    class LauncherSecurityManager extends SecurityManager {
774        private static boolean exitInvoked;
775        private static int exitCode;
776        private SecurityManager securityManager;
777    
778        public LauncherSecurityManager() {
779            reset();
780            securityManager = System.getSecurityManager();
781            System.setSecurityManager(this);
782        }
783    
784        @Override
785        public void checkPermission(Permission perm, Object context) {
786            if (securityManager != null) {
787                // check everything with the original SecurityManager
788                securityManager.checkPermission(perm, context);
789            }
790        }
791    
792        @Override
793        public void checkPermission(Permission perm) {
794            if (securityManager != null) {
795                // check everything with the original SecurityManager
796                securityManager.checkPermission(perm);
797            }
798        }
799    
800        @Override
801        public void checkExit(int status) throws SecurityException {
802            exitInvoked = true;
803            exitCode = status;
804            throw new SecurityException("Intercepted System.exit(" + status + ")");
805        }
806    
807        public static boolean getExitInvoked() {
808            return exitInvoked;
809        }
810    
811        public static int getExitCode() {
812            return exitCode;
813        }
814    
815        public static void reset() {
816            exitInvoked = false;
817            exitCode = 0;
818        }
819    }
820    
821    class LauncherException extends Exception {
822    
823        LauncherException(String message) {
824            super(message);
825        }
826    
827        LauncherException(String message, Throwable cause) {
828            super(message, cause);
829        }
830    }