001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.io.File; 022import java.io.FileInputStream; 023import java.io.IOException; 024import java.io.InputStream; 025import java.net.URI; 026import java.net.URISyntaxException; 027import java.util.ArrayList; 028import java.util.HashMap; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Map; 032import java.util.Set; 033 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FSDataInputStream; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.oozie.ErrorCode; 039import org.apache.oozie.dependency.hcat.HCatMessageHandler; 040import org.apache.oozie.jms.JMSConnectionInfo; 041import org.apache.oozie.util.HCatURI; 042import org.apache.oozie.util.MappingRule; 043import org.apache.oozie.util.XConfiguration; 044import org.apache.oozie.util.XLog; 045 046public class HCatAccessorService implements Service { 047 048 public static final String CONF_PREFIX = Service.CONF_PREFIX + "HCatAccessorService."; 049 public static final String JMS_CONNECTIONS_PROPERTIES = CONF_PREFIX + "jmsconnections"; 050 public static final String HCAT_CONFIGURATION = CONF_PREFIX + "hcat.configuration"; 051 052 private static XLog LOG; 053 private static String DELIMITER = "#"; 054 private Configuration conf; 055 private JMSAccessorService jmsService; 056 private List<MappingRule> mappingRules; 057 private JMSConnectionInfo defaultJMSConnInfo; 058 private Configuration hcatConf; 059 /** 060 * Map of publisher(host:port) to JMS connection info 061 */ 062 private Map<String, JMSConnectionInfo> publisherJMSConnInfoMap; 063 /** 064 * List of non publishers(host:port) 065 */ 066 private Set<String> nonJMSPublishers; 067 /** 068 * Mapping of table to the topic name for the table 069 */ 070 private Map<String, String> registeredTopicsMap; 071 072 @Override 073 public void init(Services services) throws ServiceException { 074 LOG = XLog.getLog(getClass()); 075 conf = services.getConf(); 076 this.jmsService = services.get(JMSAccessorService.class); 077 initializeMappingRules(); 078 this.nonJMSPublishers = new HashSet<String>(); 079 this.publisherJMSConnInfoMap = new HashMap<String, JMSConnectionInfo>(); 080 this.registeredTopicsMap = new HashMap<String, String>(); 081 try { 082 loadHCatConf(services); 083 } catch(IOException ioe) { 084 throw new ServiceException(ErrorCode.E0100, HCatAccessorService.class.getName(), "An exception occurred" 085 + " while attempting to load the HCat Configuration", ioe); 086 } 087 } 088 089 private void loadHCatConf(Services services) throws IOException { 090 String path = conf.get(HCAT_CONFIGURATION); 091 if (path != null) { 092 if (path.startsWith("hdfs")) { 093 Path p = new Path(path); 094 HadoopAccessorService has = services.get(HadoopAccessorService.class); 095 try { 096 FileSystem fs = has.createFileSystem( 097 System.getProperty("user.name"), p.toUri(), has.createConfiguration(p.toUri().getAuthority())); 098 if (fs.exists(p)) { 099 FSDataInputStream is = null; 100 try { 101 is = fs.open(p); 102 hcatConf = new XConfiguration(is); 103 } finally { 104 if (is != null) { 105 is.close(); 106 } 107 } 108 LOG.info("Loaded HCat Configuration: " + path); 109 } else { 110 LOG.warn("HCat Configuration could not be found at [" + path + "]"); 111 } 112 } catch (HadoopAccessorException hae) { 113 throw new IOException(hae); 114 } 115 } else { 116 File f = new File(path); 117 if (f.exists()) { 118 InputStream is = null; 119 try { 120 is = new FileInputStream(f); 121 hcatConf = new XConfiguration(is); 122 } finally { 123 if (is != null) { 124 is.close(); 125 } 126 } 127 LOG.info("Loaded HCat Configuration: " + path); 128 } else { 129 LOG.warn("HCat Configuration could not be found at [" + path + "]"); 130 } 131 } 132 } 133 else { 134 LOG.info("HCat Configuration not specified"); 135 } 136 } 137 138 public Configuration getHCatConf() { 139 return hcatConf; 140 } 141 142 private void initializeMappingRules() { 143 String[] connections = ConfigurationService.getStrings(conf, JMS_CONNECTIONS_PROPERTIES); 144 if (connections != null) { 145 mappingRules = new ArrayList<MappingRule>(connections.length); 146 for (String connection : connections) { 147 String[] values = connection.split("=", 2); 148 String key = values[0].trim(); 149 String value = values[1].trim(); 150 if (key.equals("default")) { 151 defaultJMSConnInfo = new JMSConnectionInfo(value); 152 } 153 else { 154 mappingRules.add(new MappingRule(key, value)); 155 } 156 } 157 } 158 else { 159 LOG.warn("No JMS connection defined"); 160 } 161 } 162 163 /** 164 * Determine whether a given source URI publishes JMS messages 165 * 166 * @param sourceURI URI of the publisher 167 * @return true if we have JMS connection information for the source URI, else false 168 */ 169 public boolean isKnownPublisher(URI sourceURI) { 170 if (nonJMSPublishers.contains(sourceURI.getAuthority())) { 171 return true; 172 } 173 else { 174 JMSConnectionInfo connInfo = publisherJMSConnInfoMap.get(sourceURI.getAuthority()); 175 return connInfo == null ? (getJMSConnectionInfo(sourceURI) != null) : true; 176 } 177 } 178 179 /** 180 * Given a publisher host:port return the connection details of JMS server that the publisher 181 * publishes to 182 * 183 * @param publisherURI URI of the publisher 184 * @return JMSConnectionInfo to connect to the JMS server that the publisher publishes to 185 */ 186 public JMSConnectionInfo getJMSConnectionInfo(URI publisherURI) { 187 String publisherAuthority = publisherURI.getAuthority(); 188 JMSConnectionInfo connInfo = null; 189 if (publisherJMSConnInfoMap.containsKey(publisherAuthority)) { 190 connInfo = publisherJMSConnInfoMap.get(publisherAuthority); 191 } 192 else { 193 String schemeWithAuthority = publisherURI.getScheme() + "://" + publisherAuthority; 194 for (MappingRule mr : mappingRules) { 195 String jndiPropertiesString = mr.applyRule(schemeWithAuthority); 196 if (jndiPropertiesString != null) { 197 connInfo = new JMSConnectionInfo(jndiPropertiesString); 198 publisherJMSConnInfoMap.put(publisherAuthority, connInfo); 199 LOG.info("Adding hcat server [{0}] to the list of JMS publishers", schemeWithAuthority); 200 break; 201 } 202 } 203 if (connInfo == null && defaultJMSConnInfo != null) { 204 connInfo = defaultJMSConnInfo; 205 publisherJMSConnInfoMap.put(publisherAuthority, defaultJMSConnInfo); 206 LOG.info("Adding hcat server [{0}] to the list of JMS publishers", schemeWithAuthority); 207 } 208 else { 209 nonJMSPublishers.add(publisherAuthority); 210 LOG.info("Adding hcat server [{0}] to the list of non JMS publishers", schemeWithAuthority); 211 } 212 213 } 214 return connInfo; 215 } 216 217 /** 218 * Check if we are already listening to the JMS topic for the table in the given hcatURI 219 * 220 * @param hcatURI hcatalog partition URI 221 * @return true if registered to a JMS topic for the table in the given hcatURI 222 */ 223 public boolean isRegisteredForNotification(HCatURI hcatURI) { 224 return registeredTopicsMap.containsKey(getKeyForRegisteredTopicsMap(hcatURI)); 225 } 226 227 /** 228 * Register for notifications on a JMS topic for the specified hcatalog table. 229 * 230 * @param hcatURI hcatalog partition URI 231 * @param topic JMS topic to register to 232 * @param msgHandler Handler which will process the messages received on the topic 233 */ 234 public void registerForNotification(HCatURI hcatURI, String topic, HCatMessageHandler msgHandler) { 235 JMSConnectionInfo connInfo = getJMSConnectionInfo(hcatURI.getURI()); 236 jmsService.registerForNotification(connInfo, topic, msgHandler); 237 registeredTopicsMap.put( 238 getKeyForRegisteredTopicsMap(hcatURI), topic); 239 } 240 241 public void unregisterFromNotification(HCatURI hcatURI) { 242 String topic = registeredTopicsMap.remove(getKeyForRegisteredTopicsMap(hcatURI)); 243 if (topic != null) { 244 JMSConnectionInfo connInfo = getJMSConnectionInfo(hcatURI.getURI()); 245 jmsService.unregisterFromNotification(connInfo, topic); 246 } 247 } 248 249 public void unregisterFromNotification(String server, String database, String table) { 250 String key = server + DELIMITER + database + DELIMITER + table; 251 String topic = registeredTopicsMap.remove(key); 252 if (topic != null) { 253 try { 254 JMSConnectionInfo connInfo = getJMSConnectionInfo(new URI("hcat://" + server)); 255 jmsService.unregisterFromNotification(connInfo, topic); 256 } 257 catch (URISyntaxException e) { 258 LOG.warn("Error unregistering from notification for topic [{0}]. Hcat table=[{1}]", topic, key, e); 259 } 260 } 261 } 262 263 private String getKeyForRegisteredTopicsMap(HCatURI hcatURI) { 264 return hcatURI.getURI().getAuthority() + DELIMITER + hcatURI.getDb() 265 + DELIMITER + hcatURI.getTable(); 266 } 267 268 @Override 269 public void destroy() { 270 publisherJMSConnInfoMap.clear(); 271 } 272 273 @Override 274 public Class<? extends Service> getInterface() { 275 return HCatAccessorService.class; 276 } 277 278}