001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.jms; 019 020 import java.util.Properties; 021 022 import javax.jms.Connection; 023 import javax.jms.ConnectionFactory; 024 import javax.jms.ExceptionListener; 025 import javax.jms.JMSException; 026 import javax.jms.MessageConsumer; 027 import javax.jms.MessageProducer; 028 import javax.jms.Session; 029 import javax.jms.Topic; 030 import javax.naming.Context; 031 import javax.naming.InitialContext; 032 import javax.naming.NamingException; 033 034 import org.apache.oozie.util.XLog; 035 036 public class DefaultConnectionContext implements ConnectionContext { 037 038 protected Connection connection; 039 protected String connectionFactoryName; 040 private static XLog LOG = XLog.getLog(ConnectionContext.class); 041 042 @Override 043 public void createConnection(Properties props) throws NamingException, JMSException { 044 Context jndiContext = new InitialContext(props); 045 connectionFactoryName = (String) jndiContext.getEnvironment().get("connectionFactoryNames"); 046 if (connectionFactoryName == null || connectionFactoryName.trim().length() == 0) { 047 connectionFactoryName = "ConnectionFactory"; 048 } 049 ConnectionFactory connectionFactory = (ConnectionFactory) jndiContext.lookup(connectionFactoryName); 050 LOG.info("Connecting with the following properties \n" + jndiContext.getEnvironment().toString()); 051 try { 052 connection = connectionFactory.createConnection(); 053 connection.start(); 054 connection.setExceptionListener(new ExceptionListener() { 055 @Override 056 public void onException(JMSException je) { 057 LOG.error("Error in JMS connection", je); 058 } 059 }); 060 } 061 catch (JMSException e1) { 062 LOG.error(e1.getMessage(), e1); 063 if (connection != null) { 064 try { 065 connection.close(); 066 } 067 catch (Exception e2) { 068 LOG.error(e2.getMessage(), e2); 069 } 070 finally { 071 connection = null; 072 } 073 } 074 throw e1; 075 } 076 } 077 078 @Override 079 public boolean isConnectionInitialized() { 080 return connection != null; 081 } 082 083 @Override 084 public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException { 085 connection.setExceptionListener(exceptionListener); 086 } 087 088 @Override 089 public Session createSession(int sessionOpts) throws JMSException { 090 if (connection == null) { 091 throw new JMSException ("Connection is not initialized"); 092 } 093 return connection.createSession(false, sessionOpts); 094 } 095 096 @Override 097 public MessageConsumer createConsumer(Session session, String topicName) throws JMSException { 098 Topic topic = session.createTopic(topicName); 099 MessageConsumer consumer = session.createConsumer(topic); 100 return consumer; 101 } 102 103 @Override 104 public MessageProducer createProducer(Session session, String topicName) throws JMSException { 105 Topic topic = session.createTopic(topicName); 106 MessageProducer producer = session.createProducer(topic); 107 return producer; 108 } 109 110 @Override 111 public void close() { 112 if (connection != null) { 113 try { 114 connection.close(); 115 } 116 catch (JMSException e) { 117 LOG.warn("Unable to close the connection " + connection, e); 118 } 119 finally { 120 connection = null; 121 } 122 } 123 th = null; 124 } 125 126 private ThreadLocal<Session> th = new ThreadLocal<Session>(); 127 128 @Override 129 public Session createThreadLocalSession(final int sessionOpts) throws JMSException { 130 Session session = th.get(); 131 if (session != null) { 132 return session; 133 } 134 th.remove(); 135 session = createSession(sessionOpts); 136 th.set(session); 137 return session; 138 } 139 140 @Override 141 public MessageConsumer createConsumer(Session session, String topicName, String selector) throws JMSException { 142 Topic topic = session.createTopic(topicName); 143 MessageConsumer consumer = session.createConsumer(topic, selector); 144 return consumer; 145 } 146 147 }