001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.jms; 019 020 import javax.jms.DeliveryMode; 021 import javax.jms.JMSException; 022 import javax.jms.MessageProducer; 023 import javax.jms.Session; 024 import javax.jms.TextMessage; 025 026 import org.apache.hadoop.conf.Configuration; 027 import org.apache.oozie.sla.listener.SLAEventListener; 028 import org.apache.oozie.client.event.jms.JMSHeaderConstants; 029 import org.apache.oozie.client.event.message.SLAMessage; 030 import org.apache.oozie.client.event.SLAEvent; 031 import org.apache.oozie.event.messaging.MessageFactory; 032 import org.apache.oozie.event.messaging.MessageSerializer; 033 import org.apache.oozie.service.JMSAccessorService; 034 import org.apache.oozie.service.JMSTopicService; 035 import org.apache.oozie.service.Services; 036 import org.apache.oozie.util.XLog; 037 038 public class JMSSLAEventListener extends SLAEventListener { 039 040 private JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class); 041 private JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class); 042 private JMSConnectionInfo connInfo; 043 private int jmsSessionOpts; 044 private int jmsDeliveryMode; 045 private int jmsExpirationDate; 046 private ConnectionContext jmsContext; 047 private static XLog LOG; 048 049 @Override 050 public void init(Configuration conf) throws Exception { 051 String jmsProps = conf.get(JMSJobEventListener.JMS_CONNECTION_PROPERTIES); 052 connInfo = new JMSConnectionInfo(jmsProps); 053 LOG = XLog.getLog(getClass()); 054 jmsSessionOpts = conf.getInt(JMSJobEventListener.JMS_SESSION_OPTS, Session.AUTO_ACKNOWLEDGE); 055 jmsDeliveryMode = conf.getInt(JMSJobEventListener.JMS_DELIVERY_MODE, DeliveryMode.PERSISTENT); 056 jmsExpirationDate = conf.getInt(JMSJobEventListener.JMS_EXPIRATION_DATE, 0); 057 } 058 059 @Override 060 public void onStartMiss(SLAEvent event) { 061 sendSLANotification(event); 062 } 063 064 @Override 065 public void onEndMiss(SLAEvent event) { 066 sendSLANotification(event); 067 } 068 069 @Override 070 public void onDurationMiss(SLAEvent event) { 071 sendSLANotification(event); 072 } 073 074 @Override 075 public void onStartMet(SLAEvent event) { 076 sendSLANotification(event); 077 } 078 079 @Override 080 public void onEndMet(SLAEvent event) { 081 sendSLANotification(event); 082 } 083 084 @Override 085 public void onDurationMet(SLAEvent event) { 086 sendSLANotification(event); 087 } 088 089 protected void sendSLANotification(SLAEvent event) { 090 SLAMessage slaMsg = MessageFactory.createSLAMessage(event); 091 MessageSerializer serializer = MessageFactory.getMessageSerializer(); 092 String messageBody = serializer.getSerializedObject(slaMsg); 093 String serializerMessageFormat = serializer.getMessageFormat(); 094 String topicName = getTopic(event); 095 sendJMSMessage(slaMsg, messageBody, topicName, serializerMessageFormat); 096 } 097 098 protected void sendJMSMessage(SLAMessage slaMsg, String messageBody, String topicName, 099 String messageFormat) { 100 jmsContext = jmsService.createProducerConnectionContext(connInfo); 101 if (jmsContext != null) { 102 try { 103 Session session = jmsContext.createThreadLocalSession(jmsSessionOpts); 104 TextMessage textMessage = session.createTextMessage(messageBody); 105 textMessage.setStringProperty(JMSHeaderConstants.EVENT_STATUS, slaMsg.getEventStatus().toString()); 106 textMessage.setStringProperty(JMSHeaderConstants.SLA_STATUS, slaMsg.getSLAStatus().toString()); 107 textMessage.setStringProperty(JMSHeaderConstants.APP_TYPE, slaMsg.getAppType().toString()); 108 textMessage.setStringProperty(JMSHeaderConstants.MESSAGE_TYPE, slaMsg.getMessageType().toString()); 109 textMessage.setStringProperty(JMSHeaderConstants.APP_NAME, slaMsg.getAppName()); 110 textMessage.setStringProperty(JMSHeaderConstants.USER, slaMsg.getUser()); 111 textMessage.setStringProperty(JMSHeaderConstants.MESSAGE_FORMAT, messageFormat); 112 LOG.trace("Event related JMS text body [{0}]", textMessage.getText()); 113 LOG.trace("Event related JMS message [{0}]", textMessage.toString()); 114 MessageProducer producer = jmsContext.createProducer(session, topicName); 115 producer.setDeliveryMode(jmsDeliveryMode); 116 producer.setTimeToLive(jmsExpirationDate); 117 producer.send(textMessage); 118 producer.close(); 119 } 120 catch (JMSException jmse) { 121 LOG.error("Exception happened while sending event related jms message :" + messageBody, jmse); 122 } 123 } 124 else { 125 LOG.warn("No connection. Not sending message" + messageBody); 126 } 127 } 128 129 public String getTopic(SLAEvent event) { 130 if (jmsTopicService != null) { 131 return jmsTopicService.getTopic(event.getAppType(), event.getUser(), event.getId(), event.getParentId()); 132 } 133 else { 134 throw new RuntimeException("JMSTopicService is not initialized"); 135 } 136 } 137 138 @Override 139 public void destroy() { 140 } 141 }