001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.util.ArrayList; 021 import java.util.HashMap; 022 import java.util.List; 023 import java.util.Map; 024 import java.util.Map.Entry; 025 import java.util.concurrent.ConcurrentHashMap; 026 import java.util.concurrent.ConcurrentMap; 027 import javax.jms.JMSException; 028 import javax.jms.MessageConsumer; 029 import javax.jms.Session; 030 import org.apache.hadoop.conf.Configuration; 031 import org.apache.hadoop.util.ReflectionUtils; 032 import org.apache.oozie.jms.ConnectionContext; 033 import org.apache.oozie.jms.DefaultConnectionContext; 034 import org.apache.oozie.jms.JMSConnectionInfo; 035 import org.apache.oozie.jms.JMSExceptionListener; 036 import org.apache.oozie.jms.MessageHandler; 037 import org.apache.oozie.jms.MessageReceiver; 038 import org.apache.oozie.util.XLog; 039 040 import com.google.common.annotations.VisibleForTesting; 041 042 /** 043 * This class will <ul> 044 * <li> Create/Manage JMS connections using user configured JNDI properties. </li> 045 * <li> Create/Manage session for specific connection/topic and reconnects on failures. </li> 046 * <li> Provide a way to create a subscriber and publisher </li> 047 * <li> Pure JMS compliant (implementation independent but primarily tested against Apache ActiveMQ). </li> 048 * </ul> 049 */ 050 public class JMSAccessorService implements Service { 051 public static final String CONF_PREFIX = Service.CONF_PREFIX + "JMSAccessorService."; 052 public static final String JMS_CONNECTION_CONTEXT_IMPL = CONF_PREFIX + "connectioncontext.impl"; 053 public static final String SESSION_OPTS = CONF_PREFIX + "jms.sessionOpts"; 054 public static final String CONF_RETRY_INITIAL_DELAY = CONF_PREFIX + "retry.initial.delay"; 055 public static final String CONF_RETRY_MULTIPLIER = CONF_PREFIX + "retry.multiplier"; 056 public static final String CONF_RETRY_MAX_ATTEMPTS = CONF_PREFIX + "retry.max.attempts"; 057 private static XLog LOG; 058 059 private Configuration conf; 060 private int sessionOpts; 061 private int retryInitialDelay; 062 private int retryMultiplier; 063 private int retryMaxAttempts; 064 private ConnectionContext jmsProducerConnContext; 065 066 /** 067 * Map of JMS connection info to established JMS Connection 068 */ 069 private ConcurrentMap<JMSConnectionInfo, ConnectionContext> connectionMap = 070 new ConcurrentHashMap<JMSConnectionInfo, ConnectionContext>(); 071 /** 072 * Map of JMS connection info to topic names to MessageReceiver 073 */ 074 private ConcurrentMap<JMSConnectionInfo, Map<String, MessageReceiver>> receiversMap = 075 new ConcurrentHashMap<JMSConnectionInfo, Map<String, MessageReceiver>>(); 076 077 /** 078 * Map of JMS connection info to connection retry information 079 */ 080 private Map<JMSConnectionInfo, ConnectionRetryInfo> retryConnectionsMap = new HashMap<JMSConnectionInfo, ConnectionRetryInfo>(); 081 082 @Override 083 public void init(Services services) throws ServiceException { 084 LOG = XLog.getLog(getClass()); 085 conf = services.getConf(); 086 sessionOpts = conf.getInt(SESSION_OPTS, Session.AUTO_ACKNOWLEDGE); 087 retryInitialDelay = conf.getInt(CONF_RETRY_INITIAL_DELAY, 60); // initial delay in seconds 088 retryMultiplier = conf.getInt(CONF_RETRY_MULTIPLIER, 2); 089 retryMaxAttempts = conf.getInt(CONF_RETRY_MAX_ATTEMPTS, 10); 090 } 091 092 /** 093 * Register for notifications on a JMS topic. 094 * 095 * @param connInfo Information to connect to a JMS compliant messaging service. 096 * @param topic Topic in which the JMS messages are published 097 * @param msgHandler Handler which will process the messages received on the topic 098 */ 099 public void registerForNotification(JMSConnectionInfo connInfo, String topic, MessageHandler msgHandler) { 100 if (!isTopicInRetryList(connInfo, topic)) { 101 if (isConnectionInRetryList(connInfo)) { 102 queueTopicForRetry(connInfo, topic, msgHandler); 103 } 104 else { 105 Map<String, MessageReceiver> topicsMap = getReceiversTopicsMap(connInfo); 106 if (!topicsMap.containsKey(topic)) { 107 synchronized (topicsMap) { 108 if (!topicsMap.containsKey(topic)) { 109 ConnectionContext connCtxt = createConnectionContext(connInfo); 110 if (connCtxt == null) { 111 queueTopicForRetry(connInfo, topic, msgHandler); 112 return; 113 } 114 MessageReceiver receiver = registerForTopic(connInfo, connCtxt, topic, msgHandler); 115 if (receiver == null) { 116 queueTopicForRetry(connInfo, topic, msgHandler); 117 } 118 else { 119 LOG.info("Registered a listener for topic {0} on {1}", topic, connInfo); 120 topicsMap.put(topic, receiver); 121 } 122 } 123 } 124 } 125 } 126 } 127 } 128 129 /** 130 * Unregister from listening to JMS messages on a topic. 131 * 132 * @param connInfo Information to connect to the JMS compliant messaging service. 133 * @param topic Topic in which the JMS messages are published 134 */ 135 public void unregisterFromNotification(JMSConnectionInfo connInfo, String topic) { 136 LOG.info("Unregistering JMS listener. Clossing session for {0} and topic {1}", connInfo, topic); 137 138 if (isTopicInRetryList(connInfo, topic)) { 139 removeTopicFromRetryList(connInfo, topic); 140 } 141 else { 142 Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo); 143 if (topicsMap != null) { 144 MessageReceiver receiver = null; 145 synchronized (topicsMap) { 146 receiver = topicsMap.remove(topic); 147 if (topicsMap.isEmpty()) { 148 receiversMap.remove(connInfo); 149 } 150 } 151 if (receiver != null) { 152 try { 153 receiver.getSession().close(); 154 } 155 catch (JMSException e) { 156 LOG.warn("Unable to close session " + receiver.getSession(), e); 157 } 158 } 159 else { 160 LOG.warn("Received request to unregister from topic [{0}] on [{1}], but no matching session.", 161 topic, connInfo); 162 } 163 } 164 } 165 } 166 167 private Map<String, MessageReceiver> getReceiversTopicsMap(JMSConnectionInfo connInfo) { 168 Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo); 169 if (topicsMap == null) { 170 topicsMap = new HashMap<String, MessageReceiver>(); 171 Map<String, MessageReceiver> exists = receiversMap.putIfAbsent(connInfo, topicsMap); 172 if (exists != null) { 173 topicsMap = exists; 174 } 175 } 176 return topicsMap; 177 } 178 179 /** 180 * Determine if currently listening to JMS messages on a topic. 181 * 182 * @param connInfo Information to connect to the JMS compliant messaging service. 183 * @param topic Topic in which the JMS messages are published 184 * @return true if listening to the topic, else false 185 */ 186 @VisibleForTesting 187 boolean isListeningToTopic(JMSConnectionInfo connInfo, String topic) { 188 Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo); 189 return (topicsMap != null && topicsMap.containsKey(topic)); 190 } 191 192 @VisibleForTesting 193 boolean isConnectionInRetryList(JMSConnectionInfo connInfo) { 194 return retryConnectionsMap.containsKey(connInfo); 195 } 196 197 @VisibleForTesting 198 boolean isTopicInRetryList(JMSConnectionInfo connInfo, String topic) { 199 ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo); 200 if (connRetryInfo == null) { 201 return false; 202 } 203 else { 204 Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry(); 205 return topicsMap.containsKey(topic); 206 } 207 } 208 209 // For unit testing 210 @VisibleForTesting 211 int getNumConnectionAttempts(JMSConnectionInfo connInfo) { 212 return retryConnectionsMap.get(connInfo).getNumAttempt(); 213 } 214 215 private ConnectionRetryInfo queueConnectionForRetry(JMSConnectionInfo connInfo) { 216 ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo); 217 if (connRetryInfo == null) { 218 LOG.info("Queueing connection {0} for retry", connInfo); 219 connRetryInfo = new ConnectionRetryInfo(0, retryInitialDelay); 220 retryConnectionsMap.put(connInfo, connRetryInfo); 221 scheduleRetry(connInfo, retryInitialDelay); 222 } 223 return connRetryInfo; 224 } 225 226 private ConnectionRetryInfo queueTopicForRetry(JMSConnectionInfo connInfo, String topic, MessageHandler msgHandler) { 227 LOG.info("Queueing topic {0} for {1} for retry", topic, connInfo); 228 ConnectionRetryInfo connRetryInfo = queueConnectionForRetry(connInfo); 229 Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry(); 230 topicsMap.put(topic, msgHandler); 231 return connRetryInfo; 232 } 233 234 private void removeTopicFromRetryList(JMSConnectionInfo connInfo, String topic) { 235 LOG.info("Removing topic {0} from {1} from retry list", topic, connInfo); 236 ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo); 237 if (connRetryInfo != null) { 238 Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry(); 239 topicsMap.remove(topic); 240 } 241 } 242 243 private MessageReceiver registerForTopic(JMSConnectionInfo connInfo, ConnectionContext connCtxt, String topic, 244 MessageHandler msgHandler) { 245 try { 246 Session session = connCtxt.createSession(sessionOpts); 247 MessageConsumer consumer = connCtxt.createConsumer(session, topic); 248 MessageReceiver receiver = new MessageReceiver(msgHandler, session, consumer); 249 consumer.setMessageListener(receiver); 250 return receiver; 251 } 252 catch (JMSException e) { 253 LOG.warn("Error while registering to listen to topic {0} from {1}", topic, connInfo, e); 254 return null; 255 } 256 } 257 258 public ConnectionContext createConnectionContext(JMSConnectionInfo connInfo) { 259 ConnectionContext connCtxt = connectionMap.get(connInfo); 260 if (connCtxt == null) { 261 try { 262 connCtxt = getConnectionContextImpl(); 263 connCtxt.createConnection(connInfo.getJNDIProperties()); 264 connCtxt.setExceptionListener(new JMSExceptionListener(connInfo, connCtxt, true)); 265 connectionMap.put(connInfo, connCtxt); 266 LOG.info("Connection established to JMS Server for [{0}]", connInfo); 267 } 268 catch (Exception e) { 269 LOG.warn("Exception while establishing connection to JMS Server for [{0}]", connInfo, e); 270 return null; 271 } 272 } 273 return connCtxt; 274 } 275 276 public ConnectionContext createProducerConnectionContext(JMSConnectionInfo connInfo) { 277 if (jmsProducerConnContext != null && jmsProducerConnContext.isConnectionInitialized()) { 278 return jmsProducerConnContext; 279 } 280 else { 281 synchronized (this) { 282 if (jmsProducerConnContext == null || !jmsProducerConnContext.isConnectionInitialized()) { 283 try { 284 jmsProducerConnContext = getConnectionContextImpl(); 285 jmsProducerConnContext.createConnection(connInfo.getJNDIProperties()); 286 jmsProducerConnContext.setExceptionListener(new JMSExceptionListener(connInfo, 287 jmsProducerConnContext, false)); 288 LOG.info("Connection established to JMS Server for [{0}]", connInfo); 289 } 290 catch (Exception e) { 291 LOG.warn("Exception while establishing connection to JMS Server for [{0}]", connInfo, e); 292 return null; 293 } 294 } 295 } 296 } 297 return jmsProducerConnContext; 298 } 299 300 private ConnectionContext getConnectionContextImpl() { 301 Class<?> defaultClazz = conf.getClass(JMS_CONNECTION_CONTEXT_IMPL, DefaultConnectionContext.class); 302 ConnectionContext connCtx = null; 303 if (defaultClazz == DefaultConnectionContext.class) { 304 connCtx = new DefaultConnectionContext(); 305 } 306 else { 307 connCtx = (ConnectionContext) ReflectionUtils.newInstance(defaultClazz, null); 308 } 309 return connCtx; 310 } 311 312 @VisibleForTesting 313 MessageReceiver getMessageReceiver(JMSConnectionInfo connInfo, String topic) { 314 Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo); 315 if (topicsMap != null) { 316 return topicsMap.get(topic); 317 } 318 return null; 319 } 320 321 @Override 322 public void destroy() { 323 LOG.info("Destroying JMSAccessor service "); 324 receiversMap.clear(); 325 326 LOG.info("Closing JMS connections"); 327 for (ConnectionContext conn : connectionMap.values()) { 328 conn.close(); 329 } 330 if (jmsProducerConnContext != null) { 331 jmsProducerConnContext.close(); 332 } 333 connectionMap.clear(); 334 } 335 336 @Override 337 public Class<? extends Service> getInterface() { 338 return JMSAccessorService.class; 339 } 340 341 /** 342 * Reestablish connection for the given JMS connect information 343 * @param connInfo JMS connection info 344 */ 345 public void reestablishConnection(JMSConnectionInfo connInfo) { 346 // Queue the connection and topics for retry 347 connectionMap.remove(connInfo); 348 ConnectionRetryInfo connRetryInfo = queueConnectionForRetry(connInfo); 349 Map<String, MessageReceiver> listeningTopicsMap = receiversMap.remove(connInfo); 350 if (listeningTopicsMap != null) { 351 Map<String, MessageHandler> retryTopicsMap = connRetryInfo.getTopicsToRetry(); 352 for (Entry<String, MessageReceiver> topicEntry : listeningTopicsMap.entrySet()) { 353 MessageReceiver receiver = topicEntry.getValue(); 354 retryTopicsMap.put(topicEntry.getKey(), receiver.getMessageHandler()); 355 } 356 } 357 } 358 359 private void scheduleRetry(JMSConnectionInfo connInfo, long delay) { 360 LOG.info("Scheduling retry of connection [{0}] in [{1}] seconds", connInfo, delay); 361 JMSRetryRunnable runnable = new JMSRetryRunnable(connInfo); 362 SchedulerService scheduler = Services.get().get(SchedulerService.class); 363 scheduler.schedule(runnable, delay, SchedulerService.Unit.SEC); 364 } 365 366 @VisibleForTesting 367 boolean retryConnection(JMSConnectionInfo connInfo) { 368 ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo); 369 if (connRetryInfo.getNumAttempt() >= retryMaxAttempts) { 370 LOG.info("Not attempting connection [{0}] again. Reached max attempts [{1}]", connInfo, retryMaxAttempts); 371 return false; 372 } 373 LOG.info("Attempting retry of connection [{0}]", connInfo); 374 connRetryInfo.setNumAttempt(connRetryInfo.getNumAttempt() + 1); 375 connRetryInfo.setNextDelay(connRetryInfo.getNextDelay() * retryMultiplier); 376 ConnectionContext connCtxt = createConnectionContext(connInfo); 377 boolean shouldRetry = false; 378 if (connCtxt == null) { 379 shouldRetry = true; 380 } 381 else { 382 Map<String, MessageHandler> retryTopicsMap = connRetryInfo.getTopicsToRetry(); 383 Map<String, MessageReceiver> listeningTopicsMap = getReceiversTopicsMap(connInfo); 384 List<String> topicsToRemoveList = new ArrayList<String>(); 385 // For each topic in the retry list, try to register the MessageHandler for that topic 386 for (Entry<String, MessageHandler> topicEntry : retryTopicsMap.entrySet()) { 387 String topic = topicEntry.getKey(); 388 if (listeningTopicsMap.containsKey(topic)) { 389 continue; 390 } 391 synchronized (listeningTopicsMap) { 392 if (!listeningTopicsMap.containsKey(topic)) { 393 MessageReceiver receiver = registerForTopic(connInfo, connCtxt, topic, topicEntry.getValue()); 394 if (receiver == null) { 395 LOG.warn("Failed to register a listener for topic {0} on {1}", topic, connInfo); 396 } 397 else { 398 listeningTopicsMap.put(topic, receiver); 399 topicsToRemoveList.add(topic); 400 LOG.info("Registered a listener for topic {0} on {1}", topic, connInfo); 401 } 402 } 403 } 404 } 405 for (String topic : topicsToRemoveList) { 406 retryTopicsMap.remove(topic); 407 } 408 if (retryTopicsMap.isEmpty()) { 409 shouldRetry = false; 410 } 411 } 412 413 if (shouldRetry) { 414 scheduleRetry(connInfo, connRetryInfo.getNextDelay()); 415 } 416 else { 417 retryConnectionsMap.remove(connInfo); 418 } 419 return true; 420 } 421 422 private static class ConnectionRetryInfo { 423 private int numAttempt; 424 private int nextDelay; 425 private Map<String, MessageHandler> retryTopicsMap; 426 427 public ConnectionRetryInfo(int numAttempt, int nextDelay) { 428 this.numAttempt = numAttempt; 429 this.nextDelay = nextDelay; 430 this.retryTopicsMap = new HashMap<String, MessageHandler>(); 431 } 432 433 public int getNumAttempt() { 434 return numAttempt; 435 } 436 437 public void setNumAttempt(int numAttempt) { 438 this.numAttempt = numAttempt; 439 } 440 441 public int getNextDelay() { 442 return nextDelay; 443 } 444 445 public void setNextDelay(int nextDelay) { 446 this.nextDelay = nextDelay; 447 } 448 449 public Map<String, MessageHandler> getTopicsToRetry() { 450 return retryTopicsMap; 451 } 452 453 } 454 455 public class JMSRetryRunnable implements Runnable { 456 457 private JMSConnectionInfo connInfo; 458 459 public JMSRetryRunnable(JMSConnectionInfo connInfo) { 460 this.connInfo = connInfo; 461 } 462 463 public JMSConnectionInfo getJMSConnectionInfo() { 464 return connInfo; 465 } 466 467 @Override 468 public void run() { 469 retryConnection(connInfo); 470 } 471 472 } 473 474 }