001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import org.apache.oozie.util.XLog; 022 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.FileReader; 026import java.io.IOException; 027import java.io.InputStreamReader; 028import java.nio.charset.StandardCharsets; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.Map; 032import java.util.Properties; 033import java.util.Set; 034 035public class SparkConfigurationService implements Service { 036 037 private static XLog LOG = XLog.getLog(SparkConfigurationService.class); 038 039 public static final String CONF_PREFIX = Service.CONF_PREFIX + "SparkConfigurationService."; 040 public static final String SPARK_CONFIGURATIONS = CONF_PREFIX + "spark.configurations"; 041 public static final String SPARK_CONFIGURATIONS_IGNORE_SPARK_YARN_JAR 042 = SPARK_CONFIGURATIONS + ".ignore.spark.yarn.jar"; 043 public static final String SPARK_CONFIGURATIONS_BLACKLIST = SPARK_CONFIGURATIONS + ".blacklist"; 044 045 private static final String SPARK_CONFIG_FILE = "spark-defaults.conf"; 046 private static final String SPARK_YARN_JAR_PROP = "spark.yarn.jar"; 047 private static final String HOST_WILDCARD = "*"; 048 private Map<String, Properties> sparkConfigs; 049 private Set<String> blacklist; 050 051 @Override 052 public void init(Services services) throws ServiceException { 053 loadBlacklist(); 054 loadSparkConfigs(); 055 } 056 057 @Override 058 public void destroy() { 059 sparkConfigs.clear(); 060 blacklist.clear(); 061 } 062 063 @Override 064 public Class<? extends Service> getInterface() { 065 return SparkConfigurationService.class; 066 } 067 068 private void loadBlacklist() { 069 blacklist = new HashSet<>(); 070 for(String s : ConfigurationService.getStrings(SPARK_CONFIGURATIONS_BLACKLIST)) { 071 blacklist.add(s.trim()); 072 } 073 // spark.yarn.jar is added if the old property to ignore it is set. 074 if(ConfigurationService.getBoolean(SPARK_CONFIGURATIONS_IGNORE_SPARK_YARN_JAR)){ 075 LOG.warn("Deprecated property found in configuration: " + SPARK_CONFIGURATIONS_IGNORE_SPARK_YARN_JAR + 076 "Use "+SPARK_CONFIGURATIONS_BLACKLIST+" instead."); 077 blacklist.add(SPARK_YARN_JAR_PROP); 078 } 079 } 080 081 private void loadSparkConfigs() throws ServiceException { 082 sparkConfigs = new HashMap<>(); 083 String[] confDefs = ConfigurationService.getStrings(SPARK_CONFIGURATIONS); 084 for (String confDef : confDefs) { 085 readEntry(confDef.trim()); 086 } 087 } 088 089 private void readEntry(String confDef) throws ServiceException { 090 String[] parts = confDef.split("="); 091 if (parts.length == 2) { 092 String hostPort = parts[0]; 093 String confDir = parts[1]; 094 File dir = getAbsoluteDir(confDir); 095 if (dir.exists()) { 096 Properties sparkDefaults = readSparkConfigFile(hostPort, dir); 097 filterBlackList(sparkDefaults); 098 if(!sparkDefaults.isEmpty()) { 099 sparkConfigs.put(hostPort, sparkDefaults); 100 } 101 } else { 102 LOG.warn("Spark Configuration could not be loaded for {0}: {1} does not exist", 103 hostPort, dir.getAbsolutePath()); 104 } 105 } else { 106 LOG.warn("Spark Configuration could not be loaded: invalid value found: {0}", confDef); 107 } 108 } 109 110 private File getAbsoluteDir(String confDir) throws ServiceException { 111 File dir = new File(confDir); 112 if (!dir.isAbsolute()) { 113 File configDir = new File(ConfigurationService.getConfigurationDirectory()); 114 dir = new File(configDir, confDir); 115 } 116 return dir; 117 } 118 119 private void filterBlackList(Properties sparkDefaults) { 120 for(String property : blacklist){ 121 sparkDefaults.remove(property); 122 } 123 } 124 125 private Properties readSparkConfigFile(String hostPort, File dir) { 126 File file = new File(dir, SPARK_CONFIG_FILE); 127 Properties props = new Properties(); 128 if (file.exists()) { 129 try (FileInputStream stream = new FileInputStream(file); 130 InputStreamReader reader = new InputStreamReader(stream, StandardCharsets.UTF_8.name())) { 131 props.load(reader); 132 LOG.info("Loaded Spark Configuration: {0}={1}", hostPort, file.getAbsolutePath()); 133 } catch (IOException ioe) { 134 LOG.warn("Spark Configuration could not be loaded for {0}: {1}", 135 hostPort, ioe.getMessage(), ioe); 136 } 137 } else { 138 LOG.warn("Spark Configuration could not be loaded for {0}: {1} does not exist", 139 hostPort, file.getAbsolutePath()); 140 } 141 return props; 142 } 143 144 public Properties getSparkConfig(String resourceManagerHostPort) { 145 resourceManagerHostPort = (resourceManagerHostPort != null) ? resourceManagerHostPort.toLowerCase() : null; 146 Properties config = sparkConfigs.get(resourceManagerHostPort); 147 if (config == null) { 148 config = sparkConfigs.get(HOST_WILDCARD); 149 if (config == null) { 150 config = new Properties(); 151 } 152 } 153 return config; 154 } 155} 156