001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.hadoop.fs.Path;
022    import org.apache.hadoop.fs.FileSystem;
023    import org.apache.hadoop.mapred.JobClient;
024    import org.apache.hadoop.mapred.JobConf;
025    import org.apache.hadoop.mapred.RunningJob;
026    import org.apache.hadoop.security.UserGroupInformation;
027    
028    import java.util.HashSet;
029    import java.util.Map;
030    import java.util.Properties;
031    import java.io.ByteArrayOutputStream;
032    import java.io.IOException;
033    import java.io.FileOutputStream;
034    import java.io.OutputStream;
035    import java.io.File;
036    
037    public class MapReduceMain extends LauncherMain {
038    
039        public static void main(String[] args) throws Exception {
040            run(MapReduceMain.class, args);
041        }
042    
043        protected void run(String[] args) throws Exception {
044            System.out.println();
045            System.out.println("Oozie Map-Reduce action configuration");
046            System.out.println("=======================");
047    
048            // loading action conf prepared by Oozie
049            Configuration actionConf = new Configuration(false);
050            actionConf.addResource(new Path("file:///", System.getProperty("oozie.action.conf.xml")));
051    
052            logMasking("Map-Reduce job configuration:", new HashSet<String>(), actionConf);
053    
054            System.out.println("Submitting Oozie action Map-Reduce job");
055            System.out.println();
056            // submitting job
057            RunningJob runningJob = submitJob(actionConf);
058    
059            // propagating job id back to Oozie
060            String jobId = runningJob.getID().toString();
061            Properties props = new Properties();
062            props.setProperty("id", jobId);
063            File idFile = new File(System.getProperty("oozie.action.newId.properties"));
064            OutputStream os = new FileOutputStream(idFile);
065            props.store(os, "");
066            os.close();
067    
068            System.out.println("=======================");
069            System.out.println();
070        }
071    
072        protected void addActionConf(JobConf jobConf, Configuration actionConf) {
073            for (Map.Entry<String, String> entry : actionConf) {
074                jobConf.set(entry.getKey(), entry.getValue());
075            }
076        }
077    
078        protected RunningJob submitJob(Configuration actionConf) throws Exception {
079            JobConf jobConf = new JobConf();
080            addActionConf(jobConf, actionConf);
081    
082            // propagate delegation related props from launcher job to MR job
083            if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
084                jobConf.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
085            }
086            JobClient jobClient = null;
087            RunningJob runJob = null;
088            boolean exception = false;
089            try {
090                jobClient = createJobClient(jobConf);
091                runJob = jobClient.submitJob(jobConf);
092            }
093            catch (Exception ex) {
094                exception = true;
095                throw ex;
096            }
097            finally {
098                try {
099                    if (jobClient != null) {
100                        jobClient.close();
101                    }
102                }
103                catch (Exception ex) {
104                    if (exception) {
105                        System.out.println("JobClient Error: " + ex);
106                    }
107                    else {
108                        throw ex;
109                    }
110                }
111            }
112            return runJob;
113        }
114    
115        @SuppressWarnings("unchecked")
116        protected JobClient createJobClient(JobConf jobConf) throws IOException {
117            return new JobClient(jobConf);
118        }
119    
120        // allows any character in the value, the conf.setStrings() does not allow
121        // commas
122        public static void setStrings(Configuration conf, String key, String[] values) {
123            if (values != null) {
124                conf.setInt(key + ".size", values.length);
125                for (int i = 0; i < values.length; i++) {
126                    conf.set(key + "." + i, values[i]);
127                }
128            }
129        }
130    
131        public static String[] getStrings(Configuration conf, String key) {
132            String[] values = new String[conf.getInt(key + ".size", 0)];
133            for (int i = 0; i < values.length; i++) {
134                values[i] = conf.get(key + "." + i);
135                if (values[i] == null) {
136                    values[i] = "";
137                }
138            }
139            return values;
140        }
141    
142    }