This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.jms;
019
020 import java.util.Properties;
021
022 import javax.jms.Connection;
023 import javax.jms.ConnectionFactory;
024 import javax.jms.ExceptionListener;
025 import javax.jms.JMSException;
026 import javax.jms.MessageConsumer;
027 import javax.jms.MessageProducer;
028 import javax.jms.Session;
029 import javax.jms.Topic;
030 import javax.naming.Context;
031 import javax.naming.InitialContext;
032 import javax.naming.NamingException;
033
034 import org.apache.oozie.util.XLog;
035
036 public class DefaultConnectionContext implements ConnectionContext {
037
038 protected Connection connection;
039 protected String connectionFactoryName;
040 private static XLog LOG = XLog.getLog(ConnectionContext.class);
041
042 @Override
043 public void createConnection(Properties props) throws NamingException, JMSException {
044 Context jndiContext = new InitialContext(props);
045 connectionFactoryName = (String) jndiContext.getEnvironment().get("connectionFactoryNames");
046 if (connectionFactoryName == null || connectionFactoryName.trim().length() == 0) {
047 connectionFactoryName = "ConnectionFactory";
048 }
049 ConnectionFactory connectionFactory = (ConnectionFactory) jndiContext.lookup(connectionFactoryName);
050 LOG.info("Connecting with the following properties \n" + jndiContext.getEnvironment().toString());
051 try {
052 connection = connectionFactory.createConnection();
053 connection.start();
054 connection.setExceptionListener(new ExceptionListener() {
055 @Override
056 public void onException(JMSException je) {
057 LOG.error("Error in JMS connection", je);
058 }
059 });
060 }
061 catch (JMSException e1) {
062 LOG.error(e1.getMessage(), e1);
063 if (connection != null) {
064 try {
065 connection.close();
066 }
067 catch (Exception e2) {
068 LOG.error(e2.getMessage(), e2);
069 }
070 finally {
071 connection = null;
072 }
073 }
074 throw e1;
075 }
076 }
077
078 @Override
079 public boolean isConnectionInitialized() {
080 return connection != null;
081 }
082
083 @Override
084 public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
085 connection.setExceptionListener(exceptionListener);
086 }
087
088 @Override
089 public Session createSession(int sessionOpts) throws JMSException {
090 if (connection == null) {
091 throw new JMSException ("Connection is not initialized");
092 }
093 return connection.createSession(false, sessionOpts);
094 }
095
096 @Override
097 public MessageConsumer createConsumer(Session session, String topicName) throws JMSException {
098 Topic topic = session.createTopic(topicName);
099 MessageConsumer consumer = session.createConsumer(topic);
100 return consumer;
101 }
102
103 @Override
104 public MessageProducer createProducer(Session session, String topicName) throws JMSException {
105 Topic topic = session.createTopic(topicName);
106 MessageProducer producer = session.createProducer(topic);
107 return producer;
108 }
109
110 @Override
111 public void close() {
112 if (connection != null) {
113 try {
114 connection.close();
115 }
116 catch (JMSException e) {
117 LOG.warn("Unable to close the connection " + connection, e);
118 }
119 finally {
120 connection = null;
121 }
122 }
123 th = null;
124 }
125
126 private ThreadLocal<Session> th = new ThreadLocal<Session>();
127
128 @Override
129 public Session createThreadLocalSession(final int sessionOpts) throws JMSException {
130 Session session = th.get();
131 if (session != null) {
132 return session;
133 }
134 th.remove();
135 session = createSession(sessionOpts);
136 th.set(session);
137 return session;
138 }
139
140 @Override
141 public MessageConsumer createConsumer(Session session, String topicName, String selector) throws JMSException {
142 Topic topic = session.createTopic(topicName);
143 MessageConsumer consumer = session.createConsumer(topic, selector);
144 return consumer;
145 }
146
147 }