001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.client.event.jms;
019    
020    import org.apache.oozie.client.event.Event.MessageType;
021    import org.apache.oozie.client.event.message.CoordinatorActionMessage;
022    import org.apache.oozie.client.event.message.EventMessage;
023    import org.apache.oozie.client.event.message.WorkflowJobMessage;
024    import org.apache.oozie.client.event.message.SLAMessage;
025    import org.apache.oozie.AppType;
026    import javax.jms.Message;
027    import javax.jms.TextMessage;
028    import javax.jms.JMSException;
029    
030    /**
031     * Class to deserialize the jms message to java object
032     */
033    public abstract class MessageDeserializer {
034    
035        /**
036         * Constructs the event message from JMS message
037         *
038         * @param message the JMS message
039         * @return EventMessage
040         * @throws JMSException
041         */
042        @SuppressWarnings("unchecked")
043        public <T extends EventMessage> T getEventMessage(Message message) throws JMSException {
044            TextMessage textMessage = (TextMessage) message;
045            String appTypeString = textMessage.getStringProperty(JMSHeaderConstants.APP_TYPE);
046            String msgType = textMessage.getStringProperty(JMSHeaderConstants.MESSAGE_TYPE);
047            String messageBody = textMessage.getText();
048            T eventMsg = null;
049    
050            if (appTypeString == null || appTypeString.isEmpty() || messageBody == null || messageBody.isEmpty()) {
051                throw new IllegalArgumentException("Could not extract OozieEventMessage. "
052                        + "AppType and/or MessageBody is null/empty." + "Apptype is " + appTypeString + " MessageBody is "
053                        + messageBody);
054            }
055    
056            if (MessageType.valueOf(msgType) == MessageType.JOB) {
057                switch (AppType.valueOf(appTypeString)) {
058                    case WORKFLOW_JOB:
059                        WorkflowJobMessage wfJobMsg = getDeserializedObject(messageBody, WorkflowJobMessage.class);
060                        wfJobMsg.setProperties(textMessage);
061                        eventMsg = (T) wfJobMsg;
062                        break;
063                    case COORDINATOR_ACTION:
064                        CoordinatorActionMessage caActionMsg = getDeserializedObject(messageBody,
065                                CoordinatorActionMessage.class);
066                        caActionMsg.setProperties(textMessage);
067                        eventMsg = (T) caActionMsg;
068                        break;
069                    default:
070                        throw new UnsupportedOperationException("Conversion of " + appTypeString
071                                + " to Event message is not supported");
072                }
073            }
074            else if (MessageType.valueOf(msgType) == MessageType.SLA) {
075                SLAMessage SLAMsg = getDeserializedObject(messageBody, SLAMessage.class);
076                SLAMsg.setProperties(textMessage);
077                eventMsg = (T) SLAMsg;
078            }
079    
080            return eventMsg;
081        }
082    
083        protected abstract <T> T getDeserializedObject(String s, Class<T> clazz);
084    
085    }