This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.net.URISyntaxException;
023 import java.util.ArrayList;
024 import java.util.List;
025 import java.util.Properties;
026 import java.util.StringTokenizer;
027
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.hadoop.fs.FileSystem;
030 import org.apache.hadoop.fs.Path;
031 import org.apache.hadoop.mapred.Counters;
032 import org.apache.hadoop.mapred.JobClient;
033 import org.apache.hadoop.mapred.JobConf;
034 import org.apache.hadoop.mapred.JobID;
035 import org.apache.hadoop.mapred.RunningJob;
036 import org.apache.oozie.action.ActionExecutorException;
037 import org.apache.oozie.client.WorkflowAction;
038 import org.apache.oozie.service.HadoopAccessorException;
039 import org.apache.oozie.util.XConfiguration;
040 import org.apache.oozie.util.XmlUtils;
041 import org.apache.oozie.util.XLog;
042 import org.jdom.Element;
043 import org.jdom.JDOMException;
044 import org.jdom.Namespace;
045
046 public class SqoopActionExecutor extends JavaActionExecutor {
047
048 public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
049 private static final String SQOOP_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.SqoopMain";
050 static final String SQOOP_ARGS = "oozie.sqoop.args";
051
052 public SqoopActionExecutor() {
053 super("sqoop");
054 }
055
056 @Override
057 protected List<Class> getLauncherClasses() {
058 List<Class> classes = super.getLauncherClasses();
059 classes.add(LauncherMain.class);
060 classes.add(MapReduceMain.class);
061 try {
062 classes.add(Class.forName(SQOOP_MAIN_CLASS_NAME));
063 }
064 catch (ClassNotFoundException e) {
065 throw new RuntimeException("Class not found", e);
066 }
067 return classes;
068 }
069
070 @Override
071 protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
072 return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, SQOOP_MAIN_CLASS_NAME);
073 }
074
075 @Override
076 @SuppressWarnings("unchecked")
077 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
078 throws ActionExecutorException {
079 super.setupActionConf(actionConf, context, actionXml, appPath);
080 Namespace ns = actionXml.getNamespace();
081
082 try {
083 Element e = actionXml.getChild("configuration", ns);
084 if (e != null) {
085 String strConf = XmlUtils.prettyPrint(e).toString();
086 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
087 checkForDisallowedProps(inlineConf, "inline configuration");
088 XConfiguration.copy(inlineConf, actionConf);
089 }
090 } catch (IOException ex) {
091 throw convertException(ex);
092 }
093
094 String[] args;
095 if (actionXml.getChild("command", ns) != null) {
096 String command = actionXml.getChild("command", ns).getTextTrim();
097 StringTokenizer st = new StringTokenizer(command, " ");
098 List<String> l = new ArrayList<String>();
099 while (st.hasMoreTokens()) {
100 l.add(st.nextToken());
101 }
102 args = l.toArray(new String[l.size()]);
103 }
104 else {
105 List<Element> eArgs = (List<Element>) actionXml.getChildren("arg", ns);
106 args = new String[eArgs.size()];
107 for (int i = 0; i < eArgs.size(); i++) {
108 args[i] = eArgs.get(i).getTextTrim();
109 }
110 }
111
112 setSqoopCommand(actionConf, args);
113 return actionConf;
114 }
115
116 private void setSqoopCommand(Configuration conf, String[] args) {
117 MapReduceMain.setStrings(conf, SQOOP_ARGS, args);
118 }
119
120 /**
121 * We will gather counters from all executed action Hadoop jobs (e.g. jobs
122 * that moved data, not the launcher itself) and merge them together. There
123 * will be only one job most of the time. The only exception is
124 * import-all-table option that will execute one job per one exported table.
125 *
126 * @param context Action context
127 * @param action Workflow action
128 * @throws ActionExecutorException
129 */
130 @Override
131 public void end(Context context, WorkflowAction action) throws ActionExecutorException {
132 super.end(context, action);
133 JobClient jobClient = null;
134
135 boolean exception = false;
136 try {
137 if (action.getStatus() == WorkflowAction.Status.OK) {
138 Element actionXml = XmlUtils.parseXml(action.getConf());
139 JobConf jobConf = createBaseHadoopConf(context, actionXml);
140 jobClient = createJobClient(context, jobConf);
141
142 // Cumulative counters for all Sqoop mapreduce jobs
143 Counters counters = null;
144
145 String externalIds = action.getExternalChildIDs();
146 String []jobIds = externalIds.split(",");
147
148 for(String jobId : jobIds) {
149 RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
150 if (runningJob == null) {
151 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "SQOOP001",
152 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", action
153 .getExternalId(), action.getId());
154 }
155
156 Counters taskCounters = runningJob.getCounters();
157 if(taskCounters != null) {
158 if(counters == null) {
159 counters = taskCounters;
160 } else {
161 counters.incrAllCounters(taskCounters);
162 }
163 } else {
164 XLog.getLog(getClass()).warn("Could not find Hadoop Counters for job: [{0}]", jobId);
165 }
166 }
167
168 if (counters != null) {
169 ActionStats stats = new MRStats(counters);
170 String statsJsonString = stats.toJSON();
171 context.setVar(MapReduceActionExecutor.HADOOP_COUNTERS, statsJsonString);
172
173 // If action stats write property is set to false by user or
174 // size of stats is greater than the maximum allowed size,
175 // do not store the action stats
176 if (Boolean.parseBoolean(evaluateConfigurationProperty(actionXml,
177 OOZIE_ACTION_EXTERNAL_STATS_WRITE, "true"))
178 && (statsJsonString.getBytes().length <= getMaxExternalStatsSize())) {
179 context.setExecutionStats(statsJsonString);
180 log.debug(
181 "Printing stats for sqoop action as a JSON string : [{0}]", statsJsonString);
182 }
183 } else {
184 context.setVar(MapReduceActionExecutor.HADOOP_COUNTERS, "");
185 XLog.getLog(getClass()).warn("Can't find any associated Hadoop job counters");
186 }
187 }
188 }
189 catch (Exception ex) {
190 exception = true;
191 throw convertException(ex);
192 }
193 finally {
194 if (jobClient != null) {
195 try {
196 jobClient.close();
197 }
198 catch (Exception e) {
199 if (exception) {
200 log.error("JobClient error: ", e);
201 }
202 else {
203 throw convertException(e);
204 }
205 }
206 }
207 }
208 }
209
210 // Return the value of the specified configuration property
211 private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue)
212 throws ActionExecutorException {
213 try {
214 if (actionConf != null) {
215 Namespace ns = actionConf.getNamespace();
216 Element e = actionConf.getChild("configuration", ns);
217
218 if(e != null) {
219 String strConf = XmlUtils.prettyPrint(e).toString();
220 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
221 return inlineConf.get(key, defaultValue);
222 }
223 }
224 return defaultValue;
225 }
226 catch (IOException ex) {
227 throw convertException(ex);
228 }
229 }
230
231 /**
232 * Get the stats and external child IDs
233 *
234 * @param actionFs the FileSystem object
235 * @param runningJob the runningJob
236 * @param action the Workflow action
237 * @param context executor context
238 *
239 */
240 @Override
241 protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
242 throws HadoopAccessorException, JDOMException, IOException, URISyntaxException{
243 super.getActionData(actionFs, runningJob, action, context);
244
245 // Load stored Hadoop jobs ids and promote them as external child ids
246 action.getData();
247 Properties props = new Properties();
248 props.load(new StringReader(action.getData()));
249 context.setExternalChildIDs((String)props.get(LauncherMain.HADOOP_JOBS));
250 }
251
252 @Override
253 protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
254 return true;
255 }
256
257
258 /**
259 * Return the sharelib name for the action.
260 *
261 * @return returns <code>sqoop</code>.
262 * @param actionXml
263 */
264 @Override
265 protected String getDefaultShareLibName(Element actionXml) {
266 return "sqoop";
267 }
268
269 }