001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.hadoop.fs.FileSystem; 022 import org.apache.hadoop.fs.FSDataOutputStream; 023 import org.apache.hadoop.fs.Path; 024 import org.apache.hadoop.mapred.RunningJob; 025 import org.apache.oozie.action.ActionExecutorException; 026 import org.apache.oozie.client.XOozieClient; 027 import org.apache.oozie.client.WorkflowAction; 028 import org.apache.oozie.service.HadoopAccessorException; 029 import org.apache.oozie.util.IOUtils; 030 import org.apache.oozie.util.XLog; 031 import org.jdom.Element; 032 import org.jdom.Namespace; 033 import org.jdom.JDOMException; 034 035 import java.io.BufferedReader; 036 import java.io.IOException; 037 import java.io.InputStream; 038 import java.io.InputStreamReader; 039 import java.net.URISyntaxException; 040 import java.util.List; 041 042 public class PigActionExecutor extends JavaActionExecutor { 043 044 public PigActionExecutor() { 045 super("pig"); 046 } 047 048 @Override 049 protected List<Class> getLauncherClasses() { 050 List<Class> classes = super.getLauncherClasses(); 051 classes.add(LauncherMain.class); 052 classes.add(MapReduceMain.class); 053 classes.add(PigMain.class); 054 classes.add(OoziePigStats.class); 055 return classes; 056 } 057 058 059 @Override 060 protected String getLauncherMain(Configuration launcherConf, Element actionXml) { 061 return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, PigMain.class.getName()); 062 } 063 064 @Override 065 void injectActionCallback(Context context, Configuration launcherConf) { 066 } 067 068 @Override 069 protected Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) 070 throws ActionExecutorException { 071 super.setupLauncherConf(conf, actionXml, appPath, context); 072 Namespace ns = actionXml.getNamespace(); 073 String script = actionXml.getChild("script", ns).getTextTrim(); 074 String pigName = new Path(script).getName(); 075 String pigScriptContent = context.getProtoActionConf().get(XOozieClient.PIG_SCRIPT); 076 077 Path pigScriptFile = null; 078 if (pigScriptContent != null) { // Create pig script on hdfs if this is 079 // an http submission pig job; 080 FSDataOutputStream dos = null; 081 try { 082 Path actionPath = context.getActionDir(); 083 pigScriptFile = new Path(actionPath, script); 084 FileSystem fs = context.getAppFileSystem(); 085 dos = fs.create(pigScriptFile); 086 dos.writeBytes(pigScriptContent); 087 088 addToCache(conf, actionPath, script + "#" + pigName, false); 089 } 090 catch (Exception ex) { 091 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_OPERATION", XLog 092 .format("Not able to write pig script file {0} on hdfs", pigScriptFile), ex); 093 } 094 finally { 095 try { 096 if (dos != null) { 097 dos.close(); 098 } 099 } 100 catch (IOException ex) { 101 XLog.getLog(getClass()).error("Error: " + ex.getMessage()); 102 } 103 } 104 } 105 else { 106 addToCache(conf, appPath, script + "#" + pigName, false); 107 } 108 109 return conf; 110 } 111 112 @Override 113 @SuppressWarnings("unchecked") 114 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) 115 throws ActionExecutorException { 116 super.setupActionConf(actionConf, context, actionXml, appPath); 117 Namespace ns = actionXml.getNamespace(); 118 119 String script = actionXml.getChild("script", ns).getTextTrim(); 120 String pigName = new Path(script).getName(); 121 122 List<Element> params = (List<Element>) actionXml.getChildren("param", ns); 123 String[] strParams = new String[params.size()]; 124 for (int i = 0; i < params.size(); i++) { 125 strParams[i] = params.get(i).getTextTrim(); 126 } 127 String[] strArgs = null; 128 List<Element> eArgs = actionXml.getChildren("argument", ns); 129 if (eArgs != null && eArgs.size() > 0) { 130 strArgs = new String[eArgs.size()]; 131 for (int i = 0; i < eArgs.size(); i++) { 132 strArgs[i] = eArgs.get(i).getTextTrim(); 133 } 134 } 135 PigMain.setPigScript(actionConf, pigName, strParams, strArgs); 136 return actionConf; 137 } 138 139 @Override 140 protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException { 141 return false; 142 } 143 144 /** 145 * Get the stats and external child IDs for a pig job 146 * 147 * @param actionFs the FileSystem object 148 * @param runningJob the runningJob 149 * @param action the Workflow action 150 * @param context executor context 151 * 152 */ 153 @Override 154 protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) throws HadoopAccessorException, JDOMException, IOException, URISyntaxException{ 155 super.getActionData(actionFs, runningJob, action, context); 156 String stats = getStats(context, actionFs); 157 context.setExecutionStats(stats); 158 String externalChildIDs = getExternalChildIDs(context, actionFs); 159 context.setExternalChildIDs(externalChildIDs); 160 } 161 162 private String getStats(Context context, FileSystem actionFs) throws IOException, HadoopAccessorException, 163 URISyntaxException { 164 Path actionOutput = LauncherMapper.getActionStatsDataPath(context.getActionDir()); 165 String stats = null; 166 if (actionFs.exists(actionOutput)) { 167 stats = getDataFromPath(actionOutput, actionFs); 168 169 } 170 return stats; 171 } 172 173 private String getExternalChildIDs(Context context, FileSystem actionFs) throws IOException, 174 HadoopAccessorException, URISyntaxException { 175 Path actionOutput = LauncherMapper.getExternalChildIDsDataPath(context.getActionDir()); 176 String externalIDs = null; 177 if (actionFs.exists(actionOutput)) { 178 externalIDs = getDataFromPath(actionOutput, actionFs); 179 } 180 return externalIDs; 181 } 182 183 private static String getDataFromPath(Path actionOutput, FileSystem actionFs) throws IOException{ 184 BufferedReader reader = null; 185 String data = null; 186 try { 187 InputStream is = actionFs.open(actionOutput); 188 reader = new BufferedReader(new InputStreamReader(is)); 189 data = IOUtils.getReaderAsString(reader, -1); 190 191 } 192 finally { 193 if (reader != null) { 194 reader.close(); 195 } 196 } 197 return data; 198 } 199 200 /** 201 * Return the sharelib postfix for the action. 202 * 203 * @return returns <code>pig</code>. 204 * @param actionXml 205 */ 206 @Override 207 protected String getDefaultShareLibName(Element actionXml) { 208 return "pig"; 209 } 210 211 }