001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import org.apache.oozie.util.XLog;
022
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.FileReader;
026import java.io.IOException;
027import java.io.InputStreamReader;
028import java.nio.charset.StandardCharsets;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.Map;
032import java.util.Properties;
033import java.util.Set;
034
035public class SparkConfigurationService implements Service {
036
037    private static XLog LOG = XLog.getLog(SparkConfigurationService.class);
038
039    public static final String CONF_PREFIX = Service.CONF_PREFIX + "SparkConfigurationService.";
040    public static final String SPARK_CONFIGURATIONS = CONF_PREFIX + "spark.configurations";
041    public static final String SPARK_CONFIGURATIONS_IGNORE_SPARK_YARN_JAR
042            = SPARK_CONFIGURATIONS + ".ignore.spark.yarn.jar";
043    public static final String SPARK_CONFIGURATIONS_BLACKLIST = SPARK_CONFIGURATIONS + ".blacklist";
044
045    private static final String SPARK_CONFIG_FILE = "spark-defaults.conf";
046    private static final String SPARK_YARN_JAR_PROP = "spark.yarn.jar";
047    private static final String HOST_WILDCARD = "*";
048    private Map<String, Properties> sparkConfigs;
049    private Set<String> blacklist;
050
051    @Override
052    public void init(Services services) throws ServiceException {
053        loadBlacklist();
054        loadSparkConfigs();
055    }
056
057    @Override
058    public void destroy() {
059        sparkConfigs.clear();
060        blacklist.clear();
061    }
062
063    @Override
064    public Class<? extends Service> getInterface() {
065        return SparkConfigurationService.class;
066    }
067
068    private void loadBlacklist() {
069        blacklist = new HashSet<>();
070        for(String s : ConfigurationService.getStrings(SPARK_CONFIGURATIONS_BLACKLIST)) {
071            blacklist.add(s.trim());
072        }
073        // spark.yarn.jar is added if the old property to ignore it is set.
074        if(ConfigurationService.getBoolean(SPARK_CONFIGURATIONS_IGNORE_SPARK_YARN_JAR)){
075            LOG.warn("Deprecated property found in configuration: " + SPARK_CONFIGURATIONS_IGNORE_SPARK_YARN_JAR +
076                    "Use "+SPARK_CONFIGURATIONS_BLACKLIST+" instead.");
077            blacklist.add(SPARK_YARN_JAR_PROP);
078        }
079    }
080
081    private void loadSparkConfigs() throws ServiceException {
082        sparkConfigs = new HashMap<>();
083        String[] confDefs = ConfigurationService.getStrings(SPARK_CONFIGURATIONS);
084        for (String confDef : confDefs) {
085            readEntry(confDef.trim());
086        }
087    }
088
089    private void readEntry(String confDef) throws ServiceException {
090        String[] parts = confDef.split("=");
091        if (parts.length == 2) {
092            String hostPort = parts[0];
093            String confDir = parts[1];
094            File dir = getAbsoluteDir(confDir);
095            if (dir.exists()) {
096                Properties sparkDefaults = readSparkConfigFile(hostPort, dir);
097                filterBlackList(sparkDefaults);
098                if(!sparkDefaults.isEmpty()) {
099                    sparkConfigs.put(hostPort, sparkDefaults);
100                }
101            } else {
102                LOG.warn("Spark Configuration could not be loaded for {0}: {1} does not exist",
103                        hostPort, dir.getAbsolutePath());
104            }
105        } else {
106            LOG.warn("Spark Configuration could not be loaded: invalid value found: {0}", confDef);
107        }
108    }
109
110    private File getAbsoluteDir(String confDir) throws ServiceException {
111        File dir = new File(confDir);
112        if (!dir.isAbsolute()) {
113            File configDir = new File(ConfigurationService.getConfigurationDirectory());
114            dir = new File(configDir, confDir);
115        }
116        return dir;
117    }
118
119    private void filterBlackList(Properties sparkDefaults) {
120        for(String property : blacklist){
121            sparkDefaults.remove(property);
122        }
123    }
124
125    private Properties readSparkConfigFile(String hostPort, File dir) {
126        File file = new File(dir, SPARK_CONFIG_FILE);
127        Properties props = new Properties();
128        if (file.exists()) {
129            try (FileInputStream stream = new FileInputStream(file);
130                 InputStreamReader reader = new InputStreamReader(stream, StandardCharsets.UTF_8.name())) {
131                props.load(reader);
132                LOG.info("Loaded Spark Configuration: {0}={1}", hostPort, file.getAbsolutePath());
133            } catch (IOException ioe) {
134                LOG.warn("Spark Configuration could not be loaded for {0}: {1}",
135                        hostPort, ioe.getMessage(), ioe);
136            }
137        } else {
138            LOG.warn("Spark Configuration could not be loaded for {0}: {1} does not exist",
139                    hostPort, file.getAbsolutePath());
140        }
141        return props;
142    }
143
144    public Properties getSparkConfig(String resourceManagerHostPort) {
145        resourceManagerHostPort = (resourceManagerHostPort != null) ? resourceManagerHostPort.toLowerCase() : null;
146        Properties config = sparkConfigs.get(resourceManagerHostPort);
147        if (config == null) {
148            config = sparkConfigs.get(HOST_WILDCARD);
149            if (config == null) {
150                config = new Properties();
151            }
152        }
153        return config;
154    }
155}
156