001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.io.BufferedReader;
021    import java.io.BufferedWriter;
022    import java.io.File;
023    import java.io.FileOutputStream;
024    import java.io.FileReader;
025    import java.io.FileWriter;
026    import java.io.IOException;
027    import java.io.OutputStream;
028    import java.net.URL;
029    import java.util.ArrayList;
030    import java.util.HashMap;
031    import java.util.List;
032    import java.util.Map;
033    import java.util.Map.Entry;
034    import java.util.Properties;
035    import java.util.regex.Pattern;
036    
037    import org.apache.hadoop.conf.Configuration;
038    import org.apache.hadoop.fs.Path;
039    import org.apache.hadoop.hive.cli.CliDriver;
040    
041    public class HiveMain extends LauncherMain {
042        public static final String USER_HIVE_DEFAULT_FILE = "oozie-user-hive-default.xml";
043    
044        private static final Pattern[] HIVE_JOB_IDS_PATTERNS = {
045          Pattern.compile("Ended Job = (job_\\S*)")
046        };
047    
048        public static final String HIVE_L4J_PROPS = "hive-log4j.properties";
049        public static final String HIVE_EXEC_L4J_PROPS = "hive-exec-log4j.properties";
050        public static final String HIVE_SITE_CONF = "hive-site.xml";
051        private static final String HIVE_SCRIPT = "oozie.hive.script";
052        private static final String HIVE_PARAMS = "oozie.hive.params";
053    
054        public static void main(String[] args) throws Exception {
055            run(HiveMain.class, args);
056        }
057    
058        private static Configuration initActionConf() {
059            // Loading action conf prepared by Oozie
060            Configuration hiveConf = new Configuration(false);
061    
062            String actionXml = System.getProperty("oozie.action.conf.xml");
063    
064            if (actionXml == null) {
065                throw new RuntimeException("Missing Java System Property [oozie.action.conf.xml]");
066            }
067            if (!new File(actionXml).exists()) {
068                throw new RuntimeException("Action Configuration XML file [" + actionXml + "] does not exist");
069            } else {
070                System.out.println("Using action configuration file " + actionXml);
071            }
072    
073            hiveConf.addResource(new Path("file:///", actionXml));
074    
075            // Propagate delegation related props from launcher job to Hive job
076            String delegationToken = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
077            if (delegationToken != null) {
078                hiveConf.set("mapreduce.job.credentials.binary", delegationToken);
079                System.out.println("------------------------");
080                System.out.println("Setting env property for mapreduce.job.credentials.binary to: " + delegationToken);
081                System.out.println("------------------------");
082                System.setProperty("mapreduce.job.credentials.binary", delegationToken);
083            } else {
084                System.out.println("Non-Kerberos execution");
085            }
086    
087            // Have to explicitly unset this property or Hive will not set it.
088            hiveConf.set("mapred.job.name", "");
089    
090            // See https://issues.apache.org/jira/browse/HIVE-1411
091            hiveConf.set("datanucleus.plugin.pluginRegistryBundleCheck", "LOG");
092    
093            // to force hive to use the jobclient to submit the job, never using HADOOPBIN (to do localmode)
094            hiveConf.setBoolean("hive.exec.mode.local.auto", false);
095    
096            return hiveConf;
097        }
098    
099        public static String setUpHiveLog4J(Configuration hiveConf) throws IOException {
100            //Logfile to capture job IDs
101            String hadoopJobId = System.getProperty("oozie.launcher.job.id");
102            if (hadoopJobId == null) {
103                throw new RuntimeException("Launcher Hadoop Job ID system property not set");
104            }
105    
106            String logFile = new File("hive-oozie-" + hadoopJobId + ".log").getAbsolutePath();
107    
108            Properties hadoopProps = new Properties();
109    
110            // Preparing log4j configuration
111            URL log4jFile = Thread.currentThread().getContextClassLoader().getResource("log4j.properties");
112            if (log4jFile != null) {
113                // getting hadoop log4j configuration
114                hadoopProps.load(log4jFile.openStream());
115            }
116    
117            String logLevel = hiveConf.get("oozie.hive.log.level", "INFO");
118    
119            hadoopProps.setProperty("log4j.logger.org.apache.hadoop.hive", logLevel + ", A");
120            hadoopProps.setProperty("log4j.logger.hive", logLevel + ", A");
121            hadoopProps.setProperty("log4j.logger.DataNucleus", logLevel + ", A");
122            hadoopProps.setProperty("log4j.logger.DataStore", logLevel + ", A");
123            hadoopProps.setProperty("log4j.logger.JPOX", logLevel + ", A");
124            hadoopProps.setProperty("log4j.appender.A", "org.apache.log4j.ConsoleAppender");
125            hadoopProps.setProperty("log4j.appender.A.layout", "org.apache.log4j.PatternLayout");
126            hadoopProps.setProperty("log4j.appender.A.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n");
127    
128            hadoopProps.setProperty("log4j.appender.jobid", "org.apache.log4j.FileAppender");
129            hadoopProps.setProperty("log4j.appender.jobid.file", logFile);
130            hadoopProps.setProperty("log4j.appender.jobid.layout", "org.apache.log4j.PatternLayout");
131            hadoopProps.setProperty("log4j.appender.jobid.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n");
132            hadoopProps.setProperty("log4j.logger.org.apache.hadoop.hive.ql.exec", "INFO, jobid");
133    
134            String localProps = new File(HIVE_L4J_PROPS).getAbsolutePath();
135            OutputStream os1 = new FileOutputStream(localProps);
136            hadoopProps.store(os1, "");
137            os1.close();
138    
139            localProps = new File(HIVE_EXEC_L4J_PROPS).getAbsolutePath();
140            os1 = new FileOutputStream(localProps);
141            hadoopProps.store(os1, "");
142            os1.close();
143            return logFile;
144        }
145    
146        public static Configuration setUpHiveSite() throws Exception {
147            Configuration hiveConf = initActionConf();
148    
149            // Write the action configuration out to hive-site.xml
150            OutputStream os = new FileOutputStream(HIVE_SITE_CONF);
151            hiveConf.writeXml(os);
152            os.close();
153    
154            System.out.println();
155            System.out.println("Hive Configuration Properties:");
156            System.out.println("------------------------");
157            for (Entry<String, String> entry : hiveConf) {
158                System.out.println(entry.getKey() + "=" + entry.getValue());
159            }
160            System.out.flush();
161            System.out.println("------------------------");
162            System.out.println();
163            return hiveConf;
164        }
165    
166        protected void run(String[] args) throws Exception {
167            System.out.println();
168            System.out.println("Oozie Hive action configuration");
169            System.out.println("=================================================================");
170    
171            Configuration hiveConf = setUpHiveSite();
172    
173            List<String> arguments = new ArrayList<String>();
174            String scriptPath = hiveConf.get(HIVE_SCRIPT);
175    
176            if (scriptPath == null) {
177                throw new RuntimeException("Action Configuration does not have [" +  HIVE_SCRIPT + "] property");
178            }
179    
180            if (!new File(scriptPath).exists()) {
181                throw new RuntimeException("Hive script file [" + scriptPath + "] does not exist");
182            }
183    
184            // check if hive-default.xml is in the classpath, if not look for oozie-user-hive-default.xml
185            // in the current directory (it will be there if the Hive action has the 'oozie.hive.defaults'
186            // property) and rename it to hive-default.xml
187            if (Thread.currentThread().getContextClassLoader().getResource("hive-default.xml") == null) {
188                File userProvidedDefault = new File(USER_HIVE_DEFAULT_FILE);
189                if (userProvidedDefault.exists()) {
190                    if (!userProvidedDefault.renameTo(new File("hive-default.xml"))) {
191                        throw new RuntimeException(
192                                "Could not rename user provided Hive defaults file to 'hive-default.xml'");
193                    }
194                    System.out.println("Using 'hive-default.xml' defined in the Hive action");
195                }
196                else {
197                    throw new RuntimeException(
198                            "Hive JAR does not bundle a 'hive-default.xml' and Hive action does not define one");
199                }
200            }
201            else {
202                System.out.println("Using 'hive-default.xml' defined in the Hive JAR");
203                File userProvidedDefault = new File(USER_HIVE_DEFAULT_FILE);
204                if (userProvidedDefault.exists()) {
205                    System.out.println("WARNING: Ignoring user provided Hive defaults");
206                }
207            }
208            System.out.println();
209    
210            String logFile = setUpHiveLog4J(hiveConf);
211    
212            // print out current directory & its contents
213            File localDir = new File("dummy").getAbsoluteFile().getParentFile();
214            System.out.println("Current (local) dir = " + localDir.getAbsolutePath());
215            System.out.println("------------------------");
216            for (String file : localDir.list()) {
217                System.out.println("  " + file);
218            }
219            System.out.println("------------------------");
220            System.out.println();
221    
222            // Prepare the Hive Script
223            String script = readStringFromFile(scriptPath);
224            System.out.println();
225            System.out.println("Script [" + scriptPath + "] content: ");
226            System.out.println("------------------------");
227            System.out.println(script);
228            System.out.println("------------------------");
229            System.out.println();
230    
231            // Pass any parameters to Hive via arguments
232            String[] params = MapReduceMain.getStrings(hiveConf, HIVE_PARAMS);
233            if (params.length > 0) {
234                System.out.println("Parameters:");
235                System.out.println("------------------------");
236                for (String param : params) {
237                    System.out.println("  " + param);
238    
239                    int idx = param.indexOf('=');
240                    if (idx == -1) {
241                        throw new RuntimeException("Parameter expression must contain an assignment: " + param);
242                    } else if (idx == 0) {
243                        throw new RuntimeException("Parameter value not specified: " + param);
244                    }
245                    arguments.add("--hivevar");
246                    arguments.add(param);
247                }
248                System.out.println("------------------------");
249                System.out.println();
250            }
251    
252            arguments.add("-f");
253            arguments.add(scriptPath);
254    
255    
256            System.out.println("Hive command arguments :");
257            for (String arg : arguments) {
258                System.out.println("             " + arg);
259            }
260            System.out.println();
261    
262            System.out.println("=================================================================");
263            System.out.println();
264            System.out.println(">>> Invoking Hive command line now >>>");
265            System.out.println();
266            System.out.flush();
267    
268            try {
269                runHive(arguments.toArray(new String[arguments.size()]));
270            }
271            catch (SecurityException ex) {
272                if (LauncherSecurityManager.getExitInvoked()) {
273                    if (LauncherSecurityManager.getExitCode() != 0) {
274                        throw ex;
275                    }
276                }
277            }
278    
279            System.out.println("\n<<< Invocation of Hive command completed <<<\n");
280    
281            // harvesting and recording Hadoop Job IDs
282            Properties jobIds = getHadoopJobIds(logFile, HIVE_JOB_IDS_PATTERNS);
283            File file = new File(System.getProperty("oozie.action.output.properties"));
284            OutputStream os = new FileOutputStream(file);
285            jobIds.store(os, "");
286            os.close();
287            System.out.println(" Hadoop Job IDs executed by Hive: " + jobIds.getProperty(HADOOP_JOBS));
288            System.out.println();
289        }
290    
291        private void runHive(String[] args) throws Exception {
292            CliDriver.main(args);
293        }
294    
295        public static void setHiveScript(Configuration conf, String script, String[] params) {
296            conf.set(HIVE_SCRIPT, script);
297            MapReduceMain.setStrings(conf, HIVE_PARAMS, params);
298        }
299    
300        private static String readStringFromFile(String filePath) throws IOException {
301            String line;
302            BufferedReader br = null;
303            try {
304                br = new BufferedReader(new FileReader(filePath));
305                StringBuilder sb = new StringBuilder();
306                String sep = System.getProperty("line.separator");
307                while ((line = br.readLine()) != null) {
308                    sb.append(line).append(sep);
309                }
310                return sb.toString();
311            }
312            finally {
313                if (br != null) {
314                    br.close();
315                }
316            }
317         }
318    }