001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.jms; 019 020 import java.util.Map; 021 022 import javax.jms.DeliveryMode; 023 import javax.jms.JMSException; 024 import javax.jms.MessageProducer; 025 import javax.jms.Session; 026 import javax.jms.TextMessage; 027 028 import org.apache.hadoop.conf.Configuration; 029 import org.apache.oozie.client.event.jms.JMSHeaderConstants; 030 import org.apache.oozie.client.event.message.CoordinatorActionMessage; 031 import org.apache.oozie.client.event.message.JobMessage; 032 import org.apache.oozie.client.event.message.WorkflowJobMessage; 033 import org.apache.oozie.event.BundleJobEvent; 034 import org.apache.oozie.event.CoordinatorActionEvent; 035 import org.apache.oozie.event.CoordinatorJobEvent; 036 import org.apache.oozie.event.WorkflowActionEvent; 037 import org.apache.oozie.event.WorkflowJobEvent; 038 import org.apache.oozie.event.listener.JobEventListener; 039 import org.apache.oozie.event.messaging.MessageFactory; 040 import org.apache.oozie.event.messaging.MessageSerializer; 041 import org.apache.oozie.service.JMSAccessorService; 042 import org.apache.oozie.service.JMSTopicService; 043 import org.apache.oozie.service.Services; 044 import org.apache.oozie.util.XLog; 045 046 /** 047 * Class to send JMS notifications related to job events. 048 * 049 */ 050 public class JMSJobEventListener extends JobEventListener { 051 private JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class); 052 private JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class); 053 private JMSConnectionInfo connInfo; 054 public static final String JMS_CONNECTION_PROPERTIES = "oozie.jms.producer.connection.properties"; 055 public static final String JMS_SESSION_OPTS = "oozie.jms.producer.session.opts"; 056 public static final String JMS_DELIVERY_MODE = "oozie.jms.delivery.mode"; 057 public static final String JMS_EXPIRATION_DATE = "oozie.jms.expiration.date"; 058 private int jmsSessionOpts; 059 private int jmsDeliveryMode; 060 private int jmsExpirationDate; 061 private ConnectionContext jmsContext; 062 private static XLog LOG; 063 064 @Override 065 public void init(Configuration conf) { 066 LOG = XLog.getLog(getClass()); 067 String jmsProps = conf.get(JMS_CONNECTION_PROPERTIES); 068 LOG.info("JMS producer connection properties [{0}]", jmsProps); 069 connInfo = new JMSConnectionInfo(jmsProps); 070 jmsSessionOpts = conf.getInt(JMS_SESSION_OPTS, Session.AUTO_ACKNOWLEDGE); 071 jmsDeliveryMode = conf.getInt(JMS_DELIVERY_MODE, DeliveryMode.PERSISTENT); 072 jmsExpirationDate = conf.getInt(JMS_EXPIRATION_DATE, 0); 073 074 } 075 076 protected void sendMessage(Map<String, String> messageProperties, String messageBody, String topicName, 077 String messageFormat) { 078 jmsContext = jmsService.createProducerConnectionContext(connInfo); 079 if (jmsContext != null) { 080 try { 081 Session session = jmsContext.createThreadLocalSession(jmsSessionOpts); 082 TextMessage textMessage = session.createTextMessage(messageBody); 083 for (Map.Entry<String, String> property : messageProperties.entrySet()) { 084 textMessage.setStringProperty(property.getKey(), property.getValue()); 085 } 086 textMessage.setStringProperty(JMSHeaderConstants.MESSAGE_FORMAT, messageFormat); 087 LOG.trace("Event related JMS text body [{0}]", textMessage.getText()); 088 LOG.trace("Event related JMS entire message [{0}]", textMessage.toString()); 089 MessageProducer producer = jmsContext.createProducer(session, topicName); 090 producer.setDeliveryMode(jmsDeliveryMode); 091 producer.setTimeToLive(jmsExpirationDate); 092 producer.send(textMessage); 093 producer.close(); 094 } 095 catch (JMSException jmse) { 096 LOG.error("Exception happened while sending event related jms message", jmse); 097 } 098 } 099 100 } 101 102 @Override 103 public void onWorkflowJobEvent(WorkflowJobEvent event) { 104 WorkflowJobMessage wfJobMessage = MessageFactory.createWorkflowJobMessage(event); 105 serializeJMSMessage(wfJobMessage, getTopic(event)); 106 107 } 108 109 @Override 110 public void onCoordinatorActionEvent(CoordinatorActionEvent event) { 111 CoordinatorActionMessage coordActionMessage = MessageFactory.createCoordinatorActionMessage(event); 112 serializeJMSMessage(coordActionMessage, getTopic(event)); 113 } 114 115 private void serializeJMSMessage(JobMessage jobMessage, String topicName) { 116 MessageSerializer serializer = MessageFactory.getMessageSerializer(); 117 String messageBody = serializer.getSerializedObject(jobMessage); 118 sendMessage(jobMessage.getMessageProperties(), messageBody, topicName, serializer.getMessageFormat()); 119 } 120 121 protected String getTopic(WorkflowJobEvent event) { 122 if (jmsTopicService != null) { 123 return jmsTopicService.getTopic(event.getAppType(), event.getUser(), event.getId(), event.getParentId()); 124 } 125 else { 126 throw new RuntimeException("JMSTopicService is not initialized"); 127 } 128 } 129 130 protected String getTopic(CoordinatorActionEvent event) { 131 if (jmsTopicService != null) { 132 return jmsTopicService.getTopic(event.getAppType(), event.getUser(), event.getId(), event.getParentId()); 133 } 134 else { 135 throw new RuntimeException("JMSTopicService is not initialized"); 136 } 137 } 138 139 @Override 140 public void onWorkflowActionEvent(WorkflowActionEvent wae) { 141 } 142 143 @Override 144 public void onCoordinatorJobEvent(CoordinatorJobEvent wje) { 145 } 146 147 @Override 148 public void onBundleJobEvent(BundleJobEvent wje) { 149 } 150 151 @Override 152 public void destroy() { 153 } 154 155 }