001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.hadoop.fs.FileSystem; 022 import org.apache.hadoop.fs.FSDataOutputStream; 023 import org.apache.hadoop.fs.Path; 024 import org.apache.hadoop.mapred.RunningJob; 025 import org.apache.oozie.action.ActionExecutorException; 026 import org.apache.oozie.client.XOozieClient; 027 import org.apache.oozie.client.WorkflowAction; 028 import org.apache.oozie.service.HadoopAccessorException; 029 import org.apache.oozie.util.ClassUtils; 030 import org.apache.oozie.util.IOUtils; 031 import org.apache.oozie.util.XLog; 032 import org.apache.oozie.util.XmlUtils; 033 import org.jdom.Element; 034 import org.jdom.Namespace; 035 import org.jdom.JDOMException; 036 import org.mortbay.log.Log; 037 038 import java.io.BufferedReader; 039 import java.io.IOException; 040 import java.io.InputStream; 041 import java.io.InputStreamReader; 042 import java.net.URISyntaxException; 043 import java.util.List; 044 045 public class PigActionExecutor extends JavaActionExecutor { 046 047 public PigActionExecutor() { 048 super("pig"); 049 } 050 051 @Override 052 protected List<Class> getLauncherClasses() { 053 List<Class> classes = super.getLauncherClasses(); 054 classes.add(LauncherMain.class); 055 classes.add(MapReduceMain.class); 056 classes.add(PigMain.class); 057 classes.add(OoziePigStats.class); 058 return classes; 059 } 060 061 062 @Override 063 protected String getLauncherMain(Configuration launcherConf, Element actionXml) { 064 return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, PigMain.class.getName()); 065 } 066 067 @Override 068 void injectActionCallback(Context context, Configuration launcherConf) { 069 } 070 071 @Override 072 protected Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) 073 throws ActionExecutorException { 074 super.setupLauncherConf(conf, actionXml, appPath, context); 075 Namespace ns = actionXml.getNamespace(); 076 String script = actionXml.getChild("script", ns).getTextTrim(); 077 String pigName = new Path(script).getName(); 078 String pigScriptContent = context.getProtoActionConf().get(XOozieClient.PIG_SCRIPT); 079 080 Path pigScriptFile = null; 081 if (pigScriptContent != null) { // Create pig script on hdfs if this is 082 // an http submission pig job; 083 FSDataOutputStream dos = null; 084 try { 085 Path actionPath = context.getActionDir(); 086 pigScriptFile = new Path(actionPath, script); 087 FileSystem fs = context.getAppFileSystem(); 088 dos = fs.create(pigScriptFile); 089 dos.writeBytes(pigScriptContent); 090 091 addToCache(conf, actionPath, script + "#" + pigName, false); 092 } 093 catch (Exception ex) { 094 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_OPERATION", XLog 095 .format("Not able to write pig script file {0} on hdfs", pigScriptFile), ex); 096 } 097 finally { 098 try { 099 if (dos != null) { 100 dos.close(); 101 } 102 } 103 catch (IOException ex) { 104 XLog.getLog(getClass()).error("Error: " + ex.getMessage()); 105 } 106 } 107 } 108 else { 109 addToCache(conf, appPath, script + "#" + pigName, false); 110 } 111 112 return conf; 113 } 114 115 @Override 116 @SuppressWarnings("unchecked") 117 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) 118 throws ActionExecutorException { 119 super.setupActionConf(actionConf, context, actionXml, appPath); 120 Namespace ns = actionXml.getNamespace(); 121 122 String script = actionXml.getChild("script", ns).getTextTrim(); 123 String pigName = new Path(script).getName(); 124 125 List<Element> params = (List<Element>) actionXml.getChildren("param", ns); 126 String[] strParams = new String[params.size()]; 127 for (int i = 0; i < params.size(); i++) { 128 strParams[i] = params.get(i).getTextTrim(); 129 } 130 String[] strArgs = null; 131 List<Element> eArgs = actionXml.getChildren("argument", ns); 132 if (eArgs != null && eArgs.size() > 0) { 133 strArgs = new String[eArgs.size()]; 134 for (int i = 0; i < eArgs.size(); i++) { 135 strArgs[i] = eArgs.get(i).getTextTrim(); 136 } 137 } 138 PigMain.setPigScript(actionConf, pigName, strParams, strArgs); 139 return actionConf; 140 } 141 142 @Override 143 protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException { 144 return false; 145 } 146 147 /** 148 * Get the stats and external child IDs for a pig job 149 * 150 * @param actionFs the FileSystem object 151 * @param runningJob the runningJob 152 * @param action the Workflow action 153 * @param context executor context 154 * 155 */ 156 @Override 157 protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) throws HadoopAccessorException, JDOMException, IOException, URISyntaxException{ 158 super.getActionData(actionFs, runningJob, action, context); 159 String stats = getStats(context, actionFs); 160 context.setExecutionStats(stats); 161 String externalChildIDs = getExternalChildIDs(context, actionFs); 162 context.setExternalChildIDs(externalChildIDs); 163 } 164 165 private String getStats(Context context, FileSystem actionFs) throws IOException, HadoopAccessorException, 166 URISyntaxException { 167 Path actionOutput = LauncherMapper.getActionStatsDataPath(context.getActionDir()); 168 String stats = null; 169 if (actionFs.exists(actionOutput)) { 170 stats = getDataFromPath(actionOutput, actionFs); 171 172 } 173 return stats; 174 } 175 176 private String getExternalChildIDs(Context context, FileSystem actionFs) throws IOException, 177 HadoopAccessorException, URISyntaxException { 178 Path actionOutput = LauncherMapper.getExternalChildIDsDataPath(context.getActionDir()); 179 String externalIDs = null; 180 if (actionFs.exists(actionOutput)) { 181 externalIDs = getDataFromPath(actionOutput, actionFs); 182 } 183 return externalIDs; 184 } 185 186 private static String getDataFromPath(Path actionOutput, FileSystem actionFs) throws IOException{ 187 BufferedReader reader = null; 188 String data = null; 189 try { 190 InputStream is = actionFs.open(actionOutput); 191 reader = new BufferedReader(new InputStreamReader(is)); 192 data = IOUtils.getReaderAsString(reader, -1); 193 194 } 195 finally { 196 if (reader != null) { 197 reader.close(); 198 } 199 } 200 return data; 201 } 202 203 /** 204 * Return the sharelib postfix for the action. 205 * 206 * @param context executor context. 207 * @param actionXml the action XML. 208 * @return the action sharelib post fix, this implementation returns <code>pig</code>. 209 */ 210 @Override 211 protected String getShareLibPostFix(Context context, Element actionXml) { 212 return "pig"; 213 } 214 215 }