001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.jms;
020
021import java.util.Properties;
022
023import javax.jms.Connection;
024import javax.jms.ConnectionFactory;
025import javax.jms.ExceptionListener;
026import javax.jms.JMSException;
027import javax.jms.MessageConsumer;
028import javax.jms.MessageProducer;
029import javax.jms.Session;
030import javax.jms.Topic;
031import javax.naming.Context;
032import javax.naming.InitialContext;
033import javax.naming.NamingException;
034
035import org.apache.oozie.util.XLog;
036
037public class DefaultConnectionContext implements ConnectionContext {
038
039    protected Connection connection;
040    protected String connectionFactoryName;
041    private static XLog LOG = XLog.getLog(ConnectionContext.class);
042
043    @Override
044    public void createConnection(Properties props) throws NamingException, JMSException {
045        Context jndiContext = new InitialContext(props);
046        connectionFactoryName = (String) jndiContext.getEnvironment().get("connectionFactoryNames");
047        if (connectionFactoryName == null || connectionFactoryName.trim().length() == 0) {
048            connectionFactoryName = "ConnectionFactory";
049        }
050        ConnectionFactory connectionFactory = (ConnectionFactory) jndiContext.lookup(connectionFactoryName);
051        LOG.info("Connecting with the following properties \n" + jndiContext.getEnvironment().toString());
052        try {
053            connection = connectionFactory.createConnection();
054            connection.start();
055            connection.setExceptionListener(new ExceptionListener() {
056                @Override
057                public void onException(JMSException je) {
058                    LOG.error("Error in JMS connection", je);
059                }
060            });
061        }
062        catch (JMSException e1) {
063            LOG.error(e1.getMessage(), e1);
064            if (connection != null) {
065                try {
066                    connection.close();
067                }
068                catch (Exception e2) {
069                    LOG.error(e2.getMessage(), e2);
070                }
071                finally {
072                    connection = null;
073                }
074            }
075            throw e1;
076        }
077    }
078
079    @Override
080    public boolean isConnectionInitialized() {
081        return connection != null;
082    }
083
084    @Override
085    public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
086        connection.setExceptionListener(exceptionListener);
087    }
088
089    @Override
090    public Session createSession(int sessionOpts) throws JMSException {
091        if (connection == null) {
092            throw new JMSException ("Connection is not initialized");
093        }
094        return connection.createSession(false, sessionOpts);
095    }
096
097    @Override
098    public MessageConsumer createConsumer(Session session, String topicName) throws JMSException {
099        Topic topic = session.createTopic(topicName);
100        MessageConsumer consumer = session.createConsumer(topic);
101        return consumer;
102    }
103
104    @Override
105    public MessageProducer createProducer(Session session, String topicName) throws JMSException {
106        Topic topic = session.createTopic(topicName);
107        MessageProducer producer = session.createProducer(topic);
108        return producer;
109    }
110
111    @Override
112    public void close() {
113        if (connection != null) {
114            try {
115                connection.close();
116            }
117            catch (JMSException e) {
118                LOG.warn("Unable to close the connection " + connection, e);
119            }
120            finally {
121                connection = null;
122            }
123        }
124        th = null;
125    }
126
127    private ThreadLocal<Session> th = new ThreadLocal<Session>();
128
129    @Override
130    public Session createThreadLocalSession(final int sessionOpts) throws JMSException {
131        Session session = th.get();
132        if (session != null) {
133            return session;
134        }
135        th.remove();
136        session = createSession(sessionOpts);
137        th.set(session);
138        return session;
139    }
140
141    @Override
142    public MessageConsumer createConsumer(Session session, String topicName, String selector) throws JMSException {
143        Topic topic = session.createTopic(topicName);
144        MessageConsumer consumer = session.createConsumer(topic, selector);
145        return consumer;
146    }
147
148}