001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.jms;
019
020import java.util.Properties;
021
022import javax.jms.Connection;
023import javax.jms.ConnectionFactory;
024import javax.jms.ExceptionListener;
025import javax.jms.JMSException;
026import javax.jms.MessageConsumer;
027import javax.jms.MessageProducer;
028import javax.jms.Session;
029import javax.jms.Topic;
030import javax.naming.Context;
031import javax.naming.InitialContext;
032import javax.naming.NamingException;
033
034import org.apache.oozie.util.XLog;
035
036public class DefaultConnectionContext implements ConnectionContext {
037
038    protected Connection connection;
039    protected String connectionFactoryName;
040    private static XLog LOG = XLog.getLog(ConnectionContext.class);
041
042    @Override
043    public void createConnection(Properties props) throws NamingException, JMSException {
044        Context jndiContext = new InitialContext(props);
045        connectionFactoryName = (String) jndiContext.getEnvironment().get("connectionFactoryNames");
046        if (connectionFactoryName == null || connectionFactoryName.trim().length() == 0) {
047            connectionFactoryName = "ConnectionFactory";
048        }
049        ConnectionFactory connectionFactory = (ConnectionFactory) jndiContext.lookup(connectionFactoryName);
050        LOG.info("Connecting with the following properties \n" + jndiContext.getEnvironment().toString());
051        try {
052            connection = connectionFactory.createConnection();
053            connection.start();
054            connection.setExceptionListener(new ExceptionListener() {
055                @Override
056                public void onException(JMSException je) {
057                    LOG.error("Error in JMS connection", je);
058                }
059            });
060        }
061        catch (JMSException e1) {
062            LOG.error(e1.getMessage(), e1);
063            if (connection != null) {
064                try {
065                    connection.close();
066                }
067                catch (Exception e2) {
068                    LOG.error(e2.getMessage(), e2);
069                }
070                finally {
071                    connection = null;
072                }
073            }
074            throw e1;
075        }
076    }
077
078    @Override
079    public boolean isConnectionInitialized() {
080        return connection != null;
081    }
082
083    @Override
084    public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
085        connection.setExceptionListener(exceptionListener);
086    }
087
088    @Override
089    public Session createSession(int sessionOpts) throws JMSException {
090        if (connection == null) {
091            throw new JMSException ("Connection is not initialized");
092        }
093        return connection.createSession(false, sessionOpts);
094    }
095
096    @Override
097    public MessageConsumer createConsumer(Session session, String topicName) throws JMSException {
098        Topic topic = session.createTopic(topicName);
099        MessageConsumer consumer = session.createConsumer(topic);
100        return consumer;
101    }
102
103    @Override
104    public MessageProducer createProducer(Session session, String topicName) throws JMSException {
105        Topic topic = session.createTopic(topicName);
106        MessageProducer producer = session.createProducer(topic);
107        return producer;
108    }
109
110    @Override
111    public void close() {
112        if (connection != null) {
113            try {
114                connection.close();
115            }
116            catch (JMSException e) {
117                LOG.warn("Unable to close the connection " + connection, e);
118            }
119            finally {
120                connection = null;
121            }
122        }
123        th = null;
124    }
125
126    private ThreadLocal<Session> th = new ThreadLocal<Session>();
127
128    @Override
129    public Session createThreadLocalSession(final int sessionOpts) throws JMSException {
130        Session session = th.get();
131        if (session != null) {
132            return session;
133        }
134        th.remove();
135        session = createSession(sessionOpts);
136        th.set(session);
137        return session;
138    }
139
140    @Override
141    public MessageConsumer createConsumer(Session session, String topicName, String selector) throws JMSException {
142        Topic topic = session.createTopic(topicName);
143        MessageConsumer consumer = session.createConsumer(topic, selector);
144        return consumer;
145    }
146
147}