This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import org.apache.pig.Main;
021 import org.apache.pig.PigRunner;
022 import org.apache.pig.tools.pigstats.PigStats;
023 import org.apache.hadoop.conf.Configuration;
024 import org.apache.hadoop.fs.Path;
025
026 import java.io.FileNotFoundException;
027 import java.io.OutputStream;
028 import java.io.FileOutputStream;
029 import java.io.BufferedReader;
030 import java.io.FileReader;
031 import java.io.File;
032 import java.io.IOException;
033 import java.util.Arrays;
034 import java.util.HashSet;
035 import java.util.Map;
036 import java.util.List;
037 import java.util.ArrayList;
038 import java.util.Properties;
039 import java.util.Set;
040 import java.net.URL;
041
042 public class PigMain extends LauncherMain {
043 private static final Set<String> DISALLOWED_PIG_OPTIONS = new HashSet<String>();
044
045 static {
046 DISALLOWED_PIG_OPTIONS.add("-4");
047 DISALLOWED_PIG_OPTIONS.add("-log4jconf");
048 DISALLOWED_PIG_OPTIONS.add("-e");
049 DISALLOWED_PIG_OPTIONS.add("-execute");
050 DISALLOWED_PIG_OPTIONS.add("-f");
051 DISALLOWED_PIG_OPTIONS.add("-file");
052 DISALLOWED_PIG_OPTIONS.add("-l");
053 DISALLOWED_PIG_OPTIONS.add("-logfile");
054 DISALLOWED_PIG_OPTIONS.add("-r");
055 DISALLOWED_PIG_OPTIONS.add("-dryrun");
056 DISALLOWED_PIG_OPTIONS.add("-x");
057 DISALLOWED_PIG_OPTIONS.add("-exectype");
058 DISALLOWED_PIG_OPTIONS.add("-P");
059 DISALLOWED_PIG_OPTIONS.add("-propertyFile");
060 }
061
062 public static void main(String[] args) throws Exception {
063 run(PigMain.class, args);
064 }
065
066 @Override
067 protected void run(String[] args) throws Exception {
068 System.out.println();
069 System.out.println("Oozie Pig action configuration");
070 System.out.println("=================================================================");
071
072 // loading action conf prepared by Oozie
073 Configuration actionConf = new Configuration(false);
074
075 String actionXml = System.getProperty("oozie.action.conf.xml");
076
077 if (actionXml == null) {
078 throw new RuntimeException("Missing Java System Property [oozie.action.conf.xml]");
079 }
080 if (!new File(actionXml).exists()) {
081 throw new RuntimeException("Action Configuration XML file [" + actionXml + "] does not exist");
082 }
083
084 actionConf.addResource(new Path("file:///", actionXml));
085
086 Properties pigProperties = new Properties();
087 for (Map.Entry<String, String> entry : actionConf) {
088 pigProperties.setProperty(entry.getKey(), entry.getValue());
089 }
090
091 // propagate delegation related props from launcher job to Pig job
092 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
093 pigProperties.setProperty("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
094 System.out.println("------------------------");
095 System.out.println("Setting env property for mapreduce.job.credentials.binary to:"
096 + System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
097 System.out.println("------------------------");
098 System.setProperty("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
099 }
100 else {
101 System.out.println("Non-kerberoes execution");
102 }
103
104 OutputStream os = new FileOutputStream("pig.properties");
105 pigProperties.store(os, "");
106 os.close();
107
108 logMasking("pig.properties:", Arrays.asList("password"), pigProperties.entrySet());
109
110 List<String> arguments = new ArrayList<String>();
111 String script = actionConf.get("oozie.pig.script");
112
113 if (script == null) {
114 throw new RuntimeException("Action Configuration does not have [oozie.pig.script] property");
115 }
116
117 if (!new File(script).exists()) {
118 throw new RuntimeException("Error: Pig script file [" + script + "] does not exist");
119 }
120
121 System.out.println("Pig script [" + script + "] content: ");
122 System.out.println("------------------------");
123 BufferedReader br = new BufferedReader(new FileReader(script));
124 String line = br.readLine();
125 while (line != null) {
126 System.out.println(line);
127 line = br.readLine();
128 }
129 br.close();
130 System.out.println("------------------------");
131 System.out.println();
132
133 arguments.add("-file");
134 arguments.add(script);
135 String[] params = MapReduceMain.getStrings(actionConf, "oozie.pig.params");
136 for (String param : params) {
137 arguments.add("-param");
138 arguments.add(param);
139 }
140
141 String hadoopJobId = System.getProperty("oozie.launcher.job.id");
142 if (hadoopJobId == null) {
143 throw new RuntimeException("Launcher Hadoop Job ID system property not set");
144 }
145
146 String logFile = new File("pig-oozie-" + hadoopJobId + ".log").getAbsolutePath();
147
148 URL log4jFile = Thread.currentThread().getContextClassLoader().getResource("log4j.properties");
149 if (log4jFile != null) {
150
151 String pigLogLevel = actionConf.get("oozie.pig.log.level", "INFO");
152
153 // append required PIG properties to the default hadoop log4j file
154 Properties hadoopProps = new Properties();
155 hadoopProps.load(log4jFile.openStream());
156 hadoopProps.setProperty("log4j.logger.org.apache.pig", pigLogLevel + ", A, B");
157 hadoopProps.setProperty("log4j.appender.A", "org.apache.log4j.ConsoleAppender");
158 hadoopProps.setProperty("log4j.appender.A.layout", "org.apache.log4j.PatternLayout");
159 hadoopProps.setProperty("log4j.appender.A.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n");
160 hadoopProps.setProperty("log4j.appender.B", "org.apache.log4j.FileAppender");
161 hadoopProps.setProperty("log4j.appender.B.file", logFile);
162 hadoopProps.setProperty("log4j.appender.B.layout", "org.apache.log4j.PatternLayout");
163 hadoopProps.setProperty("log4j.appender.B.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n");
164
165 String localProps = new File("piglog4j.properties").getAbsolutePath();
166 OutputStream os1 = new FileOutputStream(localProps);
167 hadoopProps.store(os1, "");
168 os1.close();
169
170 arguments.add("-log4jconf");
171 arguments.add(localProps);
172
173 // print out current directory
174 File localDir = new File(localProps).getParentFile();
175 System.out.println("Current (local) dir = " + localDir.getAbsolutePath());
176 }
177 else {
178 System.out.println("log4jfile is null");
179 }
180
181 String pigLog = "pig-" + hadoopJobId + ".log";
182 arguments.add("-logfile");
183 arguments.add(pigLog);
184
185 String[] pigArgs = MapReduceMain.getStrings(actionConf, "oozie.pig.args");
186 for (String pigArg : pigArgs) {
187 if (DISALLOWED_PIG_OPTIONS.contains(pigArg)) {
188 throw new RuntimeException("Error: Pig argument " + pigArg + " is not supported");
189 }
190 arguments.add(pigArg);
191 }
192
193 System.out.println("Pig command arguments :");
194 for (String arg : arguments) {
195 System.out.println(" " + arg);
196 }
197
198 System.out.println("=================================================================");
199 System.out.println();
200 System.out.println(">>> Invoking Pig command line now >>>");
201 System.out.println();
202 System.out.flush();
203
204 System.out.println();
205 runPigJob(new String[] { "-version" }, null, true);
206 System.out.println();
207 System.out.flush();
208
209 runPigJob(arguments.toArray(new String[arguments.size()]), pigLog, false);
210
211 System.out.println();
212 System.out.println("<<< Invocation of Pig command completed <<<");
213 System.out.println();
214
215 // harvesting and recording Hadoop Job IDs
216 Properties jobIds = getHadoopJobIds(logFile);
217 File file = new File(System.getProperty("oozie.action.output.properties"));
218 os = new FileOutputStream(file);
219 jobIds.store(os, "");
220 os.close();
221 System.out.println(" Hadoop Job IDs executed by Pig: " + jobIds.getProperty("hadoopJobs"));
222 System.out.println();
223 }
224
225 private void handleError(String pigLog) throws Exception {
226 System.err.println();
227 System.err.println("Pig logfile dump:");
228 System.err.println();
229 try {
230 BufferedReader reader = new BufferedReader(new FileReader(pigLog));
231 String line = reader.readLine();
232 while (line != null) {
233 System.err.println(line);
234 line = reader.readLine();
235 }
236 reader.close();
237 }
238 catch (FileNotFoundException e) {
239 System.err.println("pig log file: " + pigLog + " not found.");
240 }
241 }
242
243 /**
244 * Runs the pig script using PigRunner API if version 0.8 or above. Embedded
245 * pig within python is also supported.
246 *
247 * @param args pig command line arguments
248 * @param pigLog pig log file
249 * @param resetSecurityManager specify if need to reset security manager
250 * @throws Exception
251 */
252 protected void runPigJob(String[] args, String pigLog, boolean resetSecurityManager) throws Exception {
253 // running as from the command line
254 boolean pigRunnerExists = true;
255 Class klass;
256 try {
257 klass = Class.forName("org.apache.pig.PigRunner");
258 }
259 catch (ClassNotFoundException ex) {
260 pigRunnerExists = false;
261 }
262
263 if (pigRunnerExists) {
264 System.out.println("Run pig script using PigRunner.run() for Pig version 0.8+");
265 PigStats stats = PigRunner.run(args, null);
266 // isSuccessful is the API from 0.9 supported by both PigStats and
267 // EmbeddedPigStats
268 if (!stats.isSuccessful()) {
269 if (pigLog != null) {
270 handleError(pigLog);
271 }
272 throw new LauncherMainException(PigRunner.ReturnCode.FAILURE);
273 }
274 }
275 else {
276 try {
277 System.out.println("Run pig script using Main.main() for Pig version before 0.8");
278 Main.main(args);
279 }
280 catch (SecurityException ex) {
281 if (resetSecurityManager) {
282 LauncherSecurityManager.reset();
283 }
284 else {
285 if (LauncherSecurityManager.getExitInvoked()) {
286 if (LauncherSecurityManager.getExitCode() != 0) {
287 if (pigLog != null) {
288 handleError(pigLog);
289 }
290 throw ex;
291 }
292 }
293 }
294 }
295 }
296 }
297
298 public static void setPigScript(Configuration conf, String script, String[] params, String[] args) {
299 conf.set("oozie.pig.script", script);
300 MapReduceMain.setStrings(conf, "oozie.pig.params", params);
301 MapReduceMain.setStrings(conf, "oozie.pig.args", args);
302 }
303
304 private static final String JOB_ID_LOG_PREFIX = "HadoopJobId: ";
305
306 protected Properties getHadoopJobIds(String logFile) throws IOException {
307 int jobCount = 0;
308 Properties props = new Properties();
309 StringBuffer sb = new StringBuffer(100);
310 if (new File(logFile).exists() == false) {
311 System.err.println("pig log file: " + logFile + " not present. Therefore no Hadoop jobids found");
312 props.setProperty("hadoopJobs", "");
313 }
314 else {
315 BufferedReader br = new BufferedReader(new FileReader(logFile));
316 String line = br.readLine();
317 String separator = "";
318 while (line != null) {
319 if (line.contains(JOB_ID_LOG_PREFIX)) {
320 int jobIdStarts = line.indexOf(JOB_ID_LOG_PREFIX) + JOB_ID_LOG_PREFIX.length();
321 String jobId = line.substring(jobIdStarts);
322 int jobIdEnds = jobId.indexOf(" ");
323 if (jobIdEnds > -1) {
324 jobId = jobId.substring(0, jobId.indexOf(" "));
325 }
326 sb.append(separator).append(jobId);
327 separator = ",";
328 }
329 line = br.readLine();
330 }
331 br.close();
332 props.setProperty("hadoopJobs", sb.toString());
333 }
334 return props;
335 }
336
337 }