This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.util.ArrayList;
021 import java.util.HashMap;
022 import java.util.List;
023 import java.util.Map;
024 import java.util.Map.Entry;
025 import java.util.concurrent.ConcurrentHashMap;
026 import java.util.concurrent.ConcurrentMap;
027 import javax.jms.JMSException;
028 import javax.jms.MessageConsumer;
029 import javax.jms.Session;
030 import org.apache.hadoop.conf.Configuration;
031 import org.apache.hadoop.util.ReflectionUtils;
032 import org.apache.oozie.jms.ConnectionContext;
033 import org.apache.oozie.jms.DefaultConnectionContext;
034 import org.apache.oozie.jms.JMSConnectionInfo;
035 import org.apache.oozie.jms.JMSExceptionListener;
036 import org.apache.oozie.jms.MessageHandler;
037 import org.apache.oozie.jms.MessageReceiver;
038 import org.apache.oozie.util.XLog;
039
040 import com.google.common.annotations.VisibleForTesting;
041
042 /**
043 * This class will <ul>
044 * <li> Create/Manage JMS connections using user configured JNDI properties. </li>
045 * <li> Create/Manage session for specific connection/topic and reconnects on failures. </li>
046 * <li> Provide a way to create a subscriber and publisher </li>
047 * <li> Pure JMS compliant (implementation independent but primarily tested against Apache ActiveMQ). </li>
048 * </ul>
049 */
050 public class JMSAccessorService implements Service {
051 public static final String CONF_PREFIX = Service.CONF_PREFIX + "JMSAccessorService.";
052 public static final String JMS_CONNECTION_CONTEXT_IMPL = CONF_PREFIX + "connectioncontext.impl";
053 public static final String SESSION_OPTS = CONF_PREFIX + "jms.sessionOpts";
054 public static final String CONF_RETRY_INITIAL_DELAY = CONF_PREFIX + "retry.initial.delay";
055 public static final String CONF_RETRY_MULTIPLIER = CONF_PREFIX + "retry.multiplier";
056 public static final String CONF_RETRY_MAX_ATTEMPTS = CONF_PREFIX + "retry.max.attempts";
057 private static XLog LOG;
058
059 private Configuration conf;
060 private int sessionOpts;
061 private int retryInitialDelay;
062 private int retryMultiplier;
063 private int retryMaxAttempts;
064 private ConnectionContext jmsProducerConnContext;
065
066 /**
067 * Map of JMS connection info to established JMS Connection
068 */
069 private ConcurrentMap<JMSConnectionInfo, ConnectionContext> connectionMap =
070 new ConcurrentHashMap<JMSConnectionInfo, ConnectionContext>();
071 /**
072 * Map of JMS connection info to topic names to MessageReceiver
073 */
074 private ConcurrentMap<JMSConnectionInfo, Map<String, MessageReceiver>> receiversMap =
075 new ConcurrentHashMap<JMSConnectionInfo, Map<String, MessageReceiver>>();
076
077 /**
078 * Map of JMS connection info to connection retry information
079 */
080 private Map<JMSConnectionInfo, ConnectionRetryInfo> retryConnectionsMap = new HashMap<JMSConnectionInfo, ConnectionRetryInfo>();
081
082 @Override
083 public void init(Services services) throws ServiceException {
084 LOG = XLog.getLog(getClass());
085 conf = services.getConf();
086 sessionOpts = conf.getInt(SESSION_OPTS, Session.AUTO_ACKNOWLEDGE);
087 retryInitialDelay = conf.getInt(CONF_RETRY_INITIAL_DELAY, 60); // initial delay in seconds
088 retryMultiplier = conf.getInt(CONF_RETRY_MULTIPLIER, 2);
089 retryMaxAttempts = conf.getInt(CONF_RETRY_MAX_ATTEMPTS, 10);
090 }
091
092 /**
093 * Register for notifications on a JMS topic.
094 *
095 * @param connInfo Information to connect to a JMS compliant messaging service.
096 * @param topic Topic in which the JMS messages are published
097 * @param msgHandler Handler which will process the messages received on the topic
098 */
099 public void registerForNotification(JMSConnectionInfo connInfo, String topic, MessageHandler msgHandler) {
100 if (!isTopicInRetryList(connInfo, topic)) {
101 if (isConnectionInRetryList(connInfo)) {
102 queueTopicForRetry(connInfo, topic, msgHandler);
103 }
104 else {
105 Map<String, MessageReceiver> topicsMap = getReceiversTopicsMap(connInfo);
106 if (!topicsMap.containsKey(topic)) {
107 synchronized (topicsMap) {
108 if (!topicsMap.containsKey(topic)) {
109 ConnectionContext connCtxt = createConnectionContext(connInfo);
110 if (connCtxt == null) {
111 queueTopicForRetry(connInfo, topic, msgHandler);
112 return;
113 }
114 MessageReceiver receiver = registerForTopic(connInfo, connCtxt, topic, msgHandler);
115 if (receiver == null) {
116 queueTopicForRetry(connInfo, topic, msgHandler);
117 }
118 else {
119 LOG.info("Registered a listener for topic {0} on {1}", topic, connInfo);
120 topicsMap.put(topic, receiver);
121 }
122 }
123 }
124 }
125 }
126 }
127 }
128
129 /**
130 * Unregister from listening to JMS messages on a topic.
131 *
132 * @param connInfo Information to connect to the JMS compliant messaging service.
133 * @param topic Topic in which the JMS messages are published
134 */
135 public void unregisterFromNotification(JMSConnectionInfo connInfo, String topic) {
136 LOG.info("Unregistering JMS listener. Clossing session for {0} and topic {1}", connInfo, topic);
137
138 if (isTopicInRetryList(connInfo, topic)) {
139 removeTopicFromRetryList(connInfo, topic);
140 }
141 else {
142 Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
143 if (topicsMap != null) {
144 MessageReceiver receiver = null;
145 synchronized (topicsMap) {
146 receiver = topicsMap.remove(topic);
147 if (topicsMap.isEmpty()) {
148 receiversMap.remove(connInfo);
149 }
150 }
151 if (receiver != null) {
152 try {
153 receiver.getSession().close();
154 }
155 catch (JMSException e) {
156 LOG.warn("Unable to close session " + receiver.getSession(), e);
157 }
158 }
159 else {
160 LOG.warn("Received request to unregister from topic [{0}] on [{1}], but no matching session.",
161 topic, connInfo);
162 }
163 }
164 }
165 }
166
167 private Map<String, MessageReceiver> getReceiversTopicsMap(JMSConnectionInfo connInfo) {
168 Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
169 if (topicsMap == null) {
170 topicsMap = new HashMap<String, MessageReceiver>();
171 Map<String, MessageReceiver> exists = receiversMap.putIfAbsent(connInfo, topicsMap);
172 if (exists != null) {
173 topicsMap = exists;
174 }
175 }
176 return topicsMap;
177 }
178
179 /**
180 * Determine if currently listening to JMS messages on a topic.
181 *
182 * @param connInfo Information to connect to the JMS compliant messaging service.
183 * @param topic Topic in which the JMS messages are published
184 * @return true if listening to the topic, else false
185 */
186 @VisibleForTesting
187 boolean isListeningToTopic(JMSConnectionInfo connInfo, String topic) {
188 Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
189 return (topicsMap != null && topicsMap.containsKey(topic));
190 }
191
192 @VisibleForTesting
193 boolean isConnectionInRetryList(JMSConnectionInfo connInfo) {
194 return retryConnectionsMap.containsKey(connInfo);
195 }
196
197 @VisibleForTesting
198 boolean isTopicInRetryList(JMSConnectionInfo connInfo, String topic) {
199 ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo);
200 if (connRetryInfo == null) {
201 return false;
202 }
203 else {
204 Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry();
205 return topicsMap.containsKey(topic);
206 }
207 }
208
209 // For unit testing
210 @VisibleForTesting
211 int getNumConnectionAttempts(JMSConnectionInfo connInfo) {
212 return retryConnectionsMap.get(connInfo).getNumAttempt();
213 }
214
215 private ConnectionRetryInfo queueConnectionForRetry(JMSConnectionInfo connInfo) {
216 ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo);
217 if (connRetryInfo == null) {
218 LOG.info("Queueing connection {0} for retry", connInfo);
219 connRetryInfo = new ConnectionRetryInfo(0, retryInitialDelay);
220 retryConnectionsMap.put(connInfo, connRetryInfo);
221 scheduleRetry(connInfo, retryInitialDelay);
222 }
223 return connRetryInfo;
224 }
225
226 private ConnectionRetryInfo queueTopicForRetry(JMSConnectionInfo connInfo, String topic, MessageHandler msgHandler) {
227 LOG.info("Queueing topic {0} for {1} for retry", topic, connInfo);
228 ConnectionRetryInfo connRetryInfo = queueConnectionForRetry(connInfo);
229 Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry();
230 topicsMap.put(topic, msgHandler);
231 return connRetryInfo;
232 }
233
234 private void removeTopicFromRetryList(JMSConnectionInfo connInfo, String topic) {
235 LOG.info("Removing topic {0} from {1} from retry list", topic, connInfo);
236 ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo);
237 if (connRetryInfo != null) {
238 Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry();
239 topicsMap.remove(topic);
240 }
241 }
242
243 private MessageReceiver registerForTopic(JMSConnectionInfo connInfo, ConnectionContext connCtxt, String topic,
244 MessageHandler msgHandler) {
245 try {
246 Session session = connCtxt.createSession(sessionOpts);
247 MessageConsumer consumer = connCtxt.createConsumer(session, topic);
248 MessageReceiver receiver = new MessageReceiver(msgHandler, session, consumer);
249 consumer.setMessageListener(receiver);
250 return receiver;
251 }
252 catch (JMSException e) {
253 LOG.warn("Error while registering to listen to topic {0} from {1}", topic, connInfo, e);
254 return null;
255 }
256 }
257
258 public ConnectionContext createConnectionContext(JMSConnectionInfo connInfo) {
259 ConnectionContext connCtxt = connectionMap.get(connInfo);
260 if (connCtxt == null) {
261 try {
262 connCtxt = getConnectionContextImpl();
263 connCtxt.createConnection(connInfo.getJNDIProperties());
264 connCtxt.setExceptionListener(new JMSExceptionListener(connInfo, connCtxt, true));
265 connectionMap.put(connInfo, connCtxt);
266 LOG.info("Connection established to JMS Server for [{0}]", connInfo);
267 }
268 catch (Exception e) {
269 LOG.warn("Exception while establishing connection to JMS Server for [{0}]", connInfo, e);
270 return null;
271 }
272 }
273 return connCtxt;
274 }
275
276 public ConnectionContext createProducerConnectionContext(JMSConnectionInfo connInfo) {
277 if (jmsProducerConnContext != null && jmsProducerConnContext.isConnectionInitialized()) {
278 return jmsProducerConnContext;
279 }
280 else {
281 synchronized (this) {
282 if (jmsProducerConnContext == null || !jmsProducerConnContext.isConnectionInitialized()) {
283 try {
284 jmsProducerConnContext = getConnectionContextImpl();
285 jmsProducerConnContext.createConnection(connInfo.getJNDIProperties());
286 jmsProducerConnContext.setExceptionListener(new JMSExceptionListener(connInfo,
287 jmsProducerConnContext, false));
288 LOG.info("Connection established to JMS Server for [{0}]", connInfo);
289 }
290 catch (Exception e) {
291 LOG.warn("Exception while establishing connection to JMS Server for [{0}]", connInfo, e);
292 return null;
293 }
294 }
295 }
296 }
297 return jmsProducerConnContext;
298 }
299
300 private ConnectionContext getConnectionContextImpl() {
301 Class<?> defaultClazz = conf.getClass(JMS_CONNECTION_CONTEXT_IMPL, DefaultConnectionContext.class);
302 ConnectionContext connCtx = null;
303 if (defaultClazz == DefaultConnectionContext.class) {
304 connCtx = new DefaultConnectionContext();
305 }
306 else {
307 connCtx = (ConnectionContext) ReflectionUtils.newInstance(defaultClazz, null);
308 }
309 return connCtx;
310 }
311
312 @VisibleForTesting
313 MessageReceiver getMessageReceiver(JMSConnectionInfo connInfo, String topic) {
314 Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
315 if (topicsMap != null) {
316 return topicsMap.get(topic);
317 }
318 return null;
319 }
320
321 @Override
322 public void destroy() {
323 LOG.info("Destroying JMSAccessor service ");
324 receiversMap.clear();
325
326 LOG.info("Closing JMS connections");
327 for (ConnectionContext conn : connectionMap.values()) {
328 conn.close();
329 }
330 if (jmsProducerConnContext != null) {
331 jmsProducerConnContext.close();
332 }
333 connectionMap.clear();
334 }
335
336 @Override
337 public Class<? extends Service> getInterface() {
338 return JMSAccessorService.class;
339 }
340
341 /**
342 * Reestablish connection for the given JMS connect information
343 * @param connInfo JMS connection info
344 */
345 public void reestablishConnection(JMSConnectionInfo connInfo) {
346 // Queue the connection and topics for retry
347 connectionMap.remove(connInfo);
348 ConnectionRetryInfo connRetryInfo = queueConnectionForRetry(connInfo);
349 Map<String, MessageReceiver> listeningTopicsMap = receiversMap.remove(connInfo);
350 if (listeningTopicsMap != null) {
351 Map<String, MessageHandler> retryTopicsMap = connRetryInfo.getTopicsToRetry();
352 for (Entry<String, MessageReceiver> topicEntry : listeningTopicsMap.entrySet()) {
353 MessageReceiver receiver = topicEntry.getValue();
354 retryTopicsMap.put(topicEntry.getKey(), receiver.getMessageHandler());
355 }
356 }
357 }
358
359 private void scheduleRetry(JMSConnectionInfo connInfo, long delay) {
360 LOG.info("Scheduling retry of connection [{0}] in [{1}] seconds", connInfo, delay);
361 JMSRetryRunnable runnable = new JMSRetryRunnable(connInfo);
362 SchedulerService scheduler = Services.get().get(SchedulerService.class);
363 scheduler.schedule(runnable, delay, SchedulerService.Unit.SEC);
364 }
365
366 @VisibleForTesting
367 boolean retryConnection(JMSConnectionInfo connInfo) {
368 ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo);
369 if (connRetryInfo.getNumAttempt() >= retryMaxAttempts) {
370 LOG.info("Not attempting connection [{0}] again. Reached max attempts [{1}]", connInfo, retryMaxAttempts);
371 return false;
372 }
373 LOG.info("Attempting retry of connection [{0}]", connInfo);
374 connRetryInfo.setNumAttempt(connRetryInfo.getNumAttempt() + 1);
375 connRetryInfo.setNextDelay(connRetryInfo.getNextDelay() * retryMultiplier);
376 ConnectionContext connCtxt = createConnectionContext(connInfo);
377 boolean shouldRetry = false;
378 if (connCtxt == null) {
379 shouldRetry = true;
380 }
381 else {
382 Map<String, MessageHandler> retryTopicsMap = connRetryInfo.getTopicsToRetry();
383 Map<String, MessageReceiver> listeningTopicsMap = getReceiversTopicsMap(connInfo);
384 List<String> topicsToRemoveList = new ArrayList<String>();
385 // For each topic in the retry list, try to register the MessageHandler for that topic
386 for (Entry<String, MessageHandler> topicEntry : retryTopicsMap.entrySet()) {
387 String topic = topicEntry.getKey();
388 if (listeningTopicsMap.containsKey(topic)) {
389 continue;
390 }
391 synchronized (listeningTopicsMap) {
392 if (!listeningTopicsMap.containsKey(topic)) {
393 MessageReceiver receiver = registerForTopic(connInfo, connCtxt, topic, topicEntry.getValue());
394 if (receiver == null) {
395 LOG.warn("Failed to register a listener for topic {0} on {1}", topic, connInfo);
396 }
397 else {
398 listeningTopicsMap.put(topic, receiver);
399 topicsToRemoveList.add(topic);
400 LOG.info("Registered a listener for topic {0} on {1}", topic, connInfo);
401 }
402 }
403 }
404 }
405 for (String topic : topicsToRemoveList) {
406 retryTopicsMap.remove(topic);
407 }
408 if (retryTopicsMap.isEmpty()) {
409 shouldRetry = false;
410 }
411 }
412
413 if (shouldRetry) {
414 scheduleRetry(connInfo, connRetryInfo.getNextDelay());
415 }
416 else {
417 retryConnectionsMap.remove(connInfo);
418 }
419 return true;
420 }
421
422 private static class ConnectionRetryInfo {
423 private int numAttempt;
424 private int nextDelay;
425 private Map<String, MessageHandler> retryTopicsMap;
426
427 public ConnectionRetryInfo(int numAttempt, int nextDelay) {
428 this.numAttempt = numAttempt;
429 this.nextDelay = nextDelay;
430 this.retryTopicsMap = new HashMap<String, MessageHandler>();
431 }
432
433 public int getNumAttempt() {
434 return numAttempt;
435 }
436
437 public void setNumAttempt(int numAttempt) {
438 this.numAttempt = numAttempt;
439 }
440
441 public int getNextDelay() {
442 return nextDelay;
443 }
444
445 public void setNextDelay(int nextDelay) {
446 this.nextDelay = nextDelay;
447 }
448
449 public Map<String, MessageHandler> getTopicsToRetry() {
450 return retryTopicsMap;
451 }
452
453 }
454
455 public class JMSRetryRunnable implements Runnable {
456
457 private JMSConnectionInfo connInfo;
458
459 public JMSRetryRunnable(JMSConnectionInfo connInfo) {
460 this.connInfo = connInfo;
461 }
462
463 public JMSConnectionInfo getJMSConnectionInfo() {
464 return connInfo;
465 }
466
467 @Override
468 public void run() {
469 retryConnection(connInfo);
470 }
471
472 }
473
474 }