001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.Map.Entry;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028import javax.jms.JMSException;
029import javax.jms.MessageConsumer;
030import javax.jms.Session;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.util.ReflectionUtils;
033import org.apache.oozie.jms.ConnectionContext;
034import org.apache.oozie.jms.DefaultConnectionContext;
035import org.apache.oozie.jms.JMSConnectionInfo;
036import org.apache.oozie.jms.JMSExceptionListener;
037import org.apache.oozie.jms.MessageHandler;
038import org.apache.oozie.jms.MessageReceiver;
039import org.apache.oozie.util.XLog;
040
041import com.google.common.annotations.VisibleForTesting;
042
043/**
044 * This class will <ul>
045 * <li> Create/Manage JMS connections using user configured JNDI properties. </li>
046 * <li> Create/Manage session for specific connection/topic and reconnects on failures. </li>
047 * <li> Provide a way to create a subscriber and publisher </li>
048 * <li> Pure JMS compliant (implementation independent but primarily tested against Apache ActiveMQ). </li>
049 * </ul>
050 */
051public class JMSAccessorService implements Service {
052    public static final String CONF_PREFIX = Service.CONF_PREFIX + "JMSAccessorService.";
053    public static final String JMS_CONNECTION_CONTEXT_IMPL = CONF_PREFIX + "connectioncontext.impl";
054    public static final String SESSION_OPTS = CONF_PREFIX + "jms.sessionOpts";
055    public static final String CONF_RETRY_INITIAL_DELAY = CONF_PREFIX + "retry.initial.delay";
056    public static final String CONF_RETRY_MULTIPLIER = CONF_PREFIX + "retry.multiplier";
057    public static final String CONF_RETRY_MAX_ATTEMPTS = CONF_PREFIX + "retry.max.attempts";
058    private static XLog LOG;
059
060    private Configuration conf;
061    private int sessionOpts;
062    private int retryInitialDelay;
063    private int retryMultiplier;
064    private int retryMaxAttempts;
065    private ConnectionContext jmsProducerConnContext;
066
067    /**
068     * Map of JMS connection info to established JMS Connection
069     */
070    private ConcurrentMap<JMSConnectionInfo, ConnectionContext> connectionMap =
071            new ConcurrentHashMap<JMSConnectionInfo, ConnectionContext>();
072    /**
073     * Map of JMS connection info to topic names to MessageReceiver
074     */
075    private ConcurrentMap<JMSConnectionInfo, Map<String, MessageReceiver>> receiversMap =
076            new ConcurrentHashMap<JMSConnectionInfo, Map<String, MessageReceiver>>();
077
078    /**
079     * Map of JMS connection info to connection retry information
080     */
081    private Map<JMSConnectionInfo, ConnectionRetryInfo> retryConnectionsMap = new HashMap<JMSConnectionInfo, ConnectionRetryInfo>();
082
083    @Override
084    public void init(Services services) throws ServiceException {
085        LOG = XLog.getLog(getClass());
086        conf = services.getConf();
087        sessionOpts = conf.getInt(SESSION_OPTS, Session.AUTO_ACKNOWLEDGE);
088        retryInitialDelay = conf.getInt(CONF_RETRY_INITIAL_DELAY, 60); // initial delay in seconds
089        retryMultiplier = conf.getInt(CONF_RETRY_MULTIPLIER, 2);
090        retryMaxAttempts = conf.getInt(CONF_RETRY_MAX_ATTEMPTS, 10);
091    }
092
093    /**
094     * Register for notifications on a JMS topic.
095     *
096     * @param connInfo Information to connect to a JMS compliant messaging service.
097     * @param topic Topic in which the JMS messages are published
098     * @param msgHandler Handler which will process the messages received on the topic
099     */
100    public void registerForNotification(JMSConnectionInfo connInfo, String topic, MessageHandler msgHandler) {
101        if (!isTopicInRetryList(connInfo, topic)) {
102            if (isConnectionInRetryList(connInfo)) {
103                queueTopicForRetry(connInfo, topic, msgHandler);
104            }
105            else {
106                Map<String, MessageReceiver> topicsMap = getReceiversTopicsMap(connInfo);
107                if (!topicsMap.containsKey(topic)) {
108                    synchronized (topicsMap) {
109                        if (!topicsMap.containsKey(topic)) {
110                            ConnectionContext connCtxt = createConnectionContext(connInfo);
111                            if (connCtxt == null) {
112                                queueTopicForRetry(connInfo, topic, msgHandler);
113                                return;
114                            }
115                            MessageReceiver receiver = registerForTopic(connInfo, connCtxt, topic, msgHandler);
116                            if (receiver == null) {
117                                queueTopicForRetry(connInfo, topic, msgHandler);
118                            }
119                            else {
120                                LOG.info("Registered a listener for topic {0} on {1}", topic, connInfo);
121                                topicsMap.put(topic, receiver);
122                            }
123                        }
124                    }
125                }
126            }
127        }
128    }
129
130    /**
131     * Unregister from listening to JMS messages on a topic.
132     *
133     * @param connInfo Information to connect to the JMS compliant messaging service.
134     * @param topic Topic in which the JMS messages are published
135     */
136    public void unregisterFromNotification(JMSConnectionInfo connInfo, String topic) {
137        LOG.info("Unregistering JMS listener. Clossing session for {0} and topic {1}", connInfo, topic);
138
139        if (isTopicInRetryList(connInfo, topic)) {
140            removeTopicFromRetryList(connInfo, topic);
141        }
142        else {
143            Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
144            if (topicsMap != null) {
145                MessageReceiver receiver = null;
146                synchronized (topicsMap) {
147                    receiver = topicsMap.remove(topic);
148                    if (topicsMap.isEmpty()) {
149                        receiversMap.remove(connInfo);
150                    }
151                }
152                if (receiver != null) {
153                    try {
154                        receiver.getSession().close();
155                    }
156                    catch (JMSException e) {
157                        LOG.warn("Unable to close session " + receiver.getSession(), e);
158                    }
159                }
160                else {
161                    LOG.warn("Received request to unregister from topic [{0}] on [{1}], but no matching session.",
162                            topic, connInfo);
163                }
164            }
165        }
166    }
167
168    private Map<String, MessageReceiver> getReceiversTopicsMap(JMSConnectionInfo connInfo) {
169        Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
170        if (topicsMap == null) {
171            topicsMap = new HashMap<String, MessageReceiver>();
172            Map<String, MessageReceiver> exists = receiversMap.putIfAbsent(connInfo, topicsMap);
173            if (exists != null) {
174                topicsMap = exists;
175            }
176        }
177        return topicsMap;
178    }
179
180    /**
181     * Determine if currently listening to JMS messages on a topic.
182     *
183     * @param connInfo Information to connect to the JMS compliant messaging service.
184     * @param topic Topic in which the JMS messages are published
185     * @return true if listening to the topic, else false
186     */
187    @VisibleForTesting
188    boolean isListeningToTopic(JMSConnectionInfo connInfo, String topic) {
189        Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
190        return (topicsMap != null && topicsMap.containsKey(topic));
191    }
192
193    @VisibleForTesting
194    boolean isConnectionInRetryList(JMSConnectionInfo connInfo) {
195        return retryConnectionsMap.containsKey(connInfo);
196    }
197
198    @VisibleForTesting
199    boolean isTopicInRetryList(JMSConnectionInfo connInfo, String topic) {
200        ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo);
201        if (connRetryInfo == null) {
202            return false;
203        }
204        else {
205            Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry();
206            return topicsMap.containsKey(topic);
207        }
208    }
209
210    // For unit testing
211    @VisibleForTesting
212    int getNumConnectionAttempts(JMSConnectionInfo connInfo) {
213        return retryConnectionsMap.get(connInfo).getNumAttempt();
214    }
215
216    private ConnectionRetryInfo queueConnectionForRetry(JMSConnectionInfo connInfo) {
217        ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo);
218        if (connRetryInfo == null) {
219            LOG.info("Queueing connection {0} for retry", connInfo);
220            connRetryInfo = new ConnectionRetryInfo(0, retryInitialDelay);
221            retryConnectionsMap.put(connInfo, connRetryInfo);
222            scheduleRetry(connInfo, retryInitialDelay);
223        }
224        return connRetryInfo;
225    }
226
227    private ConnectionRetryInfo queueTopicForRetry(JMSConnectionInfo connInfo, String topic, MessageHandler msgHandler) {
228        LOG.info("Queueing topic {0} for {1} for retry", topic, connInfo);
229        ConnectionRetryInfo connRetryInfo = queueConnectionForRetry(connInfo);
230        Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry();
231        topicsMap.put(topic, msgHandler);
232        return connRetryInfo;
233    }
234
235    private void removeTopicFromRetryList(JMSConnectionInfo connInfo, String topic) {
236        LOG.info("Removing topic {0} from {1} from retry list", topic, connInfo);
237        ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo);
238        if (connRetryInfo != null) {
239            Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry();
240            topicsMap.remove(topic);
241        }
242    }
243
244    private MessageReceiver registerForTopic(JMSConnectionInfo connInfo, ConnectionContext connCtxt, String topic,
245            MessageHandler msgHandler) {
246        try {
247            Session session = connCtxt.createSession(sessionOpts);
248            MessageConsumer consumer = connCtxt.createConsumer(session, topic);
249            MessageReceiver receiver = new MessageReceiver(msgHandler, session, consumer);
250            consumer.setMessageListener(receiver);
251            return receiver;
252        }
253        catch (JMSException e) {
254            LOG.warn("Error while registering to listen to topic {0} from {1}", topic, connInfo, e);
255            return null;
256        }
257    }
258
259    public ConnectionContext createConnectionContext(JMSConnectionInfo connInfo) {
260        ConnectionContext connCtxt = connectionMap.get(connInfo);
261        if (connCtxt == null) {
262            try {
263                connCtxt = getConnectionContextImpl();
264                connCtxt.createConnection(connInfo.getJNDIProperties());
265                connCtxt.setExceptionListener(new JMSExceptionListener(connInfo, connCtxt, true));
266                connectionMap.put(connInfo, connCtxt);
267                LOG.info("Connection established to JMS Server for [{0}]", connInfo);
268            }
269            catch (Exception e) {
270                LOG.warn("Exception while establishing connection to JMS Server for [{0}]", connInfo, e);
271                return null;
272            }
273        }
274        return connCtxt;
275    }
276
277    public ConnectionContext createProducerConnectionContext(JMSConnectionInfo connInfo) {
278        if (jmsProducerConnContext != null && jmsProducerConnContext.isConnectionInitialized()) {
279            return jmsProducerConnContext;
280        }
281        else {
282            synchronized (this) {
283                if (jmsProducerConnContext == null || !jmsProducerConnContext.isConnectionInitialized()) {
284                    try {
285                        jmsProducerConnContext = getConnectionContextImpl();
286                        jmsProducerConnContext.createConnection(connInfo.getJNDIProperties());
287                        jmsProducerConnContext.setExceptionListener(new JMSExceptionListener(connInfo,
288                                jmsProducerConnContext, false));
289                        LOG.info("Connection established to JMS Server for [{0}]", connInfo);
290                    }
291                    catch (Exception e) {
292                        LOG.warn("Exception while establishing connection to JMS Server for [{0}]", connInfo, e);
293                        return null;
294                    }
295                }
296            }
297        }
298        return jmsProducerConnContext;
299    }
300
301    private ConnectionContext getConnectionContextImpl() {
302        Class<?> defaultClazz = ConfigurationService.getClass(conf, JMS_CONNECTION_CONTEXT_IMPL);
303        ConnectionContext connCtx = null;
304        if (defaultClazz == DefaultConnectionContext.class) {
305            connCtx = new DefaultConnectionContext();
306        }
307        else {
308            connCtx = (ConnectionContext) ReflectionUtils.newInstance(defaultClazz, null);
309        }
310        return connCtx;
311    }
312
313    @VisibleForTesting
314    MessageReceiver getMessageReceiver(JMSConnectionInfo connInfo, String topic) {
315        Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
316        if (topicsMap != null) {
317            return topicsMap.get(topic);
318        }
319        return null;
320    }
321
322    @Override
323    public void destroy() {
324        LOG.info("Destroying JMSAccessor service ");
325        receiversMap.clear();
326
327        LOG.info("Closing JMS connections");
328        for (ConnectionContext conn : connectionMap.values()) {
329            conn.close();
330        }
331        if (jmsProducerConnContext != null) {
332            jmsProducerConnContext.close();
333        }
334        connectionMap.clear();
335    }
336
337    @Override
338    public Class<? extends Service> getInterface() {
339        return JMSAccessorService.class;
340    }
341
342    /**
343     * Reestablish connection for the given JMS connect information
344     * @param connInfo JMS connection info
345     */
346    public void reestablishConnection(JMSConnectionInfo connInfo) {
347        // Queue the connection and topics for retry
348        connectionMap.remove(connInfo);
349        ConnectionRetryInfo connRetryInfo = queueConnectionForRetry(connInfo);
350        Map<String, MessageReceiver> listeningTopicsMap = receiversMap.remove(connInfo);
351        if (listeningTopicsMap != null) {
352            Map<String, MessageHandler> retryTopicsMap = connRetryInfo.getTopicsToRetry();
353            for (Entry<String, MessageReceiver> topicEntry : listeningTopicsMap.entrySet()) {
354                MessageReceiver receiver = topicEntry.getValue();
355                retryTopicsMap.put(topicEntry.getKey(), receiver.getMessageHandler());
356            }
357        }
358    }
359
360    private void scheduleRetry(JMSConnectionInfo connInfo, long delay) {
361        LOG.info("Scheduling retry of connection [{0}] in [{1}] seconds", connInfo, delay);
362        JMSRetryRunnable runnable = new JMSRetryRunnable(connInfo);
363        SchedulerService scheduler = Services.get().get(SchedulerService.class);
364        scheduler.schedule(runnable, delay, SchedulerService.Unit.SEC);
365    }
366
367    @VisibleForTesting
368    boolean retryConnection(JMSConnectionInfo connInfo) {
369        ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo);
370        if (connRetryInfo.getNumAttempt() >= retryMaxAttempts) {
371            LOG.info("Not attempting connection [{0}] again. Reached max attempts [{1}]", connInfo, retryMaxAttempts);
372            return false;
373        }
374        LOG.info("Attempting retry of connection [{0}]", connInfo);
375        connRetryInfo.setNumAttempt(connRetryInfo.getNumAttempt() + 1);
376        connRetryInfo.setNextDelay(connRetryInfo.getNextDelay() * retryMultiplier);
377        ConnectionContext connCtxt = createConnectionContext(connInfo);
378        boolean shouldRetry = false;
379        if (connCtxt == null) {
380            shouldRetry = true;
381        }
382        else {
383            Map<String, MessageHandler> retryTopicsMap = connRetryInfo.getTopicsToRetry();
384            Map<String, MessageReceiver> listeningTopicsMap = getReceiversTopicsMap(connInfo);
385            List<String> topicsToRemoveList = new ArrayList<String>();
386            // For each topic in the retry list, try to register the MessageHandler for that topic
387            for (Entry<String, MessageHandler> topicEntry : retryTopicsMap.entrySet()) {
388                String topic = topicEntry.getKey();
389                if (listeningTopicsMap.containsKey(topic)) {
390                    continue;
391                }
392                synchronized (listeningTopicsMap) {
393                    if (!listeningTopicsMap.containsKey(topic)) {
394                        MessageReceiver receiver = registerForTopic(connInfo, connCtxt, topic, topicEntry.getValue());
395                        if (receiver == null) {
396                            LOG.warn("Failed to register a listener for topic {0} on {1}", topic, connInfo);
397                        }
398                        else {
399                            listeningTopicsMap.put(topic, receiver);
400                            topicsToRemoveList.add(topic);
401                            LOG.info("Registered a listener for topic {0} on {1}", topic, connInfo);
402                        }
403                    }
404                }
405            }
406            for (String topic : topicsToRemoveList) {
407                retryTopicsMap.remove(topic);
408            }
409            if (retryTopicsMap.isEmpty()) {
410                shouldRetry = false;
411            }
412        }
413
414        if (shouldRetry) {
415            scheduleRetry(connInfo, connRetryInfo.getNextDelay());
416        }
417        else {
418            retryConnectionsMap.remove(connInfo);
419        }
420        return true;
421    }
422
423    private static class ConnectionRetryInfo {
424        private int numAttempt;
425        private int nextDelay;
426        private Map<String, MessageHandler> retryTopicsMap;
427
428        public ConnectionRetryInfo(int numAttempt, int nextDelay) {
429            this.numAttempt = numAttempt;
430            this.nextDelay = nextDelay;
431            this.retryTopicsMap = new HashMap<String, MessageHandler>();
432        }
433
434        public int getNumAttempt() {
435            return numAttempt;
436        }
437
438        public void setNumAttempt(int numAttempt) {
439            this.numAttempt = numAttempt;
440        }
441
442        public int getNextDelay() {
443            return nextDelay;
444        }
445
446        public void setNextDelay(int nextDelay) {
447            this.nextDelay = nextDelay;
448        }
449
450        public Map<String, MessageHandler> getTopicsToRetry() {
451            return retryTopicsMap;
452        }
453
454    }
455
456    public class JMSRetryRunnable implements Runnable {
457
458        private JMSConnectionInfo connInfo;
459
460        public JMSRetryRunnable(JMSConnectionInfo connInfo) {
461            this.connInfo = connInfo;
462        }
463
464        public JMSConnectionInfo getJMSConnectionInfo() {
465            return connInfo;
466        }
467
468        @Override
469        public void run() {
470            retryConnection(connInfo);
471        }
472
473    }
474
475}