001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.hadoop.fs.Path; 022 import org.apache.hadoop.fs.FileSystem; 023 import org.apache.hadoop.mapred.JobClient; 024 import org.apache.hadoop.mapred.JobConf; 025 import org.apache.hadoop.mapred.RunningJob; 026 import org.apache.hadoop.security.UserGroupInformation; 027 028 import java.util.HashSet; 029 import java.util.Map; 030 import java.util.Properties; 031 import java.io.ByteArrayOutputStream; 032 import java.io.IOException; 033 import java.io.FileOutputStream; 034 import java.io.OutputStream; 035 import java.io.File; 036 037 public class MapReduceMain extends LauncherMain { 038 039 public static void main(String[] args) throws Exception { 040 run(MapReduceMain.class, args); 041 } 042 043 protected void run(String[] args) throws Exception { 044 System.out.println(); 045 System.out.println("Oozie Map-Reduce action configuration"); 046 System.out.println("======================="); 047 048 // loading action conf prepared by Oozie 049 Configuration actionConf = new Configuration(false); 050 actionConf.addResource(new Path("file:///", System.getProperty("oozie.action.conf.xml"))); 051 052 logMasking("Map-Reduce job configuration:", new HashSet<String>(), actionConf); 053 054 System.out.println("Submitting Oozie action Map-Reduce job"); 055 System.out.println(); 056 // submitting job 057 RunningJob runningJob = submitJob(actionConf); 058 059 // propagating job id back to Oozie 060 String jobId = runningJob.getID().toString(); 061 Properties props = new Properties(); 062 props.setProperty("id", jobId); 063 File idFile = new File(System.getProperty("oozie.action.newId.properties")); 064 OutputStream os = new FileOutputStream(idFile); 065 props.store(os, ""); 066 os.close(); 067 068 System.out.println("======================="); 069 System.out.println(); 070 } 071 072 protected void addActionConf(JobConf jobConf, Configuration actionConf) { 073 for (Map.Entry<String, String> entry : actionConf) { 074 jobConf.set(entry.getKey(), entry.getValue()); 075 } 076 } 077 078 protected RunningJob submitJob(Configuration actionConf) throws Exception { 079 JobConf jobConf = new JobConf(); 080 addActionConf(jobConf, actionConf); 081 082 // propagate delegation related props from launcher job to MR job 083 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { 084 jobConf.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION")); 085 } 086 JobClient jobClient = null; 087 RunningJob runJob = null; 088 boolean exception = false; 089 try { 090 jobClient = createJobClient(jobConf); 091 runJob = jobClient.submitJob(jobConf); 092 } 093 catch (Exception ex) { 094 exception = true; 095 throw ex; 096 } 097 finally { 098 try { 099 if (jobClient != null) { 100 jobClient.close(); 101 } 102 } 103 catch (Exception ex) { 104 if (exception) { 105 System.out.println("JobClient Error: " + ex); 106 } 107 else { 108 throw ex; 109 } 110 } 111 } 112 return runJob; 113 } 114 115 @SuppressWarnings("unchecked") 116 protected JobClient createJobClient(JobConf jobConf) throws IOException { 117 return new JobClient(jobConf); 118 } 119 120 // allows any character in the value, the conf.setStrings() does not allow 121 // commas 122 public static void setStrings(Configuration conf, String key, String[] values) { 123 if (values != null) { 124 conf.setInt(key + ".size", values.length); 125 for (int i = 0; i < values.length; i++) { 126 conf.set(key + "." + i, values[i]); 127 } 128 } 129 } 130 131 public static String[] getStrings(Configuration conf, String key) { 132 String[] values = new String[conf.getInt(key + ".size", 0)]; 133 for (int i = 0; i < values.length; i++) { 134 values[i] = conf.get(key + "." + i); 135 } 136 return values; 137 } 138 139 }