This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import org.apache.hadoop.conf.Configuration;
022import org.apache.oozie.ErrorCode;
023import org.apache.oozie.util.IOUtils;
024import org.apache.oozie.util.XConfiguration;
025import org.apache.oozie.util.XLog;
026
027import java.io.File;
028import java.io.FileReader;
029import java.io.IOException;
030import java.io.InputStream;
031import java.util.HashMap;
032import java.util.Map;
033import java.util.Properties;
034
035public class SparkConfigurationService implements Service {
036
037    private static XLog LOG = XLog.getLog(SparkConfigurationService.class);
038
039    public static final String SPARK_CONFIGURATION = "oozie.service.SparkConfigurationService.spark.configurations";
040
041    private Map<String, Map<String, String>> sparkConfigs;
042    private static final String SPARK_CONFIG_FILE = "spark-defaults.conf";
043
044    @Override
045    public void init(Services services) throws ServiceException {
046        loadSparkConfigs();
047    }
048
049    @Override
050    public void destroy() {
051        sparkConfigs.clear();
052    }
053
054    @Override
055    public Class<? extends Service> getInterface() {
056        return SparkConfigurationService.class;
057    }
058
059    private void loadSparkConfigs() throws ServiceException {
060        sparkConfigs = new HashMap<String, Map<String, String>>();
061        File configDir = new File(ConfigurationService.getConfigurationDirectory());
062        String[] confDefs = ConfigurationService.getStrings(SPARK_CONFIGURATION);
063        if (confDefs != null) {
064            for (String confDef : confDefs) {
065                if (confDef.trim().length() > 0) {
066                    String[] parts = confDef.split("=");
067                    if (parts.length == 2) {
068                        String hostPort = parts[0];
069                        String confDir = parts[1];
070                        File dir = new File(confDir);
071                        if (!dir.isAbsolute()) {
072                            dir = new File(configDir, confDir);
073                        }
074                        if (dir.exists()) {
075                            File file = new File(dir, SPARK_CONFIG_FILE);
076                            if (file.exists()) {
077                                Properties props = new Properties();
078                                FileReader fr = null;
079                                try {
080                                    fr = new FileReader(file);
081                                    props.load(fr);
082                                    fr.close();
083                                    sparkConfigs.put(hostPort, propsToMap(props));
084                                    LOG.info("Loaded Spark Configuration: {0}={1}", hostPort, file.getAbsolutePath());
085                                } catch (IOException ioe) {
086                                    LOG.warn("Spark Configuration could not be loaded for {0}: {1}",
087                                            hostPort, ioe.getMessage(), ioe);
088                                } finally {
089                                    IOUtils.closeSafely(fr);
090                                }
091                            } else {
092                                LOG.warn("Spark Configuration could not be loaded for {0}: {1} does not exist",
093                                        hostPort, file.getAbsolutePath());
094                            }
095                        } else {
096                            LOG.warn("Spark Configuration could not be loaded for {0}: {1} does not exist",
097                                    hostPort, dir.getAbsolutePath());
098                        }
099                    } else {
100                        LOG.warn("Spark Configuration could not be loaded: invalid value found: {0}", confDef);
101                    }
102                }
103            }
104        } else {
105            LOG.info("Spark Configuration(s) not specified");
106        }
107    }
108
109    private Map<String, String> propsToMap(Properties props) {
110        Map<String, String> map = new HashMap<String, String>(props.size());
111        for (String key : props.stringPropertyNames()) {
112            map.put(key, props.getProperty(key));
113        }
114        return map;
115    }
116
117    public Map<String, String> getSparkConfig(String resourceManagerHostPort) {
118        resourceManagerHostPort = (resourceManagerHostPort != null) ? resourceManagerHostPort.toLowerCase() : null;
119        Map<String, String> config = sparkConfigs.get(resourceManagerHostPort);
120        if (config == null) {
121            config = sparkConfigs.get("*");
122            if (config == null) {
123                config = new HashMap<String, String>();
124            }
125        }
126        return config;
127    }
128}
129