001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import org.apache.pig.Main;
021    import org.apache.pig.PigRunner;
022    import org.apache.pig.tools.pigstats.JobStats;
023    import org.apache.pig.tools.pigstats.PigStats;
024    import org.apache.hadoop.conf.Configuration;
025    import org.apache.hadoop.fs.Path;
026    
027    import java.io.BufferedWriter;
028    import java.io.FileNotFoundException;
029    import java.io.FileWriter;
030    import java.io.OutputStream;
031    import java.io.FileOutputStream;
032    import java.io.BufferedReader;
033    import java.io.FileReader;
034    import java.io.File;
035    import java.io.IOException;
036    import java.util.Arrays;
037    import java.util.HashSet;
038    import java.util.Map;
039    import java.util.List;
040    import java.util.ArrayList;
041    import java.util.Properties;
042    import java.util.Set;
043    import java.net.URL;
044    import java.util.regex.Pattern;
045    
046    public class PigMain extends LauncherMain {
047        private static final Set<String> DISALLOWED_PIG_OPTIONS = new HashSet<String>();
048        public static final String ACTION_PREFIX = "oozie.action.";
049        public static final String EXTERNAL_CHILD_IDS = ACTION_PREFIX + "externalChildIDs.properties";
050        public static final String EXTERNAL_ACTION_STATS = ACTION_PREFIX + "stats.properties";
051        public static final String EXTERNAL_STATS_WRITE = ACTION_PREFIX + "external.stats.write";
052        public static final int STRING_BUFFER_SIZE = 100;
053    
054        private static final Pattern[] PIG_JOB_IDS_PATTERNS = {
055          Pattern.compile("HadoopJobId: (job_\\S*)")
056        };
057    
058        static {
059            DISALLOWED_PIG_OPTIONS.add("-4");
060            DISALLOWED_PIG_OPTIONS.add("-log4jconf");
061            DISALLOWED_PIG_OPTIONS.add("-e");
062            DISALLOWED_PIG_OPTIONS.add("-execute");
063            DISALLOWED_PIG_OPTIONS.add("-f");
064            DISALLOWED_PIG_OPTIONS.add("-file");
065            DISALLOWED_PIG_OPTIONS.add("-l");
066            DISALLOWED_PIG_OPTIONS.add("-logfile");
067            DISALLOWED_PIG_OPTIONS.add("-r");
068            DISALLOWED_PIG_OPTIONS.add("-dryrun");
069            DISALLOWED_PIG_OPTIONS.add("-x");
070            DISALLOWED_PIG_OPTIONS.add("-exectype");
071            DISALLOWED_PIG_OPTIONS.add("-P");
072            DISALLOWED_PIG_OPTIONS.add("-propertyFile");
073        }
074    
075        public static void main(String[] args) throws Exception {
076            run(PigMain.class, args);
077        }
078    
079        @Override
080        protected void run(String[] args) throws Exception {
081            System.out.println();
082            System.out.println("Oozie Pig action configuration");
083            System.out.println("=================================================================");
084    
085            // loading action conf prepared by Oozie
086            Configuration actionConf = new Configuration(false);
087    
088            String actionXml = System.getProperty("oozie.action.conf.xml");
089    
090            if (actionXml == null) {
091                throw new RuntimeException("Missing Java System Property [oozie.action.conf.xml]");
092            }
093            if (!new File(actionXml).exists()) {
094                throw new RuntimeException("Action Configuration XML file [" + actionXml + "] does not exist");
095            }
096    
097            actionConf.addResource(new Path("file:///", actionXml));
098    
099            Properties pigProperties = new Properties();
100            for (Map.Entry<String, String> entry : actionConf) {
101                pigProperties.setProperty(entry.getKey(), entry.getValue());
102            }
103    
104            // propagate delegation related props from launcher job to Pig job
105            if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
106                pigProperties.setProperty("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
107                System.out.println("------------------------");
108                System.out.println("Setting env property for mapreduce.job.credentials.binary to:"
109                        + System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
110                System.out.println("------------------------");
111                System.setProperty("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
112            }
113            else {
114                System.out.println("Non-kerberoes execution");
115            }
116    
117            OutputStream os = new FileOutputStream("pig.properties");
118            pigProperties.store(os, "");
119            os.close();
120    
121            logMasking("pig.properties:", Arrays.asList("password"), pigProperties.entrySet());
122    
123            List<String> arguments = new ArrayList<String>();
124            String script = actionConf.get("oozie.pig.script");
125    
126            if (script == null) {
127                throw new RuntimeException("Action Configuration does not have [oozie.pig.script] property");
128            }
129    
130            if (!new File(script).exists()) {
131                throw new RuntimeException("Error: Pig script file [" + script + "] does not exist");
132            }
133    
134            System.out.println("Pig script [" + script + "] content: ");
135            System.out.println("------------------------");
136            BufferedReader br = new BufferedReader(new FileReader(script));
137            String line = br.readLine();
138            while (line != null) {
139                System.out.println(line);
140                line = br.readLine();
141            }
142            br.close();
143            System.out.println("------------------------");
144            System.out.println();
145    
146            arguments.add("-file");
147            arguments.add(script);
148            String[] params = MapReduceMain.getStrings(actionConf, "oozie.pig.params");
149            for (String param : params) {
150                arguments.add("-param");
151                arguments.add(param);
152            }
153    
154            String hadoopJobId = System.getProperty("oozie.launcher.job.id");
155            if (hadoopJobId == null) {
156                throw new RuntimeException("Launcher Hadoop Job ID system property not set");
157            }
158    
159            String logFile = new File("pig-oozie-" + hadoopJobId + ".log").getAbsolutePath();
160    
161            URL log4jFile = Thread.currentThread().getContextClassLoader().getResource("log4j.properties");
162            if (log4jFile != null) {
163    
164                String pigLogLevel = actionConf.get("oozie.pig.log.level", "INFO");
165    
166                // append required PIG properties to the default hadoop log4j file
167                Properties hadoopProps = new Properties();
168                hadoopProps.load(log4jFile.openStream());
169                hadoopProps.setProperty("log4j.rootLogger", pigLogLevel + ", A, B");
170                hadoopProps.setProperty("log4j.logger.org.apache.pig", pigLogLevel + ", A, B");
171                hadoopProps.setProperty("log4j.appender.A", "org.apache.log4j.ConsoleAppender");
172                hadoopProps.setProperty("log4j.appender.A.layout", "org.apache.log4j.PatternLayout");
173                hadoopProps.setProperty("log4j.appender.A.layout.ConversionPattern", "%d [%t] %-5p %c %x - %m%n");
174                hadoopProps.setProperty("log4j.appender.B", "org.apache.log4j.FileAppender");
175                hadoopProps.setProperty("log4j.appender.B.file", logFile);
176                hadoopProps.setProperty("log4j.appender.B.layout", "org.apache.log4j.PatternLayout");
177                hadoopProps.setProperty("log4j.appender.B.layout.ConversionPattern", "%d [%t] %-5p %c %x - %m%n");
178    
179                String localProps = new File("piglog4j.properties").getAbsolutePath();
180                OutputStream os1 = new FileOutputStream(localProps);
181                hadoopProps.store(os1, "");
182                os1.close();
183    
184                arguments.add("-log4jconf");
185                arguments.add(localProps);
186    
187                // print out current directory
188                File localDir = new File(localProps).getParentFile();
189                System.out.println("Current (local) dir = " + localDir.getAbsolutePath());
190            }
191            else {
192                System.out.println("log4jfile is null");
193            }
194    
195            String pigLog = "pig-" + hadoopJobId + ".log";
196            arguments.add("-logfile");
197            arguments.add(pigLog);
198    
199            String[] pigArgs = MapReduceMain.getStrings(actionConf, "oozie.pig.args");
200            for (String pigArg : pigArgs) {
201                if (DISALLOWED_PIG_OPTIONS.contains(pigArg)) {
202                    throw new RuntimeException("Error: Pig argument " + pigArg + " is not supported");
203                }
204                arguments.add(pigArg);
205            }
206    
207            System.out.println("Pig command arguments :");
208            for (String arg : arguments) {
209                System.out.println("             " + arg);
210            }
211    
212            System.out.println("=================================================================");
213            System.out.println();
214            System.out.println(">>> Invoking Pig command line now >>>");
215            System.out.println();
216            System.out.flush();
217    
218            System.out.println();
219            runPigJob(new String[] { "-version" }, null, true, false);
220            System.out.println();
221            System.out.flush();
222            boolean hasStats = Boolean.parseBoolean(actionConf.get(EXTERNAL_STATS_WRITE));
223            runPigJob(arguments.toArray(new String[arguments.size()]), pigLog, false, hasStats);
224    
225            System.out.println();
226            System.out.println("<<< Invocation of Pig command completed <<<");
227            System.out.println();
228    
229            // For embedded python or for version of pig lower than 0.8, pig stats are not supported.
230            // So retrieving hadoop Ids here
231            File file = new File(System.getProperty(EXTERNAL_CHILD_IDS));
232            if (!file.exists()) {
233                Properties props = getHadoopJobIds(logFile, PIG_JOB_IDS_PATTERNS);
234                writeExternalData(props.getProperty(HADOOP_JOBS), file);
235                System.out.println(" Hadoop Job IDs executed by Pig: " + props.getProperty(HADOOP_JOBS));
236                System.out.println();
237            }
238        }
239    
240    
241    
242        private void handleError(String pigLog) throws Exception {
243            System.err.println();
244            System.err.println("Pig logfile dump:");
245            System.err.println();
246            try {
247                BufferedReader reader = new BufferedReader(new FileReader(pigLog));
248                String line = reader.readLine();
249                while (line != null) {
250                    System.err.println(line);
251                    line = reader.readLine();
252                }
253                reader.close();
254            }
255            catch (FileNotFoundException e) {
256                System.err.println("pig log file: " + pigLog + "  not found.");
257            }
258        }
259    
260        /**
261         * Runs the pig script using PigRunner API if version 0.8 or above. Embedded
262         * pig within python is also supported.
263         *
264         * @param args pig command line arguments
265         * @param pigLog pig log file
266         * @param resetSecurityManager specify if need to reset security manager
267         * @param retrieveStats specify if stats are to be retrieved
268         * @throws Exception
269         */
270        protected void runPigJob(String[] args, String pigLog, boolean resetSecurityManager, boolean retrieveStats) throws Exception {
271            // running as from the command line
272            boolean pigRunnerExists = true;
273            Class klass;
274            try {
275                klass = Class.forName("org.apache.pig.PigRunner");
276            }
277            catch (ClassNotFoundException ex) {
278                pigRunnerExists = false;
279            }
280    
281            if (pigRunnerExists) {
282                System.out.println("Run pig script using PigRunner.run() for Pig version 0.8+");
283                PigStats stats = PigRunner.run(args, null);
284                String jobIds = getHadoopJobIds(stats);
285                if (jobIds != null && !jobIds.isEmpty()) {
286                    System.out.println("Hadoop Job IDs executed by Pig: " + jobIds);
287                    File f = new File(System.getProperty(EXTERNAL_CHILD_IDS));
288                    writeExternalData(jobIds, f);
289                }
290                // isSuccessful is the API from 0.9 supported by both PigStats and
291                // EmbeddedPigStats
292                if (!stats.isSuccessful()) {
293                    if (pigLog != null) {
294                        handleError(pigLog);
295                    }
296                    throw new LauncherMainException(PigRunner.ReturnCode.FAILURE);
297                }
298                else {
299                    // If pig command is ran with just the "version" option, then
300                    // return
301                    if (resetSecurityManager) {
302                        return;
303                    }
304                    // Retrieve stats only if user has specified in workflow
305                    // configuration
306                    if (retrieveStats) {
307                        ActionStats pigStats;
308                        String JSONString;
309                        try {
310                            pigStats = new OoziePigStats(stats);
311                            JSONString = pigStats.toJSON();
312                        } catch (UnsupportedOperationException uoe) {
313                            throw new UnsupportedOperationException(
314                                    "Pig stats are not supported for this type of operation", uoe);
315                        }
316                        File f = new File(System.getProperty(EXTERNAL_ACTION_STATS));
317                        writeExternalData(JSONString, f);
318                    }
319                }
320            }
321            else {
322                try {
323                    System.out.println("Run pig script using Main.main() for Pig version before 0.8");
324                    Main.main(args);
325                }
326                catch (SecurityException ex) {
327                    if (resetSecurityManager) {
328                        LauncherSecurityManager.reset();
329                    }
330                    else {
331                        if (LauncherSecurityManager.getExitInvoked()) {
332                            if (LauncherSecurityManager.getExitCode() != 0) {
333                                if (pigLog != null) {
334                                    handleError(pigLog);
335                                }
336                                throw ex;
337                            }
338                        }
339                    }
340                }
341            }
342        }
343    
344        // write external data(stats, hadoopIds) to the file which will be read by the LauncherMapper
345        private static void writeExternalData(String data, File f) throws IOException {
346            BufferedWriter out = null;
347            try {
348                out = new BufferedWriter(new FileWriter(f));
349                out.write(data);
350            }
351            finally {
352                if (out != null) {
353                    out.close();
354                }
355            }
356        }
357    
358        public static void setPigScript(Configuration conf, String script, String[] params, String[] args) {
359            conf.set("oozie.pig.script", script);
360            MapReduceMain.setStrings(conf, "oozie.pig.params", params);
361            MapReduceMain.setStrings(conf, "oozie.pig.args", args);
362        }
363    
364        /**
365         * Get Hadoop Ids through PigStats API
366         *
367         * @param pigStats stats object obtained through PigStats API
368         * @return comma-separated String
369         */
370        protected String getHadoopJobIds(PigStats pigStats) {
371            StringBuilder sb = new StringBuilder(STRING_BUFFER_SIZE);
372            String separator = ",";
373            // Collect Hadoop Ids through JobGraph API of Pig and store them as
374            // comma separated string
375            try {
376                PigStats.JobGraph jobGraph = pigStats.getJobGraph();
377                for (JobStats jobStats : jobGraph) {
378                    String hadoopJobId = jobStats.getJobId();
379                    if (sb.length() > 0) {
380                        sb.append(separator);
381                    }
382                    sb.append(hadoopJobId);
383                }
384            }
385            // Return null if Pig API's are not supported
386            catch (UnsupportedOperationException uoe) {
387                return null;
388            }
389            return sb.toString();
390        }
391    
392    }