001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.action.hadoop; 020 021import java.io.IOException; 022import java.io.StringReader; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.StringTokenizer; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.mapred.Counters; 030import org.apache.hadoop.mapred.JobClient; 031import org.apache.hadoop.mapred.JobID; 032import org.apache.hadoop.mapred.RunningJob; 033import org.apache.oozie.action.ActionExecutorException; 034import org.apache.oozie.client.WorkflowAction; 035import org.apache.oozie.util.XConfiguration; 036import org.apache.oozie.util.XLog; 037import org.apache.oozie.util.XmlUtils; 038import org.jdom.Element; 039import org.jdom.Namespace; 040 041public class SqoopActionExecutor extends JavaActionExecutor { 042 043 public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write"; 044 private static final String SQOOP_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.SqoopMain"; 045 static final String SQOOP_ARGS = "oozie.sqoop.args"; 046 private static final String SQOOP = "sqoop"; 047 048 public SqoopActionExecutor() { 049 super(SQOOP); 050 } 051 052 @Override 053 public List<Class<?>> getLauncherClasses() { 054 List<Class<?>> classes = new ArrayList<Class<?>>(); 055 try { 056 classes.add(Class.forName(SQOOP_MAIN_CLASS_NAME)); 057 } 058 catch (ClassNotFoundException e) { 059 throw new RuntimeException("Class not found", e); 060 } 061 return classes; 062 } 063 064 @Override 065 protected String getLauncherMain(Configuration launcherConf, Element actionXml) { 066 return launcherConf.get(LauncherAMUtils.CONF_OOZIE_ACTION_MAIN_CLASS, SQOOP_MAIN_CLASS_NAME); 067 } 068 069 @Override 070 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) 071 throws ActionExecutorException { 072 super.setupActionConf(actionConf, context, actionXml, appPath); 073 Namespace ns = actionXml.getNamespace(); 074 075 try { 076 Element e = actionXml.getChild("configuration", ns); 077 if (e != null) { 078 String strConf = XmlUtils.prettyPrint(e).toString(); 079 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); 080 XConfiguration.copy(inlineConf, actionConf); 081 checkForDisallowedProps(inlineConf, "inline configuration"); 082 } 083 } catch (IOException ex) { 084 throw convertException(ex); 085 } 086 087 final List<String> argList = new ArrayList<>(); 088 // Build a list of arguments from either a tokenized <command> string or a list of <arg> 089 if (actionXml.getChild("command", ns) != null) { 090 String command = actionXml.getChild("command", ns).getTextTrim(); 091 StringTokenizer st = new StringTokenizer(command, " "); 092 while (st.hasMoreTokens()) { 093 argList.add(st.nextToken()); 094 } 095 } 096 else { 097 @SuppressWarnings("unchecked") 098 List<Element> eArgs = (List<Element>) actionXml.getChildren("arg", ns); 099 for (Element elem : eArgs) { 100 argList.add(elem.getTextTrim()); 101 } 102 } 103 // If the command is given accidentally as "sqoop import --option" 104 // instead of "import --option" we can make a user's life easier 105 // by removing away the unnecessary "sqoop" token. 106 // However, we do not do this if the command looks like 107 // "sqoop --option", as that's entirely invalid. 108 if (argList.size() > 1 && 109 argList.get(0).equalsIgnoreCase(SQOOP) && 110 !argList.get(1).startsWith("-")) { 111 XLog.getLog(getClass()).info( 112 "Found a redundant 'sqoop' prefixing the command. Removing it."); 113 argList.remove(0); 114 } 115 116 setSqoopCommand(actionConf, argList.toArray(new String[argList.size()])); 117 return actionConf; 118 } 119 120 private void setSqoopCommand(Configuration conf, String[] args) { 121 ActionUtils.setStrings(conf, SQOOP_ARGS, args); 122 } 123 124 /** 125 * We will gather counters from all executed action Hadoop jobs (e.g. jobs 126 * that moved data, not the launcher itself) and merge them together. There 127 * will be only one job most of the time. The only exception is 128 * import-all-table option that will execute one job per one exported table. 129 * 130 * @param context Action context 131 * @param action Workflow action 132 * @throws ActionExecutorException 133 */ 134 @Override 135 public void end(Context context, WorkflowAction action) throws ActionExecutorException { 136 super.end(context, action); 137 JobClient jobClient = null; 138 139 boolean exception = false; 140 try { 141 if (action.getStatus() == WorkflowAction.Status.OK) { 142 Element actionXml = XmlUtils.parseXml(action.getConf()); 143 Configuration jobConf = createBaseHadoopConf(context, actionXml); 144 jobClient = createJobClient(context, jobConf); 145 146 // Cumulative counters for all Sqoop mapreduce jobs 147 Counters counters = null; 148 149 // Sqoop do not have to create mapreduce job each time 150 String externalIds = action.getExternalChildIDs(); 151 if (externalIds != null && !externalIds.trim().isEmpty()) { 152 String []jobIds = externalIds.split(","); 153 154 for(String jobId : jobIds) { 155 RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); 156 if (runningJob == null) { 157 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "SQOOP001", 158 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", action 159 .getExternalId(), action.getId()); 160 } 161 162 Counters taskCounters = runningJob.getCounters(); 163 if(taskCounters != null) { 164 if(counters == null) { 165 counters = taskCounters; 166 } else { 167 counters.incrAllCounters(taskCounters); 168 } 169 } else { 170 XLog.getLog(getClass()).warn("Could not find Hadoop Counters for job: [{0}]", jobId); 171 } 172 } 173 } 174 175 if (counters != null) { 176 ActionStats stats = new MRStats(counters); 177 String statsJsonString = stats.toJSON(); 178 context.setVar(MapReduceActionExecutor.HADOOP_COUNTERS, statsJsonString); 179 180 // If action stats write property is set to false by user or 181 // size of stats is greater than the maximum allowed size, 182 // do not store the action stats 183 if (Boolean.parseBoolean(evaluateConfigurationProperty(actionXml, 184 OOZIE_ACTION_EXTERNAL_STATS_WRITE, "true")) 185 && (statsJsonString.getBytes().length <= getMaxExternalStatsSize())) { 186 context.setExecutionStats(statsJsonString); 187 LOG.debug( 188 "Printing stats for sqoop action as a JSON string : [{0}]", statsJsonString); 189 } 190 } else { 191 context.setVar(MapReduceActionExecutor.HADOOP_COUNTERS, ""); 192 XLog.getLog(getClass()).warn("Can't find any associated Hadoop job counters"); 193 } 194 } 195 } 196 catch (Exception ex) { 197 exception = true; 198 throw convertException(ex); 199 } 200 finally { 201 if (jobClient != null) { 202 try { 203 jobClient.close(); 204 } 205 catch (Exception e) { 206 if (exception) { 207 LOG.error("JobClient error: ", e); 208 } 209 else { 210 throw convertException(e); 211 } 212 } 213 } 214 } 215 } 216 217 // Return the value of the specified configuration property 218 private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue) 219 throws ActionExecutorException { 220 try { 221 if (actionConf != null) { 222 Namespace ns = actionConf.getNamespace(); 223 Element e = actionConf.getChild("configuration", ns); 224 225 if(e != null) { 226 String strConf = XmlUtils.prettyPrint(e).toString(); 227 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); 228 return inlineConf.get(key, defaultValue); 229 } 230 } 231 return defaultValue; 232 } 233 catch (IOException ex) { 234 throw convertException(ex); 235 } 236 } 237 238 /** 239 * Return the sharelib name for the action. 240 * 241 * @return returns <code>sqoop</code>. 242 * @param actionXml 243 */ 244 @Override 245 protected String getDefaultShareLibName(Element actionXml) { 246 return "sqoop"; 247 } 248 249}