001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.jms; 020 021import javax.jms.DeliveryMode; 022import javax.jms.JMSException; 023import javax.jms.MessageProducer; 024import javax.jms.Session; 025import javax.jms.TextMessage; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.oozie.sla.listener.SLAEventListener; 029import org.apache.oozie.client.event.jms.JMSHeaderConstants; 030import org.apache.oozie.client.event.message.SLAMessage; 031import org.apache.oozie.client.event.SLAEvent; 032import org.apache.oozie.event.messaging.MessageFactory; 033import org.apache.oozie.event.messaging.MessageSerializer; 034import org.apache.oozie.service.JMSAccessorService; 035import org.apache.oozie.service.JMSTopicService; 036import org.apache.oozie.service.Services; 037import org.apache.oozie.util.XLog; 038 039public class JMSSLAEventListener extends SLAEventListener { 040 041 private JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class); 042 private JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class); 043 private JMSConnectionInfo connInfo; 044 private int jmsSessionOpts; 045 private int jmsDeliveryMode; 046 private int jmsExpirationDate; 047 private ConnectionContext jmsContext; 048 private static XLog LOG; 049 050 @Override 051 public void init(Configuration conf) throws Exception { 052 String jmsProps = conf.get(JMSJobEventListener.JMS_CONNECTION_PROPERTIES); 053 connInfo = new JMSConnectionInfo(jmsProps); 054 LOG = XLog.getLog(getClass()); 055 jmsSessionOpts = conf.getInt(JMSJobEventListener.JMS_SESSION_OPTS, Session.AUTO_ACKNOWLEDGE); 056 jmsDeliveryMode = conf.getInt(JMSJobEventListener.JMS_DELIVERY_MODE, DeliveryMode.PERSISTENT); 057 jmsExpirationDate = conf.getInt(JMSJobEventListener.JMS_EXPIRATION_DATE, 0); 058 } 059 060 @Override 061 public void onStartMiss(SLAEvent event) { 062 sendSLANotification(event); 063 } 064 065 @Override 066 public void onEndMiss(SLAEvent event) { 067 sendSLANotification(event); 068 } 069 070 @Override 071 public void onDurationMiss(SLAEvent event) { 072 sendSLANotification(event); 073 } 074 075 @Override 076 public void onStartMet(SLAEvent event) { 077 sendSLANotification(event); 078 } 079 080 @Override 081 public void onEndMet(SLAEvent event) { 082 sendSLANotification(event); 083 } 084 085 @Override 086 public void onDurationMet(SLAEvent event) { 087 sendSLANotification(event); 088 } 089 090 protected void sendSLANotification(SLAEvent event) { 091 SLAMessage slaMsg = MessageFactory.createSLAMessage(event); 092 MessageSerializer serializer = MessageFactory.getMessageSerializer(); 093 String messageBody = serializer.getSerializedObject(slaMsg); 094 String serializerMessageFormat = serializer.getMessageFormat(); 095 String topicName = getTopic(event); 096 sendJMSMessage(slaMsg, messageBody, topicName, serializerMessageFormat); 097 } 098 099 protected void sendJMSMessage(SLAMessage slaMsg, String messageBody, String topicName, 100 String messageFormat) { 101 jmsContext = jmsService.createProducerConnectionContext(connInfo); 102 if (jmsContext != null) { 103 try { 104 Session session = jmsContext.createThreadLocalSession(jmsSessionOpts); 105 TextMessage textMessage = session.createTextMessage(messageBody); 106 textMessage.setStringProperty(JMSHeaderConstants.EVENT_STATUS, slaMsg.getEventStatus().toString()); 107 textMessage.setStringProperty(JMSHeaderConstants.SLA_STATUS, slaMsg.getSLAStatus().toString()); 108 textMessage.setStringProperty(JMSHeaderConstants.APP_TYPE, slaMsg.getAppType().toString()); 109 textMessage.setStringProperty(JMSHeaderConstants.MESSAGE_TYPE, slaMsg.getMessageType().toString()); 110 textMessage.setStringProperty(JMSHeaderConstants.APP_NAME, slaMsg.getAppName()); 111 textMessage.setStringProperty(JMSHeaderConstants.USER, slaMsg.getUser()); 112 textMessage.setStringProperty(JMSHeaderConstants.MESSAGE_FORMAT, messageFormat); 113 LOG.trace("Event related JMS text body [{0}]", textMessage.getText()); 114 LOG.trace("Event related JMS message [{0}]", textMessage.toString()); 115 MessageProducer producer = jmsContext.createProducer(session, topicName); 116 producer.setDeliveryMode(jmsDeliveryMode); 117 producer.setTimeToLive(jmsExpirationDate); 118 producer.send(textMessage); 119 producer.close(); 120 } 121 catch (JMSException jmse) { 122 LOG.error("Exception happened while sending event related jms message :" + messageBody, jmse); 123 } 124 } 125 else { 126 LOG.warn("No connection. Not sending message" + messageBody); 127 } 128 } 129 130 public String getTopic(SLAEvent event) { 131 if (jmsTopicService != null) { 132 return jmsTopicService.getTopic(event.getAppType(), event.getUser(), event.getId(), event.getParentId()); 133 } 134 else { 135 throw new RuntimeException("JMSTopicService is not initialized"); 136 } 137 } 138 139 @Override 140 public void destroy() { 141 } 142}