This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.hadoop.fs.FileSystem;
022 import org.apache.hadoop.fs.FSDataOutputStream;
023 import org.apache.hadoop.fs.Path;
024 import org.apache.oozie.action.ActionExecutorException;
025 import org.apache.oozie.client.XOozieClient;
026 import org.apache.oozie.client.WorkflowAction;
027 import org.apache.oozie.util.XLog;
028 import org.apache.oozie.util.XmlUtils;
029 import org.jdom.Element;
030 import org.jdom.Namespace;
031 import org.jdom.JDOMException;
032 import org.mortbay.log.Log;
033
034 import java.io.IOException;
035 import java.util.List;
036
037 public class PigActionExecutor extends JavaActionExecutor {
038
039 public PigActionExecutor() {
040 super("pig");
041 }
042
043 protected List<Class> getLauncherClasses() {
044 List<Class> classes = super.getLauncherClasses();
045 classes.add(LauncherMain.class);
046 classes.add(MapReduceMain.class);
047 classes.add(PigMain.class);
048 return classes;
049 }
050
051 protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
052 return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, PigMain.class.getName());
053 }
054
055 void injectActionCallback(Context context, Configuration launcherConf) {
056 }
057
058 @Override
059 protected Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
060 throws ActionExecutorException {
061 super.setupLauncherConf(conf, actionXml, appPath, context);
062 Namespace ns = actionXml.getNamespace();
063 String script = actionXml.getChild("script", ns).getTextTrim();
064 String pigName = new Path(script).getName();
065 String pigScriptContent = context.getProtoActionConf().get(XOozieClient.PIG_SCRIPT);
066
067 Path pigScriptFile = null;
068 if (pigScriptContent != null) { // Create pig script on hdfs if this is
069 // an http submission pig job;
070 FSDataOutputStream dos = null;
071 try {
072 Path actionPath = context.getActionDir();
073 pigScriptFile = new Path(actionPath, script);
074 FileSystem fs = context.getAppFileSystem();
075 dos = fs.create(pigScriptFile);
076 dos.writeBytes(pigScriptContent);
077
078 addToCache(conf, actionPath, script + "#" + pigName, false);
079 }
080 catch (Exception ex) {
081 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_OPERATION", XLog
082 .format("Not able to write pig script file {0} on hdfs", pigScriptFile), ex);
083 }
084 finally {
085 try {
086 if (dos != null) {
087 dos.close();
088 }
089 }
090 catch (IOException ex) {
091 XLog.getLog(getClass()).error("Error: " + ex.getMessage());
092 }
093 }
094 }
095 else {
096 addToCache(conf, appPath, script + "#" + pigName, false);
097 }
098
099 return conf;
100 }
101
102 @SuppressWarnings("unchecked")
103 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
104 throws ActionExecutorException {
105 super.setupActionConf(actionConf, context, actionXml, appPath);
106 Namespace ns = actionXml.getNamespace();
107
108 String script = actionXml.getChild("script", ns).getTextTrim();
109 String pigName = new Path(script).getName();
110
111 List<Element> params = (List<Element>) actionXml.getChildren("param", ns);
112 String[] strParams = new String[params.size()];
113 for (int i = 0; i < params.size(); i++) {
114 strParams[i] = params.get(i).getTextTrim();
115 }
116 String[] strArgs = null;
117 List<Element> eArgs = actionXml.getChildren("argument", ns);
118 if (eArgs != null && eArgs.size() > 0) {
119 strArgs = new String[eArgs.size()];
120 for (int i = 0; i < eArgs.size(); i++) {
121 strArgs[i] = eArgs.get(i).getTextTrim();
122 }
123 }
124 PigMain.setPigScript(actionConf, pigName, strParams, strArgs);
125 return actionConf;
126 }
127
128 protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
129 return true;
130 }
131
132 }