001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import org.apache.pig.Main; 021 import org.apache.hadoop.conf.Configuration; 022 import org.apache.hadoop.fs.Path; 023 import org.apache.hadoop.mapred.JobClient; 024 025 import java.io.FileNotFoundException; 026 import java.io.OutputStream; 027 import java.io.FileOutputStream; 028 import java.io.BufferedReader; 029 import java.io.FileReader; 030 import java.io.File; 031 import java.io.IOException; 032 import java.util.HashSet; 033 import java.util.Map; 034 import java.util.List; 035 import java.util.ArrayList; 036 import java.util.Properties; 037 import java.util.Set; 038 import java.net.URL; 039 040 public class PigMainWithOldAPI extends LauncherMain { 041 private static final Set<String> DISALLOWED_PIG_OPTIONS = new HashSet<String>(); 042 043 static { 044 DISALLOWED_PIG_OPTIONS.add("-4"); 045 DISALLOWED_PIG_OPTIONS.add("-log4jconf"); 046 DISALLOWED_PIG_OPTIONS.add("-e"); 047 DISALLOWED_PIG_OPTIONS.add("-execute"); 048 DISALLOWED_PIG_OPTIONS.add("-f"); 049 DISALLOWED_PIG_OPTIONS.add("-file"); 050 DISALLOWED_PIG_OPTIONS.add("-l"); 051 DISALLOWED_PIG_OPTIONS.add("-logfile"); 052 DISALLOWED_PIG_OPTIONS.add("-r"); 053 DISALLOWED_PIG_OPTIONS.add("-dryrun"); 054 DISALLOWED_PIG_OPTIONS.add("-x"); 055 DISALLOWED_PIG_OPTIONS.add("-exectype"); 056 DISALLOWED_PIG_OPTIONS.add("-P"); 057 DISALLOWED_PIG_OPTIONS.add("-propertyFile"); 058 } 059 060 public static void main(String[] args) throws Exception { 061 run(PigMainWithOldAPI.class, args); 062 } 063 064 protected void run(String[] args) throws Exception { 065 System.out.println(); 066 System.out.println("Oozie Pig action configuration"); 067 System.out.println("================================================================="); 068 069 // loading action conf prepared by Oozie 070 Configuration actionConf = new Configuration(false); 071 072 String actionXml = System.getProperty("oozie.action.conf.xml"); 073 074 if (actionXml == null) { 075 throw new RuntimeException("Missing Java System Property [oozie.action.conf.xml]"); 076 } 077 if (!new File(actionXml).exists()) { 078 throw new RuntimeException("Action Configuration XML file [" + actionXml + "] does not exist"); 079 } 080 081 actionConf.addResource(new Path("file:///", actionXml)); 082 083 Properties pigProperties = new Properties(); 084 for (Map.Entry<String, String> entry : actionConf) { 085 pigProperties.setProperty(entry.getKey(), entry.getValue()); 086 } 087 088 //propagate delegation related props from launcher job to Pig job 089 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { 090 pigProperties.setProperty("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION")); 091 System.out.println("------------------------"); 092 System.out.println("Setting env property for mapreduce.job.credentials.binary to:" 093 + System.getenv("HADOOP_TOKEN_FILE_LOCATION")); 094 System.out.println("------------------------"); 095 System.setProperty("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION")); 096 } 097 else { 098 System.out.println("Non-kerberoes execution"); 099 } 100 101 OutputStream os = new FileOutputStream("pig.properties"); 102 pigProperties.store(os, ""); 103 os.close(); 104 105 System.out.println(); 106 System.out.println("pig.properties content:"); 107 System.out.println("------------------------"); 108 pigProperties.store(System.out, ""); 109 System.out.flush(); 110 System.out.println("------------------------"); 111 System.out.println(); 112 113 List<String> arguments = new ArrayList<String>(); 114 String script = actionConf.get("oozie.pig.script"); 115 116 if (script == null) { 117 throw new RuntimeException("Action Configuration does not have [oozie.pig.script] property"); 118 } 119 120 if (!new File(script).exists()) { 121 throw new RuntimeException("Error: Pig script file [" + script + "] does not exist"); 122 } 123 124 System.out.println("Pig script [" + script + "] content: "); 125 System.out.println("------------------------"); 126 BufferedReader br = new BufferedReader(new FileReader(script)); 127 String line = br.readLine(); 128 while (line != null) { 129 System.out.println(line); 130 line = br.readLine(); 131 } 132 br.close(); 133 System.out.println("------------------------"); 134 System.out.println(); 135 136 arguments.add("-file"); 137 arguments.add(script); 138 String[] params = MapReduceMain.getStrings(actionConf, "oozie.pig.params"); 139 for (String param : params) { 140 arguments.add("-param"); 141 arguments.add(param); 142 } 143 144 String hadoopJobId = System.getProperty("oozie.launcher.job.id"); 145 if (hadoopJobId == null) { 146 throw new RuntimeException("Launcher Hadoop Job ID system property not set"); 147 } 148 149 String logFile = new File("pig-oozie-" + hadoopJobId + ".log").getAbsolutePath(); 150 151 URL log4jFile = Thread.currentThread().getContextClassLoader().getResource("log4j.properties"); 152 if (log4jFile != null) { 153 154 String pigLogLevel = actionConf.get("oozie.pig.log.level", "INFO"); 155 156 // append required PIG properties to the default hadoop log4j file 157 Properties hadoopProps = new Properties(); 158 hadoopProps.load(log4jFile.openStream()); 159 hadoopProps.setProperty("log4j.logger.org.apache.pig", pigLogLevel + ", A, B"); 160 hadoopProps.setProperty("log4j.appender.A", "org.apache.log4j.ConsoleAppender"); 161 hadoopProps.setProperty("log4j.appender.A.layout", "org.apache.log4j.PatternLayout"); 162 hadoopProps.setProperty("log4j.appender.A.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n"); 163 hadoopProps.setProperty("log4j.appender.B", "org.apache.log4j.FileAppender"); 164 hadoopProps.setProperty("log4j.appender.B.file", logFile); 165 hadoopProps.setProperty("log4j.appender.B.layout", "org.apache.log4j.PatternLayout"); 166 hadoopProps.setProperty("log4j.appender.B.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n"); 167 168 String localProps = new File("piglog4j.properties").getAbsolutePath(); 169 OutputStream os1 = new FileOutputStream(localProps); 170 hadoopProps.store(os1, ""); 171 os1.close(); 172 173 arguments.add("-log4jconf"); 174 arguments.add(localProps); 175 176 // print out current directory 177 File localDir = new File(localProps).getParentFile(); 178 System.out.println("Current (local) dir = " + localDir.getAbsolutePath()); 179 } 180 else { 181 System.out.println("log4jfile is null"); 182 } 183 184 String pigLog = "pig-" + hadoopJobId + ".log"; 185 arguments.add("-logfile"); 186 arguments.add(pigLog); 187 188 String[] pigArgs = MapReduceMain.getStrings(actionConf, "oozie.pig.args"); 189 for (String pigArg : pigArgs) { 190 if (DISALLOWED_PIG_OPTIONS.contains(pigArg)) { 191 throw new RuntimeException("Error: Pig argument " + pigArg + " is not supported"); 192 } 193 arguments.add(pigArg); 194 } 195 196 System.out.println("Pig command arguments :"); 197 for (String arg : arguments) { 198 System.out.println(" " + arg); 199 } 200 201 System.out.println("================================================================="); 202 System.out.println(); 203 System.out.println(">>> Invoking Pig command line now >>>"); 204 System.out.println(); 205 System.out.flush(); 206 207 try { 208 System.out.println(); 209 runPigJob(new String[] { "-version" }); 210 } 211 catch (SecurityException ex) { 212 LauncherSecurityManager.reset(); 213 } 214 System.out.println(); 215 System.out.flush(); 216 try { 217 runPigJob(arguments.toArray(new String[arguments.size()])); 218 } 219 catch (SecurityException ex) { 220 if (LauncherSecurityManager.getExitInvoked()) { 221 if (LauncherSecurityManager.getExitCode() != 0) { 222 System.err.println(); 223 System.err.println("Pig logfile dump:"); 224 System.err.println(); 225 try { 226 BufferedReader reader = new BufferedReader(new FileReader(pigLog)); 227 line = reader.readLine(); 228 while (line != null) { 229 System.err.println(line); 230 line = reader.readLine(); 231 } 232 reader.close(); 233 } 234 catch (FileNotFoundException e) { 235 System.err.println("pig log file: " + pigLog + " not found."); 236 } 237 throw ex; 238 } 239 } 240 } 241 242 System.out.println(); 243 System.out.println("<<< Invocation of Pig command completed <<<"); 244 System.out.println(); 245 246 // harvesting and recording Hadoop Job IDs 247 Properties jobIds = getHadoopJobIds(logFile); 248 File file = new File(System.getProperty("oozie.action.output.properties")); 249 os = new FileOutputStream(file); 250 jobIds.store(os, ""); 251 os.close(); 252 System.out.println(" Hadoop Job IDs executed by Pig: " + jobIds.getProperty(HADOOP_JOBS)); 253 System.out.println(); 254 } 255 256 protected void runPigJob(String[] args) throws Exception { 257 // running as from the command line 258 Main.main(args); 259 } 260 261 public static void setPigScript(Configuration conf, String script, String[] params, String[] args) { 262 conf.set("oozie.pig.script", script); 263 MapReduceMain.setStrings(conf, "oozie.pig.params", params); 264 MapReduceMain.setStrings(conf, "oozie.pig.args", args); 265 } 266 267 private static final String JOB_ID_LOG_PREFIX = "HadoopJobId: "; 268 269 protected Properties getHadoopJobIds(String logFile) throws IOException { 270 int jobCount = 0; 271 Properties props = new Properties(); 272 StringBuffer sb = new StringBuffer(100); 273 if (new File(logFile).exists() == false) { 274 System.err.println("pig log file: " + logFile + " not present. Therefore no Hadoop jobids found"); 275 props.setProperty(HADOOP_JOBS, ""); 276 } 277 else { 278 BufferedReader br = new BufferedReader(new FileReader(logFile)); 279 String line = br.readLine(); 280 String separator = ""; 281 while (line != null) { 282 if (line.contains(JOB_ID_LOG_PREFIX)) { 283 int jobIdStarts = line.indexOf(JOB_ID_LOG_PREFIX) + JOB_ID_LOG_PREFIX.length(); 284 String jobId = line.substring(jobIdStarts); 285 int jobIdEnds = jobId.indexOf(" "); 286 if (jobIdEnds > -1) { 287 jobId = jobId.substring(0, jobId.indexOf(" ")); 288 } 289 sb.append(separator).append(jobId); 290 separator = ","; 291 } 292 line = br.readLine(); 293 } 294 br.close(); 295 props.setProperty(HADOOP_JOBS, sb.toString()); 296 } 297 return props; 298 } 299 300 }