001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.io.File;
021    import java.io.FileOutputStream;
022    import java.io.IOException;
023    import java.io.OutputStream;
024    import java.net.URL;
025    import java.util.Map;
026    import java.util.Properties;
027    import java.util.regex.Pattern;
028    
029    import org.apache.hadoop.conf.Configuration;
030    import org.apache.hadoop.fs.Path;
031    import org.apache.log4j.PropertyConfigurator;
032    import org.apache.sqoop.Sqoop;
033    
034    public class SqoopMain extends LauncherMain {
035    
036        private static final String SQOOP_ARGS = "oozie.sqoop.args";
037    
038        public static final String SQOOP_SITE_CONF = "sqoop-site.xml";
039    
040        private static final Pattern[] SQOOP_JOB_IDS_PATTERNS = {
041          Pattern.compile("Job complete: (job_\\S*)"), Pattern.compile("Job (job_\\S*) completed successfully")
042        };
043    
044        private static final String SQOOP_LOG4J_PROPS = "sqoop-log4j.properties";
045    
046        public static void main(String[] args) throws Exception {
047            run(SqoopMain.class, args);
048        }
049    
050        private static Configuration initActionConf() {
051            // loading action conf prepared by Oozie
052            Configuration sqoopConf = new Configuration(false);
053    
054            String actionXml = System.getProperty("oozie.action.conf.xml");
055    
056            if (actionXml == null) {
057                throw new RuntimeException("Missing Java System Property [oozie.action.conf.xml]");
058            }
059            if (!new File(actionXml).exists()) {
060                throw new RuntimeException("Action Configuration XML file [" + actionXml + "] does not exist");
061            }
062    
063            sqoopConf.addResource(new Path("file:///", actionXml));
064    
065            String delegationToken = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
066            if (delegationToken != null) {
067                sqoopConf.set("mapreduce.job.credentials.binary", delegationToken);
068                System.out.println("------------------------");
069                System.out.println("Setting env property for mapreduce.job.credentials.binary to: " + delegationToken);
070                System.out.println("------------------------");
071                System.setProperty("mapreduce.job.credentials.binary", delegationToken);
072            } else {
073                System.out.println("Non-Kerberos execution");
074            }
075    
076            return sqoopConf;
077        }
078    
079        public static Configuration setUpSqoopSite() throws Exception {
080            Configuration sqoopConf = initActionConf();
081    
082            // Write the action configuration out to sqoop-site.xml
083            OutputStream os = new FileOutputStream(SQOOP_SITE_CONF);
084            try {
085                sqoopConf.writeXml(os);
086            }
087            finally {
088                os.close();
089            }
090          
091            System.out.println();
092            System.out.println("Sqoop Configuration Properties:");
093            System.out.println("------------------------");
094            for (Map.Entry<String, String> entry : sqoopConf) {
095                System.out.println(entry.getKey() + "=" + entry.getValue());
096            }
097            System.out.flush();
098            System.out.println("------------------------");
099            System.out.println();
100            return sqoopConf;
101        }
102    
103        public static String setUpSqoopLog4J(Configuration sqoopConf) throws IOException {
104            //Logfile to capture job IDs
105            String hadoopJobId = System.getProperty("oozie.launcher.job.id");
106            if (hadoopJobId == null) {
107                throw new RuntimeException("Launcher Hadoop Job ID system property not set");
108            }
109    
110            String logFile = new File("sqoop-oozie-" + hadoopJobId + ".log").getAbsolutePath();
111    
112            Properties hadoopProps = new Properties();
113    
114            // Preparing log4j configuration
115            URL log4jFile = Thread.currentThread().getContextClassLoader().getResource("log4j.properties");
116            if (log4jFile != null) {
117                // getting hadoop log4j configuration
118                hadoopProps.load(log4jFile.openStream());
119            }
120    
121            String logLevel = sqoopConf.get("oozie.sqoop.log.level", "INFO");
122    
123            hadoopProps.setProperty("log4j.logger.org.apache.sqoop", logLevel + ", A");
124            hadoopProps.setProperty("log4j.appender.A", "org.apache.log4j.ConsoleAppender");
125            hadoopProps.setProperty("log4j.appender.A.layout", "org.apache.log4j.PatternLayout");
126            hadoopProps.setProperty("log4j.appender.A.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n");
127    
128            hadoopProps.setProperty("log4j.appender.jobid", "org.apache.log4j.FileAppender");
129            hadoopProps.setProperty("log4j.appender.jobid.file", logFile);
130            hadoopProps.setProperty("log4j.appender.jobid.layout", "org.apache.log4j.PatternLayout");
131            hadoopProps.setProperty("log4j.appender.jobid.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n");
132            hadoopProps.setProperty("log4j.logger.org.apache.hadoop.mapred", "INFO, jobid");
133            hadoopProps.setProperty("log4j.logger.org.apache.hadoop.mapreduce.Job", "INFO, jobid");
134    
135            String localProps = new File(SQOOP_LOG4J_PROPS).getAbsolutePath();
136            OutputStream os1 = new FileOutputStream(localProps);
137            try {
138                hadoopProps.store(os1, "");
139            }
140            finally {
141                os1.close();
142            }
143          
144            PropertyConfigurator.configure(SQOOP_LOG4J_PROPS);
145    
146            return logFile;
147        }
148    
149        protected void run(String[] args) throws Exception {
150            System.out.println();
151            System.out.println("Oozie Sqoop action configuration");
152            System.out.println("=================================================================");
153    
154            Configuration sqoopConf = setUpSqoopSite();
155            String logFile = setUpSqoopLog4J(sqoopConf);
156    
157            String[] sqoopArgs = MapReduceMain.getStrings(sqoopConf, SQOOP_ARGS);
158            if (sqoopArgs == null) {
159                throw new RuntimeException("Action Configuration does not have [" + SQOOP_ARGS + "] property");
160            }
161    
162            System.out.println("Sqoop command arguments :");
163            for (String arg : sqoopArgs) {
164                System.out.println("             " + arg);
165            }
166    
167            System.out.println("=================================================================");
168            System.out.println();
169            System.out.println(">>> Invoking Sqoop command line now >>>");
170            System.out.println();
171            System.out.flush();
172    
173            try {
174                runSqoopJob(sqoopArgs);
175            }
176            catch (SecurityException ex) {
177                if (LauncherSecurityManager.getExitInvoked()) {
178                    if (LauncherSecurityManager.getExitCode() != 0) {
179                        throw ex;
180                    }
181                }
182            }
183    
184            System.out.println();
185            System.out.println("<<< Invocation of Sqoop command completed <<<");
186            System.out.println();
187    
188            // harvesting and recording Hadoop Job IDs
189            Properties jobIds = getHadoopJobIds(logFile, SQOOP_JOB_IDS_PATTERNS);
190    
191            File file = new File(System.getProperty("oozie.action.output.properties"));
192            OutputStream os = new FileOutputStream(file);
193            try {
194                jobIds.store(os, "");
195            }
196            finally {
197                os.close();
198            }
199            System.out.println(" Hadoop Job IDs executed by Sqoop: " + jobIds.getProperty(HADOOP_JOBS));
200            System.out.println();
201        }
202    
203        protected void runSqoopJob(String[] args) throws Exception {
204            // running as from the command line
205            Sqoop.main(args);
206        }
207    
208        public static void setSqoopCommand(Configuration conf, String[] args) {
209            MapReduceMain.setStrings(conf, SQOOP_ARGS, args);
210        }
211    }