This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import org.apache.pig.Main;
021 import org.apache.pig.PigRunner;
022 import org.apache.pig.tools.pigstats.JobStats;
023 import org.apache.pig.tools.pigstats.PigStats;
024 import org.apache.hadoop.conf.Configuration;
025 import org.apache.hadoop.fs.Path;
026
027 import java.io.BufferedWriter;
028 import java.io.FileNotFoundException;
029 import java.io.FileWriter;
030 import java.io.OutputStream;
031 import java.io.FileOutputStream;
032 import java.io.BufferedReader;
033 import java.io.FileReader;
034 import java.io.File;
035 import java.io.IOException;
036 import java.util.Arrays;
037 import java.util.HashSet;
038 import java.util.Map;
039 import java.util.List;
040 import java.util.ArrayList;
041 import java.util.Properties;
042 import java.util.Set;
043 import java.net.URL;
044 import java.util.regex.Pattern;
045
046 public class PigMain extends LauncherMain {
047 private static final Set<String> DISALLOWED_PIG_OPTIONS = new HashSet<String>();
048 public static final String ACTION_PREFIX = "oozie.action.";
049 public static final String EXTERNAL_CHILD_IDS = ACTION_PREFIX + "externalChildIDs.properties";
050 public static final String EXTERNAL_ACTION_STATS = ACTION_PREFIX + "stats.properties";
051 public static final String EXTERNAL_STATS_WRITE = ACTION_PREFIX + "external.stats.write";
052 public static final int STRING_BUFFER_SIZE = 100;
053
054 private static final Pattern[] PIG_JOB_IDS_PATTERNS = {
055 Pattern.compile("HadoopJobId: (job_\\S*)")
056 };
057
058 static {
059 DISALLOWED_PIG_OPTIONS.add("-4");
060 DISALLOWED_PIG_OPTIONS.add("-log4jconf");
061 DISALLOWED_PIG_OPTIONS.add("-e");
062 DISALLOWED_PIG_OPTIONS.add("-execute");
063 DISALLOWED_PIG_OPTIONS.add("-f");
064 DISALLOWED_PIG_OPTIONS.add("-file");
065 DISALLOWED_PIG_OPTIONS.add("-l");
066 DISALLOWED_PIG_OPTIONS.add("-logfile");
067 DISALLOWED_PIG_OPTIONS.add("-r");
068 DISALLOWED_PIG_OPTIONS.add("-dryrun");
069 DISALLOWED_PIG_OPTIONS.add("-x");
070 DISALLOWED_PIG_OPTIONS.add("-exectype");
071 DISALLOWED_PIG_OPTIONS.add("-P");
072 DISALLOWED_PIG_OPTIONS.add("-propertyFile");
073 }
074
075 public static void main(String[] args) throws Exception {
076 run(PigMain.class, args);
077 }
078
079 @Override
080 protected void run(String[] args) throws Exception {
081 System.out.println();
082 System.out.println("Oozie Pig action configuration");
083 System.out.println("=================================================================");
084
085 // loading action conf prepared by Oozie
086 Configuration actionConf = new Configuration(false);
087
088 String actionXml = System.getProperty("oozie.action.conf.xml");
089
090 if (actionXml == null) {
091 throw new RuntimeException("Missing Java System Property [oozie.action.conf.xml]");
092 }
093 if (!new File(actionXml).exists()) {
094 throw new RuntimeException("Action Configuration XML file [" + actionXml + "] does not exist");
095 }
096
097 actionConf.addResource(new Path("file:///", actionXml));
098
099 Properties pigProperties = new Properties();
100 for (Map.Entry<String, String> entry : actionConf) {
101 pigProperties.setProperty(entry.getKey(), entry.getValue());
102 }
103
104 // propagate delegation related props from launcher job to Pig job
105 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
106 pigProperties.setProperty("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
107 System.out.println("------------------------");
108 System.out.println("Setting env property for mapreduce.job.credentials.binary to:"
109 + System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
110 System.out.println("------------------------");
111 System.setProperty("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
112 }
113 else {
114 System.out.println("Non-kerberoes execution");
115 }
116
117 OutputStream os = new FileOutputStream("pig.properties");
118 pigProperties.store(os, "");
119 os.close();
120
121 logMasking("pig.properties:", Arrays.asList("password"), pigProperties.entrySet());
122
123 List<String> arguments = new ArrayList<String>();
124 String script = actionConf.get("oozie.pig.script");
125
126 if (script == null) {
127 throw new RuntimeException("Action Configuration does not have [oozie.pig.script] property");
128 }
129
130 if (!new File(script).exists()) {
131 throw new RuntimeException("Error: Pig script file [" + script + "] does not exist");
132 }
133
134 System.out.println("Pig script [" + script + "] content: ");
135 System.out.println("------------------------");
136 BufferedReader br = new BufferedReader(new FileReader(script));
137 String line = br.readLine();
138 while (line != null) {
139 System.out.println(line);
140 line = br.readLine();
141 }
142 br.close();
143 System.out.println("------------------------");
144 System.out.println();
145
146 arguments.add("-file");
147 arguments.add(script);
148 String[] params = MapReduceMain.getStrings(actionConf, "oozie.pig.params");
149 for (String param : params) {
150 arguments.add("-param");
151 arguments.add(param);
152 }
153
154 String hadoopJobId = System.getProperty("oozie.launcher.job.id");
155 if (hadoopJobId == null) {
156 throw new RuntimeException("Launcher Hadoop Job ID system property not set");
157 }
158
159 String logFile = new File("pig-oozie-" + hadoopJobId + ".log").getAbsolutePath();
160
161 URL log4jFile = Thread.currentThread().getContextClassLoader().getResource("log4j.properties");
162 if (log4jFile != null) {
163
164 String pigLogLevel = actionConf.get("oozie.pig.log.level", "INFO");
165
166 // append required PIG properties to the default hadoop log4j file
167 Properties hadoopProps = new Properties();
168 hadoopProps.load(log4jFile.openStream());
169 hadoopProps.setProperty("log4j.rootLogger", pigLogLevel + ", A, B");
170 hadoopProps.setProperty("log4j.logger.org.apache.pig", pigLogLevel + ", A, B");
171 hadoopProps.setProperty("log4j.appender.A", "org.apache.log4j.ConsoleAppender");
172 hadoopProps.setProperty("log4j.appender.A.layout", "org.apache.log4j.PatternLayout");
173 hadoopProps.setProperty("log4j.appender.A.layout.ConversionPattern", "%d [%t] %-5p %c %x - %m%n");
174 hadoopProps.setProperty("log4j.appender.B", "org.apache.log4j.FileAppender");
175 hadoopProps.setProperty("log4j.appender.B.file", logFile);
176 hadoopProps.setProperty("log4j.appender.B.layout", "org.apache.log4j.PatternLayout");
177 hadoopProps.setProperty("log4j.appender.B.layout.ConversionPattern", "%d [%t] %-5p %c %x - %m%n");
178
179 String localProps = new File("piglog4j.properties").getAbsolutePath();
180 OutputStream os1 = new FileOutputStream(localProps);
181 hadoopProps.store(os1, "");
182 os1.close();
183
184 arguments.add("-log4jconf");
185 arguments.add(localProps);
186
187 // print out current directory
188 File localDir = new File(localProps).getParentFile();
189 System.out.println("Current (local) dir = " + localDir.getAbsolutePath());
190 }
191 else {
192 System.out.println("log4jfile is null");
193 }
194
195 String pigLog = "pig-" + hadoopJobId + ".log";
196 arguments.add("-logfile");
197 arguments.add(pigLog);
198
199 String[] pigArgs = MapReduceMain.getStrings(actionConf, "oozie.pig.args");
200 for (String pigArg : pigArgs) {
201 if (DISALLOWED_PIG_OPTIONS.contains(pigArg)) {
202 throw new RuntimeException("Error: Pig argument " + pigArg + " is not supported");
203 }
204 arguments.add(pigArg);
205 }
206
207 System.out.println("Pig command arguments :");
208 for (String arg : arguments) {
209 System.out.println(" " + arg);
210 }
211
212 System.out.println("=================================================================");
213 System.out.println();
214 System.out.println(">>> Invoking Pig command line now >>>");
215 System.out.println();
216 System.out.flush();
217
218 System.out.println();
219 runPigJob(new String[] { "-version" }, null, true, false);
220 System.out.println();
221 System.out.flush();
222 boolean hasStats = Boolean.parseBoolean(actionConf.get(EXTERNAL_STATS_WRITE));
223 runPigJob(arguments.toArray(new String[arguments.size()]), pigLog, false, hasStats);
224
225 System.out.println();
226 System.out.println("<<< Invocation of Pig command completed <<<");
227 System.out.println();
228
229 // For embedded python or for version of pig lower than 0.8, pig stats are not supported.
230 // So retrieving hadoop Ids here
231 File file = new File(System.getProperty(EXTERNAL_CHILD_IDS));
232 if (!file.exists()) {
233 Properties props = getHadoopJobIds(logFile, PIG_JOB_IDS_PATTERNS);
234 writeExternalData(props.getProperty(HADOOP_JOBS), file);
235 System.out.println(" Hadoop Job IDs executed by Pig: " + props.getProperty(HADOOP_JOBS));
236 System.out.println();
237 }
238 }
239
240
241
242 private void handleError(String pigLog) throws Exception {
243 System.err.println();
244 System.err.println("Pig logfile dump:");
245 System.err.println();
246 try {
247 BufferedReader reader = new BufferedReader(new FileReader(pigLog));
248 String line = reader.readLine();
249 while (line != null) {
250 System.err.println(line);
251 line = reader.readLine();
252 }
253 reader.close();
254 }
255 catch (FileNotFoundException e) {
256 System.err.println("pig log file: " + pigLog + " not found.");
257 }
258 }
259
260 /**
261 * Runs the pig script using PigRunner API if version 0.8 or above. Embedded
262 * pig within python is also supported.
263 *
264 * @param args pig command line arguments
265 * @param pigLog pig log file
266 * @param resetSecurityManager specify if need to reset security manager
267 * @param retrieveStats specify if stats are to be retrieved
268 * @throws Exception
269 */
270 protected void runPigJob(String[] args, String pigLog, boolean resetSecurityManager, boolean retrieveStats) throws Exception {
271 // running as from the command line
272 boolean pigRunnerExists = true;
273 Class klass;
274 try {
275 klass = Class.forName("org.apache.pig.PigRunner");
276 }
277 catch (ClassNotFoundException ex) {
278 pigRunnerExists = false;
279 }
280
281 if (pigRunnerExists) {
282 System.out.println("Run pig script using PigRunner.run() for Pig version 0.8+");
283 PigStats stats = PigRunner.run(args, null);
284 String jobIds = getHadoopJobIds(stats);
285 if (jobIds != null && !jobIds.isEmpty()) {
286 System.out.println("Hadoop Job IDs executed by Pig: " + jobIds);
287 File f = new File(System.getProperty(EXTERNAL_CHILD_IDS));
288 writeExternalData(jobIds, f);
289 }
290 // isSuccessful is the API from 0.9 supported by both PigStats and
291 // EmbeddedPigStats
292 if (!stats.isSuccessful()) {
293 if (pigLog != null) {
294 handleError(pigLog);
295 }
296 throw new LauncherMainException(PigRunner.ReturnCode.FAILURE);
297 }
298 else {
299 // If pig command is ran with just the "version" option, then
300 // return
301 if (resetSecurityManager) {
302 return;
303 }
304 // Retrieve stats only if user has specified in workflow
305 // configuration
306 if (retrieveStats) {
307 ActionStats pigStats;
308 String JSONString;
309 try {
310 pigStats = new OoziePigStats(stats);
311 JSONString = pigStats.toJSON();
312 } catch (UnsupportedOperationException uoe) {
313 throw new UnsupportedOperationException(
314 "Pig stats are not supported for this type of operation", uoe);
315 }
316 File f = new File(System.getProperty(EXTERNAL_ACTION_STATS));
317 writeExternalData(JSONString, f);
318 }
319 }
320 }
321 else {
322 try {
323 System.out.println("Run pig script using Main.main() for Pig version before 0.8");
324 Main.main(args);
325 }
326 catch (SecurityException ex) {
327 if (resetSecurityManager) {
328 LauncherSecurityManager.reset();
329 }
330 else {
331 if (LauncherSecurityManager.getExitInvoked()) {
332 if (LauncherSecurityManager.getExitCode() != 0) {
333 if (pigLog != null) {
334 handleError(pigLog);
335 }
336 throw ex;
337 }
338 }
339 }
340 }
341 }
342 }
343
344 // write external data(stats, hadoopIds) to the file which will be read by the LauncherMapper
345 private static void writeExternalData(String data, File f) throws IOException {
346 BufferedWriter out = null;
347 try {
348 out = new BufferedWriter(new FileWriter(f));
349 out.write(data);
350 }
351 finally {
352 if (out != null) {
353 out.close();
354 }
355 }
356 }
357
358 public static void setPigScript(Configuration conf, String script, String[] params, String[] args) {
359 conf.set("oozie.pig.script", script);
360 MapReduceMain.setStrings(conf, "oozie.pig.params", params);
361 MapReduceMain.setStrings(conf, "oozie.pig.args", args);
362 }
363
364 /**
365 * Get Hadoop Ids through PigStats API
366 *
367 * @param pigStats stats object obtained through PigStats API
368 * @return comma-separated String
369 */
370 protected String getHadoopJobIds(PigStats pigStats) {
371 StringBuilder sb = new StringBuilder(STRING_BUFFER_SIZE);
372 String separator = ",";
373 // Collect Hadoop Ids through JobGraph API of Pig and store them as
374 // comma separated string
375 try {
376 PigStats.JobGraph jobGraph = pigStats.getJobGraph();
377 for (JobStats jobStats : jobGraph) {
378 String hadoopJobId = jobStats.getJobId();
379 if (sb.length() > 0) {
380 sb.append(separator);
381 }
382 sb.append(hadoopJobId);
383 }
384 }
385 // Return null if Pig API's are not supported
386 catch (UnsupportedOperationException uoe) {
387 return null;
388 }
389 return sb.toString();
390 }
391
392 }