001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.net.URISyntaxException;
023    import java.util.ArrayList;
024    import java.util.List;
025    import java.util.Properties;
026    import java.util.StringTokenizer;
027    
028    import org.apache.hadoop.conf.Configuration;
029    import org.apache.hadoop.fs.FileSystem;
030    import org.apache.hadoop.fs.Path;
031    import org.apache.hadoop.mapred.Counters;
032    import org.apache.hadoop.mapred.JobClient;
033    import org.apache.hadoop.mapred.JobConf;
034    import org.apache.hadoop.mapred.JobID;
035    import org.apache.hadoop.mapred.RunningJob;
036    import org.apache.oozie.action.ActionExecutorException;
037    import org.apache.oozie.client.WorkflowAction;
038    import org.apache.oozie.service.HadoopAccessorException;
039    import org.apache.oozie.util.XConfiguration;
040    import org.apache.oozie.util.XmlUtils;
041    import org.apache.oozie.util.XLog;
042    import org.jdom.Element;
043    import org.jdom.JDOMException;
044    import org.jdom.Namespace;
045    
046    public class SqoopActionExecutor extends JavaActionExecutor {
047    
048      public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
049      private static final String SQOOP_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.SqoopMain";
050      static final String SQOOP_ARGS = "oozie.sqoop.args";
051    
052        public SqoopActionExecutor() {
053            super("sqoop");
054        }
055    
056        @Override
057        protected List<Class> getLauncherClasses() {
058            List<Class> classes = super.getLauncherClasses();
059            classes.add(LauncherMain.class);
060            classes.add(MapReduceMain.class);
061            try {
062                classes.add(Class.forName(SQOOP_MAIN_CLASS_NAME));
063            }
064            catch (ClassNotFoundException e) {
065                throw new RuntimeException("Class not found", e);
066            }
067            return classes;
068        }
069    
070        @Override
071        protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
072            return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, SQOOP_MAIN_CLASS_NAME);
073        }
074    
075        @Override
076        @SuppressWarnings("unchecked")
077        Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
078                throws ActionExecutorException {
079            super.setupActionConf(actionConf, context, actionXml, appPath);
080            Namespace ns = actionXml.getNamespace();
081    
082            try {
083                Element e = actionXml.getChild("configuration", ns);
084                if (e != null) {
085                    String strConf = XmlUtils.prettyPrint(e).toString();
086                    XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
087                    checkForDisallowedProps(inlineConf, "inline configuration");
088                    XConfiguration.copy(inlineConf, actionConf);
089                }
090            } catch (IOException ex) {
091                throw convertException(ex);
092            }
093    
094            String[] args;
095            if (actionXml.getChild("command", ns) != null) {
096                String command = actionXml.getChild("command", ns).getTextTrim();
097                StringTokenizer st = new StringTokenizer(command, " ");
098                List<String> l = new ArrayList<String>();
099                while (st.hasMoreTokens()) {
100                    l.add(st.nextToken());
101                }
102                args = l.toArray(new String[l.size()]);
103            }
104            else {
105                List<Element> eArgs = (List<Element>) actionXml.getChildren("arg", ns);
106                args = new String[eArgs.size()];
107                for (int i = 0; i < eArgs.size(); i++) {
108                    args[i] = eArgs.get(i).getTextTrim();
109                }
110            }
111    
112            setSqoopCommand(actionConf, args);
113            return actionConf;
114        }
115    
116        private void setSqoopCommand(Configuration conf, String[] args) {
117            MapReduceMain.setStrings(conf, SQOOP_ARGS, args);
118        }
119    
120        /**
121         * We will gather counters from all executed action Hadoop jobs (e.g. jobs
122         * that moved data, not the launcher itself) and merge them together. There
123         * will be only one job most of the time. The only exception is
124         * import-all-table option that will execute one job per one exported table.
125         *
126         * @param context Action context
127         * @param action Workflow action
128         * @throws ActionExecutorException
129         */
130        @Override
131        public void end(Context context, WorkflowAction action) throws ActionExecutorException {
132            super.end(context, action);
133            JobClient jobClient = null;
134    
135            boolean exception = false;
136            try {
137                if (action.getStatus() == WorkflowAction.Status.OK) {
138                    Element actionXml = XmlUtils.parseXml(action.getConf());
139                    JobConf jobConf = createBaseHadoopConf(context, actionXml);
140                    jobClient = createJobClient(context, jobConf);
141    
142                    // Cumulative counters for all Sqoop mapreduce jobs
143                    Counters counters = null;
144    
145                    String externalIds = action.getExternalChildIDs();
146                    String []jobIds = externalIds.split(",");
147    
148                    for(String jobId : jobIds) {
149                        RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
150                        if (runningJob == null) {
151                          throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "SQOOP001",
152                            "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", action
153                            .getExternalId(), action.getId());
154                        }
155    
156                        Counters taskCounters = runningJob.getCounters();
157                        if(taskCounters != null) {
158                            if(counters == null) {
159                              counters = taskCounters;
160                            } else {
161                              counters.incrAllCounters(taskCounters);
162                            }
163                        } else {
164                          XLog.getLog(getClass()).warn("Could not find Hadoop Counters for job: [{0}]", jobId);
165                        }
166                    }
167    
168                    if (counters != null) {
169                        ActionStats stats = new MRStats(counters);
170                        String statsJsonString = stats.toJSON();
171                        context.setVar(MapReduceActionExecutor.HADOOP_COUNTERS, statsJsonString);
172    
173                        // If action stats write property is set to false by user or
174                        // size of stats is greater than the maximum allowed size,
175                        // do not store the action stats
176                        if (Boolean.parseBoolean(evaluateConfigurationProperty(actionXml,
177                                OOZIE_ACTION_EXTERNAL_STATS_WRITE, "true"))
178                                && (statsJsonString.getBytes().length <= getMaxExternalStatsSize())) {
179                            context.setExecutionStats(statsJsonString);
180                            log.debug(
181                              "Printing stats for sqoop action as a JSON string : [{0}]", statsJsonString);
182                        }
183                    } else {
184                        context.setVar(MapReduceActionExecutor.HADOOP_COUNTERS, "");
185                        XLog.getLog(getClass()).warn("Can't find any associated Hadoop job counters");
186                    }
187                }
188            }
189            catch (Exception ex) {
190                exception = true;
191                throw convertException(ex);
192            }
193            finally {
194                if (jobClient != null) {
195                    try {
196                        jobClient.close();
197                    }
198                    catch (Exception e) {
199                        if (exception) {
200                            log.error("JobClient error: ", e);
201                        }
202                        else {
203                            throw convertException(e);
204                        }
205                    }
206                }
207            }
208        }
209    
210        // Return the value of the specified configuration property
211        private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue)
212                throws ActionExecutorException {
213            try {
214                if (actionConf != null) {
215                    Namespace ns = actionConf.getNamespace();
216                    Element e = actionConf.getChild("configuration", ns);
217    
218                    if(e != null) {
219                      String strConf = XmlUtils.prettyPrint(e).toString();
220                      XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
221                      return inlineConf.get(key, defaultValue);
222                    }
223                }
224                return defaultValue;
225            }
226            catch (IOException ex) {
227                throw convertException(ex);
228            }
229        }
230    
231        /**
232         * Get the stats and external child IDs
233         *
234         * @param actionFs the FileSystem object
235         * @param runningJob the runningJob
236         * @param action the Workflow action
237         * @param context executor context
238         *
239         */
240        @Override
241        protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
242                throws HadoopAccessorException, JDOMException, IOException, URISyntaxException{
243            super.getActionData(actionFs, runningJob, action, context);
244    
245            // Load stored Hadoop jobs ids and promote them as external child ids
246            action.getData();
247            Properties props = new Properties();
248            props.load(new StringReader(action.getData()));
249            context.setExternalChildIDs((String)props.get(LauncherMain.HADOOP_JOBS));
250        }
251    
252        @Override
253        protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
254            return true;
255        }
256    
257    
258        /**
259         * Return the sharelib name for the action.
260         *
261         * @return returns <code>sqoop</code>.
262         * @param actionXml
263         */
264        @Override
265        protected String getDefaultShareLibName(Element actionXml) {
266            return "sqoop";
267        }
268    
269    }