001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.jms; 020 021import java.util.Map; 022 023import javax.jms.DeliveryMode; 024import javax.jms.JMSException; 025import javax.jms.MessageProducer; 026import javax.jms.Session; 027import javax.jms.TextMessage; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.oozie.client.event.jms.JMSHeaderConstants; 031import org.apache.oozie.client.event.message.CoordinatorActionMessage; 032import org.apache.oozie.client.event.message.JobMessage; 033import org.apache.oozie.client.event.message.WorkflowJobMessage; 034import org.apache.oozie.event.BundleJobEvent; 035import org.apache.oozie.event.CoordinatorActionEvent; 036import org.apache.oozie.event.CoordinatorJobEvent; 037import org.apache.oozie.event.WorkflowActionEvent; 038import org.apache.oozie.event.WorkflowJobEvent; 039import org.apache.oozie.event.listener.JobEventListener; 040import org.apache.oozie.event.messaging.MessageFactory; 041import org.apache.oozie.event.messaging.MessageSerializer; 042import org.apache.oozie.service.ConfigurationService; 043import org.apache.oozie.service.JMSAccessorService; 044import org.apache.oozie.service.JMSTopicService; 045import org.apache.oozie.service.Services; 046import org.apache.oozie.util.XLog; 047 048/** 049 * Class to send JMS notifications related to job events. 050 * 051 */ 052public class JMSJobEventListener extends JobEventListener { 053 private JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class); 054 private JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class); 055 private JMSConnectionInfo connInfo; 056 public static final String JMS_CONNECTION_PROPERTIES = "oozie.jms.producer.connection.properties"; 057 public static final String JMS_SESSION_OPTS = "oozie.jms.producer.session.opts"; 058 public static final String JMS_DELIVERY_MODE = "oozie.jms.delivery.mode"; 059 public static final String JMS_EXPIRATION_DATE = "oozie.jms.expiration.date"; 060 private int jmsSessionOpts; 061 private int jmsDeliveryMode; 062 private int jmsExpirationDate; 063 private ConnectionContext jmsContext; 064 private static XLog LOG; 065 066 @Override 067 public void init(Configuration conf) { 068 LOG = XLog.getLog(getClass()); 069 String jmsProps = ConfigurationService.get(conf, JMS_CONNECTION_PROPERTIES); 070 LOG.info("JMS producer connection properties [{0}]", jmsProps); 071 connInfo = new JMSConnectionInfo(jmsProps); 072 jmsSessionOpts = conf.getInt(JMS_SESSION_OPTS, Session.AUTO_ACKNOWLEDGE); 073 jmsDeliveryMode = conf.getInt(JMS_DELIVERY_MODE, DeliveryMode.PERSISTENT); 074 jmsExpirationDate = conf.getInt(JMS_EXPIRATION_DATE, 0); 075 076 } 077 078 protected void sendMessage(Map<String, String> messageProperties, String messageBody, String topicName, 079 String messageFormat) { 080 jmsContext = jmsService.createProducerConnectionContext(connInfo); 081 if (jmsContext != null) { 082 try { 083 Session session = jmsContext.createThreadLocalSession(jmsSessionOpts); 084 TextMessage textMessage = session.createTextMessage(messageBody); 085 for (Map.Entry<String, String> property : messageProperties.entrySet()) { 086 textMessage.setStringProperty(property.getKey(), property.getValue()); 087 } 088 textMessage.setStringProperty(JMSHeaderConstants.MESSAGE_FORMAT, messageFormat); 089 LOG.trace("Event related JMS text body [{0}]", textMessage.getText()); 090 LOG.trace("Event related JMS entire message [{0}]", textMessage.toString()); 091 MessageProducer producer = jmsContext.createProducer(session, topicName); 092 producer.setDeliveryMode(jmsDeliveryMode); 093 producer.setTimeToLive(jmsExpirationDate); 094 producer.send(textMessage); 095 producer.close(); 096 } 097 catch (JMSException jmse) { 098 LOG.error("Exception happened while sending event related jms message", jmse); 099 } 100 } 101 102 } 103 104 @Override 105 public void onWorkflowJobEvent(WorkflowJobEvent event) { 106 WorkflowJobMessage wfJobMessage = MessageFactory.createWorkflowJobMessage(event); 107 serializeJMSMessage(wfJobMessage, getTopic(event)); 108 109 } 110 111 @Override 112 public void onCoordinatorActionEvent(CoordinatorActionEvent event) { 113 CoordinatorActionMessage coordActionMessage = MessageFactory.createCoordinatorActionMessage(event); 114 serializeJMSMessage(coordActionMessage, getTopic(event)); 115 } 116 117 private void serializeJMSMessage(JobMessage jobMessage, String topicName) { 118 MessageSerializer serializer = MessageFactory.getMessageSerializer(); 119 String messageBody = serializer.getSerializedObject(jobMessage); 120 sendMessage(jobMessage.getMessageProperties(), messageBody, topicName, serializer.getMessageFormat()); 121 } 122 123 protected String getTopic(WorkflowJobEvent event) { 124 if (jmsTopicService != null) { 125 return jmsTopicService.getTopic(event.getAppType(), event.getUser(), event.getId(), event.getParentId()); 126 } 127 else { 128 throw new RuntimeException("JMSTopicService is not initialized"); 129 } 130 } 131 132 protected String getTopic(CoordinatorActionEvent event) { 133 if (jmsTopicService != null) { 134 return jmsTopicService.getTopic(event.getAppType(), event.getUser(), event.getId(), event.getParentId()); 135 } 136 else { 137 throw new RuntimeException("JMSTopicService is not initialized"); 138 } 139 } 140 141 @Override 142 public void onWorkflowActionEvent(WorkflowActionEvent wae) { 143 } 144 145 @Override 146 public void onCoordinatorJobEvent(CoordinatorJobEvent wje) { 147 } 148 149 @Override 150 public void onBundleJobEvent(BundleJobEvent wje) { 151 } 152 153 @Override 154 public void destroy() { 155} 156 157}