001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.jms;
019    
020    import java.util.Properties;
021    
022    import javax.jms.Connection;
023    import javax.jms.ConnectionFactory;
024    import javax.jms.ExceptionListener;
025    import javax.jms.JMSException;
026    import javax.jms.MessageConsumer;
027    import javax.jms.MessageProducer;
028    import javax.jms.Session;
029    import javax.jms.Topic;
030    import javax.naming.Context;
031    import javax.naming.InitialContext;
032    import javax.naming.NamingException;
033    
034    import org.apache.oozie.util.XLog;
035    
036    public class DefaultConnectionContext implements ConnectionContext {
037    
038        protected Connection connection;
039        protected String connectionFactoryName;
040        private static XLog LOG = XLog.getLog(ConnectionContext.class);
041    
042        @Override
043        public void createConnection(Properties props) throws NamingException, JMSException {
044            Context jndiContext = new InitialContext(props);
045            connectionFactoryName = (String) jndiContext.getEnvironment().get("connectionFactoryNames");
046            if (connectionFactoryName == null || connectionFactoryName.trim().length() == 0) {
047                connectionFactoryName = "ConnectionFactory";
048            }
049            ConnectionFactory connectionFactory = (ConnectionFactory) jndiContext.lookup(connectionFactoryName);
050            LOG.info("Connecting with the following properties \n" + jndiContext.getEnvironment().toString());
051            try {
052                connection = connectionFactory.createConnection();
053                connection.start();
054                connection.setExceptionListener(new ExceptionListener() {
055                    @Override
056                    public void onException(JMSException je) {
057                        LOG.error("Error in JMS connection", je);
058                    }
059                });
060            }
061            catch (JMSException e1) {
062                LOG.error(e1.getMessage(), e1);
063                if (connection != null) {
064                    try {
065                        connection.close();
066                    }
067                    catch (Exception e2) {
068                        LOG.error(e2.getMessage(), e2);
069                    }
070                    finally {
071                        connection = null;
072                    }
073                }
074                throw e1;
075            }
076        }
077    
078        @Override
079        public boolean isConnectionInitialized() {
080            return connection != null;
081        }
082    
083        @Override
084        public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
085            connection.setExceptionListener(exceptionListener);
086        }
087    
088        @Override
089        public Session createSession(int sessionOpts) throws JMSException {
090            if (connection == null) {
091                throw new JMSException ("Connection is not initialized");
092            }
093            return connection.createSession(false, sessionOpts);
094        }
095    
096        @Override
097        public MessageConsumer createConsumer(Session session, String topicName) throws JMSException {
098            Topic topic = session.createTopic(topicName);
099            MessageConsumer consumer = session.createConsumer(topic);
100            return consumer;
101        }
102    
103        @Override
104        public MessageProducer createProducer(Session session, String topicName) throws JMSException {
105            Topic topic = session.createTopic(topicName);
106            MessageProducer producer = session.createProducer(topic);
107            return producer;
108        }
109    
110        @Override
111        public void close() {
112            if (connection != null) {
113                try {
114                    connection.close();
115                }
116                catch (JMSException e) {
117                    LOG.warn("Unable to close the connection " + connection, e);
118                }
119                finally {
120                    connection = null;
121                }
122            }
123            th = null;
124        }
125    
126        private ThreadLocal<Session> th = new ThreadLocal<Session>();
127    
128        @Override
129        public Session createThreadLocalSession(final int sessionOpts) throws JMSException {
130            Session session = th.get();
131            if (session != null) {
132                return session;
133            }
134            th.remove();
135            session = createSession(sessionOpts);
136            th.set(session);
137            return session;
138        }
139    
140        @Override
141        public MessageConsumer createConsumer(Session session, String topicName, String selector) throws JMSException {
142            Topic topic = session.createTopic(topicName);
143            MessageConsumer consumer = session.createConsumer(topic, selector);
144            return consumer;
145        }
146    
147    }