001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import org.apache.pig.Main; 021 import org.apache.pig.PigRunner; 022 import org.apache.pig.tools.pigstats.JobStats; 023 import org.apache.pig.tools.pigstats.PigStats; 024 import org.apache.hadoop.conf.Configuration; 025 import org.apache.hadoop.fs.Path; 026 027 import java.io.BufferedWriter; 028 import java.io.FileNotFoundException; 029 import java.io.FileWriter; 030 import java.io.OutputStream; 031 import java.io.FileOutputStream; 032 import java.io.BufferedReader; 033 import java.io.FileReader; 034 import java.io.File; 035 import java.io.IOException; 036 import java.util.Arrays; 037 import java.util.HashSet; 038 import java.util.Map; 039 import java.util.List; 040 import java.util.ArrayList; 041 import java.util.Properties; 042 import java.util.Set; 043 import java.net.URL; 044 import java.util.regex.Pattern; 045 046 public class PigMain extends LauncherMain { 047 private static final Set<String> DISALLOWED_PIG_OPTIONS = new HashSet<String>(); 048 public static final String ACTION_PREFIX = "oozie.action."; 049 public static final String EXTERNAL_CHILD_IDS = ACTION_PREFIX + "externalChildIDs.properties"; 050 public static final String EXTERNAL_ACTION_STATS = ACTION_PREFIX + "stats.properties"; 051 public static final String EXTERNAL_STATS_WRITE = ACTION_PREFIX + "external.stats.write"; 052 public static final int STRING_BUFFER_SIZE = 100; 053 054 private static final Pattern[] PIG_JOB_IDS_PATTERNS = { 055 Pattern.compile("HadoopJobId: (job_\\S*)") 056 }; 057 058 static { 059 DISALLOWED_PIG_OPTIONS.add("-4"); 060 DISALLOWED_PIG_OPTIONS.add("-log4jconf"); 061 DISALLOWED_PIG_OPTIONS.add("-e"); 062 DISALLOWED_PIG_OPTIONS.add("-execute"); 063 DISALLOWED_PIG_OPTIONS.add("-f"); 064 DISALLOWED_PIG_OPTIONS.add("-file"); 065 DISALLOWED_PIG_OPTIONS.add("-l"); 066 DISALLOWED_PIG_OPTIONS.add("-logfile"); 067 DISALLOWED_PIG_OPTIONS.add("-r"); 068 DISALLOWED_PIG_OPTIONS.add("-dryrun"); 069 DISALLOWED_PIG_OPTIONS.add("-x"); 070 DISALLOWED_PIG_OPTIONS.add("-exectype"); 071 DISALLOWED_PIG_OPTIONS.add("-P"); 072 DISALLOWED_PIG_OPTIONS.add("-propertyFile"); 073 } 074 075 public static void main(String[] args) throws Exception { 076 run(PigMain.class, args); 077 } 078 079 @Override 080 protected void run(String[] args) throws Exception { 081 System.out.println(); 082 System.out.println("Oozie Pig action configuration"); 083 System.out.println("================================================================="); 084 085 // loading action conf prepared by Oozie 086 Configuration actionConf = new Configuration(false); 087 088 String actionXml = System.getProperty("oozie.action.conf.xml"); 089 090 if (actionXml == null) { 091 throw new RuntimeException("Missing Java System Property [oozie.action.conf.xml]"); 092 } 093 if (!new File(actionXml).exists()) { 094 throw new RuntimeException("Action Configuration XML file [" + actionXml + "] does not exist"); 095 } 096 097 actionConf.addResource(new Path("file:///", actionXml)); 098 099 Properties pigProperties = new Properties(); 100 for (Map.Entry<String, String> entry : actionConf) { 101 pigProperties.setProperty(entry.getKey(), entry.getValue()); 102 } 103 104 // propagate delegation related props from launcher job to Pig job 105 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { 106 pigProperties.setProperty("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION")); 107 System.out.println("------------------------"); 108 System.out.println("Setting env property for mapreduce.job.credentials.binary to:" 109 + System.getenv("HADOOP_TOKEN_FILE_LOCATION")); 110 System.out.println("------------------------"); 111 System.setProperty("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION")); 112 } 113 else { 114 System.out.println("Non-kerberoes execution"); 115 } 116 117 OutputStream os = new FileOutputStream("pig.properties"); 118 pigProperties.store(os, ""); 119 os.close(); 120 121 logMasking("pig.properties:", Arrays.asList("password"), pigProperties.entrySet()); 122 123 List<String> arguments = new ArrayList<String>(); 124 String script = actionConf.get("oozie.pig.script"); 125 126 if (script == null) { 127 throw new RuntimeException("Action Configuration does not have [oozie.pig.script] property"); 128 } 129 130 if (!new File(script).exists()) { 131 throw new RuntimeException("Error: Pig script file [" + script + "] does not exist"); 132 } 133 134 System.out.println("Pig script [" + script + "] content: "); 135 System.out.println("------------------------"); 136 BufferedReader br = new BufferedReader(new FileReader(script)); 137 String line = br.readLine(); 138 while (line != null) { 139 System.out.println(line); 140 line = br.readLine(); 141 } 142 br.close(); 143 System.out.println("------------------------"); 144 System.out.println(); 145 146 arguments.add("-file"); 147 arguments.add(script); 148 String[] params = MapReduceMain.getStrings(actionConf, "oozie.pig.params"); 149 for (String param : params) { 150 arguments.add("-param"); 151 arguments.add(param); 152 } 153 154 String hadoopJobId = System.getProperty("oozie.launcher.job.id"); 155 if (hadoopJobId == null) { 156 throw new RuntimeException("Launcher Hadoop Job ID system property not set"); 157 } 158 159 String logFile = new File("pig-oozie-" + hadoopJobId + ".log").getAbsolutePath(); 160 161 URL log4jFile = Thread.currentThread().getContextClassLoader().getResource("log4j.properties"); 162 if (log4jFile != null) { 163 164 String pigLogLevel = actionConf.get("oozie.pig.log.level", "INFO"); 165 166 // append required PIG properties to the default hadoop log4j file 167 Properties hadoopProps = new Properties(); 168 hadoopProps.load(log4jFile.openStream()); 169 hadoopProps.setProperty("log4j.logger.org.apache.pig", pigLogLevel + ", A, B"); 170 hadoopProps.setProperty("log4j.appender.A", "org.apache.log4j.ConsoleAppender"); 171 hadoopProps.setProperty("log4j.appender.A.layout", "org.apache.log4j.PatternLayout"); 172 hadoopProps.setProperty("log4j.appender.A.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n"); 173 hadoopProps.setProperty("log4j.appender.B", "org.apache.log4j.FileAppender"); 174 hadoopProps.setProperty("log4j.appender.B.file", logFile); 175 hadoopProps.setProperty("log4j.appender.B.layout", "org.apache.log4j.PatternLayout"); 176 hadoopProps.setProperty("log4j.appender.B.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n"); 177 178 String localProps = new File("piglog4j.properties").getAbsolutePath(); 179 OutputStream os1 = new FileOutputStream(localProps); 180 hadoopProps.store(os1, ""); 181 os1.close(); 182 183 arguments.add("-log4jconf"); 184 arguments.add(localProps); 185 186 // print out current directory 187 File localDir = new File(localProps).getParentFile(); 188 System.out.println("Current (local) dir = " + localDir.getAbsolutePath()); 189 } 190 else { 191 System.out.println("log4jfile is null"); 192 } 193 194 String pigLog = "pig-" + hadoopJobId + ".log"; 195 arguments.add("-logfile"); 196 arguments.add(pigLog); 197 198 String[] pigArgs = MapReduceMain.getStrings(actionConf, "oozie.pig.args"); 199 for (String pigArg : pigArgs) { 200 if (DISALLOWED_PIG_OPTIONS.contains(pigArg)) { 201 throw new RuntimeException("Error: Pig argument " + pigArg + " is not supported"); 202 } 203 arguments.add(pigArg); 204 } 205 206 System.out.println("Pig command arguments :"); 207 for (String arg : arguments) { 208 System.out.println(" " + arg); 209 } 210 211 System.out.println("================================================================="); 212 System.out.println(); 213 System.out.println(">>> Invoking Pig command line now >>>"); 214 System.out.println(); 215 System.out.flush(); 216 217 System.out.println(); 218 runPigJob(new String[] { "-version" }, null, true, false); 219 System.out.println(); 220 System.out.flush(); 221 boolean hasStats = Boolean.parseBoolean(actionConf.get(EXTERNAL_STATS_WRITE)); 222 runPigJob(arguments.toArray(new String[arguments.size()]), pigLog, false, hasStats); 223 224 System.out.println(); 225 System.out.println("<<< Invocation of Pig command completed <<<"); 226 System.out.println(); 227 228 // For embedded python or for version of pig lower than 0.8, pig stats are not supported. 229 // So retrieving hadoop Ids here 230 File file = new File(System.getProperty(EXTERNAL_CHILD_IDS)); 231 if (!file.exists()) { 232 Properties props = getHadoopJobIds(logFile, PIG_JOB_IDS_PATTERNS); 233 writeExternalData(props.getProperty(HADOOP_JOBS), file); 234 System.out.println(" Hadoop Job IDs executed by Pig: " + props.getProperty(HADOOP_JOBS)); 235 System.out.println(); 236 } 237 } 238 239 240 241 private void handleError(String pigLog) throws Exception { 242 System.err.println(); 243 System.err.println("Pig logfile dump:"); 244 System.err.println(); 245 try { 246 BufferedReader reader = new BufferedReader(new FileReader(pigLog)); 247 String line = reader.readLine(); 248 while (line != null) { 249 System.err.println(line); 250 line = reader.readLine(); 251 } 252 reader.close(); 253 } 254 catch (FileNotFoundException e) { 255 System.err.println("pig log file: " + pigLog + " not found."); 256 } 257 } 258 259 /** 260 * Runs the pig script using PigRunner API if version 0.8 or above. Embedded 261 * pig within python is also supported. 262 * 263 * @param args pig command line arguments 264 * @param pigLog pig log file 265 * @param resetSecurityManager specify if need to reset security manager 266 * @param retrieveStats specify if stats are to be retrieved 267 * @throws Exception 268 */ 269 protected void runPigJob(String[] args, String pigLog, boolean resetSecurityManager, boolean retrieveStats) throws Exception { 270 // running as from the command line 271 boolean pigRunnerExists = true; 272 Class klass; 273 try { 274 klass = Class.forName("org.apache.pig.PigRunner"); 275 } 276 catch (ClassNotFoundException ex) { 277 pigRunnerExists = false; 278 } 279 280 if (pigRunnerExists) { 281 System.out.println("Run pig script using PigRunner.run() for Pig version 0.8+"); 282 PigStats stats = PigRunner.run(args, null); 283 // isSuccessful is the API from 0.9 supported by both PigStats and 284 // EmbeddedPigStats 285 if (!stats.isSuccessful()) { 286 if (pigLog != null) { 287 handleError(pigLog); 288 } 289 throw new LauncherMainException(PigRunner.ReturnCode.FAILURE); 290 } 291 else { 292 // If pig command is ran with just the "version" option, then 293 // return 294 if (resetSecurityManager) { 295 return; 296 } 297 String jobIds = getHadoopJobIds(stats); 298 if (jobIds != null) { 299 System.out.println(" Hadoop Job IDs executed by Pig: " + jobIds); 300 File f = new File(System.getProperty(EXTERNAL_CHILD_IDS)); 301 writeExternalData(jobIds, f); 302 } 303 // Retrieve stats only if user has specified in workflow 304 // configuration 305 if (retrieveStats) { 306 ActionStats pigStats; 307 String JSONString; 308 try { 309 pigStats = new OoziePigStats(stats); 310 JSONString = pigStats.toJSON(); 311 } catch (UnsupportedOperationException uoe) { 312 throw new UnsupportedOperationException( 313 "Pig stats are not supported for this type of operation", uoe); 314 } 315 File f = new File(System.getProperty(EXTERNAL_ACTION_STATS)); 316 writeExternalData(JSONString, f); 317 } 318 } 319 } 320 else { 321 try { 322 System.out.println("Run pig script using Main.main() for Pig version before 0.8"); 323 Main.main(args); 324 } 325 catch (SecurityException ex) { 326 if (resetSecurityManager) { 327 LauncherSecurityManager.reset(); 328 } 329 else { 330 if (LauncherSecurityManager.getExitInvoked()) { 331 if (LauncherSecurityManager.getExitCode() != 0) { 332 if (pigLog != null) { 333 handleError(pigLog); 334 } 335 throw ex; 336 } 337 } 338 } 339 } 340 } 341 } 342 343 // write external data(stats, hadoopIds) to the file which will be read by the LauncherMapper 344 private static void writeExternalData(String data, File f) throws IOException { 345 BufferedWriter out = null; 346 try { 347 out = new BufferedWriter(new FileWriter(f)); 348 out.write(data); 349 } 350 finally { 351 if (out != null) { 352 out.close(); 353 } 354 } 355 } 356 357 public static void setPigScript(Configuration conf, String script, String[] params, String[] args) { 358 conf.set("oozie.pig.script", script); 359 MapReduceMain.setStrings(conf, "oozie.pig.params", params); 360 MapReduceMain.setStrings(conf, "oozie.pig.args", args); 361 } 362 363 /** 364 * Get Hadoop Ids through PigStats API 365 * 366 * @param pigStats stats object obtained through PigStats API 367 * @return comma-separated String 368 */ 369 protected String getHadoopJobIds(PigStats pigStats) { 370 StringBuilder sb = new StringBuilder(STRING_BUFFER_SIZE); 371 String separator = ","; 372 // Collect Hadoop Ids through JobGraph API of Pig and store them as 373 // comma separated string 374 try { 375 PigStats.JobGraph jobGraph = pigStats.getJobGraph(); 376 for (JobStats jobStats : jobGraph) { 377 String hadoopJobId = jobStats.getJobId(); 378 if (sb.length() > 0) { 379 sb.append(separator); 380 } 381 sb.append(hadoopJobId); 382 } 383 } 384 // Return null if Pig API's are not supported 385 catch (UnsupportedOperationException uoe) { 386 return null; 387 } 388 return sb.toString(); 389 } 390 391 }