001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import java.io.File; 021import java.io.FileInputStream; 022import java.io.IOException; 023import java.io.InputStream; 024import java.net.URI; 025import java.net.URISyntaxException; 026import java.util.ArrayList; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FSDataInputStream; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.oozie.ErrorCode; 038import org.apache.oozie.dependency.hcat.HCatMessageHandler; 039import org.apache.oozie.jms.JMSConnectionInfo; 040import org.apache.oozie.util.HCatURI; 041import org.apache.oozie.util.MappingRule; 042import org.apache.oozie.util.XConfiguration; 043import org.apache.oozie.util.XLog; 044 045public class HCatAccessorService implements Service { 046 047 public static final String CONF_PREFIX = Service.CONF_PREFIX + "HCatAccessorService."; 048 public static final String JMS_CONNECTIONS_PROPERTIES = CONF_PREFIX + "jmsconnections"; 049 public static final String HCAT_CONFIGURATION = CONF_PREFIX + "hcat.configuration"; 050 051 private static XLog LOG; 052 private static String DELIMITER = "#"; 053 private Configuration conf; 054 private JMSAccessorService jmsService; 055 private List<MappingRule> mappingRules; 056 private JMSConnectionInfo defaultJMSConnInfo; 057 private Configuration hcatConf; 058 /** 059 * Map of publisher(host:port) to JMS connection info 060 */ 061 private Map<String, JMSConnectionInfo> publisherJMSConnInfoMap; 062 /** 063 * List of non publishers(host:port) 064 */ 065 private Set<String> nonJMSPublishers; 066 /** 067 * Mapping of table to the topic name for the table 068 */ 069 private Map<String, String> registeredTopicsMap; 070 071 @Override 072 public void init(Services services) throws ServiceException { 073 LOG = XLog.getLog(getClass()); 074 conf = services.getConf(); 075 this.jmsService = services.get(JMSAccessorService.class); 076 initializeMappingRules(); 077 this.nonJMSPublishers = new HashSet<String>(); 078 this.publisherJMSConnInfoMap = new HashMap<String, JMSConnectionInfo>(); 079 this.registeredTopicsMap = new HashMap<String, String>(); 080 try { 081 loadHCatConf(services); 082 } catch(IOException ioe) { 083 throw new ServiceException(ErrorCode.E0100, HCatAccessorService.class.getName(), "An exception occured while attempting" 084 + "to load the HCat Configuration", ioe); 085 } 086 } 087 088 private void loadHCatConf(Services services) throws IOException { 089 String path = conf.get(HCAT_CONFIGURATION); 090 if (path != null) { 091 if (path.startsWith("hdfs")) { 092 Path p = new Path(path); 093 HadoopAccessorService has = services.get(HadoopAccessorService.class); 094 try { 095 FileSystem fs = has.createFileSystem( 096 System.getProperty("user.name"), p.toUri(), has.createJobConf(p.toUri().getAuthority())); 097 if (fs.exists(p)) { 098 FSDataInputStream is = null; 099 try { 100 is = fs.open(p); 101 hcatConf = new XConfiguration(is); 102 } finally { 103 if (is != null) { 104 is.close(); 105 } 106 } 107 LOG.info("Loaded HCat Configuration: " + path); 108 } else { 109 LOG.warn("HCat Configuration could not be found at [" + path + "]"); 110 } 111 } catch (HadoopAccessorException hae) { 112 throw new IOException(hae); 113 } 114 } else { 115 File f = new File(path); 116 if (f.exists()) { 117 InputStream is = null; 118 try { 119 is = new FileInputStream(f); 120 hcatConf = new XConfiguration(is); 121 } finally { 122 if (is != null) { 123 is.close(); 124 } 125 } 126 LOG.info("Loaded HCat Configuration: " + path); 127 } else { 128 LOG.warn("HCat Configuration could not be found at [" + path + "]"); 129 } 130 } 131 } 132 else { 133 LOG.info("HCat Configuration not specified"); 134 } 135 } 136 137 public Configuration getHCatConf() { 138 return hcatConf; 139 } 140 141 private void initializeMappingRules() { 142 String[] connections = conf.getStrings(JMS_CONNECTIONS_PROPERTIES); 143 if (connections != null) { 144 mappingRules = new ArrayList<MappingRule>(connections.length); 145 for (String connection : connections) { 146 String[] values = connection.split("=", 2); 147 String key = values[0].trim(); 148 String value = values[1].trim(); 149 if (key.equals("default")) { 150 defaultJMSConnInfo = new JMSConnectionInfo(value); 151 } 152 else { 153 mappingRules.add(new MappingRule(key, value)); 154 } 155 } 156 } 157 else { 158 LOG.warn("No JMS connection defined"); 159 } 160 } 161 162 /** 163 * Determine whether a given source URI publishes JMS messages 164 * 165 * @param sourceURI URI of the publisher 166 * @return true if we have JMS connection information for the source URI, else false 167 */ 168 public boolean isKnownPublisher(URI sourceURI) { 169 if (nonJMSPublishers.contains(sourceURI.getAuthority())) { 170 return true; 171 } 172 else { 173 JMSConnectionInfo connInfo = publisherJMSConnInfoMap.get(sourceURI.getAuthority()); 174 return connInfo == null ? (getJMSConnectionInfo(sourceURI) != null) : true; 175 } 176 } 177 178 /** 179 * Given a publisher host:port return the connection details of JMS server that the publisher 180 * publishes to 181 * 182 * @param publisherURI URI of the publisher 183 * @return JMSConnectionInfo to connect to the JMS server that the publisher publishes to 184 */ 185 public JMSConnectionInfo getJMSConnectionInfo(URI publisherURI) { 186 String publisherAuthority = publisherURI.getAuthority(); 187 JMSConnectionInfo connInfo = null; 188 if (publisherJMSConnInfoMap.containsKey(publisherAuthority)) { 189 connInfo = publisherJMSConnInfoMap.get(publisherAuthority); 190 } 191 else { 192 String schemeWithAuthority = publisherURI.getScheme() + "://" + publisherAuthority; 193 for (MappingRule mr : mappingRules) { 194 String jndiPropertiesString = mr.applyRule(schemeWithAuthority); 195 if (jndiPropertiesString != null) { 196 connInfo = new JMSConnectionInfo(jndiPropertiesString); 197 publisherJMSConnInfoMap.put(publisherAuthority, connInfo); 198 LOG.info("Adding hcat server [{0}] to the list of JMS publishers", schemeWithAuthority); 199 break; 200 } 201 } 202 if (connInfo == null && defaultJMSConnInfo != null) { 203 connInfo = defaultJMSConnInfo; 204 publisherJMSConnInfoMap.put(publisherAuthority, defaultJMSConnInfo); 205 LOG.info("Adding hcat server [{0}] to the list of JMS publishers", schemeWithAuthority); 206 } 207 else { 208 nonJMSPublishers.add(publisherAuthority); 209 LOG.info("Adding hcat server [{0}] to the list of non JMS publishers", schemeWithAuthority); 210 } 211 212 } 213 return connInfo; 214 } 215 216 /** 217 * Check if we are already listening to the JMS topic for the table in the given hcatURI 218 * 219 * @param hcatURI hcatalog partition URI 220 * @return true if registered to a JMS topic for the table in the given hcatURI 221 */ 222 public boolean isRegisteredForNotification(HCatURI hcatURI) { 223 return registeredTopicsMap.containsKey(getKeyForRegisteredTopicsMap(hcatURI)); 224 } 225 226 /** 227 * Register for notifications on a JMS topic for the specified hcatalog table. 228 * 229 * @param hcatURI hcatalog partition URI 230 * @param topic JMS topic to register to 231 * @param msgHandler Handler which will process the messages received on the topic 232 */ 233 public void registerForNotification(HCatURI hcatURI, String topic, HCatMessageHandler msgHandler) { 234 JMSConnectionInfo connInfo = getJMSConnectionInfo(hcatURI.getURI()); 235 jmsService.registerForNotification(connInfo, topic, msgHandler); 236 registeredTopicsMap.put( 237 getKeyForRegisteredTopicsMap(hcatURI), topic); 238 } 239 240 public void unregisterFromNotification(HCatURI hcatURI) { 241 String topic = registeredTopicsMap.remove(getKeyForRegisteredTopicsMap(hcatURI)); 242 if (topic != null) { 243 JMSConnectionInfo connInfo = getJMSConnectionInfo(hcatURI.getURI()); 244 jmsService.unregisterFromNotification(connInfo, topic); 245 } 246 } 247 248 public void unregisterFromNotification(String server, String database, String table) { 249 String key = server + DELIMITER + database + DELIMITER + table; 250 String topic = registeredTopicsMap.remove(key); 251 if (topic != null) { 252 try { 253 JMSConnectionInfo connInfo = getJMSConnectionInfo(new URI("hcat://" + server)); 254 jmsService.unregisterFromNotification(connInfo, topic); 255 } 256 catch (URISyntaxException e) { 257 LOG.warn("Error unregistering from notification for topic [{0}]. Hcat table=[{1}]", topic, key, e); 258 } 259 } 260 } 261 262 private String getKeyForRegisteredTopicsMap(HCatURI hcatURI) { 263 return hcatURI.getURI().getAuthority() + DELIMITER + hcatURI.getDb() 264 + DELIMITER + hcatURI.getTable(); 265 } 266 267 @Override 268 public void destroy() { 269 publisherJMSConnInfoMap.clear(); 270 } 271 272 @Override 273 public Class<? extends Service> getInterface() { 274 return HCatAccessorService.class; 275 } 276 277}