001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import java.io.BufferedReader; 021 import java.io.File; 022 import java.io.FileOutputStream; 023 import java.io.FileReader; 024 import java.io.IOException; 025 import java.io.OutputStream; 026 import java.net.URL; 027 import java.util.ArrayList; 028 import java.util.List; 029 import java.util.Map.Entry; 030 import java.util.Properties; 031 import java.util.regex.Pattern; 032 033 import org.apache.hadoop.conf.Configuration; 034 import org.apache.hadoop.fs.Path; 035 import org.apache.hadoop.hive.cli.CliDriver; 036 037 public class HiveMain extends LauncherMain { 038 private static final Pattern[] HIVE_JOB_IDS_PATTERNS = { 039 Pattern.compile("Ended Job = (job_\\S*)") 040 }; 041 042 public static final String HIVE_L4J_PROPS = "hive-log4j.properties"; 043 public static final String HIVE_EXEC_L4J_PROPS = "hive-exec-log4j.properties"; 044 public static final String HIVE_SITE_CONF = "hive-site.xml"; 045 private static final String HIVE_SCRIPT = "oozie.hive.script"; 046 private static final String HIVE_PARAMS = "oozie.hive.params"; 047 048 public static void main(String[] args) throws Exception { 049 run(HiveMain.class, args); 050 } 051 052 private static Configuration initActionConf() { 053 // Loading action conf prepared by Oozie 054 Configuration hiveConf = new Configuration(false); 055 056 String actionXml = System.getProperty("oozie.action.conf.xml"); 057 058 if (actionXml == null) { 059 throw new RuntimeException("Missing Java System Property [oozie.action.conf.xml]"); 060 } 061 if (!new File(actionXml).exists()) { 062 throw new RuntimeException("Action Configuration XML file [" + actionXml + "] does not exist"); 063 } else { 064 System.out.println("Using action configuration file " + actionXml); 065 } 066 067 hiveConf.addResource(new Path("file:///", actionXml)); 068 069 // Propagate delegation related props from launcher job to Hive job 070 String delegationToken = System.getenv("HADOOP_TOKEN_FILE_LOCATION"); 071 if (delegationToken != null) { 072 hiveConf.set("mapreduce.job.credentials.binary", delegationToken); 073 System.out.println("------------------------"); 074 System.out.println("Setting env property for mapreduce.job.credentials.binary to: " + delegationToken); 075 System.out.println("------------------------"); 076 System.setProperty("mapreduce.job.credentials.binary", delegationToken); 077 } else { 078 System.out.println("Non-Kerberos execution"); 079 } 080 081 // Have to explicitly unset this property or Hive will not set it. 082 hiveConf.set("mapred.job.name", ""); 083 084 // See https://issues.apache.org/jira/browse/HIVE-1411 085 hiveConf.set("datanucleus.plugin.pluginRegistryBundleCheck", "LOG"); 086 087 // to force hive to use the jobclient to submit the job, never using HADOOPBIN (to do localmode) 088 hiveConf.setBoolean("hive.exec.mode.local.auto", false); 089 090 return hiveConf; 091 } 092 093 public static String setUpHiveLog4J(Configuration hiveConf) throws IOException { 094 //Logfile to capture job IDs 095 String hadoopJobId = System.getProperty("oozie.launcher.job.id"); 096 if (hadoopJobId == null) { 097 throw new RuntimeException("Launcher Hadoop Job ID system property not set"); 098 } 099 100 String logFile = new File("hive-oozie-" + hadoopJobId + ".log").getAbsolutePath(); 101 102 Properties hadoopProps = new Properties(); 103 104 // Preparing log4j configuration 105 URL log4jFile = Thread.currentThread().getContextClassLoader().getResource("log4j.properties"); 106 if (log4jFile != null) { 107 // getting hadoop log4j configuration 108 hadoopProps.load(log4jFile.openStream()); 109 } 110 111 String logLevel = hiveConf.get("oozie.hive.log.level", "INFO"); 112 113 hadoopProps.setProperty("log4j.logger.org.apache.hadoop.hive", logLevel + ", A"); 114 hadoopProps.setProperty("log4j.logger.hive", logLevel + ", A"); 115 hadoopProps.setProperty("log4j.logger.DataNucleus", logLevel + ", A"); 116 hadoopProps.setProperty("log4j.logger.DataStore", logLevel + ", A"); 117 hadoopProps.setProperty("log4j.logger.JPOX", logLevel + ", A"); 118 hadoopProps.setProperty("log4j.appender.A", "org.apache.log4j.ConsoleAppender"); 119 hadoopProps.setProperty("log4j.appender.A.layout", "org.apache.log4j.PatternLayout"); 120 hadoopProps.setProperty("log4j.appender.A.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n"); 121 122 hadoopProps.setProperty("log4j.appender.jobid", "org.apache.log4j.FileAppender"); 123 hadoopProps.setProperty("log4j.appender.jobid.file", logFile); 124 hadoopProps.setProperty("log4j.appender.jobid.layout", "org.apache.log4j.PatternLayout"); 125 hadoopProps.setProperty("log4j.appender.jobid.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n"); 126 hadoopProps.setProperty("log4j.logger.org.apache.hadoop.hive.ql.exec", "INFO, jobid"); 127 128 String localProps = new File(HIVE_L4J_PROPS).getAbsolutePath(); 129 OutputStream os1 = new FileOutputStream(localProps); 130 hadoopProps.store(os1, ""); 131 os1.close(); 132 133 localProps = new File(HIVE_EXEC_L4J_PROPS).getAbsolutePath(); 134 os1 = new FileOutputStream(localProps); 135 hadoopProps.store(os1, ""); 136 os1.close(); 137 return logFile; 138 } 139 140 public static Configuration setUpHiveSite() throws Exception { 141 Configuration hiveConf = initActionConf(); 142 143 // Write the action configuration out to hive-site.xml 144 OutputStream os = new FileOutputStream(HIVE_SITE_CONF); 145 hiveConf.writeXml(os); 146 os.close(); 147 148 System.out.println(); 149 System.out.println("Hive Configuration Properties:"); 150 System.out.println("------------------------"); 151 for (Entry<String, String> entry : hiveConf) { 152 System.out.println(entry.getKey() + "=" + entry.getValue()); 153 } 154 System.out.flush(); 155 System.out.println("------------------------"); 156 System.out.println(); 157 return hiveConf; 158 } 159 160 protected void run(String[] args) throws Exception { 161 System.out.println(); 162 System.out.println("Oozie Hive action configuration"); 163 System.out.println("================================================================="); 164 System.out.println(); 165 166 Configuration hiveConf = setUpHiveSite(); 167 168 List<String> arguments = new ArrayList<String>(); 169 String scriptPath = hiveConf.get(HIVE_SCRIPT); 170 171 if (scriptPath == null) { 172 throw new RuntimeException("Action Configuration does not have [" + HIVE_SCRIPT + "] property"); 173 } 174 175 if (!new File(scriptPath).exists()) { 176 throw new RuntimeException("Hive script file [" + scriptPath + "] does not exist"); 177 } 178 179 String logFile = setUpHiveLog4J(hiveConf); 180 181 // print out current directory & its contents 182 File localDir = new File("dummy").getAbsoluteFile().getParentFile(); 183 System.out.println("Current (local) dir = " + localDir.getAbsolutePath()); 184 System.out.println("------------------------"); 185 for (String file : localDir.list()) { 186 System.out.println(" " + file); 187 } 188 System.out.println("------------------------"); 189 System.out.println(); 190 191 // Prepare the Hive Script 192 String script = readStringFromFile(scriptPath); 193 System.out.println(); 194 System.out.println("Script [" + scriptPath + "] content: "); 195 System.out.println("------------------------"); 196 System.out.println(script); 197 System.out.println("------------------------"); 198 System.out.println(); 199 200 // Pass any parameters to Hive via arguments 201 String[] params = MapReduceMain.getStrings(hiveConf, HIVE_PARAMS); 202 if (params.length > 0) { 203 System.out.println("Parameters:"); 204 System.out.println("------------------------"); 205 for (String param : params) { 206 System.out.println(" " + param); 207 208 int idx = param.indexOf('='); 209 if (idx == -1) { 210 throw new RuntimeException("Parameter expression must contain an assignment: " + param); 211 } else if (idx == 0) { 212 throw new RuntimeException("Parameter value not specified: " + param); 213 } 214 arguments.add("--hivevar"); 215 arguments.add(param); 216 } 217 System.out.println("------------------------"); 218 System.out.println(); 219 } 220 221 arguments.add("-f"); 222 arguments.add(scriptPath); 223 224 225 System.out.println("Hive command arguments :"); 226 for (String arg : arguments) { 227 System.out.println(" " + arg); 228 } 229 System.out.println(); 230 231 System.out.println("================================================================="); 232 System.out.println(); 233 System.out.println(">>> Invoking Hive command line now >>>"); 234 System.out.println(); 235 System.out.flush(); 236 237 try { 238 runHive(arguments.toArray(new String[arguments.size()])); 239 } 240 catch (SecurityException ex) { 241 if (LauncherSecurityManager.getExitInvoked()) { 242 if (LauncherSecurityManager.getExitCode() != 0) { 243 throw ex; 244 } 245 } 246 } 247 248 System.out.println("\n<<< Invocation of Hive command completed <<<\n"); 249 250 // harvesting and recording Hadoop Job IDs 251 Properties jobIds = getHadoopJobIds(logFile, HIVE_JOB_IDS_PATTERNS); 252 File file = new File(System.getProperty("oozie.action.output.properties")); 253 OutputStream os = new FileOutputStream(file); 254 jobIds.store(os, ""); 255 os.close(); 256 System.out.println(" Hadoop Job IDs executed by Hive: " + jobIds.getProperty(HADOOP_JOBS)); 257 System.out.println(); 258 } 259 260 private void runHive(String[] args) throws Exception { 261 CliDriver.main(args); 262 } 263 264 public static void setHiveScript(Configuration conf, String script, String[] params) { 265 conf.set(HIVE_SCRIPT, script); 266 MapReduceMain.setStrings(conf, HIVE_PARAMS, params); 267 } 268 269 private static String readStringFromFile(String filePath) throws IOException { 270 String line; 271 BufferedReader br = null; 272 try { 273 br = new BufferedReader(new FileReader(filePath)); 274 StringBuilder sb = new StringBuilder(); 275 String sep = System.getProperty("line.separator"); 276 while ((line = br.readLine()) != null) { 277 sb.append(line).append(sep); 278 } 279 return sb.toString(); 280 } 281 finally { 282 if (br != null) { 283 br.close(); 284 } 285 } 286 } 287 }