001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.hadoop.fs.FileSystem; 022 import org.apache.hadoop.fs.Path; 023 import org.apache.hadoop.mapred.RunningJob; 024 import org.apache.oozie.action.ActionExecutorException; 025 import org.apache.oozie.client.XOozieClient; 026 import org.apache.oozie.client.WorkflowAction; 027 import org.apache.oozie.service.HadoopAccessorException; 028 import org.apache.oozie.util.IOUtils; 029 import org.apache.oozie.util.XLog; 030 import org.jdom.Element; 031 import org.jdom.Namespace; 032 import org.jdom.JDOMException; 033 034 import java.io.BufferedReader; 035 import java.io.IOException; 036 import java.io.InputStream; 037 import java.io.InputStreamReader; 038 import java.net.URISyntaxException; 039 import java.util.List; 040 041 public class PigActionExecutor extends ScriptLanguageActionExecutor { 042 043 private static final String PIG_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.PigMain"; 044 private static final String OOZIE_PIG_STATS = "org.apache.oozie.action.hadoop.OoziePigStats"; 045 static final String PIG_SCRIPT = "oozie.pig.script"; 046 static final String PIG_PARAMS = "oozie.pig.params"; 047 static final String PIG_ARGS = "oozie.pig.args"; 048 049 public PigActionExecutor() { 050 super("pig"); 051 } 052 053 @Override 054 protected List<Class> getLauncherClasses() { 055 List<Class> classes = super.getLauncherClasses(); 056 try { 057 classes.add(Class.forName(PIG_MAIN_CLASS_NAME)); 058 classes.add(Class.forName(OOZIE_PIG_STATS)); 059 } 060 catch (ClassNotFoundException e) { 061 throw new RuntimeException("Class not found", e); 062 } 063 return classes; 064 } 065 066 067 @Override 068 protected String getLauncherMain(Configuration launcherConf, Element actionXml) { 069 return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, PIG_MAIN_CLASS_NAME); 070 } 071 072 @Override 073 void injectActionCallback(Context context, Configuration launcherConf) { 074 } 075 076 @Override 077 @SuppressWarnings("unchecked") 078 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) 079 throws ActionExecutorException { 080 super.setupActionConf(actionConf, context, actionXml, appPath); 081 Namespace ns = actionXml.getNamespace(); 082 083 String script = actionXml.getChild("script", ns).getTextTrim(); 084 String pigName = new Path(script).getName(); 085 086 List<Element> params = (List<Element>) actionXml.getChildren("param", ns); 087 String[] strParams = new String[params.size()]; 088 for (int i = 0; i < params.size(); i++) { 089 strParams[i] = params.get(i).getTextTrim(); 090 } 091 String[] strArgs = null; 092 List<Element> eArgs = actionXml.getChildren("argument", ns); 093 if (eArgs != null && eArgs.size() > 0) { 094 strArgs = new String[eArgs.size()]; 095 for (int i = 0; i < eArgs.size(); i++) { 096 strArgs[i] = eArgs.get(i).getTextTrim(); 097 } 098 } 099 setPigScript(actionConf, pigName, strParams, strArgs); 100 return actionConf; 101 } 102 103 public static void setPigScript(Configuration conf, String script, String[] params, String[] args) { 104 conf.set(PIG_SCRIPT, script); 105 MapReduceMain.setStrings(conf, PIG_PARAMS, params); 106 MapReduceMain.setStrings(conf, PIG_ARGS, args); 107 } 108 109 110 @Override 111 protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException { 112 return false; 113 } 114 115 /** 116 * Get the stats and external child IDs for a pig job 117 * 118 * @param actionFs the FileSystem object 119 * @param runningJob the runningJob 120 * @param action the Workflow action 121 * @param context executor context 122 * 123 */ 124 @Override 125 protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) throws HadoopAccessorException, JDOMException, IOException, URISyntaxException{ 126 super.getActionData(actionFs, runningJob, action, context); 127 String stats = getStats(context, actionFs); 128 context.setExecutionStats(stats); 129 String externalChildIDs = getExternalChildIDs(context, actionFs); 130 context.setExternalChildIDs(externalChildIDs); 131 } 132 133 private String getStats(Context context, FileSystem actionFs) throws IOException, HadoopAccessorException, 134 URISyntaxException { 135 Path actionOutput = LauncherMapperHelper.getActionStatsDataPath(context.getActionDir()); 136 String stats = null; 137 if (actionFs.exists(actionOutput)) { 138 stats = getDataFromPath(actionOutput, actionFs); 139 140 } 141 return stats; 142 } 143 144 @Override 145 protected void setActionCompletionData(Context context, FileSystem fs) throws HadoopAccessorException, IOException, 146 URISyntaxException { 147 String data = getExternalChildIDs(context, fs); 148 context.setExternalChildIDs(data); 149 } 150 151 private String getExternalChildIDs(Context context, FileSystem actionFs) throws IOException, 152 HadoopAccessorException, URISyntaxException { 153 Path actionOutput = LauncherMapperHelper.getExternalChildIDsDataPath(context.getActionDir()); 154 String externalIDs = null; 155 if (actionFs.exists(actionOutput)) { 156 externalIDs = getDataFromPath(actionOutput, actionFs); 157 XLog.getLog(getClass()).info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs); 158 } 159 return externalIDs; 160 } 161 162 private static String getDataFromPath(Path actionOutput, FileSystem actionFs) throws IOException{ 163 BufferedReader reader = null; 164 String data = null; 165 try { 166 InputStream is = actionFs.open(actionOutput); 167 reader = new BufferedReader(new InputStreamReader(is)); 168 data = IOUtils.getReaderAsString(reader, -1); 169 170 } 171 finally { 172 if (reader != null) { 173 reader.close(); 174 } 175 } 176 return data; 177 } 178 179 /** 180 * Return the sharelib postfix for the action. 181 * 182 * @return returns <code>pig</code>. 183 * @param actionXml 184 */ 185 @Override 186 protected String getDefaultShareLibName(Element actionXml) { 187 return "pig"; 188 } 189 190 protected String getScriptName() { 191 return XOozieClient.PIG_SCRIPT; 192 } 193 194 }