001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.net.URISyntaxException;
023    import java.util.ArrayList;
024    import java.util.List;
025    import java.util.Properties;
026    import java.util.StringTokenizer;
027    
028    import org.apache.hadoop.conf.Configuration;
029    import org.apache.hadoop.fs.FileSystem;
030    import org.apache.hadoop.fs.Path;
031    import org.apache.hadoop.mapred.Counters;
032    import org.apache.hadoop.mapred.JobClient;
033    import org.apache.hadoop.mapred.JobConf;
034    import org.apache.hadoop.mapred.JobID;
035    import org.apache.hadoop.mapred.RunningJob;
036    import org.apache.oozie.action.ActionExecutorException;
037    import org.apache.oozie.client.WorkflowAction;
038    import org.apache.oozie.service.HadoopAccessorException;
039    import org.apache.oozie.util.XConfiguration;
040    import org.apache.oozie.util.XmlUtils;
041    import org.apache.oozie.util.XLog;
042    import org.jdom.Element;
043    import org.jdom.JDOMException;
044    import org.jdom.Namespace;
045    
046    public class SqoopActionExecutor extends JavaActionExecutor {
047    
048      public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
049    
050        public SqoopActionExecutor() {
051            super("sqoop");
052        }
053    
054        @Override
055        protected List<Class> getLauncherClasses() {
056            List<Class> classes = super.getLauncherClasses();
057            classes.add(LauncherMain.class);
058            classes.add(MapReduceMain.class);
059            classes.add(HiveMain.class);
060            classes.add(SqoopMain.class);
061            return classes;
062        }
063    
064        @Override
065        protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
066            return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, SqoopMain.class.getName());
067        }
068    
069        @Override
070        @SuppressWarnings("unchecked")
071        Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
072                throws ActionExecutorException {
073            super.setupActionConf(actionConf, context, actionXml, appPath);
074            Namespace ns = actionXml.getNamespace();
075    
076            try {
077                Element e = actionXml.getChild("configuration", ns);
078                if (e != null) {
079                    String strConf = XmlUtils.prettyPrint(e).toString();
080                    XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
081                    checkForDisallowedProps(inlineConf, "inline configuration");
082                    XConfiguration.copy(inlineConf, actionConf);
083                }
084            } catch (IOException ex) {
085                throw convertException(ex);
086            }
087    
088            String[] args;
089            if (actionXml.getChild("command", ns) != null) {
090                String command = actionXml.getChild("command", ns).getTextTrim();
091                StringTokenizer st = new StringTokenizer(command, " ");
092                List<String> l = new ArrayList<String>();
093                while (st.hasMoreTokens()) {
094                    l.add(st.nextToken());
095                }
096                args = l.toArray(new String[l.size()]);
097            }
098            else {
099                List<Element> eArgs = (List<Element>) actionXml.getChildren("arg", ns);
100                args = new String[eArgs.size()];
101                for (int i = 0; i < eArgs.size(); i++) {
102                    args[i] = eArgs.get(i).getTextTrim();
103                }
104            }
105    
106            SqoopMain.setSqoopCommand(actionConf, args);
107            return actionConf;
108        }
109    
110        /**
111         * We will gather counters from all executed action Hadoop jobs (e.g. jobs
112         * that moved data, not the launcher itself) and merge them together. There
113         * will be only one job most of the time. The only exception is
114         * import-all-table option that will execute one job per one exported table.
115         *
116         * @param context Action context
117         * @param action Workflow action
118         * @throws ActionExecutorException
119         */
120        @Override
121        public void end(Context context, WorkflowAction action) throws ActionExecutorException {
122            super.end(context, action);
123            JobClient jobClient = null;
124    
125            boolean exception = false;
126            try {
127                if (action.getStatus() == WorkflowAction.Status.OK) {
128                    Element actionXml = XmlUtils.parseXml(action.getConf());
129                    JobConf jobConf = createBaseHadoopConf(context, actionXml);
130                    jobClient = createJobClient(context, jobConf);
131    
132                    // Cumulative counters for all Sqoop mapreduce jobs
133                    Counters counters = null;
134    
135                    String externalIds = action.getExternalChildIDs();
136                    String []jobIds = externalIds.split(",");
137    
138                    for(String jobId : jobIds) {
139                        RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
140                        if (runningJob == null) {
141                          throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "SQOOP001",
142                            "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", action
143                            .getExternalId(), action.getId());
144                        }
145    
146                        Counters taskCounters = runningJob.getCounters();
147                        if(taskCounters != null) {
148                            if(counters == null) {
149                              counters = taskCounters;
150                            } else {
151                              counters.incrAllCounters(taskCounters);
152                            }
153                        } else {
154                          XLog.getLog(getClass()).warn("Could not find Hadoop Counters for job: [{0}]", jobId);
155                        }
156                    }
157    
158                    if (counters != null) {
159                        ActionStats stats = new MRStats(counters);
160                        String statsJsonString = stats.toJSON();
161                        context.setVar(MapReduceActionExecutor.HADOOP_COUNTERS, statsJsonString);
162    
163                        // If action stats write property is set to false by user or
164                        // size of stats is greater than the maximum allowed size,
165                        // do not store the action stats
166                        if (Boolean.parseBoolean(evaluateConfigurationProperty(actionXml,
167                                OOZIE_ACTION_EXTERNAL_STATS_WRITE, "true"))
168                                && (statsJsonString.getBytes().length <= getMaxExternalStatsSize())) {
169                            context.setExecutionStats(statsJsonString);
170                            log.debug(
171                              "Printing stats for sqoop action as a JSON string : [{0}]", statsJsonString);
172                        }
173                    } else {
174                        context.setVar(MapReduceActionExecutor.HADOOP_COUNTERS, "");
175                        XLog.getLog(getClass()).warn("Can't find any associated Hadoop job counters");
176                    }
177                }
178            }
179            catch (Exception ex) {
180                exception = true;
181                throw convertException(ex);
182            }
183            finally {
184                if (jobClient != null) {
185                    try {
186                        jobClient.close();
187                    }
188                    catch (Exception e) {
189                        if (exception) {
190                            log.error("JobClient error: ", e);
191                        }
192                        else {
193                            throw convertException(e);
194                        }
195                    }
196                }
197            }
198        }
199    
200        // Return the value of the specified configuration property
201        private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue)
202                throws ActionExecutorException {
203            try {
204                if (actionConf != null) {
205                    Namespace ns = actionConf.getNamespace();
206                    Element e = actionConf.getChild("configuration", ns);
207    
208                    if(e != null) {
209                      String strConf = XmlUtils.prettyPrint(e).toString();
210                      XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
211                      return inlineConf.get(key, defaultValue);
212                    }
213                }
214                return defaultValue;
215            }
216            catch (IOException ex) {
217                throw convertException(ex);
218            }
219        }
220    
221        /**
222         * Get the stats and external child IDs
223         *
224         * @param actionFs the FileSystem object
225         * @param runningJob the runningJob
226         * @param action the Workflow action
227         * @param context executor context
228         *
229         */
230        @Override
231        protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
232                throws HadoopAccessorException, JDOMException, IOException, URISyntaxException{
233            super.getActionData(actionFs, runningJob, action, context);
234    
235            // Load stored Hadoop jobs ids and promote them as external child ids
236            action.getData();
237            Properties props = new Properties();
238            props.load(new StringReader(action.getData()));
239            context.setExternalChildIDs((String)props.get(LauncherMain.HADOOP_JOBS));
240        }
241    
242        @Override
243        protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
244            return true;
245        }
246    
247    
248        /**
249         * Return the sharelib name for the action.
250         *
251         * @return returns <code>sqoop</code>.
252         * @param actionXml
253         */
254        @Override
255        protected String getDefaultShareLibName(Element actionXml) {
256            return "sqoop";
257        }
258    
259    }