001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.action.hadoop;
020
021import java.io.IOException;
022import java.io.StringReader;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.StringTokenizer;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.mapred.Counters;
030import org.apache.hadoop.mapred.JobClient;
031import org.apache.hadoop.mapred.JobID;
032import org.apache.hadoop.mapred.RunningJob;
033import org.apache.oozie.action.ActionExecutorException;
034import org.apache.oozie.client.WorkflowAction;
035import org.apache.oozie.util.XConfiguration;
036import org.apache.oozie.util.XLog;
037import org.apache.oozie.util.XmlUtils;
038import org.jdom.Element;
039import org.jdom.Namespace;
040
041public class SqoopActionExecutor extends JavaActionExecutor {
042
043  public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
044  private static final String SQOOP_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.SqoopMain";
045  static final String SQOOP_ARGS = "oozie.sqoop.args";
046  private static final String SQOOP = "sqoop";
047
048    public SqoopActionExecutor() {
049        super(SQOOP);
050    }
051
052    @Override
053    public List<Class<?>> getLauncherClasses() {
054        List<Class<?>> classes = new ArrayList<Class<?>>();
055        try {
056            classes.add(Class.forName(SQOOP_MAIN_CLASS_NAME));
057        }
058        catch (ClassNotFoundException e) {
059            throw new RuntimeException("Class not found", e);
060        }
061        return classes;
062    }
063
064    @Override
065    protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
066        return launcherConf.get(LauncherAMUtils.CONF_OOZIE_ACTION_MAIN_CLASS, SQOOP_MAIN_CLASS_NAME);
067    }
068
069    @Override
070    Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
071            throws ActionExecutorException {
072        super.setupActionConf(actionConf, context, actionXml, appPath);
073        Namespace ns = actionXml.getNamespace();
074
075        try {
076            Element e = actionXml.getChild("configuration", ns);
077            if (e != null) {
078                String strConf = XmlUtils.prettyPrint(e).toString();
079                XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
080                XConfiguration.copy(inlineConf, actionConf);
081                checkForDisallowedProps(inlineConf, "inline configuration");
082            }
083        } catch (IOException ex) {
084            throw convertException(ex);
085        }
086
087        final List<String> argList = new ArrayList<>();
088        // Build a list of arguments from either a tokenized <command> string or a list of <arg>
089        if (actionXml.getChild("command", ns) != null) {
090            String command = actionXml.getChild("command", ns).getTextTrim();
091            StringTokenizer st = new StringTokenizer(command, " ");
092            while (st.hasMoreTokens()) {
093                argList.add(st.nextToken());
094            }
095        }
096        else {
097            @SuppressWarnings("unchecked")
098            List<Element> eArgs = (List<Element>) actionXml.getChildren("arg", ns);
099            for (Element elem : eArgs) {
100                argList.add(elem.getTextTrim());
101            }
102        }
103        // If the command is given accidentally as "sqoop import --option"
104        // instead of "import --option" we can make a user's life easier
105        // by removing away the unnecessary "sqoop" token.
106        // However, we do not do this if the command looks like
107        // "sqoop --option", as that's entirely invalid.
108        if (argList.size() > 1 &&
109                argList.get(0).equalsIgnoreCase(SQOOP) &&
110                !argList.get(1).startsWith("-")) {
111            XLog.getLog(getClass()).info(
112                    "Found a redundant 'sqoop' prefixing the command. Removing it.");
113            argList.remove(0);
114        }
115
116        setSqoopCommand(actionConf, argList.toArray(new String[argList.size()]));
117        return actionConf;
118    }
119
120    private void setSqoopCommand(Configuration conf, String[] args) {
121        ActionUtils.setStrings(conf, SQOOP_ARGS, args);
122    }
123
124    /**
125     * We will gather counters from all executed action Hadoop jobs (e.g. jobs
126     * that moved data, not the launcher itself) and merge them together. There
127     * will be only one job most of the time. The only exception is
128     * import-all-table option that will execute one job per one exported table.
129     *
130     * @param context Action context
131     * @param action Workflow action
132     * @throws ActionExecutorException
133     */
134    @Override
135    public void end(Context context, WorkflowAction action) throws ActionExecutorException {
136        super.end(context, action);
137        JobClient jobClient = null;
138
139        boolean exception = false;
140        try {
141            if (action.getStatus() == WorkflowAction.Status.OK) {
142                Element actionXml = XmlUtils.parseXml(action.getConf());
143                Configuration jobConf = createBaseHadoopConf(context, actionXml);
144                jobClient = createJobClient(context, jobConf);
145
146                // Cumulative counters for all Sqoop mapreduce jobs
147                Counters counters = null;
148
149                // Sqoop do not have to create mapreduce job each time
150                String externalIds = action.getExternalChildIDs();
151                if (externalIds != null && !externalIds.trim().isEmpty()) {
152                    String []jobIds = externalIds.split(",");
153
154                    for(String jobId : jobIds) {
155                        RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
156                        if (runningJob == null) {
157                          throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "SQOOP001",
158                            "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", action
159                            .getExternalId(), action.getId());
160                        }
161
162                        Counters taskCounters = runningJob.getCounters();
163                        if(taskCounters != null) {
164                            if(counters == null) {
165                              counters = taskCounters;
166                            } else {
167                              counters.incrAllCounters(taskCounters);
168                            }
169                        } else {
170                          XLog.getLog(getClass()).warn("Could not find Hadoop Counters for job: [{0}]", jobId);
171                        }
172                    }
173                }
174
175                if (counters != null) {
176                    ActionStats stats = new MRStats(counters);
177                    String statsJsonString = stats.toJSON();
178                    context.setVar(MapReduceActionExecutor.HADOOP_COUNTERS, statsJsonString);
179
180                    // If action stats write property is set to false by user or
181                    // size of stats is greater than the maximum allowed size,
182                    // do not store the action stats
183                    if (Boolean.parseBoolean(evaluateConfigurationProperty(actionXml,
184                            OOZIE_ACTION_EXTERNAL_STATS_WRITE, "true"))
185                            && (statsJsonString.getBytes().length <= getMaxExternalStatsSize())) {
186                        context.setExecutionStats(statsJsonString);
187                        LOG.debug(
188                          "Printing stats for sqoop action as a JSON string : [{0}]", statsJsonString);
189                    }
190                } else {
191                    context.setVar(MapReduceActionExecutor.HADOOP_COUNTERS, "");
192                    XLog.getLog(getClass()).warn("Can't find any associated Hadoop job counters");
193                }
194            }
195        }
196        catch (Exception ex) {
197            exception = true;
198            throw convertException(ex);
199        }
200        finally {
201            if (jobClient != null) {
202                try {
203                    jobClient.close();
204                }
205                catch (Exception e) {
206                    if (exception) {
207                        LOG.error("JobClient error: ", e);
208                    }
209                    else {
210                        throw convertException(e);
211                    }
212                }
213            }
214        }
215    }
216
217    // Return the value of the specified configuration property
218    private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue)
219            throws ActionExecutorException {
220        try {
221            if (actionConf != null) {
222                Namespace ns = actionConf.getNamespace();
223                Element e = actionConf.getChild("configuration", ns);
224
225                if(e != null) {
226                  String strConf = XmlUtils.prettyPrint(e).toString();
227                  XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
228                  return inlineConf.get(key, defaultValue);
229                }
230            }
231            return defaultValue;
232        }
233        catch (IOException ex) {
234            throw convertException(ex);
235        }
236    }
237
238    /**
239     * Return the sharelib name for the action.
240     *
241     * @return returns <code>sqoop</code>.
242     * @param actionXml
243     */
244    @Override
245    protected String getDefaultShareLibName(Element actionXml) {
246        return "sqoop";
247    }
248
249}