001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import java.io.BufferedReader; 021 import java.io.BufferedWriter; 022 import java.io.File; 023 import java.io.FileOutputStream; 024 import java.io.FileReader; 025 import java.io.FileWriter; 026 import java.io.IOException; 027 import java.io.OutputStream; 028 import java.net.URL; 029 import java.util.ArrayList; 030 import java.util.HashMap; 031 import java.util.List; 032 import java.util.Map; 033 import java.util.Map.Entry; 034 import java.util.Properties; 035 import java.util.regex.Pattern; 036 037 import org.apache.hadoop.conf.Configuration; 038 import org.apache.hadoop.fs.Path; 039 import org.apache.hadoop.hive.cli.CliDriver; 040 041 public class HiveMain extends LauncherMain { 042 public static final String USER_HIVE_DEFAULT_FILE = "oozie-user-hive-default.xml"; 043 044 private static final Pattern[] HIVE_JOB_IDS_PATTERNS = { 045 Pattern.compile("Ended Job = (job_\\S*)") 046 }; 047 048 public static final String HIVE_L4J_PROPS = "hive-log4j.properties"; 049 public static final String HIVE_EXEC_L4J_PROPS = "hive-exec-log4j.properties"; 050 public static final String HIVE_SITE_CONF = "hive-site.xml"; 051 private static final String HIVE_SCRIPT = "oozie.hive.script"; 052 private static final String HIVE_PARAMS = "oozie.hive.params"; 053 054 public static void main(String[] args) throws Exception { 055 run(HiveMain.class, args); 056 } 057 058 private static Configuration initActionConf() { 059 // Loading action conf prepared by Oozie 060 Configuration hiveConf = new Configuration(false); 061 062 String actionXml = System.getProperty("oozie.action.conf.xml"); 063 064 if (actionXml == null) { 065 throw new RuntimeException("Missing Java System Property [oozie.action.conf.xml]"); 066 } 067 if (!new File(actionXml).exists()) { 068 throw new RuntimeException("Action Configuration XML file [" + actionXml + "] does not exist"); 069 } else { 070 System.out.println("Using action configuration file " + actionXml); 071 } 072 073 hiveConf.addResource(new Path("file:///", actionXml)); 074 075 // Propagate delegation related props from launcher job to Hive job 076 String delegationToken = System.getenv("HADOOP_TOKEN_FILE_LOCATION"); 077 if (delegationToken != null) { 078 hiveConf.set("mapreduce.job.credentials.binary", delegationToken); 079 System.out.println("------------------------"); 080 System.out.println("Setting env property for mapreduce.job.credentials.binary to: " + delegationToken); 081 System.out.println("------------------------"); 082 System.setProperty("mapreduce.job.credentials.binary", delegationToken); 083 } else { 084 System.out.println("Non-Kerberos execution"); 085 } 086 087 // Have to explicitly unset this property or Hive will not set it. 088 hiveConf.set("mapred.job.name", ""); 089 090 // See https://issues.apache.org/jira/browse/HIVE-1411 091 hiveConf.set("datanucleus.plugin.pluginRegistryBundleCheck", "LOG"); 092 093 // to force hive to use the jobclient to submit the job, never using HADOOPBIN (to do localmode) 094 hiveConf.setBoolean("hive.exec.mode.local.auto", false); 095 096 return hiveConf; 097 } 098 099 public static String setUpHiveLog4J(Configuration hiveConf) throws IOException { 100 //Logfile to capture job IDs 101 String hadoopJobId = System.getProperty("oozie.launcher.job.id"); 102 if (hadoopJobId == null) { 103 throw new RuntimeException("Launcher Hadoop Job ID system property not set"); 104 } 105 106 String logFile = new File("hive-oozie-" + hadoopJobId + ".log").getAbsolutePath(); 107 108 Properties hadoopProps = new Properties(); 109 110 // Preparing log4j configuration 111 URL log4jFile = Thread.currentThread().getContextClassLoader().getResource("log4j.properties"); 112 if (log4jFile != null) { 113 // getting hadoop log4j configuration 114 hadoopProps.load(log4jFile.openStream()); 115 } 116 117 String logLevel = hiveConf.get("oozie.hive.log.level", "INFO"); 118 119 hadoopProps.setProperty("log4j.logger.org.apache.hadoop.hive", logLevel + ", A"); 120 hadoopProps.setProperty("log4j.logger.hive", logLevel + ", A"); 121 hadoopProps.setProperty("log4j.logger.DataNucleus", logLevel + ", A"); 122 hadoopProps.setProperty("log4j.logger.DataStore", logLevel + ", A"); 123 hadoopProps.setProperty("log4j.logger.JPOX", logLevel + ", A"); 124 hadoopProps.setProperty("log4j.appender.A", "org.apache.log4j.ConsoleAppender"); 125 hadoopProps.setProperty("log4j.appender.A.layout", "org.apache.log4j.PatternLayout"); 126 hadoopProps.setProperty("log4j.appender.A.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n"); 127 128 hadoopProps.setProperty("log4j.appender.jobid", "org.apache.log4j.FileAppender"); 129 hadoopProps.setProperty("log4j.appender.jobid.file", logFile); 130 hadoopProps.setProperty("log4j.appender.jobid.layout", "org.apache.log4j.PatternLayout"); 131 hadoopProps.setProperty("log4j.appender.jobid.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n"); 132 hadoopProps.setProperty("log4j.logger.org.apache.hadoop.hive.ql.exec", "INFO, jobid"); 133 134 String localProps = new File(HIVE_L4J_PROPS).getAbsolutePath(); 135 OutputStream os1 = new FileOutputStream(localProps); 136 hadoopProps.store(os1, ""); 137 os1.close(); 138 139 localProps = new File(HIVE_EXEC_L4J_PROPS).getAbsolutePath(); 140 os1 = new FileOutputStream(localProps); 141 hadoopProps.store(os1, ""); 142 os1.close(); 143 return logFile; 144 } 145 146 public static Configuration setUpHiveSite() throws Exception { 147 Configuration hiveConf = initActionConf(); 148 149 // Write the action configuration out to hive-site.xml 150 OutputStream os = new FileOutputStream(HIVE_SITE_CONF); 151 hiveConf.writeXml(os); 152 os.close(); 153 154 System.out.println(); 155 System.out.println("Hive Configuration Properties:"); 156 System.out.println("------------------------"); 157 for (Entry<String, String> entry : hiveConf) { 158 System.out.println(entry.getKey() + "=" + entry.getValue()); 159 } 160 System.out.flush(); 161 System.out.println("------------------------"); 162 System.out.println(); 163 return hiveConf; 164 } 165 166 protected void run(String[] args) throws Exception { 167 System.out.println(); 168 System.out.println("Oozie Hive action configuration"); 169 System.out.println("================================================================="); 170 171 Configuration hiveConf = setUpHiveSite(); 172 173 List<String> arguments = new ArrayList<String>(); 174 String scriptPath = hiveConf.get(HIVE_SCRIPT); 175 176 if (scriptPath == null) { 177 throw new RuntimeException("Action Configuration does not have [" + HIVE_SCRIPT + "] property"); 178 } 179 180 if (!new File(scriptPath).exists()) { 181 throw new RuntimeException("Hive script file [" + scriptPath + "] does not exist"); 182 } 183 184 // check if hive-default.xml is in the classpath, if not look for oozie-user-hive-default.xml 185 // in the current directory (it will be there if the Hive action has the 'oozie.hive.defaults' 186 // property) and rename it to hive-default.xml 187 if (Thread.currentThread().getContextClassLoader().getResource("hive-default.xml") == null) { 188 File userProvidedDefault = new File(USER_HIVE_DEFAULT_FILE); 189 if (userProvidedDefault.exists()) { 190 if (!userProvidedDefault.renameTo(new File("hive-default.xml"))) { 191 throw new RuntimeException( 192 "Could not rename user provided Hive defaults file to 'hive-default.xml'"); 193 } 194 System.out.println("Using 'hive-default.xml' defined in the Hive action"); 195 } 196 else { 197 throw new RuntimeException( 198 "Hive JAR does not bundle a 'hive-default.xml' and Hive action does not define one"); 199 } 200 } 201 else { 202 System.out.println("Using 'hive-default.xml' defined in the Hive JAR"); 203 File userProvidedDefault = new File(USER_HIVE_DEFAULT_FILE); 204 if (userProvidedDefault.exists()) { 205 System.out.println("WARNING: Ignoring user provided Hive defaults"); 206 } 207 } 208 System.out.println(); 209 210 String logFile = setUpHiveLog4J(hiveConf); 211 212 // print out current directory & its contents 213 File localDir = new File("dummy").getAbsoluteFile().getParentFile(); 214 System.out.println("Current (local) dir = " + localDir.getAbsolutePath()); 215 System.out.println("------------------------"); 216 for (String file : localDir.list()) { 217 System.out.println(" " + file); 218 } 219 System.out.println("------------------------"); 220 System.out.println(); 221 222 // Prepare the Hive Script 223 String script = readStringFromFile(scriptPath); 224 System.out.println(); 225 System.out.println("Original script [" + scriptPath + "] content: "); 226 System.out.println("------------------------"); 227 System.out.println(script); 228 System.out.println("------------------------"); 229 System.out.println(); 230 231 String[] params = MapReduceMain.getStrings(hiveConf, HIVE_PARAMS); 232 if (params.length > 0) { 233 Map<String, String> varMap = new HashMap<String, String>(); 234 System.out.println("Parameters:"); 235 System.out.println("------------------------"); 236 for (String param : params) { 237 System.out.println(" " + param); 238 239 int idx = param.indexOf('='); 240 if (idx == -1) { 241 throw new RuntimeException("Parameter expression must contain an assignment: " + param); 242 } else if (idx == 0) { 243 throw new RuntimeException("Parameter value not specified: " + param); 244 } 245 String var = param.substring(0, idx); 246 String val = param.substring(idx + 1, param.length()); 247 varMap.put(var, val); 248 } 249 System.out.println("------------------------"); 250 System.out.println(); 251 252 String resolvedScript = substitute(varMap, script); 253 scriptPath = scriptPath + ".sub"; 254 writeStringToFile(scriptPath, resolvedScript); 255 256 System.out.println("Resolved script [" + scriptPath + "] content: "); 257 System.out.println("------------------------"); 258 System.out.println(resolvedScript); 259 System.out.println("------------------------"); 260 System.out.println(); 261 } 262 263 arguments.add("-f"); 264 arguments.add(scriptPath); 265 266 267 System.out.println("Hive command arguments :"); 268 for (String arg : arguments) { 269 System.out.println(" " + arg); 270 } 271 System.out.println(); 272 273 System.out.println("================================================================="); 274 System.out.println(); 275 System.out.println(">>> Invoking Hive command line now >>>"); 276 System.out.println(); 277 System.out.flush(); 278 279 try { 280 runHive(arguments.toArray(new String[arguments.size()])); 281 } 282 catch (SecurityException ex) { 283 if (LauncherSecurityManager.getExitInvoked()) { 284 if (LauncherSecurityManager.getExitCode() != 0) { 285 throw ex; 286 } 287 } 288 } 289 290 System.out.println("\n<<< Invocation of Hive command completed <<<\n"); 291 292 // harvesting and recording Hadoop Job IDs 293 Properties jobIds = getHadoopJobIds(logFile, HIVE_JOB_IDS_PATTERNS); 294 File file = new File(System.getProperty("oozie.action.output.properties")); 295 OutputStream os = new FileOutputStream(file); 296 jobIds.store(os, ""); 297 os.close(); 298 System.out.println(" Hadoop Job IDs executed by Hive: " + jobIds.getProperty(HADOOP_JOBS)); 299 System.out.println(); 300 } 301 302 private void runHive(String[] args) throws Exception { 303 CliDriver.main(args); 304 } 305 306 public static void setHiveScript(Configuration conf, String script, String[] params) { 307 conf.set(HIVE_SCRIPT, script); 308 MapReduceMain.setStrings(conf, HIVE_PARAMS, params); 309 } 310 311 private static String readStringFromFile(String filePath) throws IOException { 312 String line; 313 BufferedReader br = null; 314 try { 315 br = new BufferedReader(new FileReader(filePath)); 316 StringBuilder sb = new StringBuilder(); 317 String sep = System.getProperty("line.separator"); 318 while ((line = br.readLine()) != null) { 319 sb.append(line).append(sep); 320 } 321 return sb.toString(); 322 } 323 finally { 324 if (br != null) { 325 br.close(); 326 } 327 } 328 } 329 330 private static void writeStringToFile(String filePath, String str) throws IOException { 331 BufferedWriter out = null; 332 try { 333 out = new BufferedWriter(new FileWriter(filePath)); 334 out.write(str); 335 } 336 finally { 337 if (out != null) { 338 out.close(); 339 } 340 } 341 } 342 343 static String substitute(Map<String, String> vars, String expr) { 344 for (Map.Entry<String, String> entry : vars.entrySet()) { 345 String var = "${" + entry.getKey() + "}"; 346 String value = entry.getValue(); 347 expr = expr.replace(var, value); 348 } 349 return expr; 350 } 351 352 }