001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.hadoop.fs.FileSystem;
022    import org.apache.hadoop.fs.FSDataOutputStream;
023    import org.apache.hadoop.fs.Path;
024    import org.apache.oozie.action.ActionExecutorException;
025    import org.apache.oozie.client.XOozieClient;
026    import org.apache.oozie.client.WorkflowAction;
027    import org.apache.oozie.util.XLog;
028    import org.apache.oozie.util.XmlUtils;
029    import org.jdom.Element;
030    import org.jdom.Namespace;
031    import org.jdom.JDOMException;
032    import org.mortbay.log.Log;
033    
034    import java.io.IOException;
035    import java.util.List;
036    
037    public class PigActionExecutor extends JavaActionExecutor {
038    
039        public PigActionExecutor() {
040            super("pig");
041        }
042    
043        protected List<Class> getLauncherClasses() {
044            List<Class> classes = super.getLauncherClasses();
045            classes.add(LauncherMain.class);
046            classes.add(MapReduceMain.class);
047            classes.add(PigMain.class);
048            return classes;
049        }
050    
051        protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
052            return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, PigMain.class.getName());
053        }
054    
055        void injectActionCallback(Context context, Configuration launcherConf) {
056        }
057    
058        @Override
059        protected Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
060                throws ActionExecutorException {
061            super.setupLauncherConf(conf, actionXml, appPath, context);
062            Namespace ns = actionXml.getNamespace();
063            String script = actionXml.getChild("script", ns).getTextTrim();
064            String pigName = new Path(script).getName();
065            String pigScriptContent = context.getProtoActionConf().get(XOozieClient.PIG_SCRIPT);
066    
067            Path pigScriptFile = null;
068            if (pigScriptContent != null) { // Create pig script on hdfs if this is
069                // an http submission pig job;
070                FSDataOutputStream dos = null;
071                try {
072                    Path actionPath = context.getActionDir();
073                    pigScriptFile = new Path(actionPath, script);
074                    FileSystem fs = context.getAppFileSystem();
075                    dos = fs.create(pigScriptFile);
076                    dos.writeBytes(pigScriptContent);
077    
078                    addToCache(conf, actionPath, script + "#" + pigName, false);
079                }
080                catch (Exception ex) {
081                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_OPERATION", XLog
082                            .format("Not able to write pig script file {0} on hdfs", pigScriptFile), ex);
083                }
084                finally {
085                    try {
086                        if (dos != null) {
087                            dos.close();
088                        }
089                    }
090                    catch (IOException ex) {
091                        XLog.getLog(getClass()).error("Error: " + ex.getMessage());
092                    }
093                }
094            }
095            else {
096                addToCache(conf, appPath, script + "#" + pigName, false);
097            }
098    
099            return conf;
100        }
101    
102        @SuppressWarnings("unchecked")
103        Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
104                throws ActionExecutorException {
105            super.setupActionConf(actionConf, context, actionXml, appPath);
106            Namespace ns = actionXml.getNamespace();
107    
108            String script = actionXml.getChild("script", ns).getTextTrim();
109            String pigName = new Path(script).getName();
110    
111            List<Element> params = (List<Element>) actionXml.getChildren("param", ns);
112            String[] strParams = new String[params.size()];
113            for (int i = 0; i < params.size(); i++) {
114                strParams[i] = params.get(i).getTextTrim();
115            }
116            String[] strArgs = null;
117            List<Element> eArgs = actionXml.getChildren("argument", ns);
118            if (eArgs != null && eArgs.size() > 0) {
119                strArgs = new String[eArgs.size()];
120                for (int i = 0; i < eArgs.size(); i++) {
121                    strArgs[i] = eArgs.get(i).getTextTrim();
122                }
123            }
124            PigMain.setPigScript(actionConf, pigName, strParams, strArgs);
125            return actionConf;
126        }
127    
128        protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
129            return true;
130        }
131    
132    }