001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.client.event.jms;
019
020import org.apache.oozie.client.event.Event.MessageType;
021import org.apache.oozie.client.event.message.CoordinatorActionMessage;
022import org.apache.oozie.client.event.message.EventMessage;
023import org.apache.oozie.client.event.message.WorkflowJobMessage;
024import org.apache.oozie.client.event.message.SLAMessage;
025import org.apache.oozie.AppType;
026import javax.jms.Message;
027import javax.jms.TextMessage;
028import javax.jms.JMSException;
029
030/**
031 * Class to deserialize the jms message to java object
032 */
033public abstract class MessageDeserializer {
034
035    /**
036     * Constructs the event message from JMS message
037     *
038     * @param message the JMS message
039     * @return EventMessage
040     * @throws JMSException
041     */
042    @SuppressWarnings("unchecked")
043    public <T extends EventMessage> T getEventMessage(Message message) throws JMSException {
044        TextMessage textMessage = (TextMessage) message;
045        String appTypeString = textMessage.getStringProperty(JMSHeaderConstants.APP_TYPE);
046        String msgType = textMessage.getStringProperty(JMSHeaderConstants.MESSAGE_TYPE);
047        String messageBody = textMessage.getText();
048        T eventMsg = null;
049
050        if (appTypeString == null || appTypeString.isEmpty() || messageBody == null || messageBody.isEmpty()) {
051            throw new IllegalArgumentException("Could not extract OozieEventMessage. "
052                    + "AppType and/or MessageBody is null/empty." + "Apptype is " + appTypeString + " MessageBody is "
053                    + messageBody);
054        }
055
056        if (MessageType.valueOf(msgType) == MessageType.JOB) {
057            switch (AppType.valueOf(appTypeString)) {
058                case WORKFLOW_JOB:
059                    WorkflowJobMessage wfJobMsg = getDeserializedObject(messageBody, WorkflowJobMessage.class);
060                    wfJobMsg.setProperties(textMessage);
061                    eventMsg = (T) wfJobMsg;
062                    break;
063                case COORDINATOR_ACTION:
064                    CoordinatorActionMessage caActionMsg = getDeserializedObject(messageBody,
065                            CoordinatorActionMessage.class);
066                    caActionMsg.setProperties(textMessage);
067                    eventMsg = (T) caActionMsg;
068                    break;
069                default:
070                    throw new UnsupportedOperationException("Conversion of " + appTypeString
071                            + " to Event message is not supported");
072            }
073        }
074        else if (MessageType.valueOf(msgType) == MessageType.SLA) {
075            SLAMessage SLAMsg = getDeserializedObject(messageBody, SLAMessage.class);
076            SLAMsg.setProperties(textMessage);
077            eventMsg = (T) SLAMsg;
078        }
079
080        return eventMsg;
081    }
082
083    protected abstract <T> T getDeserializedObject(String s, Class<T> clazz);
084
085}