This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.client.event.jms;
019
020 import org.apache.oozie.client.event.Event.MessageType;
021 import org.apache.oozie.client.event.message.CoordinatorActionMessage;
022 import org.apache.oozie.client.event.message.EventMessage;
023 import org.apache.oozie.client.event.message.WorkflowJobMessage;
024 import org.apache.oozie.client.event.message.SLAMessage;
025 import org.apache.oozie.AppType;
026 import javax.jms.Message;
027 import javax.jms.TextMessage;
028 import javax.jms.JMSException;
029
030 /**
031 * Class to deserialize the jms message to java object
032 */
033 public abstract class MessageDeserializer {
034
035 /**
036 * Constructs the event message from JMS message
037 *
038 * @param message the JMS message
039 * @return EventMessage
040 * @throws JMSException
041 */
042 @SuppressWarnings("unchecked")
043 public <T extends EventMessage> T getEventMessage(Message message) throws JMSException {
044 TextMessage textMessage = (TextMessage) message;
045 String appTypeString = textMessage.getStringProperty(JMSHeaderConstants.APP_TYPE);
046 String msgType = textMessage.getStringProperty(JMSHeaderConstants.MESSAGE_TYPE);
047 String messageBody = textMessage.getText();
048 T eventMsg = null;
049
050 if (appTypeString == null || appTypeString.isEmpty() || messageBody == null || messageBody.isEmpty()) {
051 throw new IllegalArgumentException("Could not extract OozieEventMessage. "
052 + "AppType and/or MessageBody is null/empty." + "Apptype is " + appTypeString + " MessageBody is "
053 + messageBody);
054 }
055
056 if (MessageType.valueOf(msgType) == MessageType.JOB) {
057 switch (AppType.valueOf(appTypeString)) {
058 case WORKFLOW_JOB:
059 WorkflowJobMessage wfJobMsg = getDeserializedObject(messageBody, WorkflowJobMessage.class);
060 wfJobMsg.setProperties(textMessage);
061 eventMsg = (T) wfJobMsg;
062 break;
063 case COORDINATOR_ACTION:
064 CoordinatorActionMessage caActionMsg = getDeserializedObject(messageBody,
065 CoordinatorActionMessage.class);
066 caActionMsg.setProperties(textMessage);
067 eventMsg = (T) caActionMsg;
068 break;
069 default:
070 throw new UnsupportedOperationException("Conversion of " + appTypeString
071 + " to Event message is not supported");
072 }
073 }
074 else if (MessageType.valueOf(msgType) == MessageType.SLA) {
075 SLAMessage SLAMsg = getDeserializedObject(messageBody, SLAMessage.class);
076 SLAMsg.setProperties(textMessage);
077 eventMsg = (T) SLAMsg;
078 }
079
080 return eventMsg;
081 }
082
083 protected abstract <T> T getDeserializedObject(String s, Class<T> clazz);
084
085 }