001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.jms; 020 021import java.util.Properties; 022 023import javax.jms.Connection; 024import javax.jms.ConnectionFactory; 025import javax.jms.ExceptionListener; 026import javax.jms.JMSException; 027import javax.jms.MessageConsumer; 028import javax.jms.MessageProducer; 029import javax.jms.Session; 030import javax.jms.Topic; 031import javax.naming.Context; 032import javax.naming.InitialContext; 033import javax.naming.NamingException; 034 035import org.apache.oozie.util.XLog; 036 037public class DefaultConnectionContext implements ConnectionContext { 038 039 protected Connection connection; 040 protected String connectionFactoryName; 041 private static XLog LOG = XLog.getLog(ConnectionContext.class); 042 043 @Override 044 public void createConnection(Properties props) throws NamingException, JMSException { 045 Context jndiContext = new InitialContext(props); 046 connectionFactoryName = (String) jndiContext.getEnvironment().get("connectionFactoryNames"); 047 if (connectionFactoryName == null || connectionFactoryName.trim().length() == 0) { 048 connectionFactoryName = "ConnectionFactory"; 049 } 050 ConnectionFactory connectionFactory = (ConnectionFactory) jndiContext.lookup(connectionFactoryName); 051 LOG.info("Connecting with the following properties \n" + jndiContext.getEnvironment().toString()); 052 try { 053 connection = connectionFactory.createConnection(); 054 connection.start(); 055 connection.setExceptionListener(new ExceptionListener() { 056 @Override 057 public void onException(JMSException je) { 058 LOG.error("Error in JMS connection", je); 059 } 060 }); 061 } 062 catch (JMSException e1) { 063 LOG.error(e1.getMessage(), e1); 064 if (connection != null) { 065 try { 066 connection.close(); 067 } 068 catch (Exception e2) { 069 LOG.error(e2.getMessage(), e2); 070 } 071 finally { 072 connection = null; 073 } 074 } 075 throw e1; 076 } 077 } 078 079 @Override 080 public boolean isConnectionInitialized() { 081 return connection != null; 082 } 083 084 @Override 085 public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException { 086 connection.setExceptionListener(exceptionListener); 087 } 088 089 @Override 090 public Session createSession(int sessionOpts) throws JMSException { 091 if (connection == null) { 092 throw new JMSException ("Connection is not initialized"); 093 } 094 return connection.createSession(false, sessionOpts); 095 } 096 097 @Override 098 public MessageConsumer createConsumer(Session session, String topicName) throws JMSException { 099 Topic topic = session.createTopic(topicName); 100 MessageConsumer consumer = session.createConsumer(topic); 101 return consumer; 102 } 103 104 @Override 105 public MessageProducer createProducer(Session session, String topicName) throws JMSException { 106 Topic topic = session.createTopic(topicName); 107 MessageProducer producer = session.createProducer(topic); 108 return producer; 109 } 110 111 @Override 112 public void close() { 113 if (connection != null) { 114 try { 115 connection.close(); 116 } 117 catch (JMSException e) { 118 LOG.warn("Unable to close the connection " + connection, e); 119 } 120 finally { 121 connection = null; 122 } 123 } 124 th = null; 125 } 126 127 private ThreadLocal<Session> th = new ThreadLocal<Session>(); 128 129 @Override 130 public Session createThreadLocalSession(final int sessionOpts) throws JMSException { 131 Session session = th.get(); 132 if (session != null) { 133 return session; 134 } 135 th.remove(); 136 session = createSession(sessionOpts); 137 th.set(session); 138 return session; 139 } 140 141 @Override 142 public MessageConsumer createConsumer(Session session, String topicName, String selector) throws JMSException { 143 Topic topic = session.createTopic(topicName); 144 MessageConsumer consumer = session.createConsumer(topic, selector); 145 return consumer; 146 } 147 148}