001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.hadoop.fs.FileSystem;
022    import org.apache.hadoop.fs.FSDataOutputStream;
023    import org.apache.hadoop.fs.Path;
024    import org.apache.hadoop.mapred.RunningJob;
025    import org.apache.oozie.action.ActionExecutorException;
026    import org.apache.oozie.client.XOozieClient;
027    import org.apache.oozie.client.WorkflowAction;
028    import org.apache.oozie.service.HadoopAccessorException;
029    import org.apache.oozie.util.IOUtils;
030    import org.apache.oozie.util.XLog;
031    import org.jdom.Element;
032    import org.jdom.Namespace;
033    import org.jdom.JDOMException;
034    
035    import java.io.BufferedReader;
036    import java.io.IOException;
037    import java.io.InputStream;
038    import java.io.InputStreamReader;
039    import java.net.URISyntaxException;
040    import java.util.List;
041    
042    public class PigActionExecutor extends JavaActionExecutor {
043    
044        public PigActionExecutor() {
045            super("pig");
046        }
047    
048        @Override
049        protected List<Class> getLauncherClasses() {
050            List<Class> classes = super.getLauncherClasses();
051            classes.add(LauncherMain.class);
052            classes.add(MapReduceMain.class);
053            classes.add(PigMain.class);
054            classes.add(OoziePigStats.class);
055            return classes;
056        }
057    
058    
059        @Override
060        protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
061            return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, PigMain.class.getName());
062        }
063    
064        @Override
065        void injectActionCallback(Context context, Configuration launcherConf) {
066        }
067    
068        @Override
069        protected Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
070                throws ActionExecutorException {
071            super.setupLauncherConf(conf, actionXml, appPath, context);
072            Namespace ns = actionXml.getNamespace();
073            String script = actionXml.getChild("script", ns).getTextTrim();
074            String pigName = new Path(script).getName();
075            String pigScriptContent = context.getProtoActionConf().get(XOozieClient.PIG_SCRIPT);
076    
077            Path pigScriptFile = null;
078            if (pigScriptContent != null) { // Create pig script on hdfs if this is
079                // an http submission pig job;
080                FSDataOutputStream dos = null;
081                try {
082                    Path actionPath = context.getActionDir();
083                    pigScriptFile = new Path(actionPath, script);
084                    FileSystem fs = context.getAppFileSystem();
085                    dos = fs.create(pigScriptFile);
086                    dos.writeBytes(pigScriptContent);
087    
088                    addToCache(conf, actionPath, script + "#" + pigName, false);
089                }
090                catch (Exception ex) {
091                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_OPERATION", XLog
092                            .format("Not able to write pig script file {0} on hdfs", pigScriptFile), ex);
093                }
094                finally {
095                    try {
096                        if (dos != null) {
097                            dos.close();
098                        }
099                    }
100                    catch (IOException ex) {
101                        XLog.getLog(getClass()).error("Error: " + ex.getMessage());
102                    }
103                }
104            }
105            else {
106                addToCache(conf, appPath, script + "#" + pigName, false);
107            }
108    
109            return conf;
110        }
111    
112        @Override
113        @SuppressWarnings("unchecked")
114        Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
115                throws ActionExecutorException {
116            super.setupActionConf(actionConf, context, actionXml, appPath);
117            Namespace ns = actionXml.getNamespace();
118    
119            String script = actionXml.getChild("script", ns).getTextTrim();
120            String pigName = new Path(script).getName();
121    
122            List<Element> params = (List<Element>) actionXml.getChildren("param", ns);
123            String[] strParams = new String[params.size()];
124            for (int i = 0; i < params.size(); i++) {
125                strParams[i] = params.get(i).getTextTrim();
126            }
127            String[] strArgs = null;
128            List<Element> eArgs = actionXml.getChildren("argument", ns);
129            if (eArgs != null && eArgs.size() > 0) {
130                strArgs = new String[eArgs.size()];
131                for (int i = 0; i < eArgs.size(); i++) {
132                    strArgs[i] = eArgs.get(i).getTextTrim();
133                }
134            }
135            PigMain.setPigScript(actionConf, pigName, strParams, strArgs);
136            return actionConf;
137        }
138    
139        @Override
140        protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
141            return false;
142        }
143    
144        /**
145         * Get the stats and external child IDs for a pig job
146         *
147         * @param actionFs the FileSystem object
148         * @param runningJob the runningJob
149         * @param action the Workflow action
150         * @param context executor context
151         *
152         */
153        @Override
154        protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) throws HadoopAccessorException, JDOMException, IOException, URISyntaxException{
155            super.getActionData(actionFs, runningJob, action, context);
156            String stats = getStats(context, actionFs);
157            context.setExecutionStats(stats);
158            String externalChildIDs = getExternalChildIDs(context, actionFs);
159            context.setExternalChildIDs(externalChildIDs);
160        }
161    
162        private String getStats(Context context, FileSystem actionFs) throws IOException, HadoopAccessorException,
163                URISyntaxException {
164            Path actionOutput = LauncherMapper.getActionStatsDataPath(context.getActionDir());
165            String stats = null;
166            if (actionFs.exists(actionOutput)) {
167                stats = getDataFromPath(actionOutput, actionFs);
168    
169            }
170            return stats;
171        }
172    
173        private String getExternalChildIDs(Context context, FileSystem actionFs) throws IOException,
174                HadoopAccessorException, URISyntaxException {
175            Path actionOutput = LauncherMapper.getExternalChildIDsDataPath(context.getActionDir());
176            String externalIDs = null;
177            if (actionFs.exists(actionOutput)) {
178                externalIDs = getDataFromPath(actionOutput, actionFs);
179            }
180            return externalIDs;
181        }
182    
183        private static String getDataFromPath(Path actionOutput, FileSystem actionFs) throws IOException{
184            BufferedReader reader = null;
185            String data = null;
186            try {
187                InputStream is = actionFs.open(actionOutput);
188                reader = new BufferedReader(new InputStreamReader(is));
189                data = IOUtils.getReaderAsString(reader, -1);
190    
191            }
192            finally {
193                if (reader != null) {
194                    reader.close();
195                }
196            }
197            return data;
198        }
199    
200        /**
201         * Return the sharelib postfix for the action.
202         *
203         * @return returns <code>pig</code>.
204         * @param actionXml
205         */
206        @Override
207        protected String getDefaultShareLibName(Element actionXml) {
208            return "pig";
209        }
210    
211    }