001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.net.URI; 021 import java.net.URISyntaxException; 022 import java.util.ArrayList; 023 import java.util.HashMap; 024 import java.util.HashSet; 025 import java.util.List; 026 import java.util.Map; 027 import java.util.Set; 028 029 import org.apache.hadoop.conf.Configuration; 030 import org.apache.oozie.dependency.hcat.HCatMessageHandler; 031 import org.apache.oozie.jms.JMSConnectionInfo; 032 import org.apache.oozie.util.HCatURI; 033 import org.apache.oozie.util.MappingRule; 034 import org.apache.oozie.util.XLog; 035 036 public class HCatAccessorService implements Service { 037 038 public static final String CONF_PREFIX = Service.CONF_PREFIX + "HCatAccessorService."; 039 public static final String JMS_CONNECTIONS_PROPERTIES = CONF_PREFIX + "jmsconnections"; 040 041 private static XLog LOG; 042 private static String DELIMITER = "#"; 043 private Configuration conf; 044 private JMSAccessorService jmsService; 045 private List<MappingRule> mappingRules; 046 private JMSConnectionInfo defaultJMSConnInfo; 047 /** 048 * Map of publisher(host:port) to JMS connection info 049 */ 050 private Map<String, JMSConnectionInfo> publisherJMSConnInfoMap; 051 /** 052 * List of non publishers(host:port) 053 */ 054 private Set<String> nonJMSPublishers; 055 /** 056 * Mapping of table to the topic name for the table 057 */ 058 private Map<String, String> registeredTopicsMap; 059 060 @Override 061 public void init(Services services) throws ServiceException { 062 LOG = XLog.getLog(getClass()); 063 conf = services.getConf(); 064 this.jmsService = services.get(JMSAccessorService.class); 065 initializeMappingRules(); 066 this.nonJMSPublishers = new HashSet<String>(); 067 this.publisherJMSConnInfoMap = new HashMap<String, JMSConnectionInfo>(); 068 this.registeredTopicsMap = new HashMap<String, String>(); 069 } 070 071 private void initializeMappingRules() { 072 String[] connections = conf.getStrings(JMS_CONNECTIONS_PROPERTIES); 073 if (connections != null) { 074 mappingRules = new ArrayList<MappingRule>(connections.length); 075 for (String connection : connections) { 076 String[] values = connection.split("=", 2); 077 String key = values[0].trim(); 078 String value = values[1].trim(); 079 if (key.equals("default")) { 080 defaultJMSConnInfo = new JMSConnectionInfo(value); 081 } 082 else { 083 mappingRules.add(new MappingRule(key, value)); 084 } 085 } 086 } 087 else { 088 LOG.warn("No JMS connection defined"); 089 } 090 } 091 092 /** 093 * Determine whether a given source URI publishes JMS messages 094 * 095 * @param sourceURI URI of the publisher 096 * @return true if we have JMS connection information for the source URI, else false 097 */ 098 public boolean isKnownPublisher(URI sourceURI) { 099 if (nonJMSPublishers.contains(sourceURI.getAuthority())) { 100 return true; 101 } 102 else { 103 JMSConnectionInfo connInfo = publisherJMSConnInfoMap.get(sourceURI.getAuthority()); 104 return connInfo == null ? (getJMSConnectionInfo(sourceURI) != null) : true; 105 } 106 } 107 108 /** 109 * Given a publisher host:port return the connection details of JMS server that the publisher 110 * publishes to 111 * 112 * @param publisherURI URI of the publisher 113 * @return JMSConnectionInfo to connect to the JMS server that the publisher publishes to 114 */ 115 public JMSConnectionInfo getJMSConnectionInfo(URI publisherURI) { 116 String publisherAuthority = publisherURI.getAuthority(); 117 JMSConnectionInfo connInfo = null; 118 if (publisherJMSConnInfoMap.containsKey(publisherAuthority)) { 119 connInfo = publisherJMSConnInfoMap.get(publisherAuthority); 120 } 121 else { 122 String schemeWithAuthority = publisherURI.getScheme() + "://" + publisherAuthority; 123 for (MappingRule mr : mappingRules) { 124 String jndiPropertiesString = mr.applyRule(schemeWithAuthority); 125 if (jndiPropertiesString != null) { 126 connInfo = new JMSConnectionInfo(jndiPropertiesString); 127 publisherJMSConnInfoMap.put(publisherAuthority, connInfo); 128 LOG.info("Adding hcat server [{0}] to the list of JMS publishers", schemeWithAuthority); 129 break; 130 } 131 } 132 if (connInfo == null && defaultJMSConnInfo != null) { 133 connInfo = defaultJMSConnInfo; 134 publisherJMSConnInfoMap.put(publisherAuthority, defaultJMSConnInfo); 135 LOG.info("Adding hcat server [{0}] to the list of JMS publishers", schemeWithAuthority); 136 } 137 else { 138 nonJMSPublishers.add(publisherAuthority); 139 LOG.info("Adding hcat server [{0}] to the list of non JMS publishers", schemeWithAuthority); 140 } 141 142 } 143 return connInfo; 144 } 145 146 /** 147 * Check if we are already listening to the JMS topic for the table in the given hcatURI 148 * 149 * @param hcatURI hcatalog partition URI 150 * @return true if registered to a JMS topic for the table in the given hcatURI 151 */ 152 public boolean isRegisteredForNotification(HCatURI hcatURI) { 153 return registeredTopicsMap.containsKey(getKeyForRegisteredTopicsMap(hcatURI)); 154 } 155 156 /** 157 * Register for notifications on a JMS topic for the specified hcatalog table. 158 * 159 * @param hcatURI hcatalog partition URI 160 * @param topic JMS topic to register to 161 * @param msgHandler Handler which will process the messages received on the topic 162 */ 163 public void registerForNotification(HCatURI hcatURI, String topic, HCatMessageHandler msgHandler) { 164 JMSConnectionInfo connInfo = getJMSConnectionInfo(hcatURI.getURI()); 165 jmsService.registerForNotification(connInfo, topic, msgHandler); 166 registeredTopicsMap.put( 167 getKeyForRegisteredTopicsMap(hcatURI), topic); 168 } 169 170 public void unregisterFromNotification(HCatURI hcatURI) { 171 String topic = registeredTopicsMap.remove(getKeyForRegisteredTopicsMap(hcatURI)); 172 if (topic != null) { 173 JMSConnectionInfo connInfo = getJMSConnectionInfo(hcatURI.getURI()); 174 jmsService.unregisterFromNotification(connInfo, topic); 175 } 176 } 177 178 public void unregisterFromNotification(String server, String database, String table) { 179 String key = server + DELIMITER + database + DELIMITER + table; 180 String topic = registeredTopicsMap.remove(key); 181 if (topic != null) { 182 try { 183 JMSConnectionInfo connInfo = getJMSConnectionInfo(new URI("hcat://" + server)); 184 jmsService.unregisterFromNotification(connInfo, topic); 185 } 186 catch (URISyntaxException e) { 187 LOG.warn("Error unregistering from notification for topic [{0}]. Hcat table=[{1}]", topic, key, e); 188 } 189 } 190 } 191 192 private String getKeyForRegisteredTopicsMap(HCatURI hcatURI) { 193 return hcatURI.getURI().getAuthority() + DELIMITER + hcatURI.getDb() 194 + DELIMITER + hcatURI.getTable(); 195 } 196 197 @Override 198 public void destroy() { 199 publisherJMSConnInfoMap.clear(); 200 } 201 202 @Override 203 public Class<? extends Service> getInterface() { 204 return HCatAccessorService.class; 205 } 206 207 }