001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.action.hadoop;
020
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.fs.FileSystem;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.mapred.JobConf;
025import org.apache.oozie.action.ActionExecutorException;
026import org.apache.oozie.client.WorkflowAction;
027import org.apache.oozie.service.ConfigurationService;
028import org.apache.oozie.service.Services;
029import org.apache.oozie.service.SparkConfigurationService;
030import org.jdom.Element;
031import org.jdom.Namespace;
032
033import java.util.ArrayList;
034import java.util.List;
035import java.util.Map;
036
037public class SparkActionExecutor extends JavaActionExecutor {
038    public static final String SPARK_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.SparkMain";
039    public static final String TASK_USER_PRECEDENCE = "mapreduce.task.classpath.user.precedence"; // hadoop-2
040    public static final String TASK_USER_CLASSPATH_PRECEDENCE = "mapreduce.user.classpath.first";  // hadoop-1
041    public static final String SPARK_MASTER = "oozie.spark.master";
042    public static final String SPARK_MODE = "oozie.spark.mode";
043    public static final String SPARK_OPTS = "oozie.spark.spark-opts";
044    public static final String SPARK_JOB_NAME = "oozie.spark.name";
045    public static final String SPARK_CLASS = "oozie.spark.class";
046    public static final String SPARK_JAR = "oozie.spark.jar";
047    public static final String MAPRED_CHILD_ENV = "mapred.child.env";
048    private static final String CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR = "oozie.action.spark.setup.hadoop.conf.dir";
049
050    public SparkActionExecutor() {
051        super("spark");
052    }
053
054    @Override
055    Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
056            throws ActionExecutorException {
057        actionConf = super.setupActionConf(actionConf, context, actionXml, appPath);
058        Namespace ns = actionXml.getNamespace();
059
060        String master = actionXml.getChildTextTrim("master", ns);
061        actionConf.set(SPARK_MASTER, master);
062
063        String mode = actionXml.getChildTextTrim("mode", ns);
064        if (mode != null) {
065            actionConf.set(SPARK_MODE, mode);
066        }
067
068        String jobName = actionXml.getChildTextTrim("name", ns);
069        actionConf.set(SPARK_JOB_NAME, jobName);
070
071        String sparkClass = actionXml.getChildTextTrim("class", ns);
072        if (sparkClass != null) {
073            actionConf.set(SPARK_CLASS, sparkClass);
074        }
075
076        String jarLocation = actionXml.getChildTextTrim("jar", ns);
077        actionConf.set(SPARK_JAR, jarLocation);
078
079        StringBuilder sparkOptsSb = new StringBuilder();
080        if (master.startsWith("yarn")) {
081            String resourceManager = actionConf.get(HADOOP_JOB_TRACKER);
082            Map<String, String> sparkConfig = Services.get().get(SparkConfigurationService.class).getSparkConfig(resourceManager);
083            for (Map.Entry<String, String> entry : sparkConfig.entrySet()) {
084                sparkOptsSb.append("--conf ").append(entry.getKey()).append("=").append(entry.getValue()).append(" ");
085            }
086        }
087        String sparkOpts = actionXml.getChildTextTrim("spark-opts", ns);
088        if (sparkOpts != null) {
089            sparkOptsSb.append(sparkOpts);
090        }
091        if (sparkOptsSb.length() > 0) {
092            actionConf.set(SPARK_OPTS, sparkOptsSb.toString().trim());
093        }
094
095        // Setting if SparkMain should setup hadoop config *-site.xml
096        boolean setupHadoopConf = actionConf.getBoolean(CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR,
097                ConfigurationService.getBoolean(CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR));
098        actionConf.setBoolean(CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR, setupHadoopConf);
099        return actionConf;
100    }
101
102    @Override
103    JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml,
104                               Configuration actionConf) throws ActionExecutorException {
105
106        JobConf launcherJobConf = super.createLauncherConf(actionFs, context, action, actionXml, actionConf);
107        if (launcherJobConf.get("oozie.launcher." + TASK_USER_PRECEDENCE) == null) {
108            launcherJobConf.set(TASK_USER_PRECEDENCE, "true");
109        }
110        if (launcherJobConf.get("oozie.launcher." + TASK_USER_CLASSPATH_PRECEDENCE) == null) {
111            launcherJobConf.set(TASK_USER_CLASSPATH_PRECEDENCE, "true");
112        }
113        return launcherJobConf;
114    }
115
116    @Override
117    Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
118            throws ActionExecutorException {
119        super.setupLauncherConf(conf, actionXml, appPath, context);
120
121        // Set SPARK_HOME environment variable on launcher job
122        // It is needed since pyspark client checks for it.
123        String sparkHome = "SPARK_HOME=.";
124        String mapredChildEnv = conf.get("oozie.launcher." + MAPRED_CHILD_ENV);
125
126        if (mapredChildEnv == null) {
127            conf.set(MAPRED_CHILD_ENV, sparkHome);
128            conf.set("oozie.launcher." + MAPRED_CHILD_ENV, sparkHome);
129        } else if (!mapredChildEnv.contains("SPARK_HOME")) {
130            conf.set(MAPRED_CHILD_ENV, mapredChildEnv + "," + sparkHome);
131            conf.set("oozie.launcher." + MAPRED_CHILD_ENV, mapredChildEnv + "," + sparkHome);
132        }
133        return conf;
134    }
135
136    @Override
137    public List<Class> getLauncherClasses() {
138        List<Class> classes = new ArrayList<Class>();
139        try {
140            classes.add(Class.forName(SPARK_MAIN_CLASS_NAME));
141        } catch (ClassNotFoundException e) {
142            throw new RuntimeException("Class not found", e);
143        }
144        return classes;
145    }
146
147
148    /**
149     * Return the sharelib name for the action.
150     *
151     * @param actionXml
152     * @return returns <code>spark</code>.
153     */
154    @Override
155    protected String getDefaultShareLibName(Element actionXml) {
156        return "spark";
157    }
158
159    @Override
160    protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
161        return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, SPARK_MAIN_CLASS_NAME);
162    }
163}