001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.hadoop.mapred.JobConf; 022 import org.apache.hadoop.mapred.RunningJob; 023 import org.apache.hadoop.mapred.pipes.Submitter; 024 import org.apache.hadoop.filecache.DistributedCache; 025 import org.apache.hadoop.fs.Path; 026 027 public class PipesMain extends MapReduceMain { 028 029 public static void main(String[] args) throws Exception { 030 run(PipesMain.class, args); 031 } 032 033 @Override 034 protected RunningJob submitJob(Configuration actionConf) throws Exception { 035 JobConf jobConf = new JobConf(); 036 037 String value = actionConf.get("oozie.pipes.map"); 038 if (value != null) { 039 jobConf.setBoolean("hadoop.pipes.java.mapper", true); 040 jobConf.set("mapred.mapper.class", value); 041 } 042 value = actionConf.get("oozie.pipes.reduce"); 043 if (value != null) { 044 jobConf.setBoolean("hadoop.pipes.java.reducer", true); 045 jobConf.set("mapred.reducer.class", value); 046 } 047 value = actionConf.get("oozie.pipes.inputformat"); 048 if (value != null) { 049 jobConf.setBoolean("hadoop.pipes.java.recordreader", true); 050 jobConf.set("mapred.input.format.class", value); 051 } 052 value = actionConf.get("oozie.pipes.partitioner"); 053 if (value != null) { 054 jobConf.set("mapred.partitioner.class", value); 055 } 056 value = actionConf.get("oozie.pipes.writer"); 057 if (value != null) { 058 jobConf.setBoolean("hadoop.pipes.java.recordwriter", true); 059 jobConf.set("mapred.output.format.class", value); 060 } 061 value = actionConf.get("oozie.pipes.program"); 062 if (value != null) { 063 jobConf.set("hadoop.pipes.executable", value); 064 if (value.contains("#")) { 065 DistributedCache.createSymlink(jobConf); 066 } 067 } 068 069 addActionConf(jobConf, actionConf); 070 071 //propagate delegation related props from launcher job to MR job 072 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { 073 jobConf.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION")); 074 } 075 076 return Submitter.jobSubmit(jobConf); 077 } 078 079 public static void setPipes(Configuration conf, String map, String reduce, String inputFormat, String partitioner, 080 String writer, String program, Path appPath) { 081 if (map != null) { 082 conf.set("oozie.pipes.map", map); 083 } 084 if (reduce != null) { 085 conf.set("oozie.pipes.reduce", reduce); 086 } 087 if (inputFormat != null) { 088 conf.set("oozie.pipes.inputformat", inputFormat); 089 } 090 if (partitioner != null) { 091 conf.set("oozie.pipes.partitioner", partitioner); 092 } 093 if (writer != null) { 094 conf.set("oozie.pipes.writer", writer); 095 } 096 if (program != null) { 097 Path path = null; 098 if (!program.startsWith("/")) { 099 path = new Path(appPath, program); 100 program = path.toString(); 101 } 102 conf.set("oozie.pipes.program", program); 103 104 } 105 } 106 107 }