This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.hadoop.mapred.JobClient;
022 import org.apache.hadoop.mapred.JobConf;
023 import org.apache.hadoop.mapred.RunningJob;
024
025 public class StreamingMain extends MapReduceMain {
026
027 public static void main(String[] args) throws Exception {
028 run(StreamingMain.class, args);
029 }
030
031 protected RunningJob submitJob(Configuration actionConf) throws Exception {
032 JobConf jobConf = new JobConf();
033
034 jobConf.set("mapred.mapper.class", "org.apache.hadoop.streaming.PipeMapper");
035 jobConf.set("mapred.reducer.class", "org.apache.hadoop.streaming.PipeReducer");
036 jobConf.set("mapred.map.runner.class", "org.apache.hadoop.streaming.PipeMapRunner");
037
038 jobConf.set("mapred.input.format.class", "org.apache.hadoop.mapred.TextInputFormat");
039 jobConf.set("mapred.output.format.class", "org.apache.hadoop.mapred.TextOutputFormat");
040 jobConf.set("mapred.output.value.class", "org.apache.hadoop.io.Text");
041 jobConf.set("mapred.output.key.class", "org.apache.hadoop.io.Text");
042
043 jobConf.set("mapred.create.symlink", "yes");
044 jobConf.set("mapred.used.genericoptionsparser", "true");
045
046 jobConf.set("stream.addenvironment", "");
047
048 String value = actionConf.get("oozie.streaming.mapper");
049 if (value != null) {
050 jobConf.set("stream.map.streamprocessor", value);
051 }
052 value = actionConf.get("oozie.streaming.reducer");
053 if (value != null) {
054 jobConf.set("stream.reduce.streamprocessor", value);
055 }
056 value = actionConf.get("oozie.streaming.record-reader");
057 if (value != null) {
058 jobConf.set("stream.recordreader.class", value);
059 }
060 String[] values = getStrings(actionConf, "oozie.streaming.record-reader-mapping");
061 for (String s : values) {
062 String[] kv = s.split("=");
063 jobConf.set("stream.recordreader." + kv[0], kv[1]);
064 }
065 values = getStrings(actionConf, "oozie.streaming.env");
066 value = jobConf.get("stream.addenvironment", "");
067 if (value.length() > 0) {
068 value = value + " ";
069 }
070 for (String s : values) {
071 value = value + s + " ";
072 }
073 jobConf.set("stream.addenvironment", value);
074
075 addActionConf(jobConf, actionConf);
076
077 // propagate delegation related props from launcher job to MR job
078 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
079 jobConf.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
080 }
081
082 JobClient jobClient = null;
083 RunningJob runJob = null;
084 boolean exception = false;
085 try {
086 jobClient = createJobClient(jobConf);
087 runJob = jobClient.submitJob(jobConf);
088 }
089 catch (Exception ex) {
090 exception = true;
091 throw ex;
092 }
093 finally {
094 try {
095 if (jobClient != null) {
096 jobClient.close();
097 }
098 }
099 catch (Exception ex) {
100 if (exception) {
101 System.out.println("JobClient Error: " + ex);
102 }
103 else {
104 throw ex;
105 }
106 }
107 }
108 return runJob;
109 }
110
111 public static void setStreaming(Configuration conf, String mapper, String reducer, String recordReader,
112 String[] recordReaderMapping, String[] env) {
113 if (mapper != null) {
114 conf.set("oozie.streaming.mapper", mapper);
115 }
116 if (reducer != null) {
117 conf.set("oozie.streaming.reducer", reducer);
118 }
119 if (recordReader != null) {
120 conf.set("oozie.streaming.record-reader", recordReader);
121 }
122 setStrings(conf, "oozie.streaming.record-reader-mapping", recordReaderMapping);
123 setStrings(conf, "oozie.streaming.env", env);
124 }
125
126 }