001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.action.hadoop;
020
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.fs.Path;
023import org.apache.hadoop.mapred.Counters;
024import org.apache.hadoop.mapred.JobClient;
025import org.apache.hadoop.mapred.JobConf;
026import org.apache.hadoop.mapred.JobID;
027import org.apache.hadoop.mapred.RunningJob;
028import org.apache.oozie.action.ActionExecutorException;
029import org.apache.oozie.client.WorkflowAction;
030import org.apache.oozie.util.XConfiguration;
031import org.apache.oozie.util.XLog;
032import org.apache.oozie.util.XmlUtils;
033import org.jdom.Element;
034import org.jdom.Namespace;
035
036import java.io.IOException;
037import java.io.StringReader;
038import java.util.ArrayList;
039import java.util.List;
040import java.util.StringTokenizer;
041
042public class SqoopActionExecutor extends JavaActionExecutor {
043
044  public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
045  private static final String SQOOP_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.SqoopMain";
046  static final String SQOOP_ARGS = "oozie.sqoop.args";
047
048    public SqoopActionExecutor() {
049        super("sqoop");
050    }
051
052    @Override
053    public List<Class> getLauncherClasses() {
054        List<Class> classes = new ArrayList<Class>();
055        try {
056            classes.add(Class.forName(SQOOP_MAIN_CLASS_NAME));
057        }
058        catch (ClassNotFoundException e) {
059            throw new RuntimeException("Class not found", e);
060        }
061        return classes;
062    }
063
064    @Override
065    protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
066        return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, SQOOP_MAIN_CLASS_NAME);
067    }
068
069    @Override
070    @SuppressWarnings("unchecked")
071    Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
072            throws ActionExecutorException {
073        super.setupActionConf(actionConf, context, actionXml, appPath);
074        Namespace ns = actionXml.getNamespace();
075
076        try {
077            Element e = actionXml.getChild("configuration", ns);
078            if (e != null) {
079                String strConf = XmlUtils.prettyPrint(e).toString();
080                XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
081                checkForDisallowedProps(inlineConf, "inline configuration");
082                XConfiguration.copy(inlineConf, actionConf);
083            }
084        } catch (IOException ex) {
085            throw convertException(ex);
086        }
087
088        String[] args;
089        if (actionXml.getChild("command", ns) != null) {
090            String command = actionXml.getChild("command", ns).getTextTrim();
091            StringTokenizer st = new StringTokenizer(command, " ");
092            List<String> l = new ArrayList<String>();
093            while (st.hasMoreTokens()) {
094                l.add(st.nextToken());
095            }
096            args = l.toArray(new String[l.size()]);
097        }
098        else {
099            List<Element> eArgs = (List<Element>) actionXml.getChildren("arg", ns);
100            args = new String[eArgs.size()];
101            for (int i = 0; i < eArgs.size(); i++) {
102                args[i] = eArgs.get(i).getTextTrim();
103            }
104        }
105
106        setSqoopCommand(actionConf, args);
107        return actionConf;
108    }
109
110    private void setSqoopCommand(Configuration conf, String[] args) {
111        MapReduceMain.setStrings(conf, SQOOP_ARGS, args);
112    }
113
114    /**
115     * We will gather counters from all executed action Hadoop jobs (e.g. jobs
116     * that moved data, not the launcher itself) and merge them together. There
117     * will be only one job most of the time. The only exception is
118     * import-all-table option that will execute one job per one exported table.
119     *
120     * @param context Action context
121     * @param action Workflow action
122     * @throws ActionExecutorException
123     */
124    @Override
125    public void end(Context context, WorkflowAction action) throws ActionExecutorException {
126        super.end(context, action);
127        JobClient jobClient = null;
128
129        boolean exception = false;
130        try {
131            if (action.getStatus() == WorkflowAction.Status.OK) {
132                Element actionXml = XmlUtils.parseXml(action.getConf());
133                JobConf jobConf = createBaseHadoopConf(context, actionXml);
134                jobClient = createJobClient(context, jobConf);
135
136                // Cumulative counters for all Sqoop mapreduce jobs
137                Counters counters = null;
138
139                // Sqoop do not have to create mapreduce job each time
140                String externalIds = action.getExternalChildIDs();
141                if (externalIds != null && !externalIds.trim().isEmpty()) {
142                    String []jobIds = externalIds.split(",");
143
144                    for(String jobId : jobIds) {
145                        RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
146                        if (runningJob == null) {
147                          throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "SQOOP001",
148                            "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", action
149                            .getExternalId(), action.getId());
150                        }
151
152                        Counters taskCounters = runningJob.getCounters();
153                        if(taskCounters != null) {
154                            if(counters == null) {
155                              counters = taskCounters;
156                            } else {
157                              counters.incrAllCounters(taskCounters);
158                            }
159                        } else {
160                          XLog.getLog(getClass()).warn("Could not find Hadoop Counters for job: [{0}]", jobId);
161                        }
162                    }
163                }
164
165                if (counters != null) {
166                    ActionStats stats = new MRStats(counters);
167                    String statsJsonString = stats.toJSON();
168                    context.setVar(MapReduceActionExecutor.HADOOP_COUNTERS, statsJsonString);
169
170                    // If action stats write property is set to false by user or
171                    // size of stats is greater than the maximum allowed size,
172                    // do not store the action stats
173                    if (Boolean.parseBoolean(evaluateConfigurationProperty(actionXml,
174                            OOZIE_ACTION_EXTERNAL_STATS_WRITE, "true"))
175                            && (statsJsonString.getBytes().length <= getMaxExternalStatsSize())) {
176                        context.setExecutionStats(statsJsonString);
177                        LOG.debug(
178                          "Printing stats for sqoop action as a JSON string : [{0}]", statsJsonString);
179                    }
180                } else {
181                    context.setVar(MapReduceActionExecutor.HADOOP_COUNTERS, "");
182                    XLog.getLog(getClass()).warn("Can't find any associated Hadoop job counters");
183                }
184            }
185        }
186        catch (Exception ex) {
187            exception = true;
188            throw convertException(ex);
189        }
190        finally {
191            if (jobClient != null) {
192                try {
193                    jobClient.close();
194                }
195                catch (Exception e) {
196                    if (exception) {
197                        LOG.error("JobClient error: ", e);
198                    }
199                    else {
200                        throw convertException(e);
201                    }
202                }
203            }
204        }
205    }
206
207    // Return the value of the specified configuration property
208    private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue)
209            throws ActionExecutorException {
210        try {
211            if (actionConf != null) {
212                Namespace ns = actionConf.getNamespace();
213                Element e = actionConf.getChild("configuration", ns);
214
215                if(e != null) {
216                  String strConf = XmlUtils.prettyPrint(e).toString();
217                  XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
218                  return inlineConf.get(key, defaultValue);
219                }
220            }
221            return defaultValue;
222        }
223        catch (IOException ex) {
224            throw convertException(ex);
225        }
226    }
227
228    /**
229     * Return the sharelib name for the action.
230     *
231     * @return returns <code>sqoop</code>.
232     * @param actionXml
233     */
234    @Override
235    protected String getDefaultShareLibName(Element actionXml) {
236        return "sqoop";
237    }
238
239}