001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.net.URISyntaxException; 023 import java.util.ArrayList; 024 import java.util.List; 025 import java.util.Properties; 026 import java.util.StringTokenizer; 027 028 import org.apache.hadoop.conf.Configuration; 029 import org.apache.hadoop.fs.FileSystem; 030 import org.apache.hadoop.fs.Path; 031 import org.apache.hadoop.mapred.Counters; 032 import org.apache.hadoop.mapred.JobClient; 033 import org.apache.hadoop.mapred.JobConf; 034 import org.apache.hadoop.mapred.JobID; 035 import org.apache.hadoop.mapred.RunningJob; 036 import org.apache.oozie.action.ActionExecutorException; 037 import org.apache.oozie.client.WorkflowAction; 038 import org.apache.oozie.service.HadoopAccessorException; 039 import org.apache.oozie.util.XConfiguration; 040 import org.apache.oozie.util.XmlUtils; 041 import org.apache.oozie.util.XLog; 042 import org.jdom.Element; 043 import org.jdom.JDOMException; 044 import org.jdom.Namespace; 045 046 public class SqoopActionExecutor extends JavaActionExecutor { 047 048 public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write"; 049 050 public SqoopActionExecutor() { 051 super("sqoop"); 052 } 053 054 @Override 055 protected List<Class> getLauncherClasses() { 056 List<Class> classes = super.getLauncherClasses(); 057 classes.add(LauncherMain.class); 058 classes.add(MapReduceMain.class); 059 classes.add(HiveMain.class); 060 classes.add(SqoopMain.class); 061 return classes; 062 } 063 064 @Override 065 protected String getLauncherMain(Configuration launcherConf, Element actionXml) { 066 return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, SqoopMain.class.getName()); 067 } 068 069 @Override 070 @SuppressWarnings("unchecked") 071 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) 072 throws ActionExecutorException { 073 super.setupActionConf(actionConf, context, actionXml, appPath); 074 Namespace ns = actionXml.getNamespace(); 075 076 try { 077 Element e = actionXml.getChild("configuration", ns); 078 if (e != null) { 079 String strConf = XmlUtils.prettyPrint(e).toString(); 080 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); 081 checkForDisallowedProps(inlineConf, "inline configuration"); 082 XConfiguration.copy(inlineConf, actionConf); 083 } 084 } catch (IOException ex) { 085 throw convertException(ex); 086 } 087 088 String[] args; 089 if (actionXml.getChild("command", ns) != null) { 090 String command = actionXml.getChild("command", ns).getTextTrim(); 091 StringTokenizer st = new StringTokenizer(command, " "); 092 List<String> l = new ArrayList<String>(); 093 while (st.hasMoreTokens()) { 094 l.add(st.nextToken()); 095 } 096 args = l.toArray(new String[l.size()]); 097 } 098 else { 099 List<Element> eArgs = (List<Element>) actionXml.getChildren("arg", ns); 100 args = new String[eArgs.size()]; 101 for (int i = 0; i < eArgs.size(); i++) { 102 args[i] = eArgs.get(i).getTextTrim(); 103 } 104 } 105 106 SqoopMain.setSqoopCommand(actionConf, args); 107 return actionConf; 108 } 109 110 /** 111 * We will gather counters from all executed action Hadoop jobs (e.g. jobs 112 * that moved data, not the launcher itself) and merge them together. There 113 * will be only one job most of the time. The only exception is 114 * import-all-table option that will execute one job per one exported table. 115 * 116 * @param context Action context 117 * @param action Workflow action 118 * @throws ActionExecutorException 119 */ 120 @Override 121 public void end(Context context, WorkflowAction action) throws ActionExecutorException { 122 super.end(context, action); 123 JobClient jobClient = null; 124 125 boolean exception = false; 126 try { 127 if (action.getStatus() == WorkflowAction.Status.OK) { 128 Element actionXml = XmlUtils.parseXml(action.getConf()); 129 JobConf jobConf = createBaseHadoopConf(context, actionXml); 130 jobClient = createJobClient(context, jobConf); 131 132 // Cumulative counters for all Sqoop mapreduce jobs 133 Counters counters = null; 134 135 String externalIds = action.getExternalChildIDs(); 136 String []jobIds = externalIds.split(","); 137 138 for(String jobId : jobIds) { 139 RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); 140 if (runningJob == null) { 141 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "SQOOP001", 142 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", action 143 .getExternalId(), action.getId()); 144 } 145 146 Counters taskCounters = runningJob.getCounters(); 147 if(taskCounters != null) { 148 if(counters == null) { 149 counters = taskCounters; 150 } else { 151 counters.incrAllCounters(taskCounters); 152 } 153 } else { 154 XLog.getLog(getClass()).warn("Could not find Hadoop Counters for job: [{0}]", jobId); 155 } 156 } 157 158 if (counters != null) { 159 ActionStats stats = new MRStats(counters); 160 String statsJsonString = stats.toJSON(); 161 context.setVar(MapReduceActionExecutor.HADOOP_COUNTERS, statsJsonString); 162 163 // If action stats write property is set to false by user or 164 // size of stats is greater than the maximum allowed size, 165 // do not store the action stats 166 if (Boolean.parseBoolean(evaluateConfigurationProperty(actionXml, 167 OOZIE_ACTION_EXTERNAL_STATS_WRITE, "true")) 168 && (statsJsonString.getBytes().length <= getMaxExternalStatsSize())) { 169 context.setExecutionStats(statsJsonString); 170 log.debug( 171 "Printing stats for sqoop action as a JSON string : [{0}]", statsJsonString); 172 } 173 } else { 174 context.setVar(MapReduceActionExecutor.HADOOP_COUNTERS, ""); 175 XLog.getLog(getClass()).warn("Can't find any associated Hadoop job counters"); 176 } 177 } 178 } 179 catch (Exception ex) { 180 exception = true; 181 throw convertException(ex); 182 } 183 finally { 184 if (jobClient != null) { 185 try { 186 jobClient.close(); 187 } 188 catch (Exception e) { 189 if (exception) { 190 log.error("JobClient error: ", e); 191 } 192 else { 193 throw convertException(e); 194 } 195 } 196 } 197 } 198 } 199 200 // Return the value of the specified configuration property 201 private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue) 202 throws ActionExecutorException { 203 try { 204 if (actionConf != null) { 205 Namespace ns = actionConf.getNamespace(); 206 Element e = actionConf.getChild("configuration", ns); 207 208 if(e != null) { 209 String strConf = XmlUtils.prettyPrint(e).toString(); 210 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); 211 return inlineConf.get(key, defaultValue); 212 } 213 } 214 return defaultValue; 215 } 216 catch (IOException ex) { 217 throw convertException(ex); 218 } 219 } 220 221 /** 222 * Get the stats and external child IDs 223 * 224 * @param actionFs the FileSystem object 225 * @param runningJob the runningJob 226 * @param action the Workflow action 227 * @param context executor context 228 * 229 */ 230 @Override 231 protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) 232 throws HadoopAccessorException, JDOMException, IOException, URISyntaxException{ 233 super.getActionData(actionFs, runningJob, action, context); 234 235 // Load stored Hadoop jobs ids and promote them as external child ids 236 action.getData(); 237 Properties props = new Properties(); 238 props.load(new StringReader(action.getData())); 239 context.setExternalChildIDs((String)props.get(LauncherMain.HADOOP_JOBS)); 240 } 241 242 @Override 243 protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException { 244 return true; 245 } 246 247 248 /** 249 * Return the sharelib name for the action. 250 * 251 * @return returns <code>sqoop</code>. 252 * @param actionXml 253 */ 254 @Override 255 protected String getDefaultShareLibName(Element actionXml) { 256 return "sqoop"; 257 } 258 259 }