001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.action.hadoop;
020
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.fs.FileSystem;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.mapred.JobConf;
025import org.apache.oozie.action.ActionExecutorException;
026import org.apache.oozie.client.WorkflowAction;
027import org.apache.oozie.service.ConfigurationService;
028import org.apache.oozie.service.Services;
029import org.apache.oozie.service.SparkConfigurationService;
030import org.jdom.Element;
031import org.jdom.Namespace;
032
033import java.util.ArrayList;
034import java.util.List;
035import java.util.Properties;
036
037public class SparkActionExecutor extends JavaActionExecutor {
038    public static final String SPARK_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.SparkMain";
039    public static final String TASK_USER_PRECEDENCE = "mapreduce.task.classpath.user.precedence"; // hadoop-2
040    public static final String TASK_USER_CLASSPATH_PRECEDENCE = "mapreduce.user.classpath.first";  // hadoop-1
041    public static final String SPARK_MASTER = "oozie.spark.master";
042    public static final String SPARK_MODE = "oozie.spark.mode";
043    public static final String SPARK_OPTS = "oozie.spark.spark-opts";
044    public static final String SPARK_JOB_NAME = "oozie.spark.name";
045    public static final String SPARK_CLASS = "oozie.spark.class";
046    public static final String SPARK_JAR = "oozie.spark.jar";
047    public static final String MAPRED_CHILD_ENV = "mapred.child.env";
048    private static final String CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR = "oozie.action.spark.setup.hadoop.conf.dir";
049
050    public SparkActionExecutor() {
051        super("spark");
052    }
053
054    @Override
055    Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
056            throws ActionExecutorException {
057        actionConf = super.setupActionConf(actionConf, context, actionXml, appPath);
058        Namespace ns = actionXml.getNamespace();
059
060        String master = actionXml.getChildTextTrim("master", ns);
061        actionConf.set(SPARK_MASTER, master);
062
063        String mode = actionXml.getChildTextTrim("mode", ns);
064        if (mode != null) {
065            actionConf.set(SPARK_MODE, mode);
066        }
067
068        String jobName = actionXml.getChildTextTrim("name", ns);
069        actionConf.set(SPARK_JOB_NAME, jobName);
070
071        String sparkClass = actionXml.getChildTextTrim("class", ns);
072        if (sparkClass != null) {
073            actionConf.set(SPARK_CLASS, sparkClass);
074        }
075
076        String jarLocation = actionXml.getChildTextTrim("jar", ns);
077        actionConf.set(SPARK_JAR, jarLocation);
078
079        StringBuilder sparkOptsSb = new StringBuilder();
080        if (master.startsWith("yarn")) {
081            String resourceManager = actionConf.get(HADOOP_JOB_TRACKER);
082            Properties sparkConfig =
083                    Services.get().get(SparkConfigurationService.class).getSparkConfig(resourceManager);
084            for (String property : sparkConfig.stringPropertyNames()) {
085                sparkOptsSb.append("--conf ")
086                        .append(property).append("=").append(sparkConfig.getProperty(property)).append(" ");
087            }
088        }
089        String sparkOpts = actionXml.getChildTextTrim("spark-opts", ns);
090        if (sparkOpts != null) {
091            sparkOptsSb.append(sparkOpts);
092        }
093        if (sparkOptsSb.length() > 0) {
094            actionConf.set(SPARK_OPTS, sparkOptsSb.toString().trim());
095        }
096
097        // Setting if SparkMain should setup hadoop config *-site.xml
098        boolean setupHadoopConf = actionConf.getBoolean(CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR,
099                ConfigurationService.getBoolean(CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR));
100        actionConf.setBoolean(CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR, setupHadoopConf);
101        return actionConf;
102    }
103
104    @Override
105    JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml,
106                               Configuration actionConf) throws ActionExecutorException {
107
108        JobConf launcherJobConf = super.createLauncherConf(actionFs, context, action, actionXml, actionConf);
109        if (launcherJobConf.get("oozie.launcher." + TASK_USER_PRECEDENCE) == null) {
110            launcherJobConf.set(TASK_USER_PRECEDENCE, "true");
111        }
112        if (launcherJobConf.get("oozie.launcher." + TASK_USER_CLASSPATH_PRECEDENCE) == null) {
113            launcherJobConf.set(TASK_USER_CLASSPATH_PRECEDENCE, "true");
114        }
115        return launcherJobConf;
116    }
117
118    @Override
119    Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
120            throws ActionExecutorException {
121        super.setupLauncherConf(conf, actionXml, appPath, context);
122
123        // Set SPARK_HOME environment variable on launcher job
124        // It is needed since pyspark client checks for it.
125        String sparkHome = "SPARK_HOME=.";
126        String mapredChildEnv = conf.get("oozie.launcher." + MAPRED_CHILD_ENV);
127
128        if (mapredChildEnv == null) {
129            conf.set(MAPRED_CHILD_ENV, sparkHome);
130            conf.set("oozie.launcher." + MAPRED_CHILD_ENV, sparkHome);
131        } else if (!mapredChildEnv.contains("SPARK_HOME")) {
132            conf.set(MAPRED_CHILD_ENV, mapredChildEnv + "," + sparkHome);
133            conf.set("oozie.launcher." + MAPRED_CHILD_ENV, mapredChildEnv + "," + sparkHome);
134        }
135        return conf;
136    }
137
138    @Override
139    public List<Class> getLauncherClasses() {
140        List<Class> classes = new ArrayList<Class>();
141        try {
142            classes.add(Class.forName(SPARK_MAIN_CLASS_NAME));
143        } catch (ClassNotFoundException e) {
144            throw new RuntimeException("Class not found", e);
145        }
146        return classes;
147    }
148
149
150    /**
151     * Return the sharelib name for the action.
152     *
153     * @param actionXml
154     * @return returns <code>spark</code>.
155     */
156    @Override
157    protected String getDefaultShareLibName(Element actionXml) {
158        return "spark";
159    }
160
161    @Override
162    protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
163        return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, SPARK_MAIN_CLASS_NAME);
164    }
165
166    @Override
167    public String[] getShareLibFilesForActionConf() {
168        return new String[] { "hive-site.xml" };
169    }
170
171}