001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.util.ArrayList;
021    import java.util.HashMap;
022    import java.util.List;
023    import java.util.Map;
024    import java.util.Map.Entry;
025    import java.util.concurrent.ConcurrentHashMap;
026    import java.util.concurrent.ConcurrentMap;
027    import javax.jms.JMSException;
028    import javax.jms.MessageConsumer;
029    import javax.jms.Session;
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.hadoop.util.ReflectionUtils;
032    import org.apache.oozie.jms.ConnectionContext;
033    import org.apache.oozie.jms.DefaultConnectionContext;
034    import org.apache.oozie.jms.JMSConnectionInfo;
035    import org.apache.oozie.jms.JMSExceptionListener;
036    import org.apache.oozie.jms.MessageHandler;
037    import org.apache.oozie.jms.MessageReceiver;
038    import org.apache.oozie.util.XLog;
039    
040    import com.google.common.annotations.VisibleForTesting;
041    
042    /**
043     * This class will <ul>
044     * <li> Create/Manage JMS connections using user configured JNDI properties. </li>
045     * <li> Create/Manage session for specific connection/topic and reconnects on failures. </li>
046     * <li> Provide a way to create a subscriber and publisher </li>
047     * <li> Pure JMS compliant (implementation independent but primarily tested against Apache ActiveMQ). </li>
048     * </ul>
049     */
050    public class JMSAccessorService implements Service {
051        public static final String CONF_PREFIX = Service.CONF_PREFIX + "JMSAccessorService.";
052        public static final String JMS_CONNECTION_CONTEXT_IMPL = CONF_PREFIX + "connectioncontext.impl";
053        public static final String SESSION_OPTS = CONF_PREFIX + "jms.sessionOpts";
054        public static final String CONF_RETRY_INITIAL_DELAY = CONF_PREFIX + "retry.initial.delay";
055        public static final String CONF_RETRY_MULTIPLIER = CONF_PREFIX + "retry.multiplier";
056        public static final String CONF_RETRY_MAX_ATTEMPTS = CONF_PREFIX + "retry.max.attempts";
057        private static XLog LOG;
058    
059        private Configuration conf;
060        private int sessionOpts;
061        private int retryInitialDelay;
062        private int retryMultiplier;
063        private int retryMaxAttempts;
064        private ConnectionContext jmsProducerConnContext;
065    
066        /**
067         * Map of JMS connection info to established JMS Connection
068         */
069        private ConcurrentMap<JMSConnectionInfo, ConnectionContext> connectionMap =
070                new ConcurrentHashMap<JMSConnectionInfo, ConnectionContext>();
071        /**
072         * Map of JMS connection info to topic names to MessageReceiver
073         */
074        private ConcurrentMap<JMSConnectionInfo, Map<String, MessageReceiver>> receiversMap =
075                new ConcurrentHashMap<JMSConnectionInfo, Map<String, MessageReceiver>>();
076    
077        /**
078         * Map of JMS connection info to connection retry information
079         */
080        private Map<JMSConnectionInfo, ConnectionRetryInfo> retryConnectionsMap = new HashMap<JMSConnectionInfo, ConnectionRetryInfo>();
081    
082        @Override
083        public void init(Services services) throws ServiceException {
084            LOG = XLog.getLog(getClass());
085            conf = services.getConf();
086            sessionOpts = conf.getInt(SESSION_OPTS, Session.AUTO_ACKNOWLEDGE);
087            retryInitialDelay = conf.getInt(CONF_RETRY_INITIAL_DELAY, 60); // initial delay in seconds
088            retryMultiplier = conf.getInt(CONF_RETRY_MULTIPLIER, 2);
089            retryMaxAttempts = conf.getInt(CONF_RETRY_MAX_ATTEMPTS, 10);
090        }
091    
092        /**
093         * Register for notifications on a JMS topic.
094         *
095         * @param connInfo Information to connect to a JMS compliant messaging service.
096         * @param topic Topic in which the JMS messages are published
097         * @param msgHandler Handler which will process the messages received on the topic
098         */
099        public void registerForNotification(JMSConnectionInfo connInfo, String topic, MessageHandler msgHandler) {
100            if (!isTopicInRetryList(connInfo, topic)) {
101                if (isConnectionInRetryList(connInfo)) {
102                    queueTopicForRetry(connInfo, topic, msgHandler);
103                }
104                else {
105                    Map<String, MessageReceiver> topicsMap = getReceiversTopicsMap(connInfo);
106                    if (!topicsMap.containsKey(topic)) {
107                        synchronized (topicsMap) {
108                            if (!topicsMap.containsKey(topic)) {
109                                ConnectionContext connCtxt = createConnectionContext(connInfo);
110                                if (connCtxt == null) {
111                                    queueTopicForRetry(connInfo, topic, msgHandler);
112                                    return;
113                                }
114                                MessageReceiver receiver = registerForTopic(connInfo, connCtxt, topic, msgHandler);
115                                if (receiver == null) {
116                                    queueTopicForRetry(connInfo, topic, msgHandler);
117                                }
118                                else {
119                                    LOG.info("Registered a listener for topic {0} on {1}", topic, connInfo);
120                                    topicsMap.put(topic, receiver);
121                                }
122                            }
123                        }
124                    }
125                }
126            }
127        }
128    
129        /**
130         * Unregister from listening to JMS messages on a topic.
131         *
132         * @param connInfo Information to connect to the JMS compliant messaging service.
133         * @param topic Topic in which the JMS messages are published
134         */
135        public void unregisterFromNotification(JMSConnectionInfo connInfo, String topic) {
136            LOG.info("Unregistering JMS listener. Clossing session for {0} and topic {1}", connInfo, topic);
137    
138            if (isTopicInRetryList(connInfo, topic)) {
139                removeTopicFromRetryList(connInfo, topic);
140            }
141            else {
142                Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
143                if (topicsMap != null) {
144                    MessageReceiver receiver = null;
145                    synchronized (topicsMap) {
146                        receiver = topicsMap.remove(topic);
147                        if (topicsMap.isEmpty()) {
148                            receiversMap.remove(connInfo);
149                        }
150                    }
151                    if (receiver != null) {
152                        try {
153                            receiver.getSession().close();
154                        }
155                        catch (JMSException e) {
156                            LOG.warn("Unable to close session " + receiver.getSession(), e);
157                        }
158                    }
159                    else {
160                        LOG.warn("Received request to unregister from topic [{0}] on [{1}], but no matching session.",
161                                topic, connInfo);
162                    }
163                }
164            }
165        }
166    
167        private Map<String, MessageReceiver> getReceiversTopicsMap(JMSConnectionInfo connInfo) {
168            Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
169            if (topicsMap == null) {
170                topicsMap = new HashMap<String, MessageReceiver>();
171                Map<String, MessageReceiver> exists = receiversMap.putIfAbsent(connInfo, topicsMap);
172                if (exists != null) {
173                    topicsMap = exists;
174                }
175            }
176            return topicsMap;
177        }
178    
179        /**
180         * Determine if currently listening to JMS messages on a topic.
181         *
182         * @param connInfo Information to connect to the JMS compliant messaging service.
183         * @param topic Topic in which the JMS messages are published
184         * @return true if listening to the topic, else false
185         */
186        @VisibleForTesting
187        boolean isListeningToTopic(JMSConnectionInfo connInfo, String topic) {
188            Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
189            return (topicsMap != null && topicsMap.containsKey(topic));
190        }
191    
192        @VisibleForTesting
193        boolean isConnectionInRetryList(JMSConnectionInfo connInfo) {
194            return retryConnectionsMap.containsKey(connInfo);
195        }
196    
197        @VisibleForTesting
198        boolean isTopicInRetryList(JMSConnectionInfo connInfo, String topic) {
199            ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo);
200            if (connRetryInfo == null) {
201                return false;
202            }
203            else {
204                Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry();
205                return topicsMap.containsKey(topic);
206            }
207        }
208    
209        // For unit testing
210        @VisibleForTesting
211        int getNumConnectionAttempts(JMSConnectionInfo connInfo) {
212            return retryConnectionsMap.get(connInfo).getNumAttempt();
213        }
214    
215        private ConnectionRetryInfo queueConnectionForRetry(JMSConnectionInfo connInfo) {
216            ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo);
217            if (connRetryInfo == null) {
218                LOG.info("Queueing connection {0} for retry", connInfo);
219                connRetryInfo = new ConnectionRetryInfo(0, retryInitialDelay);
220                retryConnectionsMap.put(connInfo, connRetryInfo);
221                scheduleRetry(connInfo, retryInitialDelay);
222            }
223            return connRetryInfo;
224        }
225    
226        private ConnectionRetryInfo queueTopicForRetry(JMSConnectionInfo connInfo, String topic, MessageHandler msgHandler) {
227            LOG.info("Queueing topic {0} for {1} for retry", topic, connInfo);
228            ConnectionRetryInfo connRetryInfo = queueConnectionForRetry(connInfo);
229            Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry();
230            topicsMap.put(topic, msgHandler);
231            return connRetryInfo;
232        }
233    
234        private void removeTopicFromRetryList(JMSConnectionInfo connInfo, String topic) {
235            LOG.info("Removing topic {0} from {1} from retry list", topic, connInfo);
236            ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo);
237            if (connRetryInfo != null) {
238                Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry();
239                topicsMap.remove(topic);
240            }
241        }
242    
243        private MessageReceiver registerForTopic(JMSConnectionInfo connInfo, ConnectionContext connCtxt, String topic,
244                MessageHandler msgHandler) {
245            try {
246                Session session = connCtxt.createSession(sessionOpts);
247                MessageConsumer consumer = connCtxt.createConsumer(session, topic);
248                MessageReceiver receiver = new MessageReceiver(msgHandler, session, consumer);
249                consumer.setMessageListener(receiver);
250                return receiver;
251            }
252            catch (JMSException e) {
253                LOG.warn("Error while registering to listen to topic {0} from {1}", topic, connInfo, e);
254                return null;
255            }
256        }
257    
258        public ConnectionContext createConnectionContext(JMSConnectionInfo connInfo) {
259            ConnectionContext connCtxt = connectionMap.get(connInfo);
260            if (connCtxt == null) {
261                try {
262                    connCtxt = getConnectionContextImpl();
263                    connCtxt.createConnection(connInfo.getJNDIProperties());
264                    connCtxt.setExceptionListener(new JMSExceptionListener(connInfo, connCtxt, true));
265                    connectionMap.put(connInfo, connCtxt);
266                    LOG.info("Connection established to JMS Server for [{0}]", connInfo);
267                }
268                catch (Exception e) {
269                    LOG.warn("Exception while establishing connection to JMS Server for [{0}]", connInfo, e);
270                    return null;
271                }
272            }
273            return connCtxt;
274        }
275    
276        public ConnectionContext createProducerConnectionContext(JMSConnectionInfo connInfo) {
277            if (jmsProducerConnContext != null && jmsProducerConnContext.isConnectionInitialized()) {
278                return jmsProducerConnContext;
279            }
280            else {
281                synchronized (this) {
282                    if (jmsProducerConnContext == null || !jmsProducerConnContext.isConnectionInitialized()) {
283                        try {
284                            jmsProducerConnContext = getConnectionContextImpl();
285                            jmsProducerConnContext.createConnection(connInfo.getJNDIProperties());
286                            jmsProducerConnContext.setExceptionListener(new JMSExceptionListener(connInfo,
287                                    jmsProducerConnContext, false));
288                            LOG.info("Connection established to JMS Server for [{0}]", connInfo);
289                        }
290                        catch (Exception e) {
291                            LOG.warn("Exception while establishing connection to JMS Server for [{0}]", connInfo, e);
292                            return null;
293                        }
294                    }
295                }
296            }
297            return jmsProducerConnContext;
298        }
299    
300        private ConnectionContext getConnectionContextImpl() {
301            Class<?> defaultClazz = conf.getClass(JMS_CONNECTION_CONTEXT_IMPL, DefaultConnectionContext.class);
302            ConnectionContext connCtx = null;
303            if (defaultClazz == DefaultConnectionContext.class) {
304                connCtx = new DefaultConnectionContext();
305            }
306            else {
307                connCtx = (ConnectionContext) ReflectionUtils.newInstance(defaultClazz, null);
308            }
309            return connCtx;
310        }
311    
312        @VisibleForTesting
313        MessageReceiver getMessageReceiver(JMSConnectionInfo connInfo, String topic) {
314            Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
315            if (topicsMap != null) {
316                return topicsMap.get(topic);
317            }
318            return null;
319        }
320    
321        @Override
322        public void destroy() {
323            LOG.info("Destroying JMSAccessor service ");
324            receiversMap.clear();
325    
326            LOG.info("Closing JMS connections");
327            for (ConnectionContext conn : connectionMap.values()) {
328                conn.close();
329            }
330            if (jmsProducerConnContext != null) {
331                jmsProducerConnContext.close();
332            }
333            connectionMap.clear();
334        }
335    
336        @Override
337        public Class<? extends Service> getInterface() {
338            return JMSAccessorService.class;
339        }
340    
341        /**
342         * Reestablish connection for the given JMS connect information
343         * @param connInfo JMS connection info
344         */
345        public void reestablishConnection(JMSConnectionInfo connInfo) {
346            // Queue the connection and topics for retry
347            connectionMap.remove(connInfo);
348            ConnectionRetryInfo connRetryInfo = queueConnectionForRetry(connInfo);
349            Map<String, MessageReceiver> listeningTopicsMap = receiversMap.remove(connInfo);
350            if (listeningTopicsMap != null) {
351                Map<String, MessageHandler> retryTopicsMap = connRetryInfo.getTopicsToRetry();
352                for (Entry<String, MessageReceiver> topicEntry : listeningTopicsMap.entrySet()) {
353                    MessageReceiver receiver = topicEntry.getValue();
354                    retryTopicsMap.put(topicEntry.getKey(), receiver.getMessageHandler());
355                }
356            }
357        }
358    
359        private void scheduleRetry(JMSConnectionInfo connInfo, long delay) {
360            LOG.info("Scheduling retry of connection [{0}] in [{1}] seconds", connInfo, delay);
361            JMSRetryRunnable runnable = new JMSRetryRunnable(connInfo);
362            SchedulerService scheduler = Services.get().get(SchedulerService.class);
363            scheduler.schedule(runnable, delay, SchedulerService.Unit.SEC);
364        }
365    
366        @VisibleForTesting
367        boolean retryConnection(JMSConnectionInfo connInfo) {
368            ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo);
369            if (connRetryInfo.getNumAttempt() >= retryMaxAttempts) {
370                LOG.info("Not attempting connection [{0}] again. Reached max attempts [{1}]", connInfo, retryMaxAttempts);
371                return false;
372            }
373            LOG.info("Attempting retry of connection [{0}]", connInfo);
374            connRetryInfo.setNumAttempt(connRetryInfo.getNumAttempt() + 1);
375            connRetryInfo.setNextDelay(connRetryInfo.getNextDelay() * retryMultiplier);
376            ConnectionContext connCtxt = createConnectionContext(connInfo);
377            boolean shouldRetry = false;
378            if (connCtxt == null) {
379                shouldRetry = true;
380            }
381            else {
382                Map<String, MessageHandler> retryTopicsMap = connRetryInfo.getTopicsToRetry();
383                Map<String, MessageReceiver> listeningTopicsMap = getReceiversTopicsMap(connInfo);
384                List<String> topicsToRemoveList = new ArrayList<String>();
385                // For each topic in the retry list, try to register the MessageHandler for that topic
386                for (Entry<String, MessageHandler> topicEntry : retryTopicsMap.entrySet()) {
387                    String topic = topicEntry.getKey();
388                    if (listeningTopicsMap.containsKey(topic)) {
389                        continue;
390                    }
391                    synchronized (listeningTopicsMap) {
392                        if (!listeningTopicsMap.containsKey(topic)) {
393                            MessageReceiver receiver = registerForTopic(connInfo, connCtxt, topic, topicEntry.getValue());
394                            if (receiver == null) {
395                                LOG.warn("Failed to register a listener for topic {0} on {1}", topic, connInfo);
396                            }
397                            else {
398                                listeningTopicsMap.put(topic, receiver);
399                                topicsToRemoveList.add(topic);
400                                LOG.info("Registered a listener for topic {0} on {1}", topic, connInfo);
401                            }
402                        }
403                    }
404                }
405                for (String topic : topicsToRemoveList) {
406                    retryTopicsMap.remove(topic);
407                }
408                if (retryTopicsMap.isEmpty()) {
409                    shouldRetry = false;
410                }
411            }
412    
413            if (shouldRetry) {
414                scheduleRetry(connInfo, connRetryInfo.getNextDelay());
415            }
416            else {
417                retryConnectionsMap.remove(connInfo);
418            }
419            return true;
420        }
421    
422        private static class ConnectionRetryInfo {
423            private int numAttempt;
424            private int nextDelay;
425            private Map<String, MessageHandler> retryTopicsMap;
426    
427            public ConnectionRetryInfo(int numAttempt, int nextDelay) {
428                this.numAttempt = numAttempt;
429                this.nextDelay = nextDelay;
430                this.retryTopicsMap = new HashMap<String, MessageHandler>();
431            }
432    
433            public int getNumAttempt() {
434                return numAttempt;
435            }
436    
437            public void setNumAttempt(int numAttempt) {
438                this.numAttempt = numAttempt;
439            }
440    
441            public int getNextDelay() {
442                return nextDelay;
443            }
444    
445            public void setNextDelay(int nextDelay) {
446                this.nextDelay = nextDelay;
447            }
448    
449            public Map<String, MessageHandler> getTopicsToRetry() {
450                return retryTopicsMap;
451            }
452    
453        }
454    
455        public class JMSRetryRunnable implements Runnable {
456    
457            private JMSConnectionInfo connInfo;
458    
459            public JMSRetryRunnable(JMSConnectionInfo connInfo) {
460                this.connInfo = connInfo;
461            }
462    
463            public JMSConnectionInfo getJMSConnectionInfo() {
464                return connInfo;
465            }
466    
467            @Override
468            public void run() {
469                retryConnection(connInfo);
470            }
471    
472        }
473    
474    }