001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.util.ArrayList;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024import java.util.Map.Entry;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import javax.jms.JMSException;
028import javax.jms.MessageConsumer;
029import javax.jms.Session;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.util.ReflectionUtils;
032import org.apache.oozie.jms.ConnectionContext;
033import org.apache.oozie.jms.DefaultConnectionContext;
034import org.apache.oozie.jms.JMSConnectionInfo;
035import org.apache.oozie.jms.JMSExceptionListener;
036import org.apache.oozie.jms.MessageHandler;
037import org.apache.oozie.jms.MessageReceiver;
038import org.apache.oozie.util.XLog;
039
040import com.google.common.annotations.VisibleForTesting;
041
042/**
043 * This class will <ul>
044 * <li> Create/Manage JMS connections using user configured JNDI properties. </li>
045 * <li> Create/Manage session for specific connection/topic and reconnects on failures. </li>
046 * <li> Provide a way to create a subscriber and publisher </li>
047 * <li> Pure JMS compliant (implementation independent but primarily tested against Apache ActiveMQ). </li>
048 * </ul>
049 */
050public class JMSAccessorService implements Service {
051    public static final String CONF_PREFIX = Service.CONF_PREFIX + "JMSAccessorService.";
052    public static final String JMS_CONNECTION_CONTEXT_IMPL = CONF_PREFIX + "connectioncontext.impl";
053    public static final String SESSION_OPTS = CONF_PREFIX + "jms.sessionOpts";
054    public static final String CONF_RETRY_INITIAL_DELAY = CONF_PREFIX + "retry.initial.delay";
055    public static final String CONF_RETRY_MULTIPLIER = CONF_PREFIX + "retry.multiplier";
056    public static final String CONF_RETRY_MAX_ATTEMPTS = CONF_PREFIX + "retry.max.attempts";
057    private static XLog LOG;
058
059    private Configuration conf;
060    private int sessionOpts;
061    private int retryInitialDelay;
062    private int retryMultiplier;
063    private int retryMaxAttempts;
064    private ConnectionContext jmsProducerConnContext;
065
066    /**
067     * Map of JMS connection info to established JMS Connection
068     */
069    private ConcurrentMap<JMSConnectionInfo, ConnectionContext> connectionMap =
070            new ConcurrentHashMap<JMSConnectionInfo, ConnectionContext>();
071    /**
072     * Map of JMS connection info to topic names to MessageReceiver
073     */
074    private ConcurrentMap<JMSConnectionInfo, Map<String, MessageReceiver>> receiversMap =
075            new ConcurrentHashMap<JMSConnectionInfo, Map<String, MessageReceiver>>();
076
077    /**
078     * Map of JMS connection info to connection retry information
079     */
080    private Map<JMSConnectionInfo, ConnectionRetryInfo> retryConnectionsMap = new HashMap<JMSConnectionInfo, ConnectionRetryInfo>();
081
082    @Override
083    public void init(Services services) throws ServiceException {
084        LOG = XLog.getLog(getClass());
085        conf = services.getConf();
086        sessionOpts = conf.getInt(SESSION_OPTS, Session.AUTO_ACKNOWLEDGE);
087        retryInitialDelay = conf.getInt(CONF_RETRY_INITIAL_DELAY, 60); // initial delay in seconds
088        retryMultiplier = conf.getInt(CONF_RETRY_MULTIPLIER, 2);
089        retryMaxAttempts = conf.getInt(CONF_RETRY_MAX_ATTEMPTS, 10);
090    }
091
092    /**
093     * Register for notifications on a JMS topic.
094     *
095     * @param connInfo Information to connect to a JMS compliant messaging service.
096     * @param topic Topic in which the JMS messages are published
097     * @param msgHandler Handler which will process the messages received on the topic
098     */
099    public void registerForNotification(JMSConnectionInfo connInfo, String topic, MessageHandler msgHandler) {
100        if (!isTopicInRetryList(connInfo, topic)) {
101            if (isConnectionInRetryList(connInfo)) {
102                queueTopicForRetry(connInfo, topic, msgHandler);
103            }
104            else {
105                Map<String, MessageReceiver> topicsMap = getReceiversTopicsMap(connInfo);
106                if (!topicsMap.containsKey(topic)) {
107                    synchronized (topicsMap) {
108                        if (!topicsMap.containsKey(topic)) {
109                            ConnectionContext connCtxt = createConnectionContext(connInfo);
110                            if (connCtxt == null) {
111                                queueTopicForRetry(connInfo, topic, msgHandler);
112                                return;
113                            }
114                            MessageReceiver receiver = registerForTopic(connInfo, connCtxt, topic, msgHandler);
115                            if (receiver == null) {
116                                queueTopicForRetry(connInfo, topic, msgHandler);
117                            }
118                            else {
119                                LOG.info("Registered a listener for topic {0} on {1}", topic, connInfo);
120                                topicsMap.put(topic, receiver);
121                            }
122                        }
123                    }
124                }
125            }
126        }
127    }
128
129    /**
130     * Unregister from listening to JMS messages on a topic.
131     *
132     * @param connInfo Information to connect to the JMS compliant messaging service.
133     * @param topic Topic in which the JMS messages are published
134     */
135    public void unregisterFromNotification(JMSConnectionInfo connInfo, String topic) {
136        LOG.info("Unregistering JMS listener. Clossing session for {0} and topic {1}", connInfo, topic);
137
138        if (isTopicInRetryList(connInfo, topic)) {
139            removeTopicFromRetryList(connInfo, topic);
140        }
141        else {
142            Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
143            if (topicsMap != null) {
144                MessageReceiver receiver = null;
145                synchronized (topicsMap) {
146                    receiver = topicsMap.remove(topic);
147                    if (topicsMap.isEmpty()) {
148                        receiversMap.remove(connInfo);
149                    }
150                }
151                if (receiver != null) {
152                    try {
153                        receiver.getSession().close();
154                    }
155                    catch (JMSException e) {
156                        LOG.warn("Unable to close session " + receiver.getSession(), e);
157                    }
158                }
159                else {
160                    LOG.warn("Received request to unregister from topic [{0}] on [{1}], but no matching session.",
161                            topic, connInfo);
162                }
163            }
164        }
165    }
166
167    private Map<String, MessageReceiver> getReceiversTopicsMap(JMSConnectionInfo connInfo) {
168        Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
169        if (topicsMap == null) {
170            topicsMap = new HashMap<String, MessageReceiver>();
171            Map<String, MessageReceiver> exists = receiversMap.putIfAbsent(connInfo, topicsMap);
172            if (exists != null) {
173                topicsMap = exists;
174            }
175        }
176        return topicsMap;
177    }
178
179    /**
180     * Determine if currently listening to JMS messages on a topic.
181     *
182     * @param connInfo Information to connect to the JMS compliant messaging service.
183     * @param topic Topic in which the JMS messages are published
184     * @return true if listening to the topic, else false
185     */
186    @VisibleForTesting
187    boolean isListeningToTopic(JMSConnectionInfo connInfo, String topic) {
188        Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
189        return (topicsMap != null && topicsMap.containsKey(topic));
190    }
191
192    @VisibleForTesting
193    boolean isConnectionInRetryList(JMSConnectionInfo connInfo) {
194        return retryConnectionsMap.containsKey(connInfo);
195    }
196
197    @VisibleForTesting
198    boolean isTopicInRetryList(JMSConnectionInfo connInfo, String topic) {
199        ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo);
200        if (connRetryInfo == null) {
201            return false;
202        }
203        else {
204            Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry();
205            return topicsMap.containsKey(topic);
206        }
207    }
208
209    // For unit testing
210    @VisibleForTesting
211    int getNumConnectionAttempts(JMSConnectionInfo connInfo) {
212        return retryConnectionsMap.get(connInfo).getNumAttempt();
213    }
214
215    private ConnectionRetryInfo queueConnectionForRetry(JMSConnectionInfo connInfo) {
216        ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo);
217        if (connRetryInfo == null) {
218            LOG.info("Queueing connection {0} for retry", connInfo);
219            connRetryInfo = new ConnectionRetryInfo(0, retryInitialDelay);
220            retryConnectionsMap.put(connInfo, connRetryInfo);
221            scheduleRetry(connInfo, retryInitialDelay);
222        }
223        return connRetryInfo;
224    }
225
226    private ConnectionRetryInfo queueTopicForRetry(JMSConnectionInfo connInfo, String topic, MessageHandler msgHandler) {
227        LOG.info("Queueing topic {0} for {1} for retry", topic, connInfo);
228        ConnectionRetryInfo connRetryInfo = queueConnectionForRetry(connInfo);
229        Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry();
230        topicsMap.put(topic, msgHandler);
231        return connRetryInfo;
232    }
233
234    private void removeTopicFromRetryList(JMSConnectionInfo connInfo, String topic) {
235        LOG.info("Removing topic {0} from {1} from retry list", topic, connInfo);
236        ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo);
237        if (connRetryInfo != null) {
238            Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry();
239            topicsMap.remove(topic);
240        }
241    }
242
243    private MessageReceiver registerForTopic(JMSConnectionInfo connInfo, ConnectionContext connCtxt, String topic,
244            MessageHandler msgHandler) {
245        try {
246            Session session = connCtxt.createSession(sessionOpts);
247            MessageConsumer consumer = connCtxt.createConsumer(session, topic);
248            MessageReceiver receiver = new MessageReceiver(msgHandler, session, consumer);
249            consumer.setMessageListener(receiver);
250            return receiver;
251        }
252        catch (JMSException e) {
253            LOG.warn("Error while registering to listen to topic {0} from {1}", topic, connInfo, e);
254            return null;
255        }
256    }
257
258    public ConnectionContext createConnectionContext(JMSConnectionInfo connInfo) {
259        ConnectionContext connCtxt = connectionMap.get(connInfo);
260        if (connCtxt == null) {
261            try {
262                connCtxt = getConnectionContextImpl();
263                connCtxt.createConnection(connInfo.getJNDIProperties());
264                connCtxt.setExceptionListener(new JMSExceptionListener(connInfo, connCtxt, true));
265                connectionMap.put(connInfo, connCtxt);
266                LOG.info("Connection established to JMS Server for [{0}]", connInfo);
267            }
268            catch (Exception e) {
269                LOG.warn("Exception while establishing connection to JMS Server for [{0}]", connInfo, e);
270                return null;
271            }
272        }
273        return connCtxt;
274    }
275
276    public ConnectionContext createProducerConnectionContext(JMSConnectionInfo connInfo) {
277        if (jmsProducerConnContext != null && jmsProducerConnContext.isConnectionInitialized()) {
278            return jmsProducerConnContext;
279        }
280        else {
281            synchronized (this) {
282                if (jmsProducerConnContext == null || !jmsProducerConnContext.isConnectionInitialized()) {
283                    try {
284                        jmsProducerConnContext = getConnectionContextImpl();
285                        jmsProducerConnContext.createConnection(connInfo.getJNDIProperties());
286                        jmsProducerConnContext.setExceptionListener(new JMSExceptionListener(connInfo,
287                                jmsProducerConnContext, false));
288                        LOG.info("Connection established to JMS Server for [{0}]", connInfo);
289                    }
290                    catch (Exception e) {
291                        LOG.warn("Exception while establishing connection to JMS Server for [{0}]", connInfo, e);
292                        return null;
293                    }
294                }
295            }
296        }
297        return jmsProducerConnContext;
298    }
299
300    private ConnectionContext getConnectionContextImpl() {
301        Class<?> defaultClazz = conf.getClass(JMS_CONNECTION_CONTEXT_IMPL, DefaultConnectionContext.class);
302        ConnectionContext connCtx = null;
303        if (defaultClazz == DefaultConnectionContext.class) {
304            connCtx = new DefaultConnectionContext();
305        }
306        else {
307            connCtx = (ConnectionContext) ReflectionUtils.newInstance(defaultClazz, null);
308        }
309        return connCtx;
310    }
311
312    @VisibleForTesting
313    MessageReceiver getMessageReceiver(JMSConnectionInfo connInfo, String topic) {
314        Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
315        if (topicsMap != null) {
316            return topicsMap.get(topic);
317        }
318        return null;
319    }
320
321    @Override
322    public void destroy() {
323        LOG.info("Destroying JMSAccessor service ");
324        receiversMap.clear();
325
326        LOG.info("Closing JMS connections");
327        for (ConnectionContext conn : connectionMap.values()) {
328            conn.close();
329        }
330        if (jmsProducerConnContext != null) {
331            jmsProducerConnContext.close();
332        }
333        connectionMap.clear();
334    }
335
336    @Override
337    public Class<? extends Service> getInterface() {
338        return JMSAccessorService.class;
339    }
340
341    /**
342     * Reestablish connection for the given JMS connect information
343     * @param connInfo JMS connection info
344     */
345    public void reestablishConnection(JMSConnectionInfo connInfo) {
346        // Queue the connection and topics for retry
347        connectionMap.remove(connInfo);
348        ConnectionRetryInfo connRetryInfo = queueConnectionForRetry(connInfo);
349        Map<String, MessageReceiver> listeningTopicsMap = receiversMap.remove(connInfo);
350        if (listeningTopicsMap != null) {
351            Map<String, MessageHandler> retryTopicsMap = connRetryInfo.getTopicsToRetry();
352            for (Entry<String, MessageReceiver> topicEntry : listeningTopicsMap.entrySet()) {
353                MessageReceiver receiver = topicEntry.getValue();
354                retryTopicsMap.put(topicEntry.getKey(), receiver.getMessageHandler());
355            }
356        }
357    }
358
359    private void scheduleRetry(JMSConnectionInfo connInfo, long delay) {
360        LOG.info("Scheduling retry of connection [{0}] in [{1}] seconds", connInfo, delay);
361        JMSRetryRunnable runnable = new JMSRetryRunnable(connInfo);
362        SchedulerService scheduler = Services.get().get(SchedulerService.class);
363        scheduler.schedule(runnable, delay, SchedulerService.Unit.SEC);
364    }
365
366    @VisibleForTesting
367    boolean retryConnection(JMSConnectionInfo connInfo) {
368        ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo);
369        if (connRetryInfo.getNumAttempt() >= retryMaxAttempts) {
370            LOG.info("Not attempting connection [{0}] again. Reached max attempts [{1}]", connInfo, retryMaxAttempts);
371            return false;
372        }
373        LOG.info("Attempting retry of connection [{0}]", connInfo);
374        connRetryInfo.setNumAttempt(connRetryInfo.getNumAttempt() + 1);
375        connRetryInfo.setNextDelay(connRetryInfo.getNextDelay() * retryMultiplier);
376        ConnectionContext connCtxt = createConnectionContext(connInfo);
377        boolean shouldRetry = false;
378        if (connCtxt == null) {
379            shouldRetry = true;
380        }
381        else {
382            Map<String, MessageHandler> retryTopicsMap = connRetryInfo.getTopicsToRetry();
383            Map<String, MessageReceiver> listeningTopicsMap = getReceiversTopicsMap(connInfo);
384            List<String> topicsToRemoveList = new ArrayList<String>();
385            // For each topic in the retry list, try to register the MessageHandler for that topic
386            for (Entry<String, MessageHandler> topicEntry : retryTopicsMap.entrySet()) {
387                String topic = topicEntry.getKey();
388                if (listeningTopicsMap.containsKey(topic)) {
389                    continue;
390                }
391                synchronized (listeningTopicsMap) {
392                    if (!listeningTopicsMap.containsKey(topic)) {
393                        MessageReceiver receiver = registerForTopic(connInfo, connCtxt, topic, topicEntry.getValue());
394                        if (receiver == null) {
395                            LOG.warn("Failed to register a listener for topic {0} on {1}", topic, connInfo);
396                        }
397                        else {
398                            listeningTopicsMap.put(topic, receiver);
399                            topicsToRemoveList.add(topic);
400                            LOG.info("Registered a listener for topic {0} on {1}", topic, connInfo);
401                        }
402                    }
403                }
404            }
405            for (String topic : topicsToRemoveList) {
406                retryTopicsMap.remove(topic);
407            }
408            if (retryTopicsMap.isEmpty()) {
409                shouldRetry = false;
410            }
411        }
412
413        if (shouldRetry) {
414            scheduleRetry(connInfo, connRetryInfo.getNextDelay());
415        }
416        else {
417            retryConnectionsMap.remove(connInfo);
418        }
419        return true;
420    }
421
422    private static class ConnectionRetryInfo {
423        private int numAttempt;
424        private int nextDelay;
425        private Map<String, MessageHandler> retryTopicsMap;
426
427        public ConnectionRetryInfo(int numAttempt, int nextDelay) {
428            this.numAttempt = numAttempt;
429            this.nextDelay = nextDelay;
430            this.retryTopicsMap = new HashMap<String, MessageHandler>();
431        }
432
433        public int getNumAttempt() {
434            return numAttempt;
435        }
436
437        public void setNumAttempt(int numAttempt) {
438            this.numAttempt = numAttempt;
439        }
440
441        public int getNextDelay() {
442            return nextDelay;
443        }
444
445        public void setNextDelay(int nextDelay) {
446            this.nextDelay = nextDelay;
447        }
448
449        public Map<String, MessageHandler> getTopicsToRetry() {
450            return retryTopicsMap;
451        }
452
453    }
454
455    public class JMSRetryRunnable implements Runnable {
456
457        private JMSConnectionInfo connInfo;
458
459        public JMSRetryRunnable(JMSConnectionInfo connInfo) {
460            this.connInfo = connInfo;
461        }
462
463        public JMSConnectionInfo getJMSConnectionInfo() {
464            return connInfo;
465        }
466
467        @Override
468        public void run() {
469            retryConnection(connInfo);
470        }
471
472    }
473
474}