001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.hadoop.fs.Path;
022    import org.apache.hadoop.fs.FileSystem;
023    import org.apache.hadoop.mapred.JobClient;
024    import org.apache.hadoop.mapred.JobConf;
025    import org.apache.hadoop.mapred.RunningJob;
026    import org.apache.hadoop.security.UserGroupInformation;
027    
028    import java.util.HashSet;
029    import java.util.Map;
030    import java.util.Properties;
031    import java.io.ByteArrayOutputStream;
032    import java.io.IOException;
033    import java.io.FileOutputStream;
034    import java.io.OutputStream;
035    import java.io.File;
036    
037    public class MapReduceMain extends LauncherMain {
038    
039        public static void main(String[] args) throws Exception {
040            run(MapReduceMain.class, args);
041        }
042    
043        protected void run(String[] args) throws Exception {
044            System.out.println();
045            System.out.println("Oozie Map-Reduce action configuration");
046            System.out.println("=======================");
047    
048            // loading action conf prepared by Oozie
049            Configuration actionConf = new Configuration(false);
050            actionConf.addResource(new Path("file:///", System.getProperty("oozie.action.conf.xml")));
051    
052            logMasking("Map-Reduce job configuration:", new HashSet<String>(), actionConf);
053    
054            System.out.println("Submitting Oozie action Map-Reduce job");
055            System.out.println();
056            // submitting job
057            RunningJob runningJob = submitJob(actionConf);
058    
059            // propagating job id back to Oozie
060            String jobId = runningJob.getID().toString();
061            Properties props = new Properties();
062            props.setProperty("id", jobId);
063            File idFile = new File(System.getProperty("oozie.action.newId.properties"));
064            OutputStream os = new FileOutputStream(idFile);
065            props.store(os, "");
066            os.close();
067    
068            System.out.println("=======================");
069            System.out.println();
070        }
071    
072        protected void addActionConf(JobConf jobConf, Configuration actionConf) {
073            for (Map.Entry<String, String> entry : actionConf) {
074                jobConf.set(entry.getKey(), entry.getValue());
075            }
076        }
077    
078        protected RunningJob submitJob(Configuration actionConf) throws Exception {
079            JobConf jobConf = new JobConf();
080            addActionConf(jobConf, actionConf);
081    
082            // Set for uber jar
083            String uberJar = actionConf.get(MapReduceActionExecutor.OOZIE_MAPREDUCE_UBER_JAR);
084            if (uberJar != null && uberJar.trim().length() > 0) {
085                jobConf.setJar(uberJar);
086            }
087    
088            // propagate delegation related props from launcher job to MR job
089            if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
090                jobConf.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
091            }
092            JobClient jobClient = null;
093            RunningJob runJob = null;
094            boolean exception = false;
095            try {
096                jobClient = createJobClient(jobConf);
097                runJob = jobClient.submitJob(jobConf);
098            }
099            catch (Exception ex) {
100                exception = true;
101                throw ex;
102            }
103            finally {
104                try {
105                    if (jobClient != null) {
106                        jobClient.close();
107                    }
108                }
109                catch (Exception ex) {
110                    if (exception) {
111                        System.out.println("JobClient Error: " + ex);
112                    }
113                    else {
114                        throw ex;
115                    }
116                }
117            }
118            return runJob;
119        }
120    
121        @SuppressWarnings("unchecked")
122        protected JobClient createJobClient(JobConf jobConf) throws IOException {
123            return new JobClient(jobConf);
124        }
125    
126        // allows any character in the value, the conf.setStrings() does not allow
127        // commas
128        public static void setStrings(Configuration conf, String key, String[] values) {
129            if (values != null) {
130                conf.setInt(key + ".size", values.length);
131                for (int i = 0; i < values.length; i++) {
132                    conf.set(key + "." + i, values[i]);
133                }
134            }
135        }
136    
137        public static String[] getStrings(Configuration conf, String key) {
138            String[] values = new String[conf.getInt(key + ".size", 0)];
139            for (int i = 0; i < values.length; i++) {
140                values[i] = conf.get(key + "." + i);
141                if (values[i] == null) {
142                    values[i] = "";
143                }
144            }
145            return values;
146        }
147    
148    }