001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.action.hadoop; 020 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.fs.FileSystem; 023import org.apache.hadoop.fs.Path; 024import org.apache.hadoop.mapred.JobConf; 025import org.apache.oozie.action.ActionExecutorException; 026import org.apache.oozie.client.WorkflowAction; 027import org.apache.oozie.service.ConfigurationService; 028import org.apache.oozie.service.Services; 029import org.apache.oozie.service.SparkConfigurationService; 030import org.jdom.Element; 031import org.jdom.Namespace; 032 033import java.util.ArrayList; 034import java.util.List; 035import java.util.Map; 036 037public class SparkActionExecutor extends JavaActionExecutor { 038 public static final String SPARK_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.SparkMain"; 039 public static final String TASK_USER_PRECEDENCE = "mapreduce.task.classpath.user.precedence"; // hadoop-2 040 public static final String TASK_USER_CLASSPATH_PRECEDENCE = "mapreduce.user.classpath.first"; // hadoop-1 041 public static final String SPARK_MASTER = "oozie.spark.master"; 042 public static final String SPARK_MODE = "oozie.spark.mode"; 043 public static final String SPARK_OPTS = "oozie.spark.spark-opts"; 044 public static final String SPARK_JOB_NAME = "oozie.spark.name"; 045 public static final String SPARK_CLASS = "oozie.spark.class"; 046 public static final String SPARK_JAR = "oozie.spark.jar"; 047 public static final String MAPRED_CHILD_ENV = "mapred.child.env"; 048 private static final String CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR = "oozie.action.spark.setup.hadoop.conf.dir"; 049 050 public SparkActionExecutor() { 051 super("spark"); 052 } 053 054 @Override 055 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) 056 throws ActionExecutorException { 057 actionConf = super.setupActionConf(actionConf, context, actionXml, appPath); 058 Namespace ns = actionXml.getNamespace(); 059 060 String master = actionXml.getChildTextTrim("master", ns); 061 actionConf.set(SPARK_MASTER, master); 062 063 String mode = actionXml.getChildTextTrim("mode", ns); 064 if (mode != null) { 065 actionConf.set(SPARK_MODE, mode); 066 } 067 068 String jobName = actionXml.getChildTextTrim("name", ns); 069 actionConf.set(SPARK_JOB_NAME, jobName); 070 071 String sparkClass = actionXml.getChildTextTrim("class", ns); 072 if (sparkClass != null) { 073 actionConf.set(SPARK_CLASS, sparkClass); 074 } 075 076 String jarLocation = actionXml.getChildTextTrim("jar", ns); 077 actionConf.set(SPARK_JAR, jarLocation); 078 079 StringBuilder sparkOptsSb = new StringBuilder(); 080 if (master.startsWith("yarn")) { 081 String resourceManager = actionConf.get(HADOOP_JOB_TRACKER); 082 Map<String, String> sparkConfig = Services.get().get(SparkConfigurationService.class).getSparkConfig(resourceManager); 083 for (Map.Entry<String, String> entry : sparkConfig.entrySet()) { 084 sparkOptsSb.append("--conf ").append(entry.getKey()).append("=").append(entry.getValue()).append(" "); 085 } 086 } 087 String sparkOpts = actionXml.getChildTextTrim("spark-opts", ns); 088 if (sparkOpts != null) { 089 sparkOptsSb.append(sparkOpts); 090 } 091 if (sparkOptsSb.length() > 0) { 092 actionConf.set(SPARK_OPTS, sparkOptsSb.toString().trim()); 093 } 094 095 // Setting if SparkMain should setup hadoop config *-site.xml 096 boolean setupHadoopConf = actionConf.getBoolean(CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR, 097 ConfigurationService.getBoolean(CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR)); 098 actionConf.setBoolean(CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR, setupHadoopConf); 099 return actionConf; 100 } 101 102 @Override 103 JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, 104 Configuration actionConf) throws ActionExecutorException { 105 106 JobConf launcherJobConf = super.createLauncherConf(actionFs, context, action, actionXml, actionConf); 107 if (launcherJobConf.get("oozie.launcher." + TASK_USER_PRECEDENCE) == null) { 108 launcherJobConf.set(TASK_USER_PRECEDENCE, "true"); 109 } 110 if (launcherJobConf.get("oozie.launcher." + TASK_USER_CLASSPATH_PRECEDENCE) == null) { 111 launcherJobConf.set(TASK_USER_CLASSPATH_PRECEDENCE, "true"); 112 } 113 return launcherJobConf; 114 } 115 116 @Override 117 Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) 118 throws ActionExecutorException { 119 super.setupLauncherConf(conf, actionXml, appPath, context); 120 121 // Set SPARK_HOME environment variable on launcher job 122 // It is needed since pyspark client checks for it. 123 String sparkHome = "SPARK_HOME=."; 124 String mapredChildEnv = conf.get("oozie.launcher." + MAPRED_CHILD_ENV); 125 126 if (mapredChildEnv == null) { 127 conf.set(MAPRED_CHILD_ENV, sparkHome); 128 conf.set("oozie.launcher." + MAPRED_CHILD_ENV, sparkHome); 129 } else if (!mapredChildEnv.contains("SPARK_HOME")) { 130 conf.set(MAPRED_CHILD_ENV, mapredChildEnv + "," + sparkHome); 131 conf.set("oozie.launcher." + MAPRED_CHILD_ENV, mapredChildEnv + "," + sparkHome); 132 } 133 return conf; 134 } 135 136 @Override 137 public List<Class> getLauncherClasses() { 138 List<Class> classes = new ArrayList<Class>(); 139 try { 140 classes.add(Class.forName(SPARK_MAIN_CLASS_NAME)); 141 } catch (ClassNotFoundException e) { 142 throw new RuntimeException("Class not found", e); 143 } 144 return classes; 145 } 146 147 148 /** 149 * Return the sharelib name for the action. 150 * 151 * @param actionXml 152 * @return returns <code>spark</code>. 153 */ 154 @Override 155 protected String getDefaultShareLibName(Element actionXml) { 156 return "spark"; 157 } 158 159 @Override 160 protected String getLauncherMain(Configuration launcherConf, Element actionXml) { 161 return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, SPARK_MAIN_CLASS_NAME); 162 } 163}