001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.io.BufferedReader;
021    import java.io.File;
022    import java.io.FileReader;
023    import java.io.IOException;
024    import java.io.InputStream;
025    import java.io.InputStreamReader;
026    import java.io.OutputStream;
027    import java.io.OutputStreamWriter;
028    import java.io.PrintWriter;
029    import java.io.StringWriter;
030    import java.io.Writer;
031    import java.lang.reflect.InvocationTargetException;
032    import java.lang.reflect.Method;
033    import java.net.URI;
034    import java.security.Permission;
035    import java.text.MessageFormat;
036    import java.util.ArrayList;
037    import java.util.Collection;
038    import java.util.List;
039    import java.util.Properties;
040    import java.util.StringTokenizer;
041    import java.util.concurrent.ScheduledThreadPoolExecutor;
042    import java.util.concurrent.TimeUnit;
043    
044    import org.apache.hadoop.conf.Configuration;
045    import org.apache.hadoop.fs.FileSystem;
046    import org.apache.hadoop.fs.Path;
047    import org.apache.hadoop.mapred.Counters;
048    import org.apache.hadoop.mapred.JobConf;
049    import org.apache.hadoop.mapred.Mapper;
050    import org.apache.hadoop.mapred.OutputCollector;
051    import org.apache.hadoop.mapred.Reporter;
052    import org.apache.hadoop.mapred.RunningJob;
053    import org.apache.oozie.service.HadoopAccessorException;
054    import org.apache.oozie.service.HadoopAccessorService;
055    import org.apache.oozie.service.Services;
056    import org.apache.oozie.util.XLog;
057    
058    public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, Runnable {
059    
060        public static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class";
061        public static final String CONF_OOZIE_ACTION_SUPPORTED_FILESYSTEMS = "oozie.launcher.action.supported.filesystems";
062    
063        public static final String CONF_OOZIE_ACTION_MAX_OUTPUT_DATA = "oozie.action.max.output.data";
064    
065        private static final String CONF_OOZIE_ACTION_MAIN_ARG_COUNT = "oozie.action.main.arg.count";
066        private static final String CONF_OOZIE_ACTION_MAIN_ARG_PREFIX = "oozie.action.main.arg.";
067        private static final String CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE = "oozie.external.stats.max.size";
068    
069        private static final String COUNTER_GROUP = "oozie.launcher";
070        private static final String COUNTER_DO_ID_SWAP = "oozie.do.id.swap";
071        private static final String COUNTER_OUTPUT_DATA = "oozie.output.data";
072        private static final String COUNTER_STATS_DATA = "oozie.stats.data";
073        private static final String COUNTER_LAUNCHER_ERROR = "oozie.launcher.error";
074    
075        private static final String OOZIE_JOB_ID = "oozie.job.id";
076        private static final String OOZIE_ACTION_ID = "oozie.action.id";
077    
078        private static final String OOZIE_ACTION_DIR_PATH = "oozie.action.dir.path";
079        private static final String OOZIE_ACTION_RECOVERY_ID = "oozie.action.recovery.id";
080    
081        public static final String ACTION_PREFIX = "oozie.action.";
082        public static final String EXTERNAL_CHILD_IDS = ACTION_PREFIX + "externalChildIDs.properties";
083        public static final String EXTERNAL_ACTION_STATS = ACTION_PREFIX + "stats.properties";
084    
085        static final String ACTION_CONF_XML = "action.xml";
086        public static final String ACTION_PREPARE_XML = "oozie.action.prepare.xml";
087        private static final String ACTION_OUTPUT_PROPS = "output.properties";
088        private static final String ACTION_STATS_PROPS = "stats.properties";
089        private static final String ACTION_EXTERNAL_CHILD_IDS_PROPS = "externalChildIds.properties";
090        private static final String ACTION_NEW_ID_PROPS = "newId.properties";
091        private static final String ACTION_ERROR_PROPS = "error.properties";
092    
093        private void setRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId) throws LauncherException {
094            try {
095                String jobId = launcherConf.get("mapred.job.id");
096                Path path = new Path(actionDir, recoveryId);
097                FileSystem fs = FileSystem.get(path.toUri(), launcherConf);
098                if (!fs.exists(path)) {
099                    try {
100                        Writer writer = new OutputStreamWriter(fs.create(path));
101                        writer.write(jobId);
102                        writer.close();
103                    }
104                    catch (IOException ex) {
105                        failLauncher(0, "IO error", ex);
106                    }
107                }
108                else {
109                    InputStream is = fs.open(path);
110                    BufferedReader reader = new BufferedReader(new InputStreamReader(is));
111                    String id = reader.readLine();
112                    reader.close();
113                    if (!jobId.equals(id)) {
114                        failLauncher(0, MessageFormat.format(
115                                "Hadoop job Id mismatch, action file [{0}] declares Id [{1}] current Id [{2}]", path, id,
116                                jobId), null);
117                    }
118    
119                }
120            }
121            catch (IOException ex) {
122                failLauncher(0, "IO error", ex);
123            }
124        }
125    
126        /**
127         * @param launcherConf
128         * @param actionDir
129         * @param recoveryId
130         * @return
131         * @throws HadoopAccessorException
132         * @throws IOException
133         */
134        public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId)
135                throws HadoopAccessorException, IOException {
136            String jobId = null;
137            Path recoveryFile = new Path(actionDir, recoveryId);
138            FileSystem fs = Services.get().get(HadoopAccessorService.class)
139                    .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf);
140    
141            if (fs.exists(recoveryFile)) {
142                InputStream is = fs.open(recoveryFile);
143                BufferedReader reader = new BufferedReader(new InputStreamReader(is));
144                jobId = reader.readLine();
145                reader.close();
146            }
147            return jobId;
148    
149        }
150    
151        public static void setupMainClass(Configuration launcherConf, String javaMainClass) {
152            launcherConf.set(CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass);
153        }
154    
155        public static void setupSupportedFileSystems(Configuration launcherConf, String supportedFileSystems) {
156            launcherConf.set(CONF_OOZIE_ACTION_SUPPORTED_FILESYSTEMS, supportedFileSystems);
157        }
158    
159        public static void setupMainArguments(Configuration launcherConf, String[] args) {
160            launcherConf.setInt(CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length);
161            for (int i = 0; i < args.length; i++) {
162                launcherConf.set(CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]);
163            }
164        }
165    
166        public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) {
167            launcherConf.setInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData);
168        }
169    
170        /**
171         * Set the maximum value of stats data
172         *
173         * @param launcherConf the oozie launcher configuration
174         * @param maxStatsData the maximum allowed size of stats data
175         */
176        public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){
177            launcherConf.setInt(CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData);
178        }
179    
180        /**
181         * @param launcherConf
182         * @param jobId
183         * @param actionId
184         * @param actionDir
185         * @param recoveryId
186         * @param actionConf
187         * @throws IOException
188         * @throws HadoopAccessorException
189         */
190        public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir,
191                String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException {
192    
193            launcherConf.setMapperClass(LauncherMapper.class);
194            launcherConf.setSpeculativeExecution(false);
195            launcherConf.setNumMapTasks(1);
196            launcherConf.setNumReduceTasks(0);
197    
198            launcherConf.set(OOZIE_JOB_ID, jobId);
199            launcherConf.set(OOZIE_ACTION_ID, actionId);
200            launcherConf.set(OOZIE_ACTION_DIR_PATH, actionDir.toString());
201            launcherConf.set(OOZIE_ACTION_RECOVERY_ID, recoveryId);
202            launcherConf.set(ACTION_PREPARE_XML, prepareXML);
203    
204            actionConf.set(OOZIE_JOB_ID, jobId);
205            actionConf.set(OOZIE_ACTION_ID, actionId);
206    
207            if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) {
208              List<String> purgedEntries = new ArrayList<String>();
209              Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files");
210              for (String entry : entries) {
211                if (entry.contains("#")) {
212                  purgedEntries.add(entry);
213                }
214              }
215              actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()]));
216              launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true);
217            }
218    
219            FileSystem fs =
220              Services.get().get(HadoopAccessorService.class).createFileSystem(launcherConf.get("user.name"),
221                                                                               actionDir.toUri(), launcherConf);
222            fs.mkdirs(actionDir);
223    
224            OutputStream os = fs.create(new Path(actionDir, ACTION_CONF_XML));
225            actionConf.writeXml(os);
226            os.close();
227    
228            Path inputDir = new Path(actionDir, "input");
229            fs.mkdirs(inputDir);
230            Writer writer = new OutputStreamWriter(fs.create(new Path(inputDir, "dummy.txt")));
231            writer.write("dummy");
232            writer.close();
233    
234            launcherConf.set("mapred.input.dir", inputDir.toString());
235            launcherConf.set("mapred.output.dir", new Path(actionDir, "output").toString());
236        }
237    
238        public static boolean isMainDone(RunningJob runningJob) throws IOException {
239            return runningJob.isComplete();
240        }
241    
242        public static boolean isMainSuccessful(RunningJob runningJob) throws IOException {
243            boolean succeeded = runningJob.isSuccessful();
244            if (succeeded) {
245                Counters counters = runningJob.getCounters();
246                if (counters != null) {
247                    Counters.Group group = counters.getGroup(COUNTER_GROUP);
248                    if (group != null) {
249                        succeeded = group.getCounter(COUNTER_LAUNCHER_ERROR) == 0;
250                    }
251                }
252            }
253            return succeeded;
254        }
255    
256        public static boolean hasOutputData(RunningJob runningJob) throws IOException {
257            boolean output = false;
258            Counters counters = runningJob.getCounters();
259            if (counters != null) {
260                Counters.Group group = counters.getGroup(COUNTER_GROUP);
261                if (group != null) {
262                    output = group.getCounter(COUNTER_OUTPUT_DATA) == 1;
263                }
264            }
265            return output;
266        }
267    
268        /**
269         * Check whether runningJob has stats data or not
270         *
271         * @param runningJob the runningJob
272         * @return returns whether the running Job has stats data or not
273         * @throws IOException
274         */
275        public static boolean hasStatsData(RunningJob runningJob) throws IOException{
276            boolean output = false;
277            Counters counters = runningJob.getCounters();
278            if (counters != null) {
279                Counters.Group group = counters.getGroup(COUNTER_GROUP);
280                if (group != null) {
281                    output = group.getCounter(COUNTER_STATS_DATA) == 1;
282                }
283            }
284            return output;
285        }
286    
287        /**
288         * @param runningJob
289         * @return
290         * @throws IOException
291         */
292        public static boolean hasIdSwap(RunningJob runningJob) throws IOException {
293            boolean swap = false;
294            Counters counters = runningJob.getCounters();
295            if (counters != null) {
296                Counters.Group group = counters.getGroup(COUNTER_GROUP);
297                if (group != null) {
298                    swap = group.getCounter(COUNTER_DO_ID_SWAP) == 1;
299                }
300            }
301            return swap;
302        }
303    
304        /**
305         * @param runningJob
306         * @param user
307         * @param group
308         * @param actionDir
309         * @return
310         * @throws IOException
311         * @throws HadoopAccessorException
312         */
313        public static boolean hasIdSwap(RunningJob runningJob, String user, String group, Path actionDir)
314                throws IOException, HadoopAccessorException {
315            boolean swap = false;
316    
317            XLog log = XLog.getLog("org.apache.oozie.action.hadoop.LauncherMapper");
318    
319            Counters counters = runningJob.getCounters();
320            if (counters != null) {
321                Counters.Group counterGroup = counters.getGroup(COUNTER_GROUP);
322                if (counterGroup != null) {
323                    swap = counterGroup.getCounter(COUNTER_DO_ID_SWAP) == 1;
324                }
325            }
326            // additional check for swapped hadoop ID
327            // Can't rely on hadoop counters existing
328            // we'll check for the newID file in hdfs if the hadoop counters is null
329            else {
330    
331                Path p = getIdSwapPath(actionDir);
332                // log.debug("Checking for newId file in: [{0}]", p);
333    
334                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
335                Configuration conf = has.createJobConf(p.toUri().getAuthority());
336                FileSystem fs = has.createFileSystem(user, p.toUri(), conf);
337                if (fs.exists(p)) {
338                    log.debug("Hadoop Counters is null, but found newID file.");
339    
340                    swap = true;
341                }
342                else {
343                    log.debug("Hadoop Counters is null, and newID file doesn't exist at: [{0}]", p);
344                }
345            }
346            return swap;
347        }
348    
349        public static Path getOutputDataPath(Path actionDir) {
350            return new Path(actionDir, ACTION_OUTPUT_PROPS);
351        }
352    
353        /**
354         * Get the location of stats file
355         *
356         * @param actionDir the action directory
357         * @return the hdfs location of the file
358         */
359        public static Path getActionStatsDataPath(Path actionDir){
360            return new Path(actionDir, ACTION_STATS_PROPS);
361        }
362    
363        /**
364         * Get the location of external Child IDs file
365         *
366         * @param actionDir the action directory
367         * @return the hdfs location of the file
368         */
369        public static Path getExternalChildIDsDataPath(Path actionDir){
370            return new Path(actionDir, ACTION_EXTERNAL_CHILD_IDS_PROPS);
371        }
372    
373        public static Path getErrorPath(Path actionDir) {
374            return new Path(actionDir, ACTION_ERROR_PROPS);
375        }
376    
377        public static Path getIdSwapPath(Path actionDir) {
378            return new Path(actionDir, ACTION_NEW_ID_PROPS);
379        }
380    
381        private JobConf jobConf;
382        private Path actionDir;
383        private ScheduledThreadPoolExecutor timer;
384    
385        private boolean configFailure = false;
386        private LauncherException configureFailureEx;
387        public LauncherMapper() {
388        }
389    
390        @Override
391        public void configure(JobConf jobConf) {
392            System.out.println();
393            System.out.println("Oozie Launcher starts");
394            System.out.println();
395            this.jobConf = jobConf;
396            actionDir = new Path(getJobConf().get(OOZIE_ACTION_DIR_PATH));
397            String recoveryId = jobConf.get(OOZIE_ACTION_RECOVERY_ID, null);
398            try {
399                setRecoveryId(jobConf, actionDir, recoveryId);
400            }
401            catch (LauncherException ex) {
402                System.out.println("Launcher config error "+ex.getMessage());
403                configureFailureEx = ex;
404                configFailure = true;
405            }
406        }
407    
408        @Override
409        public void map(K1 key, V1 value, OutputCollector<K2, V2> collector, Reporter reporter) throws IOException {
410            try {
411                if (configFailure) {
412                    throw configureFailureEx;
413                }
414                else {
415                    String mainClass = getJobConf().get(CONF_OOZIE_ACTION_MAIN_CLASS);
416                    if (getJobConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) {
417                      System.err.println("WARNING, workaround for Hadoop 2.0.2-alpha distributed cached issue (MAPREDUCE-4820) enabled");
418                    }
419                    String msgPrefix = "Main class [" + mainClass + "], ";
420                    int errorCode = 0;
421                    Throwable errorCause = null;
422                    String errorMessage = null;
423    
424                    try {
425                        new LauncherSecurityManager();
426                    }
427                    catch (SecurityException ex) {
428                        errorMessage = "Could not set LauncherSecurityManager";
429                        errorCause = ex;
430                    }
431    
432                    try {
433                        setupHeartBeater(reporter);
434    
435                        setupMainConfiguration();
436    
437                        try {
438                            System.out.println("Starting the execution of prepare actions");
439                            executePrepare();
440                            System.out.println("Completed the execution of prepare actions successfully");
441                        } catch (Exception ex) {
442                            System.out.println("Prepare execution in the Launcher Mapper has failed");
443                            throw new LauncherException(ex.getMessage(), ex);
444                        }
445    
446                        String[] args = getMainArguments(getJobConf());
447    
448                        printContentsOfCurrentDir();
449    
450                        System.out.println();
451                        System.out.println("Oozie Java/Map-Reduce/Pig action launcher-job configuration");
452                        System.out.println("=================================================================");
453                        System.out.println("Workflow job id   : " + System.getProperty("oozie.job.id"));
454                        System.out.println("Workflow action id: " + System.getProperty("oozie.action.id"));
455                        System.out.println();
456                        System.out.println("Classpath         :");
457                        System.out.println("------------------------");
458                        StringTokenizer st = new StringTokenizer(System.getProperty("java.class.path"), ":");
459                        while (st.hasMoreTokens()) {
460                            System.out.println("  " + st.nextToken());
461                        }
462                        System.out.println("------------------------");
463                        System.out.println();
464                        System.out.println("Main class        : " + mainClass);
465                        System.out.println();
466                        System.out.println("Maximum output    : "
467                                + getJobConf().getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024));
468                        System.out.println();
469                        System.out.println("Arguments         :");
470                        for (String arg : args) {
471                            System.out.println("                    " + arg);
472                        }
473    
474                        System.out.println();
475                        System.out.println("Java System Properties:");
476                        System.out.println("------------------------");
477                        System.getProperties().store(System.out, "");
478                        System.out.flush();
479                        System.out.println("------------------------");
480                        System.out.println();
481    
482                        System.out.println("=================================================================");
483                        System.out.println();
484                        System.out.println(">>> Invoking Main class now >>>");
485                        System.out.println();
486                        System.out.flush();
487    
488                        try {
489                            Class klass = getJobConf().getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class);
490                            Method mainMethod = klass.getMethod("main", String[].class);
491                            mainMethod.invoke(null, (Object) args);
492                        }
493                        catch (InvocationTargetException ex) {
494                            if (LauncherMainException.class.isInstance(ex.getCause())) {
495                                errorMessage = msgPrefix + "exit code [" +((LauncherMainException)ex.getCause()).getErrorCode()
496                                    + "]";
497                                errorCause = null;
498                            }
499                            else if (SecurityException.class.isInstance(ex.getCause())) {
500                                if (LauncherSecurityManager.getExitInvoked()) {
501                                    System.out.println("Intercepting System.exit(" + LauncherSecurityManager.getExitCode()
502                                            + ")");
503                                    System.err.println("Intercepting System.exit(" + LauncherSecurityManager.getExitCode()
504                                            + ")");
505                                    // if 0 main() method finished successfully
506                                    // ignoring
507                                    errorCode = LauncherSecurityManager.getExitCode();
508                                    if (errorCode != 0) {
509                                        errorMessage = msgPrefix + "exit code [" + errorCode + "]";
510                                        errorCause = null;
511                                    }
512                                }
513                            }
514                            else {
515                                throw ex;
516                            }
517                        }
518                        finally {
519                            System.out.println();
520                            System.out.println("<<< Invocation of Main class completed <<<");
521                            System.out.println();
522                        }
523                        if (errorMessage == null) {
524                            File outputData = new File(System.getProperty("oozie.action.output.properties"));
525                            FileSystem fs = null;
526                            if (outputData.exists()) {
527                                URI actionDirUri = new Path(actionDir, ACTION_OUTPUT_PROPS).toUri();
528                                fs = FileSystem.get(actionDirUri, getJobConf());
529                                fs.copyFromLocalFile(new Path(outputData.toString()), new Path(actionDir,
530                                                                                               ACTION_OUTPUT_PROPS));
531                                reporter.incrCounter(COUNTER_GROUP, COUNTER_OUTPUT_DATA, 1);
532    
533                                int maxOutputData = getJobConf().getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024);
534                                if (outputData.length() > maxOutputData) {
535                                    String msg = MessageFormat.format("Output data size [{0}] exceeds maximum [{1}]",
536                                                                      outputData.length(), maxOutputData);
537                                    failLauncher(0, msg, null);
538                                }
539                                System.out.println();
540                                System.out.println("Oozie Launcher, capturing output data:");
541                                System.out.println("=======================");
542                                Properties props = new Properties();
543                                props.load(new FileReader(outputData));
544                                props.store(System.out, "");
545                                System.out.println();
546                                System.out.println("=======================");
547                                System.out.println();
548                            }
549                            handleActionStatsData(fs, reporter);
550                            handleExternalChildIDs(fs, reporter);
551                            File newId = new File(System.getProperty("oozie.action.newId.properties"));
552                            if (newId.exists()) {
553                                Properties props = new Properties();
554                                props.load(new FileReader(newId));
555                                if (props.getProperty("id") == null) {
556                                    throw new IllegalStateException("ID swap file does not have [id] property");
557                                }
558                                URI actionDirUri = new Path(actionDir, ACTION_NEW_ID_PROPS).toUri();
559                                fs = FileSystem.get(actionDirUri, getJobConf());
560                                fs.copyFromLocalFile(new Path(newId.toString()), new Path(actionDir, ACTION_NEW_ID_PROPS));
561                                reporter.incrCounter(COUNTER_GROUP, COUNTER_DO_ID_SWAP, 1);
562    
563                                System.out.println("Oozie Launcher, copying new Hadoop job id to file: "
564                                        + new Path(actionDir, ACTION_NEW_ID_PROPS).toUri());
565    
566                                System.out.println();
567                                System.out.println("Oozie Launcher, propagating new Hadoop job id to Oozie");
568                                System.out.println("=======================");
569                                System.out.println("id: " + props.getProperty("id"));
570                                System.out.println("=======================");
571                                System.out.println();
572                            }
573                        }
574                    }
575                    catch (NoSuchMethodException ex) {
576                        errorMessage = msgPrefix + "main() method not found";
577                        errorCause = ex;
578                    }
579                    catch (InvocationTargetException ex) {
580                        errorMessage = msgPrefix + "main() threw exception";
581                        errorCause = ex.getTargetException();
582                    }
583                    catch (Throwable ex) {
584                        errorMessage = msgPrefix + "exception invoking main()";
585                        errorCause = ex;
586                    }
587                    finally {
588                        destroyHeartBeater();
589                        if (errorMessage != null) {
590                            failLauncher(errorCode, errorMessage, errorCause);
591                        }
592                    }
593                }
594            }
595            catch (LauncherException ex) {
596                reporter.incrCounter(COUNTER_GROUP, COUNTER_LAUNCHER_ERROR, 1);
597                System.out.println();
598                System.out.println("Oozie Launcher failed, finishing Hadoop job gracefully");
599                System.out.println();
600            }
601        }
602    
603        @Override
604        public void close() throws IOException {
605            System.out.println();
606            System.out.println("Oozie Launcher ends");
607            System.out.println();
608        }
609    
610        protected JobConf getJobConf() {
611            return jobConf;
612        }
613    
614        private void handleActionStatsData(FileSystem fs, Reporter reporter) throws IOException, LauncherException {
615            File actionStatsData = new File(System.getProperty(EXTERNAL_ACTION_STATS));
616            // If stats are stored by the action, then stats file should exist
617            if (actionStatsData.exists()) {
618                int statsMaxOutputData = getJobConf().getInt(CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE,
619                        Integer.MAX_VALUE);
620                reporter.incrCounter(COUNTER_GROUP, COUNTER_STATS_DATA, 1);
621                // fail the launcher if size of stats is greater than the maximum allowed size
622                if (actionStatsData.length() > statsMaxOutputData) {
623                    String msg = MessageFormat.format("Output stats size [{0}] exceeds maximum [{1}]",
624                            actionStatsData.length(), statsMaxOutputData);
625                    failLauncher(0, msg, null);
626                }
627                // copy the stats file to hdfs path which can be accessed by Oozie server
628                URI actionDirUri = new Path(actionDir, ACTION_STATS_PROPS).toUri();
629                fs = FileSystem.get(actionDirUri, getJobConf());
630                fs.copyFromLocalFile(new Path(actionStatsData.toString()), new Path(actionDir,
631                        ACTION_STATS_PROPS));
632            }
633        }
634    
635        private void handleExternalChildIDs(FileSystem fs, Reporter reporter) throws IOException {
636            File externalChildIDs = new File(System.getProperty(EXTERNAL_CHILD_IDS));
637            // if external ChildIDs are stored by the action, then the file should exist
638            if (externalChildIDs.exists()) {
639                // copy the externalChildIDs file to hdfs path which can be accessed by Oozie server
640                URI actionDirUri = new Path(actionDir, ACTION_EXTERNAL_CHILD_IDS_PROPS).toUri();
641                fs = FileSystem.get(actionDirUri, getJobConf());
642                fs.copyFromLocalFile(new Path(externalChildIDs.toString()), new Path(actionDir,
643                        ACTION_EXTERNAL_CHILD_IDS_PROPS));
644            }
645        }
646    
647        private void setupMainConfiguration() throws IOException, HadoopAccessorException {
648            Path pathNew = new Path(new Path(actionDir, ACTION_CONF_XML),
649                    new Path(new File(ACTION_CONF_XML).getAbsolutePath()));
650            FileSystem fs = FileSystem.get(pathNew.toUri(), getJobConf());
651            fs.copyToLocalFile(new Path(actionDir, ACTION_CONF_XML),
652                    new Path(new File(ACTION_CONF_XML).getAbsolutePath()));
653    
654            System.setProperty("oozie.launcher.job.id", getJobConf().get("mapred.job.id"));
655            System.setProperty("oozie.job.id", getJobConf().get(OOZIE_JOB_ID));
656            System.setProperty("oozie.action.id", getJobConf().get(OOZIE_ACTION_ID));
657            System.setProperty("oozie.action.conf.xml", new File(ACTION_CONF_XML).getAbsolutePath());
658            System.setProperty("oozie.action.output.properties", new File(ACTION_OUTPUT_PROPS).getAbsolutePath());
659            System.setProperty(EXTERNAL_ACTION_STATS, new File(ACTION_STATS_PROPS).getAbsolutePath());
660            System.setProperty(EXTERNAL_CHILD_IDS, new File(ACTION_EXTERNAL_CHILD_IDS_PROPS).getAbsolutePath());
661            System.setProperty("oozie.action.newId.properties", new File(ACTION_NEW_ID_PROPS).getAbsolutePath());
662        }
663    
664        // Method to execute the prepare actions
665        private void executePrepare() throws IOException, LauncherException {
666            String prepareXML = getJobConf().get(ACTION_PREPARE_XML);
667            if (prepareXML != null) {
668                 if (!prepareXML.equals("")) {
669                     PrepareActionsDriver.doOperations(
670                         getJobConf().getStringCollection(CONF_OOZIE_ACTION_SUPPORTED_FILESYSTEMS), prepareXML);
671                 } else {
672                     System.out.println("There are no prepare actions to execute.");
673                 }
674            }
675        }
676    
677        public static String[] getMainArguments(Configuration conf) {
678            String[] args = new String[conf.getInt(CONF_OOZIE_ACTION_MAIN_ARG_COUNT, 0)];
679            for (int i = 0; i < args.length; i++) {
680                args[i] = conf.get(CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i);
681            }
682            return args;
683        }
684    
685        private void setupHeartBeater(Reporter reporter) {
686            timer = new ScheduledThreadPoolExecutor(1);
687            timer.scheduleAtFixedRate(new LauncherMapper(reporter), 0, 30, TimeUnit.SECONDS);
688        }
689    
690        private void destroyHeartBeater() {
691            timer.shutdownNow();
692        }
693    
694        private Reporter reporter;
695    
696        private LauncherMapper(Reporter reporter) {
697            this.reporter = reporter;
698        }
699    
700        @Override
701        public void run() {
702            System.out.println("Heart beat");
703            reporter.progress();
704        }
705    
706        private void failLauncher(int errorCode, String reason, Throwable ex) throws LauncherException {
707            try {
708                if (ex != null) {
709                    reason += ", " + ex.getMessage();
710                }
711                Properties errorProps = new Properties();
712                errorProps.setProperty("error.code", Integer.toString(errorCode));
713                errorProps.setProperty("error.reason", reason);
714                if (ex != null) {
715                    if (ex.getMessage() != null) {
716                        errorProps.setProperty("exception.message", ex.getMessage());
717                    }
718                    StringWriter sw = new StringWriter();
719                    PrintWriter pw = new PrintWriter(sw);
720                    ex.printStackTrace(pw);
721                    pw.close();
722                    errorProps.setProperty("exception.stacktrace", sw.toString());
723                }
724                FileSystem fs = FileSystem.get((new Path(actionDir, ACTION_ERROR_PROPS)).toUri(), getJobConf());
725                OutputStream os = fs.create(new Path(actionDir, ACTION_ERROR_PROPS));
726                errorProps.store(os, "");
727                os.close();
728    
729                System.out.print("Failing Oozie Launcher, " + reason + "\n");
730                System.err.print("Failing Oozie Launcher, " + reason + "\n");
731                if (ex != null) {
732                    ex.printStackTrace(System.out);
733                    ex.printStackTrace(System.err);
734                }
735                throw new LauncherException(reason, ex);
736            }
737            catch (IOException rex) {
738                throw new RuntimeException("Error while failing launcher, " + rex.getMessage(), rex);
739            }
740        }
741    
742        /**
743         * Print files and directories in current directory. Will list files in the sub-directory (only 1 level deep)
744         */
745        protected void printContentsOfCurrentDir() {
746            File folder = new File(".");
747            System.out.println();
748            System.out.println("Files in current dir:" + folder.getAbsolutePath());
749            System.out.println("======================");
750    
751            File[] listOfFiles = folder.listFiles();
752            for (File fileName : listOfFiles) {
753                if (fileName.isFile()) {
754                    System.out.println("File: " + fileName.getName());
755                }
756                else if (fileName.isDirectory()) {
757                    System.out.println("Dir: " + fileName.getName());
758                    File subDir = new File(fileName.getName());
759                    File[] moreFiles = subDir.listFiles();
760                    for (File subFileName : moreFiles) {
761                        if (subFileName.isFile()) {
762                            System.out.println("  File: " + subFileName.getName());
763                        }
764                        else if (subFileName.isDirectory()) {
765                            System.out.println("  Dir: " + subFileName.getName());
766                        }
767                    }
768                }
769            }
770        }
771    
772    }
773    
774    class LauncherSecurityManager extends SecurityManager {
775        private static boolean exitInvoked;
776        private static int exitCode;
777        private SecurityManager securityManager;
778    
779        public LauncherSecurityManager() {
780            reset();
781            securityManager = System.getSecurityManager();
782            System.setSecurityManager(this);
783        }
784    
785        @Override
786        public void checkPermission(Permission perm, Object context) {
787            if (securityManager != null) {
788                // check everything with the original SecurityManager
789                securityManager.checkPermission(perm, context);
790            }
791        }
792    
793        @Override
794        public void checkPermission(Permission perm) {
795            if (securityManager != null) {
796                // check everything with the original SecurityManager
797                securityManager.checkPermission(perm);
798            }
799        }
800    
801        @Override
802        public void checkExit(int status) throws SecurityException {
803            exitInvoked = true;
804            exitCode = status;
805            throw new SecurityException("Intercepted System.exit(" + status + ")");
806        }
807    
808        public static boolean getExitInvoked() {
809            return exitInvoked;
810        }
811    
812        public static int getExitCode() {
813            return exitCode;
814        }
815    
816        public static void reset() {
817            exitInvoked = false;
818            exitCode = 0;
819        }
820    }
821    
822    class LauncherException extends Exception {
823    
824        LauncherException(String message) {
825            super(message);
826        }
827    
828        LauncherException(String message, Throwable cause) {
829            super(message, cause);
830        }
831    }