001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import java.io.File; 021 import java.io.FileOutputStream; 022 import java.io.IOException; 023 import java.io.OutputStream; 024 import java.net.URL; 025 import java.util.Map; 026 import java.util.Properties; 027 import java.util.regex.Pattern; 028 029 import org.apache.hadoop.conf.Configuration; 030 import org.apache.hadoop.fs.Path; 031 import org.apache.log4j.PropertyConfigurator; 032 import org.apache.sqoop.Sqoop; 033 034 public class SqoopMain extends LauncherMain { 035 036 private static final String SQOOP_ARGS = "oozie.sqoop.args"; 037 038 public static final String SQOOP_SITE_CONF = "sqoop-site.xml"; 039 040 private static final Pattern[] SQOOP_JOB_IDS_PATTERNS = { 041 Pattern.compile("Job complete: (job_\\S*)"), Pattern.compile("Job (job_\\S*) completed successfully") 042 }; 043 044 private static final String SQOOP_LOG4J_PROPS = "sqoop-log4j.properties"; 045 046 public static void main(String[] args) throws Exception { 047 run(SqoopMain.class, args); 048 } 049 050 private static Configuration initActionConf() { 051 // loading action conf prepared by Oozie 052 Configuration sqoopConf = new Configuration(false); 053 054 String actionXml = System.getProperty("oozie.action.conf.xml"); 055 056 if (actionXml == null) { 057 throw new RuntimeException("Missing Java System Property [oozie.action.conf.xml]"); 058 } 059 if (!new File(actionXml).exists()) { 060 throw new RuntimeException("Action Configuration XML file [" + actionXml + "] does not exist"); 061 } 062 063 sqoopConf.addResource(new Path("file:///", actionXml)); 064 065 String delegationToken = System.getenv("HADOOP_TOKEN_FILE_LOCATION"); 066 if (delegationToken != null) { 067 sqoopConf.set("mapreduce.job.credentials.binary", delegationToken); 068 System.out.println("------------------------"); 069 System.out.println("Setting env property for mapreduce.job.credentials.binary to: " + delegationToken); 070 System.out.println("------------------------"); 071 System.setProperty("mapreduce.job.credentials.binary", delegationToken); 072 } else { 073 System.out.println("Non-Kerberos execution"); 074 } 075 076 return sqoopConf; 077 } 078 079 public static Configuration setUpSqoopSite() throws Exception { 080 Configuration sqoopConf = initActionConf(); 081 082 // Write the action configuration out to sqoop-site.xml 083 OutputStream os = new FileOutputStream(SQOOP_SITE_CONF); 084 try { 085 sqoopConf.writeXml(os); 086 } 087 finally { 088 os.close(); 089 } 090 091 System.out.println(); 092 System.out.println("Sqoop Configuration Properties:"); 093 System.out.println("------------------------"); 094 for (Map.Entry<String, String> entry : sqoopConf) { 095 System.out.println(entry.getKey() + "=" + entry.getValue()); 096 } 097 System.out.flush(); 098 System.out.println("------------------------"); 099 System.out.println(); 100 return sqoopConf; 101 } 102 103 public static String setUpSqoopLog4J(Configuration sqoopConf) throws IOException { 104 //Logfile to capture job IDs 105 String hadoopJobId = System.getProperty("oozie.launcher.job.id"); 106 if (hadoopJobId == null) { 107 throw new RuntimeException("Launcher Hadoop Job ID system property not set"); 108 } 109 110 String logFile = new File("sqoop-oozie-" + hadoopJobId + ".log").getAbsolutePath(); 111 112 Properties hadoopProps = new Properties(); 113 114 // Preparing log4j configuration 115 URL log4jFile = Thread.currentThread().getContextClassLoader().getResource("log4j.properties"); 116 if (log4jFile != null) { 117 // getting hadoop log4j configuration 118 hadoopProps.load(log4jFile.openStream()); 119 } 120 121 String logLevel = sqoopConf.get("oozie.sqoop.log.level", "INFO"); 122 123 hadoopProps.setProperty("log4j.logger.org.apache.sqoop", logLevel + ", A"); 124 hadoopProps.setProperty("log4j.appender.A", "org.apache.log4j.ConsoleAppender"); 125 hadoopProps.setProperty("log4j.appender.A.layout", "org.apache.log4j.PatternLayout"); 126 hadoopProps.setProperty("log4j.appender.A.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n"); 127 128 hadoopProps.setProperty("log4j.appender.jobid", "org.apache.log4j.FileAppender"); 129 hadoopProps.setProperty("log4j.appender.jobid.file", logFile); 130 hadoopProps.setProperty("log4j.appender.jobid.layout", "org.apache.log4j.PatternLayout"); 131 hadoopProps.setProperty("log4j.appender.jobid.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n"); 132 hadoopProps.setProperty("log4j.logger.org.apache.hadoop.mapred", "INFO, jobid"); 133 hadoopProps.setProperty("log4j.logger.org.apache.hadoop.mapreduce.Job", "INFO, jobid"); 134 135 String localProps = new File(SQOOP_LOG4J_PROPS).getAbsolutePath(); 136 OutputStream os1 = new FileOutputStream(localProps); 137 try { 138 hadoopProps.store(os1, ""); 139 } 140 finally { 141 os1.close(); 142 } 143 144 PropertyConfigurator.configure(SQOOP_LOG4J_PROPS); 145 146 return logFile; 147 } 148 149 protected void run(String[] args) throws Exception { 150 System.out.println(); 151 System.out.println("Oozie Sqoop action configuration"); 152 System.out.println("================================================================="); 153 154 Configuration sqoopConf = setUpSqoopSite(); 155 String logFile = setUpSqoopLog4J(sqoopConf); 156 157 String[] sqoopArgs = MapReduceMain.getStrings(sqoopConf, SQOOP_ARGS); 158 if (sqoopArgs == null) { 159 throw new RuntimeException("Action Configuration does not have [" + SQOOP_ARGS + "] property"); 160 } 161 162 System.out.println("Sqoop command arguments :"); 163 for (String arg : sqoopArgs) { 164 System.out.println(" " + arg); 165 } 166 167 System.out.println("================================================================="); 168 System.out.println(); 169 System.out.println(">>> Invoking Sqoop command line now >>>"); 170 System.out.println(); 171 System.out.flush(); 172 173 try { 174 runSqoopJob(sqoopArgs); 175 } 176 catch (SecurityException ex) { 177 if (LauncherSecurityManager.getExitInvoked()) { 178 if (LauncherSecurityManager.getExitCode() != 0) { 179 throw ex; 180 } 181 } 182 } 183 184 System.out.println(); 185 System.out.println("<<< Invocation of Sqoop command completed <<<"); 186 System.out.println(); 187 188 // harvesting and recording Hadoop Job IDs 189 Properties jobIds = getHadoopJobIds(logFile, SQOOP_JOB_IDS_PATTERNS); 190 191 File file = new File(System.getProperty("oozie.action.output.properties")); 192 OutputStream os = new FileOutputStream(file); 193 try { 194 jobIds.store(os, ""); 195 } 196 finally { 197 os.close(); 198 } 199 System.out.println(" Hadoop Job IDs executed by Sqoop: " + jobIds.getProperty(HADOOP_JOBS)); 200 System.out.println(); 201 } 202 203 protected void runSqoopJob(String[] args) throws Exception { 204 // running as from the command line 205 Sqoop.main(args); 206 } 207 208 public static void setSqoopCommand(Configuration conf, String[] args) { 209 MapReduceMain.setStrings(conf, SQOOP_ARGS, args); 210 } 211 }