This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import org.apache.pig.Main;
021 import org.apache.hadoop.conf.Configuration;
022 import org.apache.hadoop.fs.Path;
023 import org.apache.hadoop.mapred.JobClient;
024
025 import java.io.FileNotFoundException;
026 import java.io.OutputStream;
027 import java.io.FileOutputStream;
028 import java.io.BufferedReader;
029 import java.io.FileReader;
030 import java.io.File;
031 import java.io.IOException;
032 import java.util.HashSet;
033 import java.util.Map;
034 import java.util.List;
035 import java.util.ArrayList;
036 import java.util.Properties;
037 import java.util.Set;
038 import java.net.URL;
039
040 public class PigMainWithOldAPI extends LauncherMain {
041 private static final Set<String> DISALLOWED_PIG_OPTIONS = new HashSet<String>();
042
043 static {
044 DISALLOWED_PIG_OPTIONS.add("-4");
045 DISALLOWED_PIG_OPTIONS.add("-log4jconf");
046 DISALLOWED_PIG_OPTIONS.add("-e");
047 DISALLOWED_PIG_OPTIONS.add("-execute");
048 DISALLOWED_PIG_OPTIONS.add("-f");
049 DISALLOWED_PIG_OPTIONS.add("-file");
050 DISALLOWED_PIG_OPTIONS.add("-l");
051 DISALLOWED_PIG_OPTIONS.add("-logfile");
052 DISALLOWED_PIG_OPTIONS.add("-r");
053 DISALLOWED_PIG_OPTIONS.add("-dryrun");
054 DISALLOWED_PIG_OPTIONS.add("-x");
055 DISALLOWED_PIG_OPTIONS.add("-exectype");
056 DISALLOWED_PIG_OPTIONS.add("-P");
057 DISALLOWED_PIG_OPTIONS.add("-propertyFile");
058 }
059
060 public static void main(String[] args) throws Exception {
061 run(PigMainWithOldAPI.class, args);
062 }
063
064 protected void run(String[] args) throws Exception {
065 System.out.println();
066 System.out.println("Oozie Pig action configuration");
067 System.out.println("=================================================================");
068
069 // loading action conf prepared by Oozie
070 Configuration actionConf = new Configuration(false);
071
072 String actionXml = System.getProperty("oozie.action.conf.xml");
073
074 if (actionXml == null) {
075 throw new RuntimeException("Missing Java System Property [oozie.action.conf.xml]");
076 }
077 if (!new File(actionXml).exists()) {
078 throw new RuntimeException("Action Configuration XML file [" + actionXml + "] does not exist");
079 }
080
081 actionConf.addResource(new Path("file:///", actionXml));
082
083 Properties pigProperties = new Properties();
084 for (Map.Entry<String, String> entry : actionConf) {
085 pigProperties.setProperty(entry.getKey(), entry.getValue());
086 }
087
088 //propagate delegation related props from launcher job to Pig job
089 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
090 pigProperties.setProperty("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
091 System.out.println("------------------------");
092 System.out.println("Setting env property for mapreduce.job.credentials.binary to:"
093 + System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
094 System.out.println("------------------------");
095 System.setProperty("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
096 }
097 else {
098 System.out.println("Non-kerberoes execution");
099 }
100
101 OutputStream os = new FileOutputStream("pig.properties");
102 pigProperties.store(os, "");
103 os.close();
104
105 System.out.println();
106 System.out.println("pig.properties content:");
107 System.out.println("------------------------");
108 pigProperties.store(System.out, "");
109 System.out.flush();
110 System.out.println("------------------------");
111 System.out.println();
112
113 List<String> arguments = new ArrayList<String>();
114 String script = actionConf.get("oozie.pig.script");
115
116 if (script == null) {
117 throw new RuntimeException("Action Configuration does not have [oozie.pig.script] property");
118 }
119
120 if (!new File(script).exists()) {
121 throw new RuntimeException("Error: Pig script file [" + script + "] does not exist");
122 }
123
124 System.out.println("Pig script [" + script + "] content: ");
125 System.out.println("------------------------");
126 BufferedReader br = new BufferedReader(new FileReader(script));
127 String line = br.readLine();
128 while (line != null) {
129 System.out.println(line);
130 line = br.readLine();
131 }
132 br.close();
133 System.out.println("------------------------");
134 System.out.println();
135
136 arguments.add("-file");
137 arguments.add(script);
138 String[] params = MapReduceMain.getStrings(actionConf, "oozie.pig.params");
139 for (String param : params) {
140 arguments.add("-param");
141 arguments.add(param);
142 }
143
144 String hadoopJobId = System.getProperty("oozie.launcher.job.id");
145 if (hadoopJobId == null) {
146 throw new RuntimeException("Launcher Hadoop Job ID system property not set");
147 }
148
149 String logFile = new File("pig-oozie-" + hadoopJobId + ".log").getAbsolutePath();
150
151 URL log4jFile = Thread.currentThread().getContextClassLoader().getResource("log4j.properties");
152 if (log4jFile != null) {
153
154 String pigLogLevel = actionConf.get("oozie.pig.log.level", "INFO");
155
156 // append required PIG properties to the default hadoop log4j file
157 Properties hadoopProps = new Properties();
158 hadoopProps.load(log4jFile.openStream());
159 hadoopProps.setProperty("log4j.logger.org.apache.pig", pigLogLevel + ", A, B");
160 hadoopProps.setProperty("log4j.appender.A", "org.apache.log4j.ConsoleAppender");
161 hadoopProps.setProperty("log4j.appender.A.layout", "org.apache.log4j.PatternLayout");
162 hadoopProps.setProperty("log4j.appender.A.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n");
163 hadoopProps.setProperty("log4j.appender.B", "org.apache.log4j.FileAppender");
164 hadoopProps.setProperty("log4j.appender.B.file", logFile);
165 hadoopProps.setProperty("log4j.appender.B.layout", "org.apache.log4j.PatternLayout");
166 hadoopProps.setProperty("log4j.appender.B.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n");
167
168 String localProps = new File("piglog4j.properties").getAbsolutePath();
169 OutputStream os1 = new FileOutputStream(localProps);
170 hadoopProps.store(os1, "");
171 os1.close();
172
173 arguments.add("-log4jconf");
174 arguments.add(localProps);
175
176 // print out current directory
177 File localDir = new File(localProps).getParentFile();
178 System.out.println("Current (local) dir = " + localDir.getAbsolutePath());
179 }
180 else {
181 System.out.println("log4jfile is null");
182 }
183
184 String pigLog = "pig-" + hadoopJobId + ".log";
185 arguments.add("-logfile");
186 arguments.add(pigLog);
187
188 String[] pigArgs = MapReduceMain.getStrings(actionConf, "oozie.pig.args");
189 for (String pigArg : pigArgs) {
190 if (DISALLOWED_PIG_OPTIONS.contains(pigArg)) {
191 throw new RuntimeException("Error: Pig argument " + pigArg + " is not supported");
192 }
193 arguments.add(pigArg);
194 }
195
196 System.out.println("Pig command arguments :");
197 for (String arg : arguments) {
198 System.out.println(" " + arg);
199 }
200
201 System.out.println("=================================================================");
202 System.out.println();
203 System.out.println(">>> Invoking Pig command line now >>>");
204 System.out.println();
205 System.out.flush();
206
207 try {
208 System.out.println();
209 runPigJob(new String[] { "-version" });
210 }
211 catch (SecurityException ex) {
212 LauncherSecurityManager.reset();
213 }
214 System.out.println();
215 System.out.flush();
216 try {
217 runPigJob(arguments.toArray(new String[arguments.size()]));
218 }
219 catch (SecurityException ex) {
220 if (LauncherSecurityManager.getExitInvoked()) {
221 if (LauncherSecurityManager.getExitCode() != 0) {
222 System.err.println();
223 System.err.println("Pig logfile dump:");
224 System.err.println();
225 try {
226 BufferedReader reader = new BufferedReader(new FileReader(pigLog));
227 line = reader.readLine();
228 while (line != null) {
229 System.err.println(line);
230 line = reader.readLine();
231 }
232 reader.close();
233 }
234 catch (FileNotFoundException e) {
235 System.err.println("pig log file: " + pigLog + " not found.");
236 }
237 throw ex;
238 }
239 }
240 }
241
242 System.out.println();
243 System.out.println("<<< Invocation of Pig command completed <<<");
244 System.out.println();
245
246 // harvesting and recording Hadoop Job IDs
247 Properties jobIds = getHadoopJobIds(logFile);
248 File file = new File(System.getProperty("oozie.action.output.properties"));
249 os = new FileOutputStream(file);
250 jobIds.store(os, "");
251 os.close();
252 System.out.println(" Hadoop Job IDs executed by Pig: " + jobIds.getProperty(HADOOP_JOBS));
253 System.out.println();
254 }
255
256 protected void runPigJob(String[] args) throws Exception {
257 // running as from the command line
258 Main.main(args);
259 }
260
261 public static void setPigScript(Configuration conf, String script, String[] params, String[] args) {
262 conf.set("oozie.pig.script", script);
263 MapReduceMain.setStrings(conf, "oozie.pig.params", params);
264 MapReduceMain.setStrings(conf, "oozie.pig.args", args);
265 }
266
267 private static final String JOB_ID_LOG_PREFIX = "HadoopJobId: ";
268
269 protected Properties getHadoopJobIds(String logFile) throws IOException {
270 int jobCount = 0;
271 Properties props = new Properties();
272 StringBuffer sb = new StringBuffer(100);
273 if (new File(logFile).exists() == false) {
274 System.err.println("pig log file: " + logFile + " not present. Therefore no Hadoop jobids found");
275 props.setProperty(HADOOP_JOBS, "");
276 }
277 else {
278 BufferedReader br = new BufferedReader(new FileReader(logFile));
279 String line = br.readLine();
280 String separator = "";
281 while (line != null) {
282 if (line.contains(JOB_ID_LOG_PREFIX)) {
283 int jobIdStarts = line.indexOf(JOB_ID_LOG_PREFIX) + JOB_ID_LOG_PREFIX.length();
284 String jobId = line.substring(jobIdStarts);
285 int jobIdEnds = jobId.indexOf(" ");
286 if (jobIdEnds > -1) {
287 jobId = jobId.substring(0, jobId.indexOf(" "));
288 }
289 sb.append(separator).append(jobId);
290 separator = ",";
291 }
292 line = br.readLine();
293 }
294 br.close();
295 props.setProperty(HADOOP_JOBS, sb.toString());
296 }
297 return props;
298 }
299
300 }