001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import static org.apache.oozie.action.hadoop.LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS;
021    
022    import java.io.BufferedReader;
023    import java.io.IOException;
024    import java.io.InputStream;
025    import java.io.InputStreamReader;
026    import java.io.StringReader;
027    import java.net.URISyntaxException;
028    import java.util.List;
029    import java.util.Properties;
030    
031    import org.apache.hadoop.conf.Configuration;
032    import org.apache.hadoop.fs.FileSystem;
033    import org.apache.hadoop.fs.Path;
034    import org.apache.hadoop.mapred.RunningJob;
035    import org.apache.oozie.action.ActionExecutorException;
036    import org.apache.oozie.client.WorkflowAction;
037    import org.apache.oozie.client.XOozieClient;
038    import org.apache.oozie.service.HadoopAccessorException;
039    import org.apache.oozie.util.IOUtils;
040    import org.jdom.Element;
041    import org.jdom.JDOMException;
042    import org.jdom.Namespace;
043    
044    public class HiveActionExecutor extends ScriptLanguageActionExecutor {
045    
046        private static final String HIVE_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.HiveMain";
047        static final String HIVE_SCRIPT = "oozie.hive.script";
048        static final String HIVE_PARAMS = "oozie.hive.params";
049        static final String HIVE_ARGS = "oozie.hive.args";
050    
051        public HiveActionExecutor() {
052            super("hive");
053        }
054    
055        @Override
056        protected List<Class> getLauncherClasses() {
057            List<Class> classes = super.getLauncherClasses();
058            try {
059                classes.add(Class.forName(HIVE_MAIN_CLASS_NAME));
060            }
061            catch (ClassNotFoundException e) {
062                throw new RuntimeException("Class not found", e);
063            }
064            return classes;
065        }
066    
067        @Override
068        protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
069            return launcherConf.get(CONF_OOZIE_ACTION_MAIN_CLASS, HIVE_MAIN_CLASS_NAME);
070        }
071    
072        @Override
073        @SuppressWarnings("unchecked")
074        Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml,
075                                      Path appPath) throws ActionExecutorException {
076            Configuration conf = super.setupActionConf(actionConf, context, actionXml, appPath);
077    
078            Namespace ns = actionXml.getNamespace();
079            String script = actionXml.getChild("script", ns).getTextTrim();
080            String scriptName = new Path(script).getName();
081            String hiveScriptContent = context.getProtoActionConf().get(XOozieClient.HIVE_SCRIPT);
082    
083            if (hiveScriptContent == null){
084                addToCache(conf, appPath, script + "#" + scriptName, false);
085            }
086    
087            List<Element> params = (List<Element>) actionXml.getChildren("param", ns);
088            String[] strParams = new String[params.size()];
089            for (int i = 0; i < params.size(); i++) {
090                strParams[i] = params.get(i).getTextTrim();
091            }
092            String[] strArgs = null;
093            List<Element> eArgs = actionXml.getChildren("argument", ns);
094            if (eArgs != null && eArgs.size() > 0) {
095                strArgs = new String[eArgs.size()];
096                for (int i = 0; i < eArgs.size(); i++) {
097                    strArgs[i] = eArgs.get(i).getTextTrim();
098                }
099            }
100    
101            setHiveScript(conf, scriptName, strParams, strArgs);
102            return conf;
103        }
104    
105        public static void setHiveScript(Configuration conf, String script, String[] params, String[] args) {
106            conf.set(HIVE_SCRIPT, script);
107            MapReduceMain.setStrings(conf, HIVE_PARAMS, params);
108            MapReduceMain.setStrings(conf, HIVE_ARGS, args);
109        }
110    
111        @Override
112        protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
113            return true;
114        }
115    
116        @Override
117        protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
118                throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
119            super.getActionData(actionFs, runningJob, action, context);
120    
121            // Load stored Hadoop jobs ids and promote them as external child ids on job success
122            Properties props = new Properties();
123            props.load(new StringReader(action.getData()));
124            context.setExternalChildIDs((String) props.get(LauncherMain.HADOOP_JOBS));
125        }
126    
127        @Override
128        protected void setActionCompletionData(Context context, FileSystem actionFs) throws IOException,
129                HadoopAccessorException, URISyntaxException {
130            super.setActionCompletionData(context, actionFs);
131    
132            // Load stored Hadoop jobs ids and promote them as external child ids on job failure
133            Path externalChildIDs = LauncherMapperHelper.getExternalChildIDsDataPath(context.getActionDir());
134            if (actionFs.exists(externalChildIDs)) {
135                InputStream is = actionFs.open(externalChildIDs);
136                BufferedReader reader = new BufferedReader(new InputStreamReader(is));
137                context.setExternalChildIDs(IOUtils.getReaderAsString(reader, -1));
138                reader.close();
139            }
140        }
141    
142        /**
143         * Return the sharelib name for the action.
144         *
145         * @return returns <code>hive</code>.
146         * @param actionXml
147         */
148        @Override
149        protected String getDefaultShareLibName(Element actionXml) {
150            return "hive";
151        }
152    
153        protected String getScriptName() {
154            return XOozieClient.HIVE_SCRIPT;
155        }
156    
157    }