This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.hadoop.fs.Path;
022 import org.apache.hadoop.fs.FileSystem;
023 import org.apache.hadoop.mapred.JobClient;
024 import org.apache.hadoop.mapred.JobConf;
025 import org.apache.hadoop.mapred.RunningJob;
026 import org.apache.hadoop.security.UserGroupInformation;
027
028 import java.util.HashSet;
029 import java.util.Map;
030 import java.util.Properties;
031 import java.io.ByteArrayOutputStream;
032 import java.io.IOException;
033 import java.io.FileOutputStream;
034 import java.io.OutputStream;
035 import java.io.File;
036
037 public class MapReduceMain extends LauncherMain {
038
039 public static void main(String[] args) throws Exception {
040 run(MapReduceMain.class, args);
041 }
042
043 protected void run(String[] args) throws Exception {
044 System.out.println();
045 System.out.println("Oozie Map-Reduce action configuration");
046 System.out.println("=======================");
047
048 // loading action conf prepared by Oozie
049 Configuration actionConf = new Configuration(false);
050 actionConf.addResource(new Path("file:///", System.getProperty("oozie.action.conf.xml")));
051
052 logMasking("Map-Reduce job configuration:", new HashSet<String>(), actionConf);
053
054 System.out.println("Submitting Oozie action Map-Reduce job");
055 System.out.println();
056 // submitting job
057 RunningJob runningJob = submitJob(actionConf);
058
059 // propagating job id back to Oozie
060 String jobId = runningJob.getID().toString();
061 Properties props = new Properties();
062 props.setProperty("id", jobId);
063 File idFile = new File(System.getProperty("oozie.action.newId.properties"));
064 OutputStream os = new FileOutputStream(idFile);
065 props.store(os, "");
066 os.close();
067
068 System.out.println("=======================");
069 System.out.println();
070 }
071
072 protected void addActionConf(JobConf jobConf, Configuration actionConf) {
073 for (Map.Entry<String, String> entry : actionConf) {
074 jobConf.set(entry.getKey(), entry.getValue());
075 }
076 }
077
078 protected RunningJob submitJob(Configuration actionConf) throws Exception {
079 JobConf jobConf = new JobConf();
080 addActionConf(jobConf, actionConf);
081
082 // propagate delegation related props from launcher job to MR job
083 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
084 jobConf.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
085 }
086 JobClient jobClient = null;
087 RunningJob runJob = null;
088 boolean exception = false;
089 try {
090 jobClient = createJobClient(jobConf);
091 runJob = jobClient.submitJob(jobConf);
092 }
093 catch (Exception ex) {
094 exception = true;
095 throw ex;
096 }
097 finally {
098 try {
099 if (jobClient != null) {
100 jobClient.close();
101 }
102 }
103 catch (Exception ex) {
104 if (exception) {
105 System.out.println("JobClient Error: " + ex);
106 }
107 else {
108 throw ex;
109 }
110 }
111 }
112 return runJob;
113 }
114
115 @SuppressWarnings("unchecked")
116 protected JobClient createJobClient(JobConf jobConf) throws IOException {
117 return new JobClient(jobConf);
118 }
119
120 // allows any character in the value, the conf.setStrings() does not allow
121 // commas
122 public static void setStrings(Configuration conf, String key, String[] values) {
123 if (values != null) {
124 conf.setInt(key + ".size", values.length);
125 for (int i = 0; i < values.length; i++) {
126 conf.set(key + "." + i, values[i]);
127 }
128 }
129 }
130
131 public static String[] getStrings(Configuration conf, String key) {
132 String[] values = new String[conf.getInt(key + ".size", 0)];
133 for (int i = 0; i < values.length; i++) {
134 values[i] = conf.get(key + "." + i);
135 }
136 return values;
137 }
138
139 }