001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.client.event.jms; 020 021import org.apache.oozie.client.event.Event.MessageType; 022import org.apache.oozie.client.event.message.CoordinatorActionMessage; 023import org.apache.oozie.client.event.message.EventMessage; 024import org.apache.oozie.client.event.message.WorkflowJobMessage; 025import org.apache.oozie.client.event.message.SLAMessage; 026import org.apache.oozie.AppType; 027import javax.jms.Message; 028import javax.jms.TextMessage; 029import javax.jms.JMSException; 030 031/** 032 * Class to deserialize the jms message to java object 033 */ 034public abstract class MessageDeserializer { 035 036 /** 037 * Constructs the event message from JMS message 038 * 039 * @param message the JMS message 040 * @return EventMessage 041 * @throws JMSException 042 */ 043 @SuppressWarnings("unchecked") 044 public <T extends EventMessage> T getEventMessage(Message message) throws JMSException { 045 TextMessage textMessage = (TextMessage) message; 046 String appTypeString = textMessage.getStringProperty(JMSHeaderConstants.APP_TYPE); 047 String msgType = textMessage.getStringProperty(JMSHeaderConstants.MESSAGE_TYPE); 048 String messageBody = textMessage.getText(); 049 T eventMsg = null; 050 051 if (appTypeString == null || appTypeString.isEmpty() || messageBody == null || messageBody.isEmpty()) { 052 throw new IllegalArgumentException("Could not extract OozieEventMessage. " 053 + "AppType and/or MessageBody is null/empty." + "Apptype is " + appTypeString + " MessageBody is " 054 + messageBody); 055 } 056 057 if (MessageType.valueOf(msgType) == MessageType.JOB) { 058 switch (AppType.valueOf(appTypeString)) { 059 case WORKFLOW_JOB: 060 WorkflowJobMessage wfJobMsg = getDeserializedObject(messageBody, WorkflowJobMessage.class); 061 wfJobMsg.setProperties(textMessage); 062 eventMsg = (T) wfJobMsg; 063 break; 064 case COORDINATOR_ACTION: 065 CoordinatorActionMessage caActionMsg = getDeserializedObject(messageBody, 066 CoordinatorActionMessage.class); 067 caActionMsg.setProperties(textMessage); 068 eventMsg = (T) caActionMsg; 069 break; 070 default: 071 throw new UnsupportedOperationException("Conversion of " + appTypeString 072 + " to Event message is not supported"); 073 } 074 } 075 else if (MessageType.valueOf(msgType) == MessageType.SLA) { 076 SLAMessage SLAMsg = getDeserializedObject(messageBody, SLAMessage.class); 077 SLAMsg.setProperties(textMessage); 078 eventMsg = (T) SLAMsg; 079 } 080 081 return eventMsg; 082 } 083 084 protected abstract <T> T getDeserializedObject(String s, Class<T> clazz); 085 086}