001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import org.apache.pig.Main; 021 import org.apache.pig.PigRunner; 022 import org.apache.pig.tools.pigstats.PigStats; 023 import org.apache.hadoop.conf.Configuration; 024 import org.apache.hadoop.fs.Path; 025 026 import java.io.FileNotFoundException; 027 import java.io.OutputStream; 028 import java.io.FileOutputStream; 029 import java.io.BufferedReader; 030 import java.io.FileReader; 031 import java.io.File; 032 import java.io.IOException; 033 import java.util.Arrays; 034 import java.util.HashSet; 035 import java.util.Map; 036 import java.util.List; 037 import java.util.ArrayList; 038 import java.util.Properties; 039 import java.util.Set; 040 import java.net.URL; 041 042 public class PigMain extends LauncherMain { 043 private static final Set<String> DISALLOWED_PIG_OPTIONS = new HashSet<String>(); 044 045 static { 046 DISALLOWED_PIG_OPTIONS.add("-4"); 047 DISALLOWED_PIG_OPTIONS.add("-log4jconf"); 048 DISALLOWED_PIG_OPTIONS.add("-e"); 049 DISALLOWED_PIG_OPTIONS.add("-execute"); 050 DISALLOWED_PIG_OPTIONS.add("-f"); 051 DISALLOWED_PIG_OPTIONS.add("-file"); 052 DISALLOWED_PIG_OPTIONS.add("-l"); 053 DISALLOWED_PIG_OPTIONS.add("-logfile"); 054 DISALLOWED_PIG_OPTIONS.add("-r"); 055 DISALLOWED_PIG_OPTIONS.add("-dryrun"); 056 DISALLOWED_PIG_OPTIONS.add("-x"); 057 DISALLOWED_PIG_OPTIONS.add("-exectype"); 058 DISALLOWED_PIG_OPTIONS.add("-P"); 059 DISALLOWED_PIG_OPTIONS.add("-propertyFile"); 060 } 061 062 public static void main(String[] args) throws Exception { 063 run(PigMain.class, args); 064 } 065 066 @Override 067 protected void run(String[] args) throws Exception { 068 System.out.println(); 069 System.out.println("Oozie Pig action configuration"); 070 System.out.println("================================================================="); 071 072 // loading action conf prepared by Oozie 073 Configuration actionConf = new Configuration(false); 074 075 String actionXml = System.getProperty("oozie.action.conf.xml"); 076 077 if (actionXml == null) { 078 throw new RuntimeException("Missing Java System Property [oozie.action.conf.xml]"); 079 } 080 if (!new File(actionXml).exists()) { 081 throw new RuntimeException("Action Configuration XML file [" + actionXml + "] does not exist"); 082 } 083 084 actionConf.addResource(new Path("file:///", actionXml)); 085 086 Properties pigProperties = new Properties(); 087 for (Map.Entry<String, String> entry : actionConf) { 088 pigProperties.setProperty(entry.getKey(), entry.getValue()); 089 } 090 091 // propagate delegation related props from launcher job to Pig job 092 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { 093 pigProperties.setProperty("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION")); 094 System.out.println("------------------------"); 095 System.out.println("Setting env property for mapreduce.job.credentials.binary to:" 096 + System.getenv("HADOOP_TOKEN_FILE_LOCATION")); 097 System.out.println("------------------------"); 098 System.setProperty("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION")); 099 } 100 else { 101 System.out.println("Non-kerberoes execution"); 102 } 103 104 OutputStream os = new FileOutputStream("pig.properties"); 105 pigProperties.store(os, ""); 106 os.close(); 107 108 logMasking("pig.properties:", Arrays.asList("password"), pigProperties.entrySet()); 109 110 List<String> arguments = new ArrayList<String>(); 111 String script = actionConf.get("oozie.pig.script"); 112 113 if (script == null) { 114 throw new RuntimeException("Action Configuration does not have [oozie.pig.script] property"); 115 } 116 117 if (!new File(script).exists()) { 118 throw new RuntimeException("Error: Pig script file [" + script + "] does not exist"); 119 } 120 121 System.out.println("Pig script [" + script + "] content: "); 122 System.out.println("------------------------"); 123 BufferedReader br = new BufferedReader(new FileReader(script)); 124 String line = br.readLine(); 125 while (line != null) { 126 System.out.println(line); 127 line = br.readLine(); 128 } 129 br.close(); 130 System.out.println("------------------------"); 131 System.out.println(); 132 133 arguments.add("-file"); 134 arguments.add(script); 135 String[] params = MapReduceMain.getStrings(actionConf, "oozie.pig.params"); 136 for (String param : params) { 137 arguments.add("-param"); 138 arguments.add(param); 139 } 140 141 String hadoopJobId = System.getProperty("oozie.launcher.job.id"); 142 if (hadoopJobId == null) { 143 throw new RuntimeException("Launcher Hadoop Job ID system property not set"); 144 } 145 146 String logFile = new File("pig-oozie-" + hadoopJobId + ".log").getAbsolutePath(); 147 148 URL log4jFile = Thread.currentThread().getContextClassLoader().getResource("log4j.properties"); 149 if (log4jFile != null) { 150 151 String pigLogLevel = actionConf.get("oozie.pig.log.level", "INFO"); 152 153 // append required PIG properties to the default hadoop log4j file 154 Properties hadoopProps = new Properties(); 155 hadoopProps.load(log4jFile.openStream()); 156 hadoopProps.setProperty("log4j.logger.org.apache.pig", pigLogLevel + ", A, B"); 157 hadoopProps.setProperty("log4j.appender.A", "org.apache.log4j.ConsoleAppender"); 158 hadoopProps.setProperty("log4j.appender.A.layout", "org.apache.log4j.PatternLayout"); 159 hadoopProps.setProperty("log4j.appender.A.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n"); 160 hadoopProps.setProperty("log4j.appender.B", "org.apache.log4j.FileAppender"); 161 hadoopProps.setProperty("log4j.appender.B.file", logFile); 162 hadoopProps.setProperty("log4j.appender.B.layout", "org.apache.log4j.PatternLayout"); 163 hadoopProps.setProperty("log4j.appender.B.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n"); 164 165 String localProps = new File("piglog4j.properties").getAbsolutePath(); 166 OutputStream os1 = new FileOutputStream(localProps); 167 hadoopProps.store(os1, ""); 168 os1.close(); 169 170 arguments.add("-log4jconf"); 171 arguments.add(localProps); 172 173 // print out current directory 174 File localDir = new File(localProps).getParentFile(); 175 System.out.println("Current (local) dir = " + localDir.getAbsolutePath()); 176 } 177 else { 178 System.out.println("log4jfile is null"); 179 } 180 181 String pigLog = "pig-" + hadoopJobId + ".log"; 182 arguments.add("-logfile"); 183 arguments.add(pigLog); 184 185 String[] pigArgs = MapReduceMain.getStrings(actionConf, "oozie.pig.args"); 186 for (String pigArg : pigArgs) { 187 if (DISALLOWED_PIG_OPTIONS.contains(pigArg)) { 188 throw new RuntimeException("Error: Pig argument " + pigArg + " is not supported"); 189 } 190 arguments.add(pigArg); 191 } 192 193 System.out.println("Pig command arguments :"); 194 for (String arg : arguments) { 195 System.out.println(" " + arg); 196 } 197 198 System.out.println("================================================================="); 199 System.out.println(); 200 System.out.println(">>> Invoking Pig command line now >>>"); 201 System.out.println(); 202 System.out.flush(); 203 204 System.out.println(); 205 runPigJob(new String[] { "-version" }, null, true); 206 System.out.println(); 207 System.out.flush(); 208 209 runPigJob(arguments.toArray(new String[arguments.size()]), pigLog, false); 210 211 System.out.println(); 212 System.out.println("<<< Invocation of Pig command completed <<<"); 213 System.out.println(); 214 215 // harvesting and recording Hadoop Job IDs 216 Properties jobIds = getHadoopJobIds(logFile); 217 File file = new File(System.getProperty("oozie.action.output.properties")); 218 os = new FileOutputStream(file); 219 jobIds.store(os, ""); 220 os.close(); 221 System.out.println(" Hadoop Job IDs executed by Pig: " + jobIds.getProperty("hadoopJobs")); 222 System.out.println(); 223 } 224 225 private void handleError(String pigLog) throws Exception { 226 System.err.println(); 227 System.err.println("Pig logfile dump:"); 228 System.err.println(); 229 try { 230 BufferedReader reader = new BufferedReader(new FileReader(pigLog)); 231 String line = reader.readLine(); 232 while (line != null) { 233 System.err.println(line); 234 line = reader.readLine(); 235 } 236 reader.close(); 237 } 238 catch (FileNotFoundException e) { 239 System.err.println("pig log file: " + pigLog + " not found."); 240 } 241 } 242 243 /** 244 * Runs the pig script using PigRunner API if version 0.8 or above. Embedded 245 * pig within python is also supported. 246 * 247 * @param args pig command line arguments 248 * @param pigLog pig log file 249 * @param resetSecurityManager specify if need to reset security manager 250 * @throws Exception 251 */ 252 protected void runPigJob(String[] args, String pigLog, boolean resetSecurityManager) throws Exception { 253 // running as from the command line 254 boolean pigRunnerExists = true; 255 Class klass; 256 try { 257 klass = Class.forName("org.apache.pig.PigRunner"); 258 } 259 catch (ClassNotFoundException ex) { 260 pigRunnerExists = false; 261 } 262 263 if (pigRunnerExists) { 264 System.out.println("Run pig script using PigRunner.run() for Pig version 0.8+"); 265 PigStats stats = PigRunner.run(args, null); 266 // isSuccessful is the API from 0.9 supported by both PigStats and 267 // EmbeddedPigStats 268 if (!stats.isSuccessful()) { 269 if (pigLog != null) { 270 handleError(pigLog); 271 } 272 throw new LauncherMainException(PigRunner.ReturnCode.FAILURE); 273 } 274 } 275 else { 276 try { 277 System.out.println("Run pig script using Main.main() for Pig version before 0.8"); 278 Main.main(args); 279 } 280 catch (SecurityException ex) { 281 if (resetSecurityManager) { 282 LauncherSecurityManager.reset(); 283 } 284 else { 285 if (LauncherSecurityManager.getExitInvoked()) { 286 if (LauncherSecurityManager.getExitCode() != 0) { 287 if (pigLog != null) { 288 handleError(pigLog); 289 } 290 throw ex; 291 } 292 } 293 } 294 } 295 } 296 } 297 298 public static void setPigScript(Configuration conf, String script, String[] params, String[] args) { 299 conf.set("oozie.pig.script", script); 300 MapReduceMain.setStrings(conf, "oozie.pig.params", params); 301 MapReduceMain.setStrings(conf, "oozie.pig.args", args); 302 } 303 304 private static final String JOB_ID_LOG_PREFIX = "HadoopJobId: "; 305 306 protected Properties getHadoopJobIds(String logFile) throws IOException { 307 int jobCount = 0; 308 Properties props = new Properties(); 309 StringBuffer sb = new StringBuffer(100); 310 if (new File(logFile).exists() == false) { 311 System.err.println("pig log file: " + logFile + " not present. Therefore no Hadoop jobids found"); 312 props.setProperty("hadoopJobs", ""); 313 } 314 else { 315 BufferedReader br = new BufferedReader(new FileReader(logFile)); 316 String line = br.readLine(); 317 String separator = ""; 318 while (line != null) { 319 if (line.contains(JOB_ID_LOG_PREFIX)) { 320 int jobIdStarts = line.indexOf(JOB_ID_LOG_PREFIX) + JOB_ID_LOG_PREFIX.length(); 321 String jobId = line.substring(jobIdStarts); 322 int jobIdEnds = jobId.indexOf(" "); 323 if (jobIdEnds > -1) { 324 jobId = jobId.substring(0, jobId.indexOf(" ")); 325 } 326 sb.append(separator).append(jobId); 327 separator = ","; 328 } 329 line = br.readLine(); 330 } 331 br.close(); 332 props.setProperty("hadoopJobs", sb.toString()); 333 } 334 return props; 335 } 336 337 }