001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import org.apache.pig.Main;
021    import org.apache.pig.PigRunner;
022    import org.apache.pig.tools.pigstats.PigStats;
023    import org.apache.hadoop.conf.Configuration;
024    import org.apache.hadoop.fs.Path;
025    
026    import java.io.FileNotFoundException;
027    import java.io.OutputStream;
028    import java.io.FileOutputStream;
029    import java.io.BufferedReader;
030    import java.io.FileReader;
031    import java.io.File;
032    import java.io.IOException;
033    import java.util.Arrays;
034    import java.util.HashSet;
035    import java.util.Map;
036    import java.util.List;
037    import java.util.ArrayList;
038    import java.util.Properties;
039    import java.util.Set;
040    import java.net.URL;
041    
042    public class PigMain extends LauncherMain {
043        private static final Set<String> DISALLOWED_PIG_OPTIONS = new HashSet<String>();
044    
045        static {
046            DISALLOWED_PIG_OPTIONS.add("-4");
047            DISALLOWED_PIG_OPTIONS.add("-log4jconf");
048            DISALLOWED_PIG_OPTIONS.add("-e");
049            DISALLOWED_PIG_OPTIONS.add("-execute");
050            DISALLOWED_PIG_OPTIONS.add("-f");
051            DISALLOWED_PIG_OPTIONS.add("-file");
052            DISALLOWED_PIG_OPTIONS.add("-l");
053            DISALLOWED_PIG_OPTIONS.add("-logfile");
054            DISALLOWED_PIG_OPTIONS.add("-r");
055            DISALLOWED_PIG_OPTIONS.add("-dryrun");
056            DISALLOWED_PIG_OPTIONS.add("-x");
057            DISALLOWED_PIG_OPTIONS.add("-exectype");
058            DISALLOWED_PIG_OPTIONS.add("-P");
059            DISALLOWED_PIG_OPTIONS.add("-propertyFile");
060        }
061    
062        public static void main(String[] args) throws Exception {
063            run(PigMain.class, args);
064        }
065    
066        @Override
067        protected void run(String[] args) throws Exception {
068            System.out.println();
069            System.out.println("Oozie Pig action configuration");
070            System.out.println("=================================================================");
071    
072            // loading action conf prepared by Oozie
073            Configuration actionConf = new Configuration(false);
074    
075            String actionXml = System.getProperty("oozie.action.conf.xml");
076    
077            if (actionXml == null) {
078                throw new RuntimeException("Missing Java System Property [oozie.action.conf.xml]");
079            }
080            if (!new File(actionXml).exists()) {
081                throw new RuntimeException("Action Configuration XML file [" + actionXml + "] does not exist");
082            }
083    
084            actionConf.addResource(new Path("file:///", actionXml));
085    
086            Properties pigProperties = new Properties();
087            for (Map.Entry<String, String> entry : actionConf) {
088                pigProperties.setProperty(entry.getKey(), entry.getValue());
089            }
090    
091            // propagate delegation related props from launcher job to Pig job
092            if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
093                pigProperties.setProperty("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
094                System.out.println("------------------------");
095                System.out.println("Setting env property for mapreduce.job.credentials.binary to:"
096                        + System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
097                System.out.println("------------------------");
098                System.setProperty("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
099            }
100            else {
101                System.out.println("Non-kerberoes execution");
102            }
103    
104            OutputStream os = new FileOutputStream("pig.properties");
105            pigProperties.store(os, "");
106            os.close();
107    
108            logMasking("pig.properties:", Arrays.asList("password"), pigProperties.entrySet());
109    
110            List<String> arguments = new ArrayList<String>();
111            String script = actionConf.get("oozie.pig.script");
112    
113            if (script == null) {
114                throw new RuntimeException("Action Configuration does not have [oozie.pig.script] property");
115            }
116    
117            if (!new File(script).exists()) {
118                throw new RuntimeException("Error: Pig script file [" + script + "] does not exist");
119            }
120    
121            System.out.println("Pig script [" + script + "] content: ");
122            System.out.println("------------------------");
123            BufferedReader br = new BufferedReader(new FileReader(script));
124            String line = br.readLine();
125            while (line != null) {
126                System.out.println(line);
127                line = br.readLine();
128            }
129            br.close();
130            System.out.println("------------------------");
131            System.out.println();
132    
133            arguments.add("-file");
134            arguments.add(script);
135            String[] params = MapReduceMain.getStrings(actionConf, "oozie.pig.params");
136            for (String param : params) {
137                arguments.add("-param");
138                arguments.add(param);
139            }
140    
141            String hadoopJobId = System.getProperty("oozie.launcher.job.id");
142            if (hadoopJobId == null) {
143                throw new RuntimeException("Launcher Hadoop Job ID system property not set");
144            }
145    
146            String logFile = new File("pig-oozie-" + hadoopJobId + ".log").getAbsolutePath();
147    
148            URL log4jFile = Thread.currentThread().getContextClassLoader().getResource("log4j.properties");
149            if (log4jFile != null) {
150    
151                String pigLogLevel = actionConf.get("oozie.pig.log.level", "INFO");
152    
153                // append required PIG properties to the default hadoop log4j file
154                Properties hadoopProps = new Properties();
155                hadoopProps.load(log4jFile.openStream());
156                hadoopProps.setProperty("log4j.logger.org.apache.pig", pigLogLevel + ", A, B");
157                hadoopProps.setProperty("log4j.appender.A", "org.apache.log4j.ConsoleAppender");
158                hadoopProps.setProperty("log4j.appender.A.layout", "org.apache.log4j.PatternLayout");
159                hadoopProps.setProperty("log4j.appender.A.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n");
160                hadoopProps.setProperty("log4j.appender.B", "org.apache.log4j.FileAppender");
161                hadoopProps.setProperty("log4j.appender.B.file", logFile);
162                hadoopProps.setProperty("log4j.appender.B.layout", "org.apache.log4j.PatternLayout");
163                hadoopProps.setProperty("log4j.appender.B.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n");
164    
165                String localProps = new File("piglog4j.properties").getAbsolutePath();
166                OutputStream os1 = new FileOutputStream(localProps);
167                hadoopProps.store(os1, "");
168                os1.close();
169    
170                arguments.add("-log4jconf");
171                arguments.add(localProps);
172    
173                // print out current directory
174                File localDir = new File(localProps).getParentFile();
175                System.out.println("Current (local) dir = " + localDir.getAbsolutePath());
176            }
177            else {
178                System.out.println("log4jfile is null");
179            }
180    
181            String pigLog = "pig-" + hadoopJobId + ".log";
182            arguments.add("-logfile");
183            arguments.add(pigLog);
184    
185            String[] pigArgs = MapReduceMain.getStrings(actionConf, "oozie.pig.args");
186            for (String pigArg : pigArgs) {
187                if (DISALLOWED_PIG_OPTIONS.contains(pigArg)) {
188                    throw new RuntimeException("Error: Pig argument " + pigArg + " is not supported");
189                }
190                arguments.add(pigArg);
191            }
192    
193            System.out.println("Pig command arguments :");
194            for (String arg : arguments) {
195                System.out.println("             " + arg);
196            }
197    
198            System.out.println("=================================================================");
199            System.out.println();
200            System.out.println(">>> Invoking Pig command line now >>>");
201            System.out.println();
202            System.out.flush();
203    
204            System.out.println();
205            runPigJob(new String[] { "-version" }, null, true);
206            System.out.println();
207            System.out.flush();
208    
209            runPigJob(arguments.toArray(new String[arguments.size()]), pigLog, false);
210    
211            System.out.println();
212            System.out.println("<<< Invocation of Pig command completed <<<");
213            System.out.println();
214    
215            // harvesting and recording Hadoop Job IDs
216            Properties jobIds = getHadoopJobIds(logFile);
217            File file = new File(System.getProperty("oozie.action.output.properties"));
218            os = new FileOutputStream(file);
219            jobIds.store(os, "");
220            os.close();
221            System.out.println(" Hadoop Job IDs executed by Pig: " + jobIds.getProperty("hadoopJobs"));
222            System.out.println();
223        }
224    
225        private void handleError(String pigLog) throws Exception {
226            System.err.println();
227            System.err.println("Pig logfile dump:");
228            System.err.println();
229            try {
230                BufferedReader reader = new BufferedReader(new FileReader(pigLog));
231                String line = reader.readLine();
232                while (line != null) {
233                    System.err.println(line);
234                    line = reader.readLine();
235                }
236                reader.close();
237            }
238            catch (FileNotFoundException e) {
239                System.err.println("pig log file: " + pigLog + "  not found.");
240            }
241        }
242    
243        /**
244         * Runs the pig script using PigRunner API if version 0.8 or above. Embedded
245         * pig within python is also supported.
246         *
247         * @param args pig command line arguments
248         * @param pigLog pig log file
249         * @param resetSecurityManager specify if need to reset security manager
250         * @throws Exception
251         */
252        protected void runPigJob(String[] args, String pigLog, boolean resetSecurityManager) throws Exception {
253            // running as from the command line
254            boolean pigRunnerExists = true;
255            Class klass;
256            try {
257                klass = Class.forName("org.apache.pig.PigRunner");
258            }
259            catch (ClassNotFoundException ex) {
260                pigRunnerExists = false;
261            }
262    
263            if (pigRunnerExists) {
264                System.out.println("Run pig script using PigRunner.run() for Pig version 0.8+");
265                PigStats stats = PigRunner.run(args, null);
266                // isSuccessful is the API from 0.9 supported by both PigStats and
267                // EmbeddedPigStats
268                if (!stats.isSuccessful()) {
269                    if (pigLog != null) {
270                        handleError(pigLog);
271                    }
272                    throw new LauncherMainException(PigRunner.ReturnCode.FAILURE);
273                }
274            }
275            else {
276                try {
277                    System.out.println("Run pig script using Main.main() for Pig version before 0.8");
278                    Main.main(args);
279                }
280                catch (SecurityException ex) {
281                    if (resetSecurityManager) {
282                        LauncherSecurityManager.reset();
283                    }
284                    else {
285                        if (LauncherSecurityManager.getExitInvoked()) {
286                            if (LauncherSecurityManager.getExitCode() != 0) {
287                                if (pigLog != null) {
288                                    handleError(pigLog);
289                                }
290                                throw ex;
291                            }
292                        }
293                    }
294                }
295            }
296        }
297    
298        public static void setPigScript(Configuration conf, String script, String[] params, String[] args) {
299            conf.set("oozie.pig.script", script);
300            MapReduceMain.setStrings(conf, "oozie.pig.params", params);
301            MapReduceMain.setStrings(conf, "oozie.pig.args", args);
302        }
303    
304        private static final String JOB_ID_LOG_PREFIX = "HadoopJobId: ";
305    
306        protected Properties getHadoopJobIds(String logFile) throws IOException {
307            int jobCount = 0;
308            Properties props = new Properties();
309            StringBuffer sb = new StringBuffer(100);
310            if (new File(logFile).exists() == false) {
311                System.err.println("pig log file: " + logFile + "  not present. Therefore no Hadoop jobids found");
312                props.setProperty("hadoopJobs", "");
313            }
314            else {
315                BufferedReader br = new BufferedReader(new FileReader(logFile));
316                String line = br.readLine();
317                String separator = "";
318                while (line != null) {
319                    if (line.contains(JOB_ID_LOG_PREFIX)) {
320                        int jobIdStarts = line.indexOf(JOB_ID_LOG_PREFIX) + JOB_ID_LOG_PREFIX.length();
321                        String jobId = line.substring(jobIdStarts);
322                        int jobIdEnds = jobId.indexOf(" ");
323                        if (jobIdEnds > -1) {
324                            jobId = jobId.substring(0, jobId.indexOf(" "));
325                        }
326                        sb.append(separator).append(jobId);
327                        separator = ",";
328                    }
329                    line = br.readLine();
330                }
331                br.close();
332                props.setProperty("hadoopJobs", sb.toString());
333            }
334            return props;
335        }
336    
337    }