This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.net.URI;
021 import java.net.URISyntaxException;
022 import java.util.ArrayList;
023 import java.util.HashMap;
024 import java.util.HashSet;
025 import java.util.List;
026 import java.util.Map;
027 import java.util.Set;
028
029 import org.apache.hadoop.conf.Configuration;
030 import org.apache.oozie.dependency.hcat.HCatMessageHandler;
031 import org.apache.oozie.jms.JMSConnectionInfo;
032 import org.apache.oozie.util.HCatURI;
033 import org.apache.oozie.util.MappingRule;
034 import org.apache.oozie.util.XLog;
035
036 public class HCatAccessorService implements Service {
037
038 public static final String CONF_PREFIX = Service.CONF_PREFIX + "HCatAccessorService.";
039 public static final String JMS_CONNECTIONS_PROPERTIES = CONF_PREFIX + "jmsconnections";
040
041 private static XLog LOG;
042 private static String DELIMITER = "#";
043 private Configuration conf;
044 private JMSAccessorService jmsService;
045 private List<MappingRule> mappingRules;
046 private JMSConnectionInfo defaultJMSConnInfo;
047 /**
048 * Map of publisher(host:port) to JMS connection info
049 */
050 private Map<String, JMSConnectionInfo> publisherJMSConnInfoMap;
051 /**
052 * List of non publishers(host:port)
053 */
054 private Set<String> nonJMSPublishers;
055 /**
056 * Mapping of table to the topic name for the table
057 */
058 private Map<String, String> registeredTopicsMap;
059
060 @Override
061 public void init(Services services) throws ServiceException {
062 LOG = XLog.getLog(getClass());
063 conf = services.getConf();
064 this.jmsService = services.get(JMSAccessorService.class);
065 initializeMappingRules();
066 this.nonJMSPublishers = new HashSet<String>();
067 this.publisherJMSConnInfoMap = new HashMap<String, JMSConnectionInfo>();
068 this.registeredTopicsMap = new HashMap<String, String>();
069 }
070
071 private void initializeMappingRules() {
072 String[] connections = conf.getStrings(JMS_CONNECTIONS_PROPERTIES);
073 if (connections != null) {
074 mappingRules = new ArrayList<MappingRule>(connections.length);
075 for (String connection : connections) {
076 String[] values = connection.split("=", 2);
077 String key = values[0].trim();
078 String value = values[1].trim();
079 if (key.equals("default")) {
080 defaultJMSConnInfo = new JMSConnectionInfo(value);
081 }
082 else {
083 mappingRules.add(new MappingRule(key, value));
084 }
085 }
086 }
087 else {
088 LOG.warn("No JMS connection defined");
089 }
090 }
091
092 /**
093 * Determine whether a given source URI publishes JMS messages
094 *
095 * @param sourceURI URI of the publisher
096 * @return true if we have JMS connection information for the source URI, else false
097 */
098 public boolean isKnownPublisher(URI sourceURI) {
099 if (nonJMSPublishers.contains(sourceURI.getAuthority())) {
100 return true;
101 }
102 else {
103 JMSConnectionInfo connInfo = publisherJMSConnInfoMap.get(sourceURI.getAuthority());
104 return connInfo == null ? (getJMSConnectionInfo(sourceURI) != null) : true;
105 }
106 }
107
108 /**
109 * Given a publisher host:port return the connection details of JMS server that the publisher
110 * publishes to
111 *
112 * @param publisherURI URI of the publisher
113 * @return JMSConnectionInfo to connect to the JMS server that the publisher publishes to
114 */
115 public JMSConnectionInfo getJMSConnectionInfo(URI publisherURI) {
116 String publisherAuthority = publisherURI.getAuthority();
117 JMSConnectionInfo connInfo = null;
118 if (publisherJMSConnInfoMap.containsKey(publisherAuthority)) {
119 connInfo = publisherJMSConnInfoMap.get(publisherAuthority);
120 }
121 else {
122 String schemeWithAuthority = publisherURI.getScheme() + "://" + publisherAuthority;
123 for (MappingRule mr : mappingRules) {
124 String jndiPropertiesString = mr.applyRule(schemeWithAuthority);
125 if (jndiPropertiesString != null) {
126 connInfo = new JMSConnectionInfo(jndiPropertiesString);
127 publisherJMSConnInfoMap.put(publisherAuthority, connInfo);
128 LOG.info("Adding hcat server [{0}] to the list of JMS publishers", schemeWithAuthority);
129 break;
130 }
131 }
132 if (connInfo == null && defaultJMSConnInfo != null) {
133 connInfo = defaultJMSConnInfo;
134 publisherJMSConnInfoMap.put(publisherAuthority, defaultJMSConnInfo);
135 LOG.info("Adding hcat server [{0}] to the list of JMS publishers", schemeWithAuthority);
136 }
137 else {
138 nonJMSPublishers.add(publisherAuthority);
139 LOG.info("Adding hcat server [{0}] to the list of non JMS publishers", schemeWithAuthority);
140 }
141
142 }
143 return connInfo;
144 }
145
146 /**
147 * Check if we are already listening to the JMS topic for the table in the given hcatURI
148 *
149 * @param hcatURI hcatalog partition URI
150 * @return true if registered to a JMS topic for the table in the given hcatURI
151 */
152 public boolean isRegisteredForNotification(HCatURI hcatURI) {
153 return registeredTopicsMap.containsKey(getKeyForRegisteredTopicsMap(hcatURI));
154 }
155
156 /**
157 * Register for notifications on a JMS topic for the specified hcatalog table.
158 *
159 * @param hcatURI hcatalog partition URI
160 * @param topic JMS topic to register to
161 * @param msgHandler Handler which will process the messages received on the topic
162 */
163 public void registerForNotification(HCatURI hcatURI, String topic, HCatMessageHandler msgHandler) {
164 JMSConnectionInfo connInfo = getJMSConnectionInfo(hcatURI.getURI());
165 jmsService.registerForNotification(connInfo, topic, msgHandler);
166 registeredTopicsMap.put(
167 getKeyForRegisteredTopicsMap(hcatURI), topic);
168 }
169
170 public void unregisterFromNotification(HCatURI hcatURI) {
171 String topic = registeredTopicsMap.remove(getKeyForRegisteredTopicsMap(hcatURI));
172 if (topic != null) {
173 JMSConnectionInfo connInfo = getJMSConnectionInfo(hcatURI.getURI());
174 jmsService.unregisterFromNotification(connInfo, topic);
175 }
176 }
177
178 public void unregisterFromNotification(String server, String database, String table) {
179 String key = server + DELIMITER + database + DELIMITER + table;
180 String topic = registeredTopicsMap.remove(key);
181 if (topic != null) {
182 try {
183 JMSConnectionInfo connInfo = getJMSConnectionInfo(new URI("hcat://" + server));
184 jmsService.unregisterFromNotification(connInfo, topic);
185 }
186 catch (URISyntaxException e) {
187 LOG.warn("Error unregistering from notification for topic [{0}]. Hcat table=[{1}]", topic, key, e);
188 }
189 }
190 }
191
192 private String getKeyForRegisteredTopicsMap(HCatURI hcatURI) {
193 return hcatURI.getURI().getAuthority() + DELIMITER + hcatURI.getDb()
194 + DELIMITER + hcatURI.getTable();
195 }
196
197 @Override
198 public void destroy() {
199 publisherJMSConnInfoMap.clear();
200 }
201
202 @Override
203 public Class<? extends Service> getInterface() {
204 return HCatAccessorService.class;
205 }
206
207 }