This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.hadoop.fs.FileSystem;
022 import org.apache.hadoop.fs.Path;
023 import org.apache.hadoop.mapred.RunningJob;
024 import org.apache.oozie.action.ActionExecutorException;
025 import org.apache.oozie.client.XOozieClient;
026 import org.apache.oozie.client.WorkflowAction;
027 import org.apache.oozie.service.HadoopAccessorException;
028 import org.apache.oozie.util.IOUtils;
029 import org.apache.oozie.util.XLog;
030 import org.jdom.Element;
031 import org.jdom.Namespace;
032 import org.jdom.JDOMException;
033
034 import java.io.BufferedReader;
035 import java.io.IOException;
036 import java.io.InputStream;
037 import java.io.InputStreamReader;
038 import java.net.URISyntaxException;
039 import java.util.List;
040
041 public class PigActionExecutor extends ScriptLanguageActionExecutor {
042
043 private static final String PIG_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.PigMain";
044 private static final String OOZIE_PIG_STATS = "org.apache.oozie.action.hadoop.OoziePigStats";
045 static final String PIG_SCRIPT = "oozie.pig.script";
046 static final String PIG_PARAMS = "oozie.pig.params";
047 static final String PIG_ARGS = "oozie.pig.args";
048
049 public PigActionExecutor() {
050 super("pig");
051 }
052
053 @Override
054 protected List<Class> getLauncherClasses() {
055 List<Class> classes = super.getLauncherClasses();
056 try {
057 classes.add(Class.forName(PIG_MAIN_CLASS_NAME));
058 classes.add(Class.forName(OOZIE_PIG_STATS));
059 }
060 catch (ClassNotFoundException e) {
061 throw new RuntimeException("Class not found", e);
062 }
063 return classes;
064 }
065
066
067 @Override
068 protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
069 return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, PIG_MAIN_CLASS_NAME);
070 }
071
072 @Override
073 void injectActionCallback(Context context, Configuration launcherConf) {
074 }
075
076 @Override
077 @SuppressWarnings("unchecked")
078 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
079 throws ActionExecutorException {
080 super.setupActionConf(actionConf, context, actionXml, appPath);
081 Namespace ns = actionXml.getNamespace();
082
083 String script = actionXml.getChild("script", ns).getTextTrim();
084 String pigName = new Path(script).getName();
085
086 List<Element> params = (List<Element>) actionXml.getChildren("param", ns);
087 String[] strParams = new String[params.size()];
088 for (int i = 0; i < params.size(); i++) {
089 strParams[i] = params.get(i).getTextTrim();
090 }
091 String[] strArgs = null;
092 List<Element> eArgs = actionXml.getChildren("argument", ns);
093 if (eArgs != null && eArgs.size() > 0) {
094 strArgs = new String[eArgs.size()];
095 for (int i = 0; i < eArgs.size(); i++) {
096 strArgs[i] = eArgs.get(i).getTextTrim();
097 }
098 }
099 setPigScript(actionConf, pigName, strParams, strArgs);
100 return actionConf;
101 }
102
103 public static void setPigScript(Configuration conf, String script, String[] params, String[] args) {
104 conf.set(PIG_SCRIPT, script);
105 MapReduceMain.setStrings(conf, PIG_PARAMS, params);
106 MapReduceMain.setStrings(conf, PIG_ARGS, args);
107 }
108
109
110 @Override
111 protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
112 return false;
113 }
114
115 /**
116 * Get the stats and external child IDs for a pig job
117 *
118 * @param actionFs the FileSystem object
119 * @param runningJob the runningJob
120 * @param action the Workflow action
121 * @param context executor context
122 *
123 */
124 @Override
125 protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) throws HadoopAccessorException, JDOMException, IOException, URISyntaxException{
126 super.getActionData(actionFs, runningJob, action, context);
127 String stats = getStats(context, actionFs);
128 context.setExecutionStats(stats);
129 String externalChildIDs = getExternalChildIDs(context, actionFs);
130 context.setExternalChildIDs(externalChildIDs);
131 }
132
133 private String getStats(Context context, FileSystem actionFs) throws IOException, HadoopAccessorException,
134 URISyntaxException {
135 Path actionOutput = LauncherMapperHelper.getActionStatsDataPath(context.getActionDir());
136 String stats = null;
137 if (actionFs.exists(actionOutput)) {
138 stats = getDataFromPath(actionOutput, actionFs);
139
140 }
141 return stats;
142 }
143
144 @Override
145 protected void setActionCompletionData(Context context, FileSystem fs) throws HadoopAccessorException, IOException,
146 URISyntaxException {
147 String data = getExternalChildIDs(context, fs);
148 context.setExternalChildIDs(data);
149 }
150
151 private String getExternalChildIDs(Context context, FileSystem actionFs) throws IOException,
152 HadoopAccessorException, URISyntaxException {
153 Path actionOutput = LauncherMapperHelper.getExternalChildIDsDataPath(context.getActionDir());
154 String externalIDs = null;
155 if (actionFs.exists(actionOutput)) {
156 externalIDs = getDataFromPath(actionOutput, actionFs);
157 XLog.getLog(getClass()).info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs);
158 }
159 return externalIDs;
160 }
161
162 private static String getDataFromPath(Path actionOutput, FileSystem actionFs) throws IOException{
163 BufferedReader reader = null;
164 String data = null;
165 try {
166 InputStream is = actionFs.open(actionOutput);
167 reader = new BufferedReader(new InputStreamReader(is));
168 data = IOUtils.getReaderAsString(reader, -1);
169
170 }
171 finally {
172 if (reader != null) {
173 reader.close();
174 }
175 }
176 return data;
177 }
178
179 /**
180 * Return the sharelib postfix for the action.
181 *
182 * @return returns <code>pig</code>.
183 * @param actionXml
184 */
185 @Override
186 protected String getDefaultShareLibName(Element actionXml) {
187 return "pig";
188 }
189
190 protected String getScriptName() {
191 return XOozieClient.PIG_SCRIPT;
192 }
193
194 }