001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.io.File;
022import java.io.FileInputStream;
023import java.io.IOException;
024import java.io.InputStream;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.util.ArrayList;
028import java.util.HashMap;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Map;
032import java.util.Set;
033
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FSDataInputStream;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.oozie.ErrorCode;
039import org.apache.oozie.dependency.hcat.HCatMessageHandler;
040import org.apache.oozie.jms.JMSConnectionInfo;
041import org.apache.oozie.util.HCatURI;
042import org.apache.oozie.util.MappingRule;
043import org.apache.oozie.util.XConfiguration;
044import org.apache.oozie.util.XLog;
045
046public class HCatAccessorService implements Service {
047
048    public static final String CONF_PREFIX = Service.CONF_PREFIX + "HCatAccessorService.";
049    public static final String JMS_CONNECTIONS_PROPERTIES = CONF_PREFIX + "jmsconnections";
050    public static final String HCAT_CONFIGURATION = CONF_PREFIX + "hcat.configuration";
051
052    private static XLog LOG;
053    private static String DELIMITER = "#";
054    private Configuration conf;
055    private JMSAccessorService jmsService;
056    private List<MappingRule> mappingRules;
057    private JMSConnectionInfo defaultJMSConnInfo;
058    private Configuration hcatConf;
059    /**
060     * Map of publisher(host:port) to JMS connection info
061     */
062    private Map<String, JMSConnectionInfo> publisherJMSConnInfoMap;
063    /**
064     * List of non publishers(host:port)
065     */
066    private Set<String> nonJMSPublishers;
067    /**
068     * Mapping of table to the topic name for the table
069     */
070    private Map<String, String> registeredTopicsMap;
071
072    @Override
073    public void init(Services services) throws ServiceException {
074        LOG = XLog.getLog(getClass());
075        conf = services.getConf();
076        this.jmsService = services.get(JMSAccessorService.class);
077        initializeMappingRules();
078        this.nonJMSPublishers = new HashSet<String>();
079        this.publisherJMSConnInfoMap = new HashMap<String, JMSConnectionInfo>();
080        this.registeredTopicsMap = new HashMap<String, String>();
081        try {
082            loadHCatConf(services);
083        } catch(IOException ioe) {
084            throw new ServiceException(ErrorCode.E0100, HCatAccessorService.class.getName(), "An exception occured while attempting"
085                    + "to load the HCat Configuration", ioe);
086        }
087    }
088
089    private void loadHCatConf(Services services) throws IOException {
090        String path = conf.get(HCAT_CONFIGURATION);
091        if (path != null) {
092            if (path.startsWith("hdfs")) {
093                Path p = new Path(path);
094                HadoopAccessorService has = services.get(HadoopAccessorService.class);
095                try {
096                    FileSystem fs = has.createFileSystem(
097                            System.getProperty("user.name"), p.toUri(), has.createJobConf(p.toUri().getAuthority()));
098                    if (fs.exists(p)) {
099                        FSDataInputStream is = null;
100                        try {
101                            is = fs.open(p);
102                            hcatConf = new XConfiguration(is);
103                        } finally {
104                            if (is != null) {
105                                is.close();
106                            }
107                        }
108                        LOG.info("Loaded HCat Configuration: " + path);
109                    } else {
110                        LOG.warn("HCat Configuration could not be found at [" + path + "]");
111                    }
112                } catch (HadoopAccessorException hae) {
113                    throw new IOException(hae);
114                }
115            } else {
116                File f = new File(path);
117                if (f.exists()) {
118                    InputStream is = null;
119                    try {
120                        is = new FileInputStream(f);
121                        hcatConf = new XConfiguration(is);
122                    } finally {
123                        if (is != null) {
124                            is.close();
125                        }
126                    }
127                    LOG.info("Loaded HCat Configuration: " + path);
128                } else {
129                    LOG.warn("HCat Configuration could not be found at [" + path + "]");
130                }
131            }
132        }
133        else {
134            LOG.info("HCat Configuration not specified");
135        }
136    }
137
138    public Configuration getHCatConf() {
139        return hcatConf;
140    }
141
142    private void initializeMappingRules() {
143        String[] connections = ConfigurationService.getStrings(conf, JMS_CONNECTIONS_PROPERTIES);
144        if (connections != null) {
145            mappingRules = new ArrayList<MappingRule>(connections.length);
146            for (String connection : connections) {
147                String[] values = connection.split("=", 2);
148                String key = values[0].trim();
149                String value = values[1].trim();
150                if (key.equals("default")) {
151                    defaultJMSConnInfo = new JMSConnectionInfo(value);
152                }
153                else {
154                    mappingRules.add(new MappingRule(key, value));
155                }
156            }
157        }
158        else {
159            LOG.warn("No JMS connection defined");
160        }
161    }
162
163    /**
164     * Determine whether a given source URI publishes JMS messages
165     *
166     * @param sourceURI URI of the publisher
167     * @return true if we have JMS connection information for the source URI, else false
168     */
169    public boolean isKnownPublisher(URI sourceURI) {
170        if (nonJMSPublishers.contains(sourceURI.getAuthority())) {
171            return true;
172        }
173        else {
174            JMSConnectionInfo connInfo = publisherJMSConnInfoMap.get(sourceURI.getAuthority());
175            return connInfo == null ? (getJMSConnectionInfo(sourceURI) != null) : true;
176        }
177    }
178
179    /**
180     * Given a publisher host:port return the connection details of JMS server that the publisher
181     * publishes to
182     *
183     * @param publisherURI URI of the publisher
184     * @return JMSConnectionInfo to connect to the JMS server that the publisher publishes to
185     */
186    public JMSConnectionInfo getJMSConnectionInfo(URI publisherURI) {
187        String publisherAuthority = publisherURI.getAuthority();
188        JMSConnectionInfo connInfo = null;
189        if (publisherJMSConnInfoMap.containsKey(publisherAuthority)) {
190            connInfo = publisherJMSConnInfoMap.get(publisherAuthority);
191        }
192        else {
193            String schemeWithAuthority = publisherURI.getScheme() + "://" + publisherAuthority;
194            for (MappingRule mr : mappingRules) {
195                String jndiPropertiesString = mr.applyRule(schemeWithAuthority);
196                if (jndiPropertiesString != null) {
197                    connInfo = new JMSConnectionInfo(jndiPropertiesString);
198                    publisherJMSConnInfoMap.put(publisherAuthority, connInfo);
199                    LOG.info("Adding hcat server [{0}] to the list of JMS publishers", schemeWithAuthority);
200                    break;
201                }
202            }
203            if (connInfo == null && defaultJMSConnInfo != null) {
204                connInfo = defaultJMSConnInfo;
205                publisherJMSConnInfoMap.put(publisherAuthority, defaultJMSConnInfo);
206                LOG.info("Adding hcat server [{0}] to the list of JMS publishers", schemeWithAuthority);
207            }
208            else {
209                nonJMSPublishers.add(publisherAuthority);
210                LOG.info("Adding hcat server [{0}] to the list of non JMS publishers", schemeWithAuthority);
211            }
212
213        }
214        return connInfo;
215    }
216
217    /**
218     * Check if we are already listening to the JMS topic for the table in the given hcatURI
219     *
220     * @param hcatURI hcatalog partition URI
221     * @return true if registered to a JMS topic for the table in the given hcatURI
222     */
223    public boolean isRegisteredForNotification(HCatURI hcatURI) {
224        return registeredTopicsMap.containsKey(getKeyForRegisteredTopicsMap(hcatURI));
225    }
226
227    /**
228     * Register for notifications on a JMS topic for the specified hcatalog table.
229     *
230     * @param hcatURI hcatalog partition URI
231     * @param topic JMS topic to register to
232     * @param msgHandler Handler which will process the messages received on the topic
233     */
234    public void registerForNotification(HCatURI hcatURI, String topic, HCatMessageHandler msgHandler) {
235        JMSConnectionInfo connInfo = getJMSConnectionInfo(hcatURI.getURI());
236        jmsService.registerForNotification(connInfo, topic, msgHandler);
237        registeredTopicsMap.put(
238                getKeyForRegisteredTopicsMap(hcatURI), topic);
239    }
240
241    public void unregisterFromNotification(HCatURI hcatURI) {
242        String topic = registeredTopicsMap.remove(getKeyForRegisteredTopicsMap(hcatURI));
243        if (topic != null) {
244            JMSConnectionInfo connInfo = getJMSConnectionInfo(hcatURI.getURI());
245            jmsService.unregisterFromNotification(connInfo, topic);
246        }
247    }
248
249    public void unregisterFromNotification(String server, String database, String table) {
250        String key = server + DELIMITER + database + DELIMITER + table;
251        String topic = registeredTopicsMap.remove(key);
252        if (topic != null) {
253            try {
254                JMSConnectionInfo connInfo = getJMSConnectionInfo(new URI("hcat://" + server));
255                jmsService.unregisterFromNotification(connInfo, topic);
256            }
257            catch (URISyntaxException e) {
258                LOG.warn("Error unregistering from notification for topic [{0}]. Hcat table=[{1}]", topic, key, e);
259            }
260        }
261    }
262
263    private String getKeyForRegisteredTopicsMap(HCatURI hcatURI) {
264        return hcatURI.getURI().getAuthority() + DELIMITER + hcatURI.getDb()
265                + DELIMITER + hcatURI.getTable();
266    }
267
268    @Override
269    public void destroy() {
270        publisherJMSConnInfoMap.clear();
271    }
272
273    @Override
274    public Class<? extends Service> getInterface() {
275        return HCatAccessorService.class;
276    }
277
278}