001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.action.hadoop; 020 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.fs.Path; 023import org.apache.hadoop.mapred.Counters; 024import org.apache.hadoop.mapred.JobClient; 025import org.apache.hadoop.mapred.JobConf; 026import org.apache.hadoop.mapred.JobID; 027import org.apache.hadoop.mapred.RunningJob; 028import org.apache.oozie.action.ActionExecutorException; 029import org.apache.oozie.client.WorkflowAction; 030import org.apache.oozie.util.XConfiguration; 031import org.apache.oozie.util.XLog; 032import org.apache.oozie.util.XmlUtils; 033import org.jdom.Element; 034import org.jdom.Namespace; 035 036import java.io.IOException; 037import java.io.StringReader; 038import java.util.ArrayList; 039import java.util.List; 040import java.util.StringTokenizer; 041 042public class SqoopActionExecutor extends JavaActionExecutor { 043 044 public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write"; 045 private static final String SQOOP_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.SqoopMain"; 046 static final String SQOOP_ARGS = "oozie.sqoop.args"; 047 048 public SqoopActionExecutor() { 049 super("sqoop"); 050 } 051 052 @Override 053 public List<Class> getLauncherClasses() { 054 List<Class> classes = new ArrayList<Class>(); 055 try { 056 classes.add(Class.forName(SQOOP_MAIN_CLASS_NAME)); 057 } 058 catch (ClassNotFoundException e) { 059 throw new RuntimeException("Class not found", e); 060 } 061 return classes; 062 } 063 064 @Override 065 protected String getLauncherMain(Configuration launcherConf, Element actionXml) { 066 return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, SQOOP_MAIN_CLASS_NAME); 067 } 068 069 @Override 070 @SuppressWarnings("unchecked") 071 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) 072 throws ActionExecutorException { 073 super.setupActionConf(actionConf, context, actionXml, appPath); 074 Namespace ns = actionXml.getNamespace(); 075 076 try { 077 Element e = actionXml.getChild("configuration", ns); 078 if (e != null) { 079 String strConf = XmlUtils.prettyPrint(e).toString(); 080 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); 081 checkForDisallowedProps(inlineConf, "inline configuration"); 082 XConfiguration.copy(inlineConf, actionConf); 083 } 084 } catch (IOException ex) { 085 throw convertException(ex); 086 } 087 088 String[] args; 089 if (actionXml.getChild("command", ns) != null) { 090 String command = actionXml.getChild("command", ns).getTextTrim(); 091 StringTokenizer st = new StringTokenizer(command, " "); 092 List<String> l = new ArrayList<String>(); 093 while (st.hasMoreTokens()) { 094 l.add(st.nextToken()); 095 } 096 args = l.toArray(new String[l.size()]); 097 } 098 else { 099 List<Element> eArgs = (List<Element>) actionXml.getChildren("arg", ns); 100 args = new String[eArgs.size()]; 101 for (int i = 0; i < eArgs.size(); i++) { 102 args[i] = eArgs.get(i).getTextTrim(); 103 } 104 } 105 106 setSqoopCommand(actionConf, args); 107 return actionConf; 108 } 109 110 private void setSqoopCommand(Configuration conf, String[] args) { 111 MapReduceMain.setStrings(conf, SQOOP_ARGS, args); 112 } 113 114 /** 115 * We will gather counters from all executed action Hadoop jobs (e.g. jobs 116 * that moved data, not the launcher itself) and merge them together. There 117 * will be only one job most of the time. The only exception is 118 * import-all-table option that will execute one job per one exported table. 119 * 120 * @param context Action context 121 * @param action Workflow action 122 * @throws ActionExecutorException 123 */ 124 @Override 125 public void end(Context context, WorkflowAction action) throws ActionExecutorException { 126 super.end(context, action); 127 JobClient jobClient = null; 128 129 boolean exception = false; 130 try { 131 if (action.getStatus() == WorkflowAction.Status.OK) { 132 Element actionXml = XmlUtils.parseXml(action.getConf()); 133 JobConf jobConf = createBaseHadoopConf(context, actionXml); 134 jobClient = createJobClient(context, jobConf); 135 136 // Cumulative counters for all Sqoop mapreduce jobs 137 Counters counters = null; 138 139 // Sqoop do not have to create mapreduce job each time 140 String externalIds = action.getExternalChildIDs(); 141 if (externalIds != null && !externalIds.trim().isEmpty()) { 142 String []jobIds = externalIds.split(","); 143 144 for(String jobId : jobIds) { 145 RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); 146 if (runningJob == null) { 147 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "SQOOP001", 148 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", action 149 .getExternalId(), action.getId()); 150 } 151 152 Counters taskCounters = runningJob.getCounters(); 153 if(taskCounters != null) { 154 if(counters == null) { 155 counters = taskCounters; 156 } else { 157 counters.incrAllCounters(taskCounters); 158 } 159 } else { 160 XLog.getLog(getClass()).warn("Could not find Hadoop Counters for job: [{0}]", jobId); 161 } 162 } 163 } 164 165 if (counters != null) { 166 ActionStats stats = new MRStats(counters); 167 String statsJsonString = stats.toJSON(); 168 context.setVar(MapReduceActionExecutor.HADOOP_COUNTERS, statsJsonString); 169 170 // If action stats write property is set to false by user or 171 // size of stats is greater than the maximum allowed size, 172 // do not store the action stats 173 if (Boolean.parseBoolean(evaluateConfigurationProperty(actionXml, 174 OOZIE_ACTION_EXTERNAL_STATS_WRITE, "true")) 175 && (statsJsonString.getBytes().length <= getMaxExternalStatsSize())) { 176 context.setExecutionStats(statsJsonString); 177 LOG.debug( 178 "Printing stats for sqoop action as a JSON string : [{0}]", statsJsonString); 179 } 180 } else { 181 context.setVar(MapReduceActionExecutor.HADOOP_COUNTERS, ""); 182 XLog.getLog(getClass()).warn("Can't find any associated Hadoop job counters"); 183 } 184 } 185 } 186 catch (Exception ex) { 187 exception = true; 188 throw convertException(ex); 189 } 190 finally { 191 if (jobClient != null) { 192 try { 193 jobClient.close(); 194 } 195 catch (Exception e) { 196 if (exception) { 197 LOG.error("JobClient error: ", e); 198 } 199 else { 200 throw convertException(e); 201 } 202 } 203 } 204 } 205 } 206 207 // Return the value of the specified configuration property 208 private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue) 209 throws ActionExecutorException { 210 try { 211 if (actionConf != null) { 212 Namespace ns = actionConf.getNamespace(); 213 Element e = actionConf.getChild("configuration", ns); 214 215 if(e != null) { 216 String strConf = XmlUtils.prettyPrint(e).toString(); 217 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); 218 return inlineConf.get(key, defaultValue); 219 } 220 } 221 return defaultValue; 222 } 223 catch (IOException ex) { 224 throw convertException(ex); 225 } 226 } 227 228 /** 229 * Return the sharelib name for the action. 230 * 231 * @return returns <code>sqoop</code>. 232 * @param actionXml 233 */ 234 @Override 235 protected String getDefaultShareLibName(Element actionXml) { 236 return "sqoop"; 237 } 238 239}