001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.hadoop.mapred.JobClient; 022 import org.apache.hadoop.mapred.JobConf; 023 import org.apache.hadoop.mapred.RunningJob; 024 025 public class StreamingMain extends MapReduceMain { 026 027 public static void main(String[] args) throws Exception { 028 run(StreamingMain.class, args); 029 } 030 031 protected RunningJob submitJob(Configuration actionConf) throws Exception { 032 JobConf jobConf = new JobConf(); 033 034 jobConf.set("mapred.mapper.class", "org.apache.hadoop.streaming.PipeMapper"); 035 jobConf.set("mapred.reducer.class", "org.apache.hadoop.streaming.PipeReducer"); 036 jobConf.set("mapred.map.runner.class", "org.apache.hadoop.streaming.PipeMapRunner"); 037 038 jobConf.set("mapred.input.format.class", "org.apache.hadoop.mapred.TextInputFormat"); 039 jobConf.set("mapred.output.format.class", "org.apache.hadoop.mapred.TextOutputFormat"); 040 jobConf.set("mapred.output.value.class", "org.apache.hadoop.io.Text"); 041 jobConf.set("mapred.output.key.class", "org.apache.hadoop.io.Text"); 042 043 jobConf.set("mapred.create.symlink", "yes"); 044 jobConf.set("mapred.used.genericoptionsparser", "true"); 045 046 jobConf.set("stream.addenvironment", ""); 047 048 String value = actionConf.get("oozie.streaming.mapper"); 049 if (value != null) { 050 jobConf.set("stream.map.streamprocessor", value); 051 } 052 value = actionConf.get("oozie.streaming.reducer"); 053 if (value != null) { 054 jobConf.set("stream.reduce.streamprocessor", value); 055 } 056 value = actionConf.get("oozie.streaming.record-reader"); 057 if (value != null) { 058 jobConf.set("stream.recordreader.class", value); 059 } 060 String[] values = getStrings(actionConf, "oozie.streaming.record-reader-mapping"); 061 for (String s : values) { 062 String[] kv = s.split("="); 063 jobConf.set("stream.recordreader." + kv[0], kv[1]); 064 } 065 values = getStrings(actionConf, "oozie.streaming.env"); 066 value = jobConf.get("stream.addenvironment", ""); 067 if (value.length() > 0) { 068 value = value + " "; 069 } 070 for (String s : values) { 071 value = value + s + " "; 072 } 073 jobConf.set("stream.addenvironment", value); 074 075 addActionConf(jobConf, actionConf); 076 077 // propagate delegation related props from launcher job to MR job 078 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { 079 jobConf.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION")); 080 } 081 082 JobClient jobClient = null; 083 RunningJob runJob = null; 084 boolean exception = false; 085 try { 086 jobClient = createJobClient(jobConf); 087 runJob = jobClient.submitJob(jobConf); 088 } 089 catch (Exception ex) { 090 exception = true; 091 throw ex; 092 } 093 finally { 094 try { 095 if (jobClient != null) { 096 jobClient.close(); 097 } 098 } 099 catch (Exception ex) { 100 if (exception) { 101 System.out.println("JobClient Error: " + ex); 102 } 103 else { 104 throw ex; 105 } 106 } 107 } 108 return runJob; 109 } 110 111 public static void setStreaming(Configuration conf, String mapper, String reducer, String recordReader, 112 String[] recordReaderMapping, String[] env) { 113 if (mapper != null) { 114 conf.set("oozie.streaming.mapper", mapper); 115 } 116 if (reducer != null) { 117 conf.set("oozie.streaming.reducer", reducer); 118 } 119 if (recordReader != null) { 120 conf.set("oozie.streaming.record-reader", recordReader); 121 } 122 setStrings(conf, "oozie.streaming.record-reader-mapping", recordReaderMapping); 123 setStrings(conf, "oozie.streaming.env", env); 124 } 125 126 }