001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.action.hadoop; 020 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.fs.FileSystem; 023import org.apache.hadoop.fs.Path; 024import org.apache.hadoop.mapred.JobConf; 025import org.apache.oozie.action.ActionExecutorException; 026import org.apache.oozie.client.WorkflowAction; 027import org.apache.oozie.service.ConfigurationService; 028import org.apache.oozie.service.Services; 029import org.apache.oozie.service.SparkConfigurationService; 030import org.jdom.Element; 031import org.jdom.Namespace; 032 033import java.util.ArrayList; 034import java.util.List; 035import java.util.Properties; 036 037public class SparkActionExecutor extends JavaActionExecutor { 038 public static final String SPARK_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.SparkMain"; 039 public static final String TASK_USER_PRECEDENCE = "mapreduce.task.classpath.user.precedence"; // hadoop-2 040 public static final String TASK_USER_CLASSPATH_PRECEDENCE = "mapreduce.user.classpath.first"; // hadoop-1 041 public static final String SPARK_MASTER = "oozie.spark.master"; 042 public static final String SPARK_MODE = "oozie.spark.mode"; 043 public static final String SPARK_OPTS = "oozie.spark.spark-opts"; 044 public static final String SPARK_JOB_NAME = "oozie.spark.name"; 045 public static final String SPARK_CLASS = "oozie.spark.class"; 046 public static final String SPARK_JAR = "oozie.spark.jar"; 047 public static final String MAPRED_CHILD_ENV = "mapred.child.env"; 048 private static final String CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR = "oozie.action.spark.setup.hadoop.conf.dir"; 049 050 public SparkActionExecutor() { 051 super("spark"); 052 } 053 054 @Override 055 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) 056 throws ActionExecutorException { 057 actionConf = super.setupActionConf(actionConf, context, actionXml, appPath); 058 Namespace ns = actionXml.getNamespace(); 059 060 String master = actionXml.getChildTextTrim("master", ns); 061 actionConf.set(SPARK_MASTER, master); 062 063 String mode = actionXml.getChildTextTrim("mode", ns); 064 if (mode != null) { 065 actionConf.set(SPARK_MODE, mode); 066 } 067 068 String jobName = actionXml.getChildTextTrim("name", ns); 069 actionConf.set(SPARK_JOB_NAME, jobName); 070 071 String sparkClass = actionXml.getChildTextTrim("class", ns); 072 if (sparkClass != null) { 073 actionConf.set(SPARK_CLASS, sparkClass); 074 } 075 076 String jarLocation = actionXml.getChildTextTrim("jar", ns); 077 actionConf.set(SPARK_JAR, jarLocation); 078 079 StringBuilder sparkOptsSb = new StringBuilder(); 080 if (master.startsWith("yarn")) { 081 String resourceManager = actionConf.get(HADOOP_JOB_TRACKER); 082 Properties sparkConfig = 083 Services.get().get(SparkConfigurationService.class).getSparkConfig(resourceManager); 084 for (String property : sparkConfig.stringPropertyNames()) { 085 sparkOptsSb.append("--conf ") 086 .append(property).append("=").append(sparkConfig.getProperty(property)).append(" "); 087 } 088 } 089 String sparkOpts = actionXml.getChildTextTrim("spark-opts", ns); 090 if (sparkOpts != null) { 091 sparkOptsSb.append(sparkOpts); 092 } 093 if (sparkOptsSb.length() > 0) { 094 actionConf.set(SPARK_OPTS, sparkOptsSb.toString().trim()); 095 } 096 097 // Setting if SparkMain should setup hadoop config *-site.xml 098 boolean setupHadoopConf = actionConf.getBoolean(CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR, 099 ConfigurationService.getBoolean(CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR)); 100 actionConf.setBoolean(CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR, setupHadoopConf); 101 return actionConf; 102 } 103 104 @Override 105 JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, 106 Configuration actionConf) throws ActionExecutorException { 107 108 JobConf launcherJobConf = super.createLauncherConf(actionFs, context, action, actionXml, actionConf); 109 if (launcherJobConf.get("oozie.launcher." + TASK_USER_PRECEDENCE) == null) { 110 launcherJobConf.set(TASK_USER_PRECEDENCE, "true"); 111 } 112 if (launcherJobConf.get("oozie.launcher." + TASK_USER_CLASSPATH_PRECEDENCE) == null) { 113 launcherJobConf.set(TASK_USER_CLASSPATH_PRECEDENCE, "true"); 114 } 115 return launcherJobConf; 116 } 117 118 @Override 119 Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) 120 throws ActionExecutorException { 121 super.setupLauncherConf(conf, actionXml, appPath, context); 122 123 // Set SPARK_HOME environment variable on launcher job 124 // It is needed since pyspark client checks for it. 125 String sparkHome = "SPARK_HOME=."; 126 String mapredChildEnv = conf.get("oozie.launcher." + MAPRED_CHILD_ENV); 127 128 if (mapredChildEnv == null) { 129 conf.set(MAPRED_CHILD_ENV, sparkHome); 130 conf.set("oozie.launcher." + MAPRED_CHILD_ENV, sparkHome); 131 } else if (!mapredChildEnv.contains("SPARK_HOME")) { 132 conf.set(MAPRED_CHILD_ENV, mapredChildEnv + "," + sparkHome); 133 conf.set("oozie.launcher." + MAPRED_CHILD_ENV, mapredChildEnv + "," + sparkHome); 134 } 135 return conf; 136 } 137 138 @Override 139 public List<Class> getLauncherClasses() { 140 List<Class> classes = new ArrayList<Class>(); 141 try { 142 classes.add(Class.forName(SPARK_MAIN_CLASS_NAME)); 143 } catch (ClassNotFoundException e) { 144 throw new RuntimeException("Class not found", e); 145 } 146 return classes; 147 } 148 149 150 /** 151 * Return the sharelib name for the action. 152 * 153 * @param actionXml 154 * @return returns <code>spark</code>. 155 */ 156 @Override 157 protected String getDefaultShareLibName(Element actionXml) { 158 return "spark"; 159 } 160 161 @Override 162 protected String getLauncherMain(Configuration launcherConf, Element actionXml) { 163 return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, SPARK_MAIN_CLASS_NAME); 164 } 165 166 @Override 167 public String[] getShareLibFilesForActionConf() { 168 return new String[] { "hive-site.xml" }; 169 } 170 171}