001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.net.URISyntaxException; 023 import java.util.ArrayList; 024 import java.util.List; 025 import java.util.Properties; 026 import java.util.StringTokenizer; 027 028 import org.apache.hadoop.conf.Configuration; 029 import org.apache.hadoop.fs.FileSystem; 030 import org.apache.hadoop.fs.Path; 031 import org.apache.hadoop.mapred.Counters; 032 import org.apache.hadoop.mapred.JobClient; 033 import org.apache.hadoop.mapred.JobConf; 034 import org.apache.hadoop.mapred.JobID; 035 import org.apache.hadoop.mapred.RunningJob; 036 import org.apache.oozie.action.ActionExecutorException; 037 import org.apache.oozie.client.WorkflowAction; 038 import org.apache.oozie.service.HadoopAccessorException; 039 import org.apache.oozie.util.XConfiguration; 040 import org.apache.oozie.util.XmlUtils; 041 import org.apache.oozie.util.XLog; 042 import org.jdom.Element; 043 import org.jdom.JDOMException; 044 import org.jdom.Namespace; 045 046 public class SqoopActionExecutor extends JavaActionExecutor { 047 048 public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write"; 049 private static final String SQOOP_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.SqoopMain"; 050 static final String SQOOP_ARGS = "oozie.sqoop.args"; 051 052 public SqoopActionExecutor() { 053 super("sqoop"); 054 } 055 056 @Override 057 protected List<Class> getLauncherClasses() { 058 List<Class> classes = super.getLauncherClasses(); 059 classes.add(LauncherMain.class); 060 classes.add(MapReduceMain.class); 061 try { 062 classes.add(Class.forName(SQOOP_MAIN_CLASS_NAME)); 063 } 064 catch (ClassNotFoundException e) { 065 throw new RuntimeException("Class not found", e); 066 } 067 return classes; 068 } 069 070 @Override 071 protected String getLauncherMain(Configuration launcherConf, Element actionXml) { 072 return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, SQOOP_MAIN_CLASS_NAME); 073 } 074 075 @Override 076 @SuppressWarnings("unchecked") 077 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) 078 throws ActionExecutorException { 079 super.setupActionConf(actionConf, context, actionXml, appPath); 080 Namespace ns = actionXml.getNamespace(); 081 082 try { 083 Element e = actionXml.getChild("configuration", ns); 084 if (e != null) { 085 String strConf = XmlUtils.prettyPrint(e).toString(); 086 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); 087 checkForDisallowedProps(inlineConf, "inline configuration"); 088 XConfiguration.copy(inlineConf, actionConf); 089 } 090 } catch (IOException ex) { 091 throw convertException(ex); 092 } 093 094 String[] args; 095 if (actionXml.getChild("command", ns) != null) { 096 String command = actionXml.getChild("command", ns).getTextTrim(); 097 StringTokenizer st = new StringTokenizer(command, " "); 098 List<String> l = new ArrayList<String>(); 099 while (st.hasMoreTokens()) { 100 l.add(st.nextToken()); 101 } 102 args = l.toArray(new String[l.size()]); 103 } 104 else { 105 List<Element> eArgs = (List<Element>) actionXml.getChildren("arg", ns); 106 args = new String[eArgs.size()]; 107 for (int i = 0; i < eArgs.size(); i++) { 108 args[i] = eArgs.get(i).getTextTrim(); 109 } 110 } 111 112 setSqoopCommand(actionConf, args); 113 return actionConf; 114 } 115 116 private void setSqoopCommand(Configuration conf, String[] args) { 117 MapReduceMain.setStrings(conf, SQOOP_ARGS, args); 118 } 119 120 /** 121 * We will gather counters from all executed action Hadoop jobs (e.g. jobs 122 * that moved data, not the launcher itself) and merge them together. There 123 * will be only one job most of the time. The only exception is 124 * import-all-table option that will execute one job per one exported table. 125 * 126 * @param context Action context 127 * @param action Workflow action 128 * @throws ActionExecutorException 129 */ 130 @Override 131 public void end(Context context, WorkflowAction action) throws ActionExecutorException { 132 super.end(context, action); 133 JobClient jobClient = null; 134 135 boolean exception = false; 136 try { 137 if (action.getStatus() == WorkflowAction.Status.OK) { 138 Element actionXml = XmlUtils.parseXml(action.getConf()); 139 JobConf jobConf = createBaseHadoopConf(context, actionXml); 140 jobClient = createJobClient(context, jobConf); 141 142 // Cumulative counters for all Sqoop mapreduce jobs 143 Counters counters = null; 144 145 String externalIds = action.getExternalChildIDs(); 146 String []jobIds = externalIds.split(","); 147 148 for(String jobId : jobIds) { 149 RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); 150 if (runningJob == null) { 151 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "SQOOP001", 152 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", action 153 .getExternalId(), action.getId()); 154 } 155 156 Counters taskCounters = runningJob.getCounters(); 157 if(taskCounters != null) { 158 if(counters == null) { 159 counters = taskCounters; 160 } else { 161 counters.incrAllCounters(taskCounters); 162 } 163 } else { 164 XLog.getLog(getClass()).warn("Could not find Hadoop Counters for job: [{0}]", jobId); 165 } 166 } 167 168 if (counters != null) { 169 ActionStats stats = new MRStats(counters); 170 String statsJsonString = stats.toJSON(); 171 context.setVar(MapReduceActionExecutor.HADOOP_COUNTERS, statsJsonString); 172 173 // If action stats write property is set to false by user or 174 // size of stats is greater than the maximum allowed size, 175 // do not store the action stats 176 if (Boolean.parseBoolean(evaluateConfigurationProperty(actionXml, 177 OOZIE_ACTION_EXTERNAL_STATS_WRITE, "true")) 178 && (statsJsonString.getBytes().length <= getMaxExternalStatsSize())) { 179 context.setExecutionStats(statsJsonString); 180 log.debug( 181 "Printing stats for sqoop action as a JSON string : [{0}]", statsJsonString); 182 } 183 } else { 184 context.setVar(MapReduceActionExecutor.HADOOP_COUNTERS, ""); 185 XLog.getLog(getClass()).warn("Can't find any associated Hadoop job counters"); 186 } 187 } 188 } 189 catch (Exception ex) { 190 exception = true; 191 throw convertException(ex); 192 } 193 finally { 194 if (jobClient != null) { 195 try { 196 jobClient.close(); 197 } 198 catch (Exception e) { 199 if (exception) { 200 log.error("JobClient error: ", e); 201 } 202 else { 203 throw convertException(e); 204 } 205 } 206 } 207 } 208 } 209 210 // Return the value of the specified configuration property 211 private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue) 212 throws ActionExecutorException { 213 try { 214 if (actionConf != null) { 215 Namespace ns = actionConf.getNamespace(); 216 Element e = actionConf.getChild("configuration", ns); 217 218 if(e != null) { 219 String strConf = XmlUtils.prettyPrint(e).toString(); 220 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); 221 return inlineConf.get(key, defaultValue); 222 } 223 } 224 return defaultValue; 225 } 226 catch (IOException ex) { 227 throw convertException(ex); 228 } 229 } 230 231 /** 232 * Get the stats and external child IDs 233 * 234 * @param actionFs the FileSystem object 235 * @param runningJob the runningJob 236 * @param action the Workflow action 237 * @param context executor context 238 * 239 */ 240 @Override 241 protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) 242 throws HadoopAccessorException, JDOMException, IOException, URISyntaxException{ 243 super.getActionData(actionFs, runningJob, action, context); 244 245 // Load stored Hadoop jobs ids and promote them as external child ids 246 action.getData(); 247 Properties props = new Properties(); 248 props.load(new StringReader(action.getData())); 249 context.setExternalChildIDs((String)props.get(LauncherMain.HADOOP_JOBS)); 250 } 251 252 @Override 253 protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException { 254 return true; 255 } 256 257 258 /** 259 * Return the sharelib name for the action. 260 * 261 * @return returns <code>sqoop</code>. 262 * @param actionXml 263 */ 264 @Override 265 protected String getDefaultShareLibName(Element actionXml) { 266 return "sqoop"; 267 } 268 269 }