001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.hadoop.fs.FileSystem;
022    import org.apache.hadoop.fs.FSDataOutputStream;
023    import org.apache.hadoop.fs.Path;
024    import org.apache.hadoop.mapred.RunningJob;
025    import org.apache.oozie.action.ActionExecutorException;
026    import org.apache.oozie.client.XOozieClient;
027    import org.apache.oozie.client.WorkflowAction;
028    import org.apache.oozie.service.HadoopAccessorException;
029    import org.apache.oozie.util.ClassUtils;
030    import org.apache.oozie.util.IOUtils;
031    import org.apache.oozie.util.XLog;
032    import org.apache.oozie.util.XmlUtils;
033    import org.jdom.Element;
034    import org.jdom.Namespace;
035    import org.jdom.JDOMException;
036    import org.mortbay.log.Log;
037    
038    import java.io.BufferedReader;
039    import java.io.IOException;
040    import java.io.InputStream;
041    import java.io.InputStreamReader;
042    import java.net.URISyntaxException;
043    import java.util.List;
044    
045    public class PigActionExecutor extends JavaActionExecutor {
046    
047        public PigActionExecutor() {
048            super("pig");
049        }
050    
051        @Override
052        protected List<Class> getLauncherClasses() {
053            List<Class> classes = super.getLauncherClasses();
054            classes.add(LauncherMain.class);
055            classes.add(MapReduceMain.class);
056            classes.add(PigMain.class);
057            classes.add(OoziePigStats.class);
058            return classes;
059        }
060    
061    
062        @Override
063        protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
064            return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, PigMain.class.getName());
065        }
066    
067        @Override
068        void injectActionCallback(Context context, Configuration launcherConf) {
069        }
070    
071        @Override
072        protected Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
073                throws ActionExecutorException {
074            super.setupLauncherConf(conf, actionXml, appPath, context);
075            Namespace ns = actionXml.getNamespace();
076            String script = actionXml.getChild("script", ns).getTextTrim();
077            String pigName = new Path(script).getName();
078            String pigScriptContent = context.getProtoActionConf().get(XOozieClient.PIG_SCRIPT);
079    
080            Path pigScriptFile = null;
081            if (pigScriptContent != null) { // Create pig script on hdfs if this is
082                // an http submission pig job;
083                FSDataOutputStream dos = null;
084                try {
085                    Path actionPath = context.getActionDir();
086                    pigScriptFile = new Path(actionPath, script);
087                    FileSystem fs = context.getAppFileSystem();
088                    dos = fs.create(pigScriptFile);
089                    dos.writeBytes(pigScriptContent);
090    
091                    addToCache(conf, actionPath, script + "#" + pigName, false);
092                }
093                catch (Exception ex) {
094                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_OPERATION", XLog
095                            .format("Not able to write pig script file {0} on hdfs", pigScriptFile), ex);
096                }
097                finally {
098                    try {
099                        if (dos != null) {
100                            dos.close();
101                        }
102                    }
103                    catch (IOException ex) {
104                        XLog.getLog(getClass()).error("Error: " + ex.getMessage());
105                    }
106                }
107            }
108            else {
109                addToCache(conf, appPath, script + "#" + pigName, false);
110            }
111    
112            return conf;
113        }
114    
115        @Override
116        @SuppressWarnings("unchecked")
117        Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
118                throws ActionExecutorException {
119            super.setupActionConf(actionConf, context, actionXml, appPath);
120            Namespace ns = actionXml.getNamespace();
121    
122            String script = actionXml.getChild("script", ns).getTextTrim();
123            String pigName = new Path(script).getName();
124    
125            List<Element> params = (List<Element>) actionXml.getChildren("param", ns);
126            String[] strParams = new String[params.size()];
127            for (int i = 0; i < params.size(); i++) {
128                strParams[i] = params.get(i).getTextTrim();
129            }
130            String[] strArgs = null;
131            List<Element> eArgs = actionXml.getChildren("argument", ns);
132            if (eArgs != null && eArgs.size() > 0) {
133                strArgs = new String[eArgs.size()];
134                for (int i = 0; i < eArgs.size(); i++) {
135                    strArgs[i] = eArgs.get(i).getTextTrim();
136                }
137            }
138            PigMain.setPigScript(actionConf, pigName, strParams, strArgs);
139            return actionConf;
140        }
141    
142        @Override
143        protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
144            return false;
145        }
146    
147        /**
148         * Get the stats and external child IDs for a pig job
149         *
150         * @param actionFs the FileSystem object
151         * @param runningJob the runningJob
152         * @param action the Workflow action
153         * @param context executor context
154         *
155         */
156        @Override
157        protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) throws HadoopAccessorException, JDOMException, IOException, URISyntaxException{
158            super.getActionData(actionFs, runningJob, action, context);
159            String stats = getStats(context, actionFs);
160            context.setExecutionStats(stats);
161            String externalChildIDs = getExternalChildIDs(context, actionFs);
162            context.setExternalChildIDs(externalChildIDs);
163        }
164    
165        private String getStats(Context context, FileSystem actionFs) throws IOException, HadoopAccessorException,
166                URISyntaxException {
167            Path actionOutput = LauncherMapper.getActionStatsDataPath(context.getActionDir());
168            String stats = null;
169            if (actionFs.exists(actionOutput)) {
170                stats = getDataFromPath(actionOutput, actionFs);
171    
172            }
173            return stats;
174        }
175    
176        private String getExternalChildIDs(Context context, FileSystem actionFs) throws IOException,
177                HadoopAccessorException, URISyntaxException {
178            Path actionOutput = LauncherMapper.getExternalChildIDsDataPath(context.getActionDir());
179            String externalIDs = null;
180            if (actionFs.exists(actionOutput)) {
181                externalIDs = getDataFromPath(actionOutput, actionFs);
182            }
183            return externalIDs;
184        }
185    
186        private static String getDataFromPath(Path actionOutput, FileSystem actionFs) throws IOException{
187            BufferedReader reader = null;
188            String data = null;
189            try {
190                InputStream is = actionFs.open(actionOutput);
191                reader = new BufferedReader(new InputStreamReader(is));
192                data = IOUtils.getReaderAsString(reader, -1);
193    
194            }
195            finally {
196                if (reader != null) {
197                    reader.close();
198                }
199            }
200            return data;
201        }
202    
203        /**
204         * Return the sharelib postfix for the action.
205         *
206         * @param context executor context.
207         * @param actionXml the action XML.
208         * @return the action sharelib post fix, this implementation returns <code>pig</code>.
209         */
210        @Override
211        protected String getShareLibPostFix(Context context, Element actionXml) {
212            return "pig";
213        }
214    
215    }