001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.client.event.jms;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.util.Properties;
024
025import javax.jms.JMSException;
026import javax.jms.Message;
027import javax.jms.TextMessage;
028
029import org.apache.oozie.client.event.message.EventMessage;
030
031/**
032 * Client utility to convert JMS message to EventMessage object
033 */
034public class JMSMessagingUtils {
035    private static final String DESERIALIZER_PROP = "oozie.msg.deserializer.";
036    private static MessageDeserializer deserializer;
037    private static Properties jmsDeserializerInfo;
038    private static final String CLIENT_PROPERTIES = "oozie_client.properties";
039
040    static {
041        InputStream is = JMSMessagingUtils.class.getClassLoader().getResourceAsStream(CLIENT_PROPERTIES);
042        if (is == null) {
043            System.out.println("Using default JSON Deserializer");
044            deserializer = new JSONMessageDeserializer();
045        }
046        else {
047            jmsDeserializerInfo = new Properties();
048            try {
049                jmsDeserializerInfo.load(is);
050                is.close();
051            }
052            catch (IOException ioe) {
053                throw new RuntimeException("I/O error occured for " + CLIENT_PROPERTIES, ioe);
054            }
055        }
056
057    }
058
059    /**
060     * Constructs the EventMessage object from JMS message
061     *
062     * @param <T>
063     * @param msg the JMS message
064     * @return the EventMessage
065     * @throws IOException
066     * @throws JMSException
067     */
068    public static <T extends EventMessage> T getEventMessage(Message msg) throws IOException, JMSException {
069        if (msg == null) {
070            throw new IllegalArgumentException("Could not extract EventMessage as JMS message is null");
071        }
072        if (deserializer == null) {
073            String msgFormat = msg.getStringProperty(JMSHeaderConstants.MESSAGE_FORMAT);
074            deserializer = getDeserializer(msgFormat);
075        }
076        return deserializer.getEventMessage(msg);
077    }
078
079    private static MessageDeserializer getDeserializer(String msgFormat) throws IOException {
080        String deserializerString = (String) jmsDeserializerInfo.get(DESERIALIZER_PROP + msgFormat);
081        if (deserializerString == null) {
082            return new JSONMessageDeserializer();
083        }
084        else {
085            try {
086                MessageDeserializer msgDeserializer = (MessageDeserializer) Class.forName(deserializerString)
087                        .newInstance();
088                return msgDeserializer;
089            }
090            catch (Exception cnfe) {
091                throw new IllegalArgumentException("Could not access class " + deserializerString, cnfe);
092            }
093
094        }
095    }
096}