001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.client.event.jms;
020
021import org.apache.oozie.client.event.Event.MessageType;
022import org.apache.oozie.client.event.message.CoordinatorActionMessage;
023import org.apache.oozie.client.event.message.EventMessage;
024import org.apache.oozie.client.event.message.WorkflowJobMessage;
025import org.apache.oozie.client.event.message.SLAMessage;
026import org.apache.oozie.AppType;
027import javax.jms.Message;
028import javax.jms.TextMessage;
029import javax.jms.JMSException;
030
031/**
032 * Class to deserialize the jms message to java object
033 */
034public abstract class MessageDeserializer {
035
036    /**
037     * Constructs the event message from JMS message
038     *
039     * @param message the JMS message
040     * @return EventMessage
041     * @throws JMSException
042     */
043    @SuppressWarnings("unchecked")
044    public <T extends EventMessage> T getEventMessage(Message message) throws JMSException {
045        TextMessage textMessage = (TextMessage) message;
046        String appTypeString = textMessage.getStringProperty(JMSHeaderConstants.APP_TYPE);
047        String msgType = textMessage.getStringProperty(JMSHeaderConstants.MESSAGE_TYPE);
048        String messageBody = textMessage.getText();
049        T eventMsg = null;
050
051        if (appTypeString == null || appTypeString.isEmpty() || messageBody == null || messageBody.isEmpty()) {
052            throw new IllegalArgumentException("Could not extract OozieEventMessage. "
053                    + "AppType and/or MessageBody is null/empty." + "Apptype is " + appTypeString + " MessageBody is "
054                    + messageBody);
055        }
056
057        if (MessageType.valueOf(msgType) == MessageType.JOB) {
058            switch (AppType.valueOf(appTypeString)) {
059                case WORKFLOW_JOB:
060                    WorkflowJobMessage wfJobMsg = getDeserializedObject(messageBody, WorkflowJobMessage.class);
061                    wfJobMsg.setProperties(textMessage);
062                    eventMsg = (T) wfJobMsg;
063                    break;
064                case COORDINATOR_ACTION:
065                    CoordinatorActionMessage caActionMsg = getDeserializedObject(messageBody,
066                            CoordinatorActionMessage.class);
067                    caActionMsg.setProperties(textMessage);
068                    eventMsg = (T) caActionMsg;
069                    break;
070                default:
071                    throw new UnsupportedOperationException("Conversion of " + appTypeString
072                            + " to Event message is not supported");
073            }
074        }
075        else if (MessageType.valueOf(msgType) == MessageType.SLA) {
076            SLAMessage SLAMsg = getDeserializedObject(messageBody, SLAMessage.class);
077            SLAMsg.setProperties(textMessage);
078            eventMsg = (T) SLAMsg;
079        }
080
081        return eventMsg;
082    }
083
084    protected abstract <T> T getDeserializedObject(String s, Class<T> clazz);
085
086}