001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import org.apache.pig.Main;
021    import org.apache.hadoop.conf.Configuration;
022    import org.apache.hadoop.fs.Path;
023    import org.apache.hadoop.mapred.JobClient;
024    
025    import java.io.FileNotFoundException;
026    import java.io.OutputStream;
027    import java.io.FileOutputStream;
028    import java.io.BufferedReader;
029    import java.io.FileReader;
030    import java.io.File;
031    import java.io.IOException;
032    import java.util.HashSet;
033    import java.util.Map;
034    import java.util.List;
035    import java.util.ArrayList;
036    import java.util.Properties;
037    import java.util.Set;
038    import java.net.URL;
039    
040    public class PigMainWithOldAPI extends LauncherMain {
041        private static final Set<String> DISALLOWED_PIG_OPTIONS = new HashSet<String>();
042    
043        static {
044            DISALLOWED_PIG_OPTIONS.add("-4");
045            DISALLOWED_PIG_OPTIONS.add("-log4jconf");
046            DISALLOWED_PIG_OPTIONS.add("-e");
047            DISALLOWED_PIG_OPTIONS.add("-execute");
048            DISALLOWED_PIG_OPTIONS.add("-f");
049            DISALLOWED_PIG_OPTIONS.add("-file");
050            DISALLOWED_PIG_OPTIONS.add("-l");
051            DISALLOWED_PIG_OPTIONS.add("-logfile");
052            DISALLOWED_PIG_OPTIONS.add("-r");
053            DISALLOWED_PIG_OPTIONS.add("-dryrun");
054            DISALLOWED_PIG_OPTIONS.add("-x");
055            DISALLOWED_PIG_OPTIONS.add("-exectype");
056            DISALLOWED_PIG_OPTIONS.add("-P");
057            DISALLOWED_PIG_OPTIONS.add("-propertyFile");
058        }
059    
060        public static void main(String[] args) throws Exception {
061            run(PigMainWithOldAPI.class, args);
062        }
063    
064        protected void run(String[] args) throws Exception {
065            System.out.println();
066            System.out.println("Oozie Pig action configuration");
067            System.out.println("=================================================================");
068    
069            // loading action conf prepared by Oozie
070            Configuration actionConf = new Configuration(false);
071    
072            String actionXml = System.getProperty("oozie.action.conf.xml");
073    
074            if (actionXml == null) {
075                throw new RuntimeException("Missing Java System Property [oozie.action.conf.xml]");
076            }
077            if (!new File(actionXml).exists()) {
078                throw new RuntimeException("Action Configuration XML file [" + actionXml + "] does not exist");
079            }
080    
081            actionConf.addResource(new Path("file:///", actionXml));
082    
083            Properties pigProperties = new Properties();
084            for (Map.Entry<String, String> entry : actionConf) {
085                pigProperties.setProperty(entry.getKey(), entry.getValue());
086            }
087    
088            //propagate delegation related props from launcher job to Pig job
089            if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
090                pigProperties.setProperty("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
091                System.out.println("------------------------");
092                System.out.println("Setting env property for mapreduce.job.credentials.binary to:"
093                        + System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
094                System.out.println("------------------------");
095                System.setProperty("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
096            }
097            else {
098                System.out.println("Non-kerberoes execution");
099            }
100    
101            OutputStream os = new FileOutputStream("pig.properties");
102            pigProperties.store(os, "");
103            os.close();
104    
105            System.out.println();
106            System.out.println("pig.properties content:");
107            System.out.println("------------------------");
108            pigProperties.store(System.out, "");
109            System.out.flush();
110            System.out.println("------------------------");
111            System.out.println();
112    
113            List<String> arguments = new ArrayList<String>();
114            String script = actionConf.get("oozie.pig.script");
115    
116            if (script == null) {
117                throw new RuntimeException("Action Configuration does not have [oozie.pig.script] property");
118            }
119    
120            if (!new File(script).exists()) {
121                throw new RuntimeException("Error: Pig script file [" + script + "] does not exist");
122            }
123    
124            System.out.println("Pig script [" + script + "] content: ");
125            System.out.println("------------------------");
126            BufferedReader br = new BufferedReader(new FileReader(script));
127            String line = br.readLine();
128            while (line != null) {
129                System.out.println(line);
130                line = br.readLine();
131            }
132            br.close();
133            System.out.println("------------------------");
134            System.out.println();
135    
136            arguments.add("-file");
137            arguments.add(script);
138            String[] params = MapReduceMain.getStrings(actionConf, "oozie.pig.params");
139            for (String param : params) {
140                arguments.add("-param");
141                arguments.add(param);
142            }
143    
144            String hadoopJobId = System.getProperty("oozie.launcher.job.id");
145            if (hadoopJobId == null) {
146                throw new RuntimeException("Launcher Hadoop Job ID system property not set");
147            }
148    
149            String logFile = new File("pig-oozie-" + hadoopJobId + ".log").getAbsolutePath();
150    
151            URL log4jFile = Thread.currentThread().getContextClassLoader().getResource("log4j.properties");
152            if (log4jFile != null) {
153    
154                String pigLogLevel = actionConf.get("oozie.pig.log.level", "INFO");
155    
156                // append required PIG properties to the default hadoop log4j file
157                Properties hadoopProps = new Properties();
158                hadoopProps.load(log4jFile.openStream());
159                hadoopProps.setProperty("log4j.logger.org.apache.pig", pigLogLevel + ", A, B");
160                hadoopProps.setProperty("log4j.appender.A", "org.apache.log4j.ConsoleAppender");
161                hadoopProps.setProperty("log4j.appender.A.layout", "org.apache.log4j.PatternLayout");
162                hadoopProps.setProperty("log4j.appender.A.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n");
163                hadoopProps.setProperty("log4j.appender.B", "org.apache.log4j.FileAppender");
164                hadoopProps.setProperty("log4j.appender.B.file", logFile);
165                hadoopProps.setProperty("log4j.appender.B.layout", "org.apache.log4j.PatternLayout");
166                hadoopProps.setProperty("log4j.appender.B.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n");
167    
168                String localProps = new File("piglog4j.properties").getAbsolutePath();
169                OutputStream os1 = new FileOutputStream(localProps);
170                hadoopProps.store(os1, "");
171                os1.close();
172    
173                arguments.add("-log4jconf");
174                arguments.add(localProps);
175    
176                // print out current directory
177                File localDir = new File(localProps).getParentFile();
178                System.out.println("Current (local) dir = " + localDir.getAbsolutePath());
179            }
180            else {
181                System.out.println("log4jfile is null");
182            }
183    
184            String pigLog = "pig-" + hadoopJobId + ".log";
185            arguments.add("-logfile");
186            arguments.add(pigLog);
187    
188            String[] pigArgs = MapReduceMain.getStrings(actionConf, "oozie.pig.args");
189            for (String pigArg : pigArgs) {
190                if (DISALLOWED_PIG_OPTIONS.contains(pigArg)) {
191                    throw new RuntimeException("Error: Pig argument " + pigArg + " is not supported");
192                }
193                arguments.add(pigArg);
194            }
195    
196            System.out.println("Pig command arguments :");
197            for (String arg : arguments) {
198                System.out.println("             " + arg);
199            }
200    
201            System.out.println("=================================================================");
202            System.out.println();
203            System.out.println(">>> Invoking Pig command line now >>>");
204            System.out.println();
205            System.out.flush();
206    
207            try {
208                System.out.println();
209                runPigJob(new String[] { "-version" });
210            }
211            catch (SecurityException ex) {
212                LauncherSecurityManager.reset();
213            }
214            System.out.println();
215            System.out.flush();
216            try {
217                runPigJob(arguments.toArray(new String[arguments.size()]));
218            }
219            catch (SecurityException ex) {
220                if (LauncherSecurityManager.getExitInvoked()) {
221                    if (LauncherSecurityManager.getExitCode() != 0) {
222                        System.err.println();
223                        System.err.println("Pig logfile dump:");
224                        System.err.println();
225                        try {
226                            BufferedReader reader = new BufferedReader(new FileReader(pigLog));
227                            line = reader.readLine();
228                            while (line != null) {
229                                System.err.println(line);
230                                line = reader.readLine();
231                            }
232                            reader.close();
233                        }
234                        catch (FileNotFoundException e) {
235                            System.err.println("pig log file: " + pigLog + "  not found.");
236                        }
237                        throw ex;
238                    }
239                }
240            }
241    
242            System.out.println();
243            System.out.println("<<< Invocation of Pig command completed <<<");
244            System.out.println();
245    
246            // harvesting and recording Hadoop Job IDs
247            Properties jobIds = getHadoopJobIds(logFile);
248            File file = new File(System.getProperty("oozie.action.output.properties"));
249            os = new FileOutputStream(file);
250            jobIds.store(os, "");
251            os.close();
252            System.out.println(" Hadoop Job IDs executed by Pig: " + jobIds.getProperty(HADOOP_JOBS));
253            System.out.println();
254        }
255    
256        protected void runPigJob(String[] args) throws Exception {
257            // running as from the command line
258            Main.main(args);
259        }
260    
261        public static void setPigScript(Configuration conf, String script, String[] params, String[] args) {
262            conf.set("oozie.pig.script", script);
263            MapReduceMain.setStrings(conf, "oozie.pig.params", params);
264            MapReduceMain.setStrings(conf, "oozie.pig.args", args);
265        }
266    
267        private static final String JOB_ID_LOG_PREFIX = "HadoopJobId: ";
268    
269        protected Properties getHadoopJobIds(String logFile) throws IOException {
270            int jobCount = 0;
271            Properties props = new Properties();
272            StringBuffer sb = new StringBuffer(100);
273            if (new File(logFile).exists() == false) {
274                System.err.println("pig log file: " + logFile + "  not present. Therefore no Hadoop jobids found");
275                props.setProperty(HADOOP_JOBS, "");
276            }
277            else {
278                BufferedReader br = new BufferedReader(new FileReader(logFile));
279                String line = br.readLine();
280                String separator = "";
281                while (line != null) {
282                    if (line.contains(JOB_ID_LOG_PREFIX)) {
283                        int jobIdStarts = line.indexOf(JOB_ID_LOG_PREFIX) + JOB_ID_LOG_PREFIX.length();
284                        String jobId = line.substring(jobIdStarts);
285                        int jobIdEnds = jobId.indexOf(" ");
286                        if (jobIdEnds > -1) {
287                            jobId = jobId.substring(0, jobId.indexOf(" "));
288                        }
289                        sb.append(separator).append(jobId);
290                        separator = ",";
291                    }
292                    line = br.readLine();
293                }
294                br.close();
295                props.setProperty(HADOOP_JOBS, sb.toString());
296            }
297            return props;
298        }
299    
300    }