001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.net.URI;
021    import java.net.URISyntaxException;
022    import java.util.ArrayList;
023    import java.util.HashMap;
024    import java.util.HashSet;
025    import java.util.List;
026    import java.util.Map;
027    import java.util.Set;
028    
029    import org.apache.hadoop.conf.Configuration;
030    import org.apache.oozie.dependency.hcat.HCatMessageHandler;
031    import org.apache.oozie.jms.JMSConnectionInfo;
032    import org.apache.oozie.util.HCatURI;
033    import org.apache.oozie.util.MappingRule;
034    import org.apache.oozie.util.XLog;
035    
036    public class HCatAccessorService implements Service {
037    
038        public static final String CONF_PREFIX = Service.CONF_PREFIX + "HCatAccessorService.";
039        public static final String JMS_CONNECTIONS_PROPERTIES = CONF_PREFIX + "jmsconnections";
040    
041        private static XLog LOG;
042        private static String DELIMITER = "#";
043        private Configuration conf;
044        private JMSAccessorService jmsService;
045        private List<MappingRule> mappingRules;
046        private JMSConnectionInfo defaultJMSConnInfo;
047        /**
048         * Map of publisher(host:port) to JMS connection info
049         */
050        private Map<String, JMSConnectionInfo> publisherJMSConnInfoMap;
051        /**
052         * List of non publishers(host:port)
053         */
054        private Set<String> nonJMSPublishers;
055        /**
056         * Mapping of table to the topic name for the table
057         */
058        private Map<String, String> registeredTopicsMap;
059    
060        @Override
061        public void init(Services services) throws ServiceException {
062            LOG = XLog.getLog(getClass());
063            conf = services.getConf();
064            this.jmsService = services.get(JMSAccessorService.class);
065            initializeMappingRules();
066            this.nonJMSPublishers = new HashSet<String>();
067            this.publisherJMSConnInfoMap = new HashMap<String, JMSConnectionInfo>();
068            this.registeredTopicsMap = new HashMap<String, String>();
069        }
070    
071        private void initializeMappingRules() {
072            String[] connections = conf.getStrings(JMS_CONNECTIONS_PROPERTIES);
073            if (connections != null) {
074                mappingRules = new ArrayList<MappingRule>(connections.length);
075                for (String connection : connections) {
076                    String[] values = connection.split("=", 2);
077                    String key = values[0].trim();
078                    String value = values[1].trim();
079                    if (key.equals("default")) {
080                        defaultJMSConnInfo = new JMSConnectionInfo(value);
081                    }
082                    else {
083                        mappingRules.add(new MappingRule(key, value));
084                    }
085                }
086            }
087            else {
088                LOG.warn("No JMS connection defined");
089            }
090        }
091    
092        /**
093         * Determine whether a given source URI publishes JMS messages
094         *
095         * @param sourceURI URI of the publisher
096         * @return true if we have JMS connection information for the source URI, else false
097         */
098        public boolean isKnownPublisher(URI sourceURI) {
099            if (nonJMSPublishers.contains(sourceURI.getAuthority())) {
100                return true;
101            }
102            else {
103                JMSConnectionInfo connInfo = publisherJMSConnInfoMap.get(sourceURI.getAuthority());
104                return connInfo == null ? (getJMSConnectionInfo(sourceURI) != null) : true;
105            }
106        }
107    
108        /**
109         * Given a publisher host:port return the connection details of JMS server that the publisher
110         * publishes to
111         *
112         * @param publisherURI URI of the publisher
113         * @return JMSConnectionInfo to connect to the JMS server that the publisher publishes to
114         */
115        public JMSConnectionInfo getJMSConnectionInfo(URI publisherURI) {
116            String publisherAuthority = publisherURI.getAuthority();
117            JMSConnectionInfo connInfo = null;
118            if (publisherJMSConnInfoMap.containsKey(publisherAuthority)) {
119                connInfo = publisherJMSConnInfoMap.get(publisherAuthority);
120            }
121            else {
122                String schemeWithAuthority = publisherURI.getScheme() + "://" + publisherAuthority;
123                for (MappingRule mr : mappingRules) {
124                    String jndiPropertiesString = mr.applyRule(schemeWithAuthority);
125                    if (jndiPropertiesString != null) {
126                        connInfo = new JMSConnectionInfo(jndiPropertiesString);
127                        publisherJMSConnInfoMap.put(publisherAuthority, connInfo);
128                        LOG.info("Adding hcat server [{0}] to the list of JMS publishers", schemeWithAuthority);
129                        break;
130                    }
131                }
132                if (connInfo == null && defaultJMSConnInfo != null) {
133                    connInfo = defaultJMSConnInfo;
134                    publisherJMSConnInfoMap.put(publisherAuthority, defaultJMSConnInfo);
135                    LOG.info("Adding hcat server [{0}] to the list of JMS publishers", schemeWithAuthority);
136                }
137                else {
138                    nonJMSPublishers.add(publisherAuthority);
139                    LOG.info("Adding hcat server [{0}] to the list of non JMS publishers", schemeWithAuthority);
140                }
141    
142            }
143            return connInfo;
144        }
145    
146        /**
147         * Check if we are already listening to the JMS topic for the table in the given hcatURI
148         *
149         * @param hcatURI hcatalog partition URI
150         * @return true if registered to a JMS topic for the table in the given hcatURI
151         */
152        public boolean isRegisteredForNotification(HCatURI hcatURI) {
153            return registeredTopicsMap.containsKey(getKeyForRegisteredTopicsMap(hcatURI));
154        }
155    
156        /**
157         * Register for notifications on a JMS topic for the specified hcatalog table.
158         *
159         * @param hcatURI hcatalog partition URI
160         * @param topic JMS topic to register to
161         * @param msgHandler Handler which will process the messages received on the topic
162         */
163        public void registerForNotification(HCatURI hcatURI, String topic, HCatMessageHandler msgHandler) {
164            JMSConnectionInfo connInfo = getJMSConnectionInfo(hcatURI.getURI());
165            jmsService.registerForNotification(connInfo, topic, msgHandler);
166            registeredTopicsMap.put(
167                    getKeyForRegisteredTopicsMap(hcatURI), topic);
168        }
169    
170        public void unregisterFromNotification(HCatURI hcatURI) {
171            String topic = registeredTopicsMap.remove(getKeyForRegisteredTopicsMap(hcatURI));
172            if (topic != null) {
173                JMSConnectionInfo connInfo = getJMSConnectionInfo(hcatURI.getURI());
174                jmsService.unregisterFromNotification(connInfo, topic);
175            }
176        }
177    
178        public void unregisterFromNotification(String server, String database, String table) {
179            String key = server + DELIMITER + database + DELIMITER + table;
180            String topic = registeredTopicsMap.remove(key);
181            if (topic != null) {
182                try {
183                    JMSConnectionInfo connInfo = getJMSConnectionInfo(new URI("hcat://" + server));
184                    jmsService.unregisterFromNotification(connInfo, topic);
185                }
186                catch (URISyntaxException e) {
187                    LOG.warn("Error unregistering from notification for topic [{0}]. Hcat table=[{1}]", topic, key, e);
188                }
189            }
190        }
191    
192        private String getKeyForRegisteredTopicsMap(HCatURI hcatURI) {
193            return hcatURI.getURI().getAuthority() + DELIMITER + hcatURI.getDb()
194                    + DELIMITER + hcatURI.getTable();
195        }
196    
197        @Override
198        public void destroy() {
199            publisherJMSConnInfoMap.clear();
200        }
201    
202        @Override
203        public Class<? extends Service> getInterface() {
204            return HCatAccessorService.class;
205        }
206    
207    }