This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.hadoop.fs.FileSystem;
022 import org.apache.hadoop.fs.FSDataOutputStream;
023 import org.apache.hadoop.fs.Path;
024 import org.apache.hadoop.mapred.RunningJob;
025 import org.apache.oozie.action.ActionExecutorException;
026 import org.apache.oozie.client.XOozieClient;
027 import org.apache.oozie.client.WorkflowAction;
028 import org.apache.oozie.service.HadoopAccessorException;
029 import org.apache.oozie.util.IOUtils;
030 import org.apache.oozie.util.XLog;
031 import org.jdom.Element;
032 import org.jdom.Namespace;
033 import org.jdom.JDOMException;
034
035 import java.io.BufferedReader;
036 import java.io.IOException;
037 import java.io.InputStream;
038 import java.io.InputStreamReader;
039 import java.net.URISyntaxException;
040 import java.util.List;
041
042 public class PigActionExecutor extends JavaActionExecutor {
043
044 public PigActionExecutor() {
045 super("pig");
046 }
047
048 @Override
049 protected List<Class> getLauncherClasses() {
050 List<Class> classes = super.getLauncherClasses();
051 classes.add(LauncherMain.class);
052 classes.add(MapReduceMain.class);
053 classes.add(PigMain.class);
054 classes.add(OoziePigStats.class);
055 return classes;
056 }
057
058
059 @Override
060 protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
061 return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, PigMain.class.getName());
062 }
063
064 @Override
065 void injectActionCallback(Context context, Configuration launcherConf) {
066 }
067
068 @Override
069 protected Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
070 throws ActionExecutorException {
071 super.setupLauncherConf(conf, actionXml, appPath, context);
072 Namespace ns = actionXml.getNamespace();
073 String script = actionXml.getChild("script", ns).getTextTrim();
074 String pigName = new Path(script).getName();
075 String pigScriptContent = context.getProtoActionConf().get(XOozieClient.PIG_SCRIPT);
076
077 Path pigScriptFile = null;
078 if (pigScriptContent != null) { // Create pig script on hdfs if this is
079 // an http submission pig job;
080 FSDataOutputStream dos = null;
081 try {
082 Path actionPath = context.getActionDir();
083 pigScriptFile = new Path(actionPath, script);
084 FileSystem fs = context.getAppFileSystem();
085 dos = fs.create(pigScriptFile);
086 dos.writeBytes(pigScriptContent);
087
088 addToCache(conf, actionPath, script + "#" + pigName, false);
089 }
090 catch (Exception ex) {
091 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_OPERATION", XLog
092 .format("Not able to write pig script file {0} on hdfs", pigScriptFile), ex);
093 }
094 finally {
095 try {
096 if (dos != null) {
097 dos.close();
098 }
099 }
100 catch (IOException ex) {
101 XLog.getLog(getClass()).error("Error: " + ex.getMessage());
102 }
103 }
104 }
105 else {
106 addToCache(conf, appPath, script + "#" + pigName, false);
107 }
108
109 return conf;
110 }
111
112 @Override
113 @SuppressWarnings("unchecked")
114 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
115 throws ActionExecutorException {
116 super.setupActionConf(actionConf, context, actionXml, appPath);
117 Namespace ns = actionXml.getNamespace();
118
119 String script = actionXml.getChild("script", ns).getTextTrim();
120 String pigName = new Path(script).getName();
121
122 List<Element> params = (List<Element>) actionXml.getChildren("param", ns);
123 String[] strParams = new String[params.size()];
124 for (int i = 0; i < params.size(); i++) {
125 strParams[i] = params.get(i).getTextTrim();
126 }
127 String[] strArgs = null;
128 List<Element> eArgs = actionXml.getChildren("argument", ns);
129 if (eArgs != null && eArgs.size() > 0) {
130 strArgs = new String[eArgs.size()];
131 for (int i = 0; i < eArgs.size(); i++) {
132 strArgs[i] = eArgs.get(i).getTextTrim();
133 }
134 }
135 PigMain.setPigScript(actionConf, pigName, strParams, strArgs);
136 return actionConf;
137 }
138
139 @Override
140 protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
141 return false;
142 }
143
144 /**
145 * Get the stats and external child IDs for a pig job
146 *
147 * @param actionFs the FileSystem object
148 * @param runningJob the runningJob
149 * @param action the Workflow action
150 * @param context executor context
151 *
152 */
153 @Override
154 protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) throws HadoopAccessorException, JDOMException, IOException, URISyntaxException{
155 super.getActionData(actionFs, runningJob, action, context);
156 String stats = getStats(context, actionFs);
157 context.setExecutionStats(stats);
158 String externalChildIDs = getExternalChildIDs(context, actionFs);
159 context.setExternalChildIDs(externalChildIDs);
160 }
161
162 private String getStats(Context context, FileSystem actionFs) throws IOException, HadoopAccessorException,
163 URISyntaxException {
164 Path actionOutput = LauncherMapper.getActionStatsDataPath(context.getActionDir());
165 String stats = null;
166 if (actionFs.exists(actionOutput)) {
167 stats = getDataFromPath(actionOutput, actionFs);
168
169 }
170 return stats;
171 }
172
173 private String getExternalChildIDs(Context context, FileSystem actionFs) throws IOException,
174 HadoopAccessorException, URISyntaxException {
175 Path actionOutput = LauncherMapper.getExternalChildIDsDataPath(context.getActionDir());
176 String externalIDs = null;
177 if (actionFs.exists(actionOutput)) {
178 externalIDs = getDataFromPath(actionOutput, actionFs);
179 }
180 return externalIDs;
181 }
182
183 private static String getDataFromPath(Path actionOutput, FileSystem actionFs) throws IOException{
184 BufferedReader reader = null;
185 String data = null;
186 try {
187 InputStream is = actionFs.open(actionOutput);
188 reader = new BufferedReader(new InputStreamReader(is));
189 data = IOUtils.getReaderAsString(reader, -1);
190
191 }
192 finally {
193 if (reader != null) {
194 reader.close();
195 }
196 }
197 return data;
198 }
199
200 /**
201 * Return the sharelib postfix for the action.
202 *
203 * @return returns <code>pig</code>.
204 * @param actionXml
205 */
206 @Override
207 protected String getDefaultShareLibName(Element actionXml) {
208 return "pig";
209 }
210
211 }