001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.hadoop.fs.Path; 022 import org.apache.hadoop.fs.FileSystem; 023 import org.apache.hadoop.mapred.JobClient; 024 import org.apache.hadoop.mapred.JobConf; 025 import org.apache.hadoop.mapred.RunningJob; 026 import org.apache.hadoop.security.UserGroupInformation; 027 028 import java.util.HashSet; 029 import java.util.Map; 030 import java.util.Properties; 031 import java.io.ByteArrayOutputStream; 032 import java.io.IOException; 033 import java.io.FileOutputStream; 034 import java.io.OutputStream; 035 import java.io.File; 036 037 public class MapReduceMain extends LauncherMain { 038 039 public static void main(String[] args) throws Exception { 040 run(MapReduceMain.class, args); 041 } 042 043 protected void run(String[] args) throws Exception { 044 System.out.println(); 045 System.out.println("Oozie Map-Reduce action configuration"); 046 System.out.println("======================="); 047 048 // loading action conf prepared by Oozie 049 Configuration actionConf = new Configuration(false); 050 actionConf.addResource(new Path("file:///", System.getProperty("oozie.action.conf.xml"))); 051 052 logMasking("Map-Reduce job configuration:", new HashSet<String>(), actionConf); 053 054 System.out.println("Submitting Oozie action Map-Reduce job"); 055 System.out.println(); 056 // submitting job 057 RunningJob runningJob = submitJob(actionConf); 058 059 // propagating job id back to Oozie 060 String jobId = runningJob.getID().toString(); 061 Properties props = new Properties(); 062 props.setProperty("id", jobId); 063 File idFile = new File(System.getProperty("oozie.action.newId.properties")); 064 OutputStream os = new FileOutputStream(idFile); 065 props.store(os, ""); 066 os.close(); 067 068 System.out.println("======================="); 069 System.out.println(); 070 } 071 072 protected void addActionConf(JobConf jobConf, Configuration actionConf) { 073 for (Map.Entry<String, String> entry : actionConf) { 074 jobConf.set(entry.getKey(), entry.getValue()); 075 } 076 } 077 078 protected RunningJob submitJob(Configuration actionConf) throws Exception { 079 JobConf jobConf = new JobConf(); 080 addActionConf(jobConf, actionConf); 081 082 // Set for uber jar 083 String uberJar = actionConf.get(MapReduceActionExecutor.OOZIE_MAPREDUCE_UBER_JAR); 084 if (uberJar != null && uberJar.trim().length() > 0) { 085 jobConf.setJar(uberJar); 086 } 087 088 // propagate delegation related props from launcher job to MR job 089 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { 090 jobConf.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION")); 091 } 092 JobClient jobClient = null; 093 RunningJob runJob = null; 094 boolean exception = false; 095 try { 096 jobClient = createJobClient(jobConf); 097 runJob = jobClient.submitJob(jobConf); 098 } 099 catch (Exception ex) { 100 exception = true; 101 throw ex; 102 } 103 finally { 104 try { 105 if (jobClient != null) { 106 jobClient.close(); 107 } 108 } 109 catch (Exception ex) { 110 if (exception) { 111 System.out.println("JobClient Error: " + ex); 112 } 113 else { 114 throw ex; 115 } 116 } 117 } 118 return runJob; 119 } 120 121 @SuppressWarnings("unchecked") 122 protected JobClient createJobClient(JobConf jobConf) throws IOException { 123 return new JobClient(jobConf); 124 } 125 126 // allows any character in the value, the conf.setStrings() does not allow 127 // commas 128 public static void setStrings(Configuration conf, String key, String[] values) { 129 if (values != null) { 130 conf.setInt(key + ".size", values.length); 131 for (int i = 0; i < values.length; i++) { 132 conf.set(key + "." + i, values[i]); 133 } 134 } 135 } 136 137 public static String[] getStrings(Configuration conf, String key) { 138 String[] values = new String[conf.getInt(key + ".size", 0)]; 139 for (int i = 0; i < values.length; i++) { 140 values[i] = conf.get(key + "." + i); 141 if (values[i] == null) { 142 values[i] = ""; 143 } 144 } 145 return values; 146 } 147 148 }