001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.io.BufferedReader;
021    import java.io.File;
022    import java.io.FileOutputStream;
023    import java.io.FileReader;
024    import java.io.IOException;
025    import java.io.OutputStream;
026    import java.net.URL;
027    import java.util.ArrayList;
028    import java.util.List;
029    import java.util.Map.Entry;
030    import java.util.Properties;
031    import java.util.regex.Pattern;
032    
033    import org.apache.hadoop.conf.Configuration;
034    import org.apache.hadoop.fs.Path;
035    import org.apache.hadoop.hive.cli.CliDriver;
036    
037    public class HiveMain extends LauncherMain {
038        private static final Pattern[] HIVE_JOB_IDS_PATTERNS = {
039          Pattern.compile("Ended Job = (job_\\S*)")
040        };
041    
042        public static final String HIVE_L4J_PROPS = "hive-log4j.properties";
043        public static final String HIVE_EXEC_L4J_PROPS = "hive-exec-log4j.properties";
044        public static final String HIVE_SITE_CONF = "hive-site.xml";
045        private static final String HIVE_SCRIPT = "oozie.hive.script";
046        private static final String HIVE_PARAMS = "oozie.hive.params";
047    
048        public static void main(String[] args) throws Exception {
049            run(HiveMain.class, args);
050        }
051    
052        private static Configuration initActionConf() {
053            // Loading action conf prepared by Oozie
054            Configuration hiveConf = new Configuration(false);
055    
056            String actionXml = System.getProperty("oozie.action.conf.xml");
057    
058            if (actionXml == null) {
059                throw new RuntimeException("Missing Java System Property [oozie.action.conf.xml]");
060            }
061            if (!new File(actionXml).exists()) {
062                throw new RuntimeException("Action Configuration XML file [" + actionXml + "] does not exist");
063            } else {
064                System.out.println("Using action configuration file " + actionXml);
065            }
066    
067            hiveConf.addResource(new Path("file:///", actionXml));
068    
069            // Propagate delegation related props from launcher job to Hive job
070            String delegationToken = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
071            if (delegationToken != null) {
072                hiveConf.set("mapreduce.job.credentials.binary", delegationToken);
073                System.out.println("------------------------");
074                System.out.println("Setting env property for mapreduce.job.credentials.binary to: " + delegationToken);
075                System.out.println("------------------------");
076                System.setProperty("mapreduce.job.credentials.binary", delegationToken);
077            } else {
078                System.out.println("Non-Kerberos execution");
079            }
080    
081            // Have to explicitly unset this property or Hive will not set it.
082            hiveConf.set("mapred.job.name", "");
083    
084            // See https://issues.apache.org/jira/browse/HIVE-1411
085            hiveConf.set("datanucleus.plugin.pluginRegistryBundleCheck", "LOG");
086    
087            // to force hive to use the jobclient to submit the job, never using HADOOPBIN (to do localmode)
088            hiveConf.setBoolean("hive.exec.mode.local.auto", false);
089    
090            return hiveConf;
091        }
092    
093        public static String setUpHiveLog4J(Configuration hiveConf) throws IOException {
094            //Logfile to capture job IDs
095            String hadoopJobId = System.getProperty("oozie.launcher.job.id");
096            if (hadoopJobId == null) {
097                throw new RuntimeException("Launcher Hadoop Job ID system property not set");
098            }
099    
100            String logFile = new File("hive-oozie-" + hadoopJobId + ".log").getAbsolutePath();
101    
102            Properties hadoopProps = new Properties();
103    
104            // Preparing log4j configuration
105            URL log4jFile = Thread.currentThread().getContextClassLoader().getResource("log4j.properties");
106            if (log4jFile != null) {
107                // getting hadoop log4j configuration
108                hadoopProps.load(log4jFile.openStream());
109            }
110    
111            String logLevel = hiveConf.get("oozie.hive.log.level", "INFO");
112    
113            hadoopProps.setProperty("log4j.logger.org.apache.hadoop.hive", logLevel + ", A");
114            hadoopProps.setProperty("log4j.logger.hive", logLevel + ", A");
115            hadoopProps.setProperty("log4j.logger.DataNucleus", logLevel + ", A");
116            hadoopProps.setProperty("log4j.logger.DataStore", logLevel + ", A");
117            hadoopProps.setProperty("log4j.logger.JPOX", logLevel + ", A");
118            hadoopProps.setProperty("log4j.appender.A", "org.apache.log4j.ConsoleAppender");
119            hadoopProps.setProperty("log4j.appender.A.layout", "org.apache.log4j.PatternLayout");
120            hadoopProps.setProperty("log4j.appender.A.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n");
121    
122            hadoopProps.setProperty("log4j.appender.jobid", "org.apache.log4j.FileAppender");
123            hadoopProps.setProperty("log4j.appender.jobid.file", logFile);
124            hadoopProps.setProperty("log4j.appender.jobid.layout", "org.apache.log4j.PatternLayout");
125            hadoopProps.setProperty("log4j.appender.jobid.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n");
126            hadoopProps.setProperty("log4j.logger.org.apache.hadoop.hive.ql.exec", "INFO, jobid");
127    
128            String localProps = new File(HIVE_L4J_PROPS).getAbsolutePath();
129            OutputStream os1 = new FileOutputStream(localProps);
130            hadoopProps.store(os1, "");
131            os1.close();
132    
133            localProps = new File(HIVE_EXEC_L4J_PROPS).getAbsolutePath();
134            os1 = new FileOutputStream(localProps);
135            hadoopProps.store(os1, "");
136            os1.close();
137            return logFile;
138        }
139    
140        public static Configuration setUpHiveSite() throws Exception {
141            Configuration hiveConf = initActionConf();
142    
143            // Write the action configuration out to hive-site.xml
144            OutputStream os = new FileOutputStream(HIVE_SITE_CONF);
145            hiveConf.writeXml(os);
146            os.close();
147    
148            System.out.println();
149            System.out.println("Hive Configuration Properties:");
150            System.out.println("------------------------");
151            for (Entry<String, String> entry : hiveConf) {
152                System.out.println(entry.getKey() + "=" + entry.getValue());
153            }
154            System.out.flush();
155            System.out.println("------------------------");
156            System.out.println();
157            return hiveConf;
158        }
159    
160        protected void run(String[] args) throws Exception {
161            System.out.println();
162            System.out.println("Oozie Hive action configuration");
163            System.out.println("=================================================================");
164            System.out.println();
165    
166            Configuration hiveConf = setUpHiveSite();
167    
168            List<String> arguments = new ArrayList<String>();
169            String scriptPath = hiveConf.get(HIVE_SCRIPT);
170    
171            if (scriptPath == null) {
172                throw new RuntimeException("Action Configuration does not have [" +  HIVE_SCRIPT + "] property");
173            }
174    
175            if (!new File(scriptPath).exists()) {
176                throw new RuntimeException("Hive script file [" + scriptPath + "] does not exist");
177            }
178    
179            String logFile = setUpHiveLog4J(hiveConf);
180    
181            // print out current directory & its contents
182            File localDir = new File("dummy").getAbsoluteFile().getParentFile();
183            System.out.println("Current (local) dir = " + localDir.getAbsolutePath());
184            System.out.println("------------------------");
185            for (String file : localDir.list()) {
186                System.out.println("  " + file);
187            }
188            System.out.println("------------------------");
189            System.out.println();
190    
191            // Prepare the Hive Script
192            String script = readStringFromFile(scriptPath);
193            System.out.println();
194            System.out.println("Script [" + scriptPath + "] content: ");
195            System.out.println("------------------------");
196            System.out.println(script);
197            System.out.println("------------------------");
198            System.out.println();
199    
200            // Pass any parameters to Hive via arguments
201            String[] params = MapReduceMain.getStrings(hiveConf, HIVE_PARAMS);
202            if (params.length > 0) {
203                System.out.println("Parameters:");
204                System.out.println("------------------------");
205                for (String param : params) {
206                    System.out.println("  " + param);
207    
208                    int idx = param.indexOf('=');
209                    if (idx == -1) {
210                        throw new RuntimeException("Parameter expression must contain an assignment: " + param);
211                    } else if (idx == 0) {
212                        throw new RuntimeException("Parameter value not specified: " + param);
213                    }
214                    arguments.add("--hivevar");
215                    arguments.add(param);
216                }
217                System.out.println("------------------------");
218                System.out.println();
219            }
220    
221            arguments.add("-f");
222            arguments.add(scriptPath);
223    
224    
225            System.out.println("Hive command arguments :");
226            for (String arg : arguments) {
227                System.out.println("             " + arg);
228            }
229            System.out.println();
230    
231            System.out.println("=================================================================");
232            System.out.println();
233            System.out.println(">>> Invoking Hive command line now >>>");
234            System.out.println();
235            System.out.flush();
236    
237            try {
238                runHive(arguments.toArray(new String[arguments.size()]));
239            }
240            catch (SecurityException ex) {
241                if (LauncherSecurityManager.getExitInvoked()) {
242                    if (LauncherSecurityManager.getExitCode() != 0) {
243                        throw ex;
244                    }
245                }
246            }
247    
248            System.out.println("\n<<< Invocation of Hive command completed <<<\n");
249    
250            // harvesting and recording Hadoop Job IDs
251            Properties jobIds = getHadoopJobIds(logFile, HIVE_JOB_IDS_PATTERNS);
252            File file = new File(System.getProperty("oozie.action.output.properties"));
253            OutputStream os = new FileOutputStream(file);
254            jobIds.store(os, "");
255            os.close();
256            System.out.println(" Hadoop Job IDs executed by Hive: " + jobIds.getProperty(HADOOP_JOBS));
257            System.out.println();
258        }
259    
260        private void runHive(String[] args) throws Exception {
261            CliDriver.main(args);
262        }
263    
264        public static void setHiveScript(Configuration conf, String script, String[] params) {
265            conf.set(HIVE_SCRIPT, script);
266            MapReduceMain.setStrings(conf, HIVE_PARAMS, params);
267        }
268    
269        private static String readStringFromFile(String filePath) throws IOException {
270            String line;
271            BufferedReader br = null;
272            try {
273                br = new BufferedReader(new FileReader(filePath));
274                StringBuilder sb = new StringBuilder();
275                String sep = System.getProperty("line.separator");
276                while ((line = br.readLine()) != null) {
277                    sb.append(line).append(sep);
278                }
279                return sb.toString();
280            }
281            finally {
282                if (br != null) {
283                    br.close();
284                }
285            }
286         }
287    }