001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.Map.Entry; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028import javax.jms.JMSException; 029import javax.jms.MessageConsumer; 030import javax.jms.Session; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.util.ReflectionUtils; 033import org.apache.oozie.jms.ConnectionContext; 034import org.apache.oozie.jms.DefaultConnectionContext; 035import org.apache.oozie.jms.JMSConnectionInfo; 036import org.apache.oozie.jms.JMSExceptionListener; 037import org.apache.oozie.jms.MessageHandler; 038import org.apache.oozie.jms.MessageReceiver; 039import org.apache.oozie.util.XLog; 040 041import com.google.common.annotations.VisibleForTesting; 042 043/** 044 * This class will <ul> 045 * <li> Create/Manage JMS connections using user configured JNDI properties. </li> 046 * <li> Create/Manage session for specific connection/topic and reconnects on failures. </li> 047 * <li> Provide a way to create a subscriber and publisher </li> 048 * <li> Pure JMS compliant (implementation independent but primarily tested against Apache ActiveMQ). </li> 049 * </ul> 050 */ 051public class JMSAccessorService implements Service { 052 public static final String CONF_PREFIX = Service.CONF_PREFIX + "JMSAccessorService."; 053 public static final String JMS_CONNECTION_CONTEXT_IMPL = CONF_PREFIX + "connectioncontext.impl"; 054 public static final String SESSION_OPTS = CONF_PREFIX + "jms.sessionOpts"; 055 public static final String CONF_RETRY_INITIAL_DELAY = CONF_PREFIX + "retry.initial.delay"; 056 public static final String CONF_RETRY_MULTIPLIER = CONF_PREFIX + "retry.multiplier"; 057 public static final String CONF_RETRY_MAX_ATTEMPTS = CONF_PREFIX + "retry.max.attempts"; 058 private static XLog LOG; 059 060 private Configuration conf; 061 private int sessionOpts; 062 private int retryInitialDelay; 063 private int retryMultiplier; 064 private int retryMaxAttempts; 065 private ConnectionContext jmsProducerConnContext; 066 067 /** 068 * Map of JMS connection info to established JMS Connection 069 */ 070 private ConcurrentMap<JMSConnectionInfo, ConnectionContext> connectionMap = 071 new ConcurrentHashMap<JMSConnectionInfo, ConnectionContext>(); 072 /** 073 * Map of JMS connection info to topic names to MessageReceiver 074 */ 075 private ConcurrentMap<JMSConnectionInfo, Map<String, MessageReceiver>> receiversMap = 076 new ConcurrentHashMap<JMSConnectionInfo, Map<String, MessageReceiver>>(); 077 078 /** 079 * Map of JMS connection info to connection retry information 080 */ 081 private Map<JMSConnectionInfo, ConnectionRetryInfo> retryConnectionsMap = new HashMap<JMSConnectionInfo, ConnectionRetryInfo>(); 082 083 @Override 084 public void init(Services services) throws ServiceException { 085 LOG = XLog.getLog(getClass()); 086 conf = services.getConf(); 087 sessionOpts = conf.getInt(SESSION_OPTS, Session.AUTO_ACKNOWLEDGE); 088 retryInitialDelay = conf.getInt(CONF_RETRY_INITIAL_DELAY, 60); // initial delay in seconds 089 retryMultiplier = conf.getInt(CONF_RETRY_MULTIPLIER, 2); 090 retryMaxAttempts = conf.getInt(CONF_RETRY_MAX_ATTEMPTS, 10); 091 } 092 093 /** 094 * Register for notifications on a JMS topic. 095 * 096 * @param connInfo Information to connect to a JMS compliant messaging service. 097 * @param topic Topic in which the JMS messages are published 098 * @param msgHandler Handler which will process the messages received on the topic 099 */ 100 public void registerForNotification(JMSConnectionInfo connInfo, String topic, MessageHandler msgHandler) { 101 if (!isTopicInRetryList(connInfo, topic)) { 102 if (isConnectionInRetryList(connInfo)) { 103 queueTopicForRetry(connInfo, topic, msgHandler); 104 } 105 else { 106 Map<String, MessageReceiver> topicsMap = getReceiversTopicsMap(connInfo); 107 if (!topicsMap.containsKey(topic)) { 108 synchronized (topicsMap) { 109 if (!topicsMap.containsKey(topic)) { 110 ConnectionContext connCtxt = createConnectionContext(connInfo); 111 if (connCtxt == null) { 112 queueTopicForRetry(connInfo, topic, msgHandler); 113 return; 114 } 115 MessageReceiver receiver = registerForTopic(connInfo, connCtxt, topic, msgHandler); 116 if (receiver == null) { 117 queueTopicForRetry(connInfo, topic, msgHandler); 118 } 119 else { 120 LOG.info("Registered a listener for topic {0} on {1}", topic, connInfo); 121 topicsMap.put(topic, receiver); 122 } 123 } 124 } 125 } 126 } 127 } 128 } 129 130 /** 131 * Unregister from listening to JMS messages on a topic. 132 * 133 * @param connInfo Information to connect to the JMS compliant messaging service. 134 * @param topic Topic in which the JMS messages are published 135 */ 136 public void unregisterFromNotification(JMSConnectionInfo connInfo, String topic) { 137 LOG.info("Unregistering JMS listener. Closing session for {0} and topic {1}", connInfo, topic); 138 139 if (isTopicInRetryList(connInfo, topic)) { 140 removeTopicFromRetryList(connInfo, topic); 141 } 142 else { 143 Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo); 144 if (topicsMap != null) { 145 MessageReceiver receiver = null; 146 synchronized (topicsMap) { 147 receiver = topicsMap.remove(topic); 148 if (topicsMap.isEmpty()) { 149 receiversMap.remove(connInfo); 150 } 151 } 152 if (receiver != null) { 153 try { 154 receiver.getSession().close(); 155 } 156 catch (JMSException e) { 157 LOG.warn("Unable to close session " + receiver.getSession(), e); 158 } 159 } 160 else { 161 LOG.warn("Received request to unregister from topic [{0}] on [{1}], but no matching session.", 162 topic, connInfo); 163 } 164 } 165 } 166 } 167 168 private Map<String, MessageReceiver> getReceiversTopicsMap(JMSConnectionInfo connInfo) { 169 Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo); 170 if (topicsMap == null) { 171 topicsMap = new HashMap<String, MessageReceiver>(); 172 Map<String, MessageReceiver> exists = receiversMap.putIfAbsent(connInfo, topicsMap); 173 if (exists != null) { 174 topicsMap = exists; 175 } 176 } 177 return topicsMap; 178 } 179 180 /** 181 * Determine if currently listening to JMS messages on a topic. 182 * 183 * @param connInfo Information to connect to the JMS compliant messaging service. 184 * @param topic Topic in which the JMS messages are published 185 * @return true if listening to the topic, else false 186 */ 187 @VisibleForTesting 188 boolean isListeningToTopic(JMSConnectionInfo connInfo, String topic) { 189 Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo); 190 return (topicsMap != null && topicsMap.containsKey(topic)); 191 } 192 193 @VisibleForTesting 194 boolean isConnectionInRetryList(JMSConnectionInfo connInfo) { 195 return retryConnectionsMap.containsKey(connInfo); 196 } 197 198 @VisibleForTesting 199 boolean isTopicInRetryList(JMSConnectionInfo connInfo, String topic) { 200 ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo); 201 if (connRetryInfo == null) { 202 return false; 203 } 204 else { 205 Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry(); 206 return topicsMap.containsKey(topic); 207 } 208 } 209 210 // For unit testing 211 @VisibleForTesting 212 int getNumConnectionAttempts(JMSConnectionInfo connInfo) { 213 return retryConnectionsMap.get(connInfo).getNumAttempt(); 214 } 215 216 private ConnectionRetryInfo queueConnectionForRetry(JMSConnectionInfo connInfo) { 217 ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo); 218 if (connRetryInfo == null) { 219 LOG.info("Queueing connection {0} for retry", connInfo); 220 connRetryInfo = new ConnectionRetryInfo(0, retryInitialDelay); 221 retryConnectionsMap.put(connInfo, connRetryInfo); 222 scheduleRetry(connInfo, retryInitialDelay); 223 } 224 return connRetryInfo; 225 } 226 227 private ConnectionRetryInfo queueTopicForRetry(JMSConnectionInfo connInfo, String topic, MessageHandler msgHandler) { 228 LOG.info("Queueing topic {0} for {1} for retry", topic, connInfo); 229 ConnectionRetryInfo connRetryInfo = queueConnectionForRetry(connInfo); 230 Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry(); 231 topicsMap.put(topic, msgHandler); 232 return connRetryInfo; 233 } 234 235 private void removeTopicFromRetryList(JMSConnectionInfo connInfo, String topic) { 236 LOG.info("Removing topic {0} from {1} from retry list", topic, connInfo); 237 ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo); 238 if (connRetryInfo != null) { 239 Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry(); 240 topicsMap.remove(topic); 241 } 242 } 243 244 private MessageReceiver registerForTopic(JMSConnectionInfo connInfo, ConnectionContext connCtxt, String topic, 245 MessageHandler msgHandler) { 246 try { 247 Session session = connCtxt.createSession(sessionOpts); 248 MessageConsumer consumer = connCtxt.createConsumer(session, topic); 249 MessageReceiver receiver = new MessageReceiver(msgHandler, session, consumer); 250 consumer.setMessageListener(receiver); 251 return receiver; 252 } 253 catch (JMSException e) { 254 LOG.warn("Error while registering to listen to topic {0} from {1}", topic, connInfo, e); 255 return null; 256 } 257 } 258 259 public ConnectionContext createConnectionContext(JMSConnectionInfo connInfo) { 260 ConnectionContext connCtxt = connectionMap.get(connInfo); 261 if (connCtxt == null) { 262 try { 263 connCtxt = getConnectionContextImpl(); 264 connCtxt.createConnection(connInfo.getJNDIProperties()); 265 connCtxt.setExceptionListener(new JMSExceptionListener(connInfo, connCtxt, true)); 266 connectionMap.put(connInfo, connCtxt); 267 LOG.info("Connection established to JMS Server for [{0}]", connInfo); 268 } 269 catch (Exception e) { 270 LOG.warn("Exception while establishing connection to JMS Server for [{0}]", connInfo, e); 271 return null; 272 } 273 } 274 return connCtxt; 275 } 276 277 public ConnectionContext createProducerConnectionContext(JMSConnectionInfo connInfo) { 278 if (jmsProducerConnContext != null && jmsProducerConnContext.isConnectionInitialized()) { 279 return jmsProducerConnContext; 280 } 281 else { 282 synchronized (this) { 283 if (jmsProducerConnContext == null || !jmsProducerConnContext.isConnectionInitialized()) { 284 try { 285 jmsProducerConnContext = getConnectionContextImpl(); 286 jmsProducerConnContext.createConnection(connInfo.getJNDIProperties()); 287 jmsProducerConnContext.setExceptionListener(new JMSExceptionListener(connInfo, 288 jmsProducerConnContext, false)); 289 LOG.info("Connection established to JMS Server for [{0}]", connInfo); 290 } 291 catch (Exception e) { 292 LOG.warn("Exception while establishing connection to JMS Server for [{0}]", connInfo, e); 293 return null; 294 } 295 } 296 } 297 } 298 return jmsProducerConnContext; 299 } 300 301 private ConnectionContext getConnectionContextImpl() { 302 Class<?> defaultClazz = ConfigurationService.getClass(conf, JMS_CONNECTION_CONTEXT_IMPL); 303 ConnectionContext connCtx = null; 304 if (defaultClazz == DefaultConnectionContext.class) { 305 connCtx = new DefaultConnectionContext(); 306 } 307 else { 308 connCtx = (ConnectionContext) ReflectionUtils.newInstance(defaultClazz, null); 309 } 310 return connCtx; 311 } 312 313 @VisibleForTesting 314 MessageReceiver getMessageReceiver(JMSConnectionInfo connInfo, String topic) { 315 Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo); 316 if (topicsMap != null) { 317 return topicsMap.get(topic); 318 } 319 return null; 320 } 321 322 @Override 323 public void destroy() { 324 LOG.info("Destroying JMSAccessor service "); 325 receiversMap.clear(); 326 327 LOG.info("Closing JMS connections"); 328 for (ConnectionContext conn : connectionMap.values()) { 329 conn.close(); 330 } 331 if (jmsProducerConnContext != null) { 332 jmsProducerConnContext.close(); 333 } 334 connectionMap.clear(); 335 } 336 337 @Override 338 public Class<? extends Service> getInterface() { 339 return JMSAccessorService.class; 340 } 341 342 /** 343 * Reestablish connection for the given JMS connect information 344 * @param connInfo JMS connection info 345 */ 346 public void reestablishConnection(JMSConnectionInfo connInfo) { 347 // Queue the connection and topics for retry 348 connectionMap.remove(connInfo); 349 ConnectionRetryInfo connRetryInfo = queueConnectionForRetry(connInfo); 350 Map<String, MessageReceiver> listeningTopicsMap = receiversMap.remove(connInfo); 351 if (listeningTopicsMap != null) { 352 Map<String, MessageHandler> retryTopicsMap = connRetryInfo.getTopicsToRetry(); 353 for (Entry<String, MessageReceiver> topicEntry : listeningTopicsMap.entrySet()) { 354 MessageReceiver receiver = topicEntry.getValue(); 355 retryTopicsMap.put(topicEntry.getKey(), receiver.getMessageHandler()); 356 } 357 } 358 } 359 360 private void scheduleRetry(JMSConnectionInfo connInfo, long delay) { 361 LOG.info("Scheduling retry of connection [{0}] in [{1}] seconds", connInfo, delay); 362 JMSRetryRunnable runnable = new JMSRetryRunnable(connInfo); 363 SchedulerService scheduler = Services.get().get(SchedulerService.class); 364 scheduler.schedule(runnable, delay, SchedulerService.Unit.SEC); 365 } 366 367 @VisibleForTesting 368 boolean retryConnection(JMSConnectionInfo connInfo) { 369 ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo); 370 if (connRetryInfo.getNumAttempt() >= retryMaxAttempts) { 371 LOG.info("Not attempting connection [{0}] again. Reached max attempts [{1}]", connInfo, retryMaxAttempts); 372 return false; 373 } 374 LOG.info("Attempting retry of connection [{0}]", connInfo); 375 connRetryInfo.setNumAttempt(connRetryInfo.getNumAttempt() + 1); 376 connRetryInfo.setNextDelay(connRetryInfo.getNextDelay() * retryMultiplier); 377 ConnectionContext connCtxt = createConnectionContext(connInfo); 378 boolean shouldRetry = false; 379 if (connCtxt == null) { 380 shouldRetry = true; 381 } 382 else { 383 Map<String, MessageHandler> retryTopicsMap = connRetryInfo.getTopicsToRetry(); 384 Map<String, MessageReceiver> listeningTopicsMap = getReceiversTopicsMap(connInfo); 385 List<String> topicsToRemoveList = new ArrayList<String>(); 386 // For each topic in the retry list, try to register the MessageHandler for that topic 387 for (Entry<String, MessageHandler> topicEntry : retryTopicsMap.entrySet()) { 388 String topic = topicEntry.getKey(); 389 if (listeningTopicsMap.containsKey(topic)) { 390 continue; 391 } 392 synchronized (listeningTopicsMap) { 393 if (!listeningTopicsMap.containsKey(topic)) { 394 MessageReceiver receiver = registerForTopic(connInfo, connCtxt, topic, topicEntry.getValue()); 395 if (receiver == null) { 396 LOG.warn("Failed to register a listener for topic {0} on {1}", topic, connInfo); 397 } 398 else { 399 listeningTopicsMap.put(topic, receiver); 400 topicsToRemoveList.add(topic); 401 LOG.info("Registered a listener for topic {0} on {1}", topic, connInfo); 402 } 403 } 404 } 405 } 406 for (String topic : topicsToRemoveList) { 407 retryTopicsMap.remove(topic); 408 } 409 if (retryTopicsMap.isEmpty()) { 410 shouldRetry = false; 411 } 412 } 413 414 if (shouldRetry) { 415 scheduleRetry(connInfo, connRetryInfo.getNextDelay()); 416 } 417 else { 418 retryConnectionsMap.remove(connInfo); 419 } 420 return true; 421 } 422 423 private static class ConnectionRetryInfo { 424 private int numAttempt; 425 private int nextDelay; 426 private Map<String, MessageHandler> retryTopicsMap; 427 428 public ConnectionRetryInfo(int numAttempt, int nextDelay) { 429 this.numAttempt = numAttempt; 430 this.nextDelay = nextDelay; 431 this.retryTopicsMap = new HashMap<String, MessageHandler>(); 432 } 433 434 public int getNumAttempt() { 435 return numAttempt; 436 } 437 438 public void setNumAttempt(int numAttempt) { 439 this.numAttempt = numAttempt; 440 } 441 442 public int getNextDelay() { 443 return nextDelay; 444 } 445 446 public void setNextDelay(int nextDelay) { 447 this.nextDelay = nextDelay; 448 } 449 450 public Map<String, MessageHandler> getTopicsToRetry() { 451 return retryTopicsMap; 452 } 453 454 } 455 456 public class JMSRetryRunnable implements Runnable { 457 458 private JMSConnectionInfo connInfo; 459 460 public JMSRetryRunnable(JMSConnectionInfo connInfo) { 461 this.connInfo = connInfo; 462 } 463 464 public JMSConnectionInfo getJMSConnectionInfo() { 465 return connInfo; 466 } 467 468 @Override 469 public void run() { 470 retryConnection(connInfo); 471 } 472 473 } 474 475}