001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import org.apache.hadoop.conf.Configuration;
022import org.apache.oozie.ErrorCode;
023import org.apache.oozie.util.IOUtils;
024import org.apache.oozie.util.XConfiguration;
025import org.apache.oozie.util.XLog;
026
027import java.io.File;
028import java.io.FileReader;
029import java.io.IOException;
030import java.io.InputStream;
031import java.util.HashMap;
032import java.util.Map;
033import java.util.Properties;
034
035public class SparkConfigurationService implements Service {
036
037    private static XLog LOG = XLog.getLog(SparkConfigurationService.class);
038
039    public static final String CONF_PREFIX = Service.CONF_PREFIX + "SparkConfigurationService.";
040    public static final String SPARK_CONFIGURATIONS = CONF_PREFIX + "spark.configurations";
041    public static final String SPARK_CONFIGURATIONS_IGNORE_SPARK_YARN_JAR
042            = CONF_PREFIX + "spark.configurations.ignore.spark.yarn.jar";
043
044    private Map<String, Map<String, String>> sparkConfigs;
045    private static final String SPARK_CONFIG_FILE = "spark-defaults.conf";
046    private static final String SPARK_YARN_JAR_PROP = "spark.yarn.jar";
047
048    @Override
049    public void init(Services services) throws ServiceException {
050        loadSparkConfigs();
051    }
052
053    @Override
054    public void destroy() {
055        sparkConfigs.clear();
056    }
057
058    @Override
059    public Class<? extends Service> getInterface() {
060        return SparkConfigurationService.class;
061    }
062
063    private void loadSparkConfigs() throws ServiceException {
064        sparkConfigs = new HashMap<String, Map<String, String>>();
065        File configDir = new File(ConfigurationService.getConfigurationDirectory());
066        String[] confDefs = ConfigurationService.getStrings(SPARK_CONFIGURATIONS);
067        if (confDefs != null) {
068            boolean ignoreSparkYarnJar = ConfigurationService.getBoolean(SPARK_CONFIGURATIONS_IGNORE_SPARK_YARN_JAR);
069            for (String confDef : confDefs) {
070                if (confDef.trim().length() > 0) {
071                    String[] parts = confDef.split("=");
072                    if (parts.length == 2) {
073                        String hostPort = parts[0];
074                        String confDir = parts[1];
075                        File dir = new File(confDir);
076                        if (!dir.isAbsolute()) {
077                            dir = new File(configDir, confDir);
078                        }
079                        if (dir.exists()) {
080                            File file = new File(dir, SPARK_CONFIG_FILE);
081                            if (file.exists()) {
082                                Properties props = new Properties();
083                                FileReader fr = null;
084                                try {
085                                    fr = new FileReader(file);
086                                    props.load(fr);
087                                    fr.close();
088                                    if (ignoreSparkYarnJar) {
089                                        // Ignore spark.yarn.jar because it may interfere with the Spark Sharelib jars
090                                        props.remove(SPARK_YARN_JAR_PROP);
091                                    }
092                                    sparkConfigs.put(hostPort, propsToMap(props));
093                                    LOG.info("Loaded Spark Configuration: {0}={1}", hostPort, file.getAbsolutePath());
094                                } catch (IOException ioe) {
095                                    LOG.warn("Spark Configuration could not be loaded for {0}: {1}",
096                                            hostPort, ioe.getMessage(), ioe);
097                                } finally {
098                                    IOUtils.closeSafely(fr);
099                                }
100                            } else {
101                                LOG.warn("Spark Configuration could not be loaded for {0}: {1} does not exist",
102                                        hostPort, file.getAbsolutePath());
103                            }
104                        } else {
105                            LOG.warn("Spark Configuration could not be loaded for {0}: {1} does not exist",
106                                    hostPort, dir.getAbsolutePath());
107                        }
108                    } else {
109                        LOG.warn("Spark Configuration could not be loaded: invalid value found: {0}", confDef);
110                    }
111                }
112            }
113        } else {
114            LOG.info("Spark Configuration(s) not specified");
115        }
116    }
117
118    private Map<String, String> propsToMap(Properties props) {
119        Map<String, String> map = new HashMap<String, String>(props.size());
120        for (String key : props.stringPropertyNames()) {
121            map.put(key, props.getProperty(key));
122        }
123        return map;
124    }
125
126    public Map<String, String> getSparkConfig(String resourceManagerHostPort) {
127        resourceManagerHostPort = (resourceManagerHostPort != null) ? resourceManagerHostPort.toLowerCase() : null;
128        Map<String, String> config = sparkConfigs.get(resourceManagerHostPort);
129        if (config == null) {
130            config = sparkConfigs.get("*");
131            if (config == null) {
132                config = new HashMap<String, String>();
133            }
134        }
135        return config;
136    }
137}
138