001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.hadoop.mapred.JobClient;
022    import org.apache.hadoop.mapred.JobConf;
023    import org.apache.hadoop.mapred.RunningJob;
024    
025    public class StreamingMain extends MapReduceMain {
026    
027        public static void main(String[] args) throws Exception {
028            run(StreamingMain.class, args);
029        }
030    
031        protected RunningJob submitJob(Configuration actionConf) throws Exception {
032            JobConf jobConf = new JobConf();
033    
034            jobConf.set("mapred.mapper.class", "org.apache.hadoop.streaming.PipeMapper");
035            jobConf.set("mapred.reducer.class", "org.apache.hadoop.streaming.PipeReducer");
036            jobConf.set("mapred.map.runner.class", "org.apache.hadoop.streaming.PipeMapRunner");
037    
038            jobConf.set("mapred.input.format.class", "org.apache.hadoop.mapred.TextInputFormat");
039            jobConf.set("mapred.output.format.class", "org.apache.hadoop.mapred.TextOutputFormat");
040            jobConf.set("mapred.output.value.class", "org.apache.hadoop.io.Text");
041            jobConf.set("mapred.output.key.class", "org.apache.hadoop.io.Text");
042    
043            jobConf.set("mapred.create.symlink", "yes");
044            jobConf.set("mapred.used.genericoptionsparser", "true");
045    
046            jobConf.set("stream.addenvironment", "");
047    
048            String value = actionConf.get("oozie.streaming.mapper");
049            if (value != null) {
050                jobConf.set("stream.map.streamprocessor", value);
051            }
052            value = actionConf.get("oozie.streaming.reducer");
053            if (value != null) {
054                jobConf.set("stream.reduce.streamprocessor", value);
055            }
056            value = actionConf.get("oozie.streaming.record-reader");
057            if (value != null) {
058                jobConf.set("stream.recordreader.class", value);
059            }
060            String[] values = getStrings(actionConf, "oozie.streaming.record-reader-mapping");
061            for (String s : values) {
062                String[] kv = s.split("=");
063                jobConf.set("stream.recordreader." + kv[0], kv[1]);
064            }
065            values = getStrings(actionConf, "oozie.streaming.env");
066            value = jobConf.get("stream.addenvironment", "");
067            if (value.length() > 0) {
068                value = value + " ";
069            }
070            for (String s : values) {
071                value = value + s + " ";
072            }
073            jobConf.set("stream.addenvironment", value);
074    
075            addActionConf(jobConf, actionConf);
076    
077            // propagate delegation related props from launcher job to MR job
078            if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
079                jobConf.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
080            }
081    
082            JobClient jobClient = null;
083            RunningJob runJob = null;
084            boolean exception = false;
085            try {
086                jobClient = createJobClient(jobConf);
087                runJob = jobClient.submitJob(jobConf);
088            }
089            catch (Exception ex) {
090                exception = true;
091                throw ex;
092            }
093            finally {
094                try {
095                    if (jobClient != null) {
096                        jobClient.close();
097                    }
098                }
099                catch (Exception ex) {
100                    if (exception) {
101                        System.out.println("JobClient Error: " + ex);
102                    }
103                    else {
104                        throw ex;
105                    }
106                }
107            }
108            return runJob;
109        }
110    
111        public static void setStreaming(Configuration conf, String mapper, String reducer, String recordReader,
112                                        String[] recordReaderMapping, String[] env) {
113            if (mapper != null) {
114                conf.set("oozie.streaming.mapper", mapper);
115            }
116            if (reducer != null) {
117                conf.set("oozie.streaming.reducer", reducer);
118            }
119            if (recordReader != null) {
120                conf.set("oozie.streaming.record-reader", recordReader);
121            }
122            setStrings(conf, "oozie.streaming.record-reader-mapping", recordReaderMapping);
123            setStrings(conf, "oozie.streaming.env", env);
124        }
125    
126    }