This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.net.URISyntaxException;
023 import java.util.ArrayList;
024 import java.util.List;
025 import java.util.Properties;
026 import java.util.StringTokenizer;
027
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.hadoop.fs.FileSystem;
030 import org.apache.hadoop.fs.Path;
031 import org.apache.hadoop.mapred.Counters;
032 import org.apache.hadoop.mapred.JobClient;
033 import org.apache.hadoop.mapred.JobConf;
034 import org.apache.hadoop.mapred.JobID;
035 import org.apache.hadoop.mapred.RunningJob;
036 import org.apache.oozie.action.ActionExecutorException;
037 import org.apache.oozie.client.WorkflowAction;
038 import org.apache.oozie.service.HadoopAccessorException;
039 import org.apache.oozie.util.XConfiguration;
040 import org.apache.oozie.util.XmlUtils;
041 import org.apache.oozie.util.XLog;
042 import org.jdom.Element;
043 import org.jdom.JDOMException;
044 import org.jdom.Namespace;
045
046 public class SqoopActionExecutor extends JavaActionExecutor {
047
048 public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
049
050 public SqoopActionExecutor() {
051 super("sqoop");
052 }
053
054 @Override
055 protected List<Class> getLauncherClasses() {
056 List<Class> classes = super.getLauncherClasses();
057 classes.add(LauncherMain.class);
058 classes.add(MapReduceMain.class);
059 classes.add(HiveMain.class);
060 classes.add(SqoopMain.class);
061 return classes;
062 }
063
064 @Override
065 protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
066 return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, SqoopMain.class.getName());
067 }
068
069 @Override
070 @SuppressWarnings("unchecked")
071 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
072 throws ActionExecutorException {
073 super.setupActionConf(actionConf, context, actionXml, appPath);
074 Namespace ns = actionXml.getNamespace();
075
076 try {
077 Element e = actionXml.getChild("configuration", ns);
078 if (e != null) {
079 String strConf = XmlUtils.prettyPrint(e).toString();
080 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
081 checkForDisallowedProps(inlineConf, "inline configuration");
082 XConfiguration.copy(inlineConf, actionConf);
083 }
084 } catch (IOException ex) {
085 throw convertException(ex);
086 }
087
088 String[] args;
089 if (actionXml.getChild("command", ns) != null) {
090 String command = actionXml.getChild("command", ns).getTextTrim();
091 StringTokenizer st = new StringTokenizer(command, " ");
092 List<String> l = new ArrayList<String>();
093 while (st.hasMoreTokens()) {
094 l.add(st.nextToken());
095 }
096 args = l.toArray(new String[l.size()]);
097 }
098 else {
099 List<Element> eArgs = (List<Element>) actionXml.getChildren("arg", ns);
100 args = new String[eArgs.size()];
101 for (int i = 0; i < eArgs.size(); i++) {
102 args[i] = eArgs.get(i).getTextTrim();
103 }
104 }
105
106 SqoopMain.setSqoopCommand(actionConf, args);
107 return actionConf;
108 }
109
110 /**
111 * We will gather counters from all executed action Hadoop jobs (e.g. jobs
112 * that moved data, not the launcher itself) and merge them together. There
113 * will be only one job most of the time. The only exception is
114 * import-all-table option that will execute one job per one exported table.
115 *
116 * @param context Action context
117 * @param action Workflow action
118 * @throws ActionExecutorException
119 */
120 @Override
121 public void end(Context context, WorkflowAction action) throws ActionExecutorException {
122 super.end(context, action);
123 JobClient jobClient = null;
124
125 boolean exception = false;
126 try {
127 if (action.getStatus() == WorkflowAction.Status.OK) {
128 Element actionXml = XmlUtils.parseXml(action.getConf());
129 JobConf jobConf = createBaseHadoopConf(context, actionXml);
130 jobClient = createJobClient(context, jobConf);
131
132 // Cumulative counters for all Sqoop mapreduce jobs
133 Counters counters = null;
134
135 String externalIds = action.getExternalChildIDs();
136 String []jobIds = externalIds.split(",");
137
138 for(String jobId : jobIds) {
139 RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
140 if (runningJob == null) {
141 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "SQOOP001",
142 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", action
143 .getExternalId(), action.getId());
144 }
145
146 Counters taskCounters = runningJob.getCounters();
147 if(taskCounters != null) {
148 if(counters == null) {
149 counters = taskCounters;
150 } else {
151 counters.incrAllCounters(taskCounters);
152 }
153 } else {
154 XLog.getLog(getClass()).warn("Could not find Hadoop Counters for job: [{0}]", jobId);
155 }
156 }
157
158 if (counters != null) {
159 ActionStats stats = new MRStats(counters);
160 String statsJsonString = stats.toJSON();
161 context.setVar(MapReduceActionExecutor.HADOOP_COUNTERS, statsJsonString);
162
163 // If action stats write property is set to false by user or
164 // size of stats is greater than the maximum allowed size,
165 // do not store the action stats
166 if (Boolean.parseBoolean(evaluateConfigurationProperty(actionXml,
167 OOZIE_ACTION_EXTERNAL_STATS_WRITE, "true"))
168 && (statsJsonString.getBytes().length <= getMaxExternalStatsSize())) {
169 context.setExecutionStats(statsJsonString);
170 log.debug(
171 "Printing stats for sqoop action as a JSON string : [{0}]", statsJsonString);
172 }
173 } else {
174 context.setVar(MapReduceActionExecutor.HADOOP_COUNTERS, "");
175 XLog.getLog(getClass()).warn("Can't find any associated Hadoop job counters");
176 }
177 }
178 }
179 catch (Exception ex) {
180 exception = true;
181 throw convertException(ex);
182 }
183 finally {
184 if (jobClient != null) {
185 try {
186 jobClient.close();
187 }
188 catch (Exception e) {
189 if (exception) {
190 log.error("JobClient error: ", e);
191 }
192 else {
193 throw convertException(e);
194 }
195 }
196 }
197 }
198 }
199
200 // Return the value of the specified configuration property
201 private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue)
202 throws ActionExecutorException {
203 try {
204 if (actionConf != null) {
205 Namespace ns = actionConf.getNamespace();
206 Element e = actionConf.getChild("configuration", ns);
207
208 if(e != null) {
209 String strConf = XmlUtils.prettyPrint(e).toString();
210 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
211 return inlineConf.get(key, defaultValue);
212 }
213 }
214 return defaultValue;
215 }
216 catch (IOException ex) {
217 throw convertException(ex);
218 }
219 }
220
221 /**
222 * Get the stats and external child IDs
223 *
224 * @param actionFs the FileSystem object
225 * @param runningJob the runningJob
226 * @param action the Workflow action
227 * @param context executor context
228 *
229 */
230 @Override
231 protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
232 throws HadoopAccessorException, JDOMException, IOException, URISyntaxException{
233 super.getActionData(actionFs, runningJob, action, context);
234
235 // Load stored Hadoop jobs ids and promote them as external child ids
236 action.getData();
237 Properties props = new Properties();
238 props.load(new StringReader(action.getData()));
239 context.setExternalChildIDs((String)props.get(LauncherMain.HADOOP_JOBS));
240 }
241
242 @Override
243 protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
244 return true;
245 }
246
247
248 /**
249 * Return the sharelib name for the action.
250 *
251 * @return returns <code>sqoop</code>.
252 * @param actionXml
253 */
254 @Override
255 protected String getDefaultShareLibName(Element actionXml) {
256 return "sqoop";
257 }
258
259 }