This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.hadoop.fs.FileSystem;
022 import org.apache.hadoop.fs.FSDataOutputStream;
023 import org.apache.hadoop.fs.Path;
024 import org.apache.hadoop.mapred.RunningJob;
025 import org.apache.oozie.action.ActionExecutorException;
026 import org.apache.oozie.client.XOozieClient;
027 import org.apache.oozie.client.WorkflowAction;
028 import org.apache.oozie.service.HadoopAccessorException;
029 import org.apache.oozie.util.ClassUtils;
030 import org.apache.oozie.util.IOUtils;
031 import org.apache.oozie.util.XLog;
032 import org.apache.oozie.util.XmlUtils;
033 import org.jdom.Element;
034 import org.jdom.Namespace;
035 import org.jdom.JDOMException;
036 import org.mortbay.log.Log;
037
038 import java.io.BufferedReader;
039 import java.io.IOException;
040 import java.io.InputStream;
041 import java.io.InputStreamReader;
042 import java.net.URISyntaxException;
043 import java.util.List;
044
045 public class PigActionExecutor extends JavaActionExecutor {
046
047 public PigActionExecutor() {
048 super("pig");
049 }
050
051 @Override
052 protected List<Class> getLauncherClasses() {
053 List<Class> classes = super.getLauncherClasses();
054 classes.add(LauncherMain.class);
055 classes.add(MapReduceMain.class);
056 classes.add(PigMain.class);
057 classes.add(OoziePigStats.class);
058 return classes;
059 }
060
061
062 @Override
063 protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
064 return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, PigMain.class.getName());
065 }
066
067 @Override
068 void injectActionCallback(Context context, Configuration launcherConf) {
069 }
070
071 @Override
072 protected Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
073 throws ActionExecutorException {
074 super.setupLauncherConf(conf, actionXml, appPath, context);
075 Namespace ns = actionXml.getNamespace();
076 String script = actionXml.getChild("script", ns).getTextTrim();
077 String pigName = new Path(script).getName();
078 String pigScriptContent = context.getProtoActionConf().get(XOozieClient.PIG_SCRIPT);
079
080 Path pigScriptFile = null;
081 if (pigScriptContent != null) { // Create pig script on hdfs if this is
082 // an http submission pig job;
083 FSDataOutputStream dos = null;
084 try {
085 Path actionPath = context.getActionDir();
086 pigScriptFile = new Path(actionPath, script);
087 FileSystem fs = context.getAppFileSystem();
088 dos = fs.create(pigScriptFile);
089 dos.writeBytes(pigScriptContent);
090
091 addToCache(conf, actionPath, script + "#" + pigName, false);
092 }
093 catch (Exception ex) {
094 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_OPERATION", XLog
095 .format("Not able to write pig script file {0} on hdfs", pigScriptFile), ex);
096 }
097 finally {
098 try {
099 if (dos != null) {
100 dos.close();
101 }
102 }
103 catch (IOException ex) {
104 XLog.getLog(getClass()).error("Error: " + ex.getMessage());
105 }
106 }
107 }
108 else {
109 addToCache(conf, appPath, script + "#" + pigName, false);
110 }
111
112 return conf;
113 }
114
115 @Override
116 @SuppressWarnings("unchecked")
117 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
118 throws ActionExecutorException {
119 super.setupActionConf(actionConf, context, actionXml, appPath);
120 Namespace ns = actionXml.getNamespace();
121
122 String script = actionXml.getChild("script", ns).getTextTrim();
123 String pigName = new Path(script).getName();
124
125 List<Element> params = (List<Element>) actionXml.getChildren("param", ns);
126 String[] strParams = new String[params.size()];
127 for (int i = 0; i < params.size(); i++) {
128 strParams[i] = params.get(i).getTextTrim();
129 }
130 String[] strArgs = null;
131 List<Element> eArgs = actionXml.getChildren("argument", ns);
132 if (eArgs != null && eArgs.size() > 0) {
133 strArgs = new String[eArgs.size()];
134 for (int i = 0; i < eArgs.size(); i++) {
135 strArgs[i] = eArgs.get(i).getTextTrim();
136 }
137 }
138 PigMain.setPigScript(actionConf, pigName, strParams, strArgs);
139 return actionConf;
140 }
141
142 @Override
143 protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
144 return false;
145 }
146
147 /**
148 * Get the stats and external child IDs for a pig job
149 *
150 * @param actionFs the FileSystem object
151 * @param runningJob the runningJob
152 * @param action the Workflow action
153 * @param context executor context
154 *
155 */
156 @Override
157 protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) throws HadoopAccessorException, JDOMException, IOException, URISyntaxException{
158 super.getActionData(actionFs, runningJob, action, context);
159 String stats = getStats(context, actionFs);
160 context.setExecutionStats(stats);
161 String externalChildIDs = getExternalChildIDs(context, actionFs);
162 context.setExternalChildIDs(externalChildIDs);
163 }
164
165 private String getStats(Context context, FileSystem actionFs) throws IOException, HadoopAccessorException,
166 URISyntaxException {
167 Path actionOutput = LauncherMapper.getActionStatsDataPath(context.getActionDir());
168 String stats = null;
169 if (actionFs.exists(actionOutput)) {
170 stats = getDataFromPath(actionOutput, actionFs);
171
172 }
173 return stats;
174 }
175
176 private String getExternalChildIDs(Context context, FileSystem actionFs) throws IOException,
177 HadoopAccessorException, URISyntaxException {
178 Path actionOutput = LauncherMapper.getExternalChildIDsDataPath(context.getActionDir());
179 String externalIDs = null;
180 if (actionFs.exists(actionOutput)) {
181 externalIDs = getDataFromPath(actionOutput, actionFs);
182 }
183 return externalIDs;
184 }
185
186 private static String getDataFromPath(Path actionOutput, FileSystem actionFs) throws IOException{
187 BufferedReader reader = null;
188 String data = null;
189 try {
190 InputStream is = actionFs.open(actionOutput);
191 reader = new BufferedReader(new InputStreamReader(is));
192 data = IOUtils.getReaderAsString(reader, -1);
193
194 }
195 finally {
196 if (reader != null) {
197 reader.close();
198 }
199 }
200 return data;
201 }
202
203 /**
204 * Return the sharelib postfix for the action.
205 *
206 * @param context executor context.
207 * @param actionXml the action XML.
208 * @return the action sharelib post fix, this implementation returns <code>pig</code>.
209 */
210 @Override
211 protected String getShareLibPostFix(Context context, Element actionXml) {
212 return "pig";
213 }
214
215 }