001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.event.messaging; 020 021import org.apache.hadoop.util.ReflectionUtils; 022import org.apache.oozie.client.event.SLAEvent; 023import org.apache.oozie.client.event.message.CoordinatorActionMessage; 024import org.apache.oozie.client.event.message.WorkflowJobMessage; 025import org.apache.oozie.client.event.message.SLAMessage; 026import org.apache.oozie.event.CoordinatorActionEvent; 027import org.apache.oozie.event.WorkflowJobEvent; 028import org.apache.oozie.service.Services; 029 030/** 031 * Factory for constructing messages and retrieving the serializer 032 */ 033public class MessageFactory { 034 035 public static final String OOZIE_MESSAGE_FORMAT = Services.get().getConf().get("message.format", "json"); 036 public static final String OOZIE_MESSAGE_SERIALIZE = "oozie.jms.serialize."; 037 038 private static class MessageSerializerHolder { 039 private static String messageSerializerInstance = Services 040 .get() 041 .getConf() 042 .get(OOZIE_MESSAGE_SERIALIZE + OOZIE_MESSAGE_FORMAT, 043 "org.apache.oozie.event.messaging.JSONMessageSerializer"); 044 public static final MessageSerializer INSTANCE; 045 static { 046 try { 047 INSTANCE = (MessageSerializer) ReflectionUtils.newInstance(Class.forName(messageSerializerInstance), 048 null); 049 } 050 catch (ClassNotFoundException cnfe) { 051 throw new IllegalStateException("Could not construct the serializer ", cnfe); 052 } 053 } 054 } 055 056 /** 057 * Gets the configured serializer 058 * 059 * @return 060 */ 061 public static MessageSerializer getMessageSerializer() { 062 return MessageSerializerHolder.INSTANCE; 063 } 064 065 /** 066 * Constructs and returns the workflow job message for workflow job event 067 * 068 * @param wfJobEvent the workflow job event 069 * @return 070 */ 071 public static WorkflowJobMessage createWorkflowJobMessage(WorkflowJobEvent wfJobEvent) { 072 WorkflowJobMessage wfJobMessage = new WorkflowJobMessage(wfJobEvent.getEventStatus(), wfJobEvent.getId(), 073 wfJobEvent.getParentId(), wfJobEvent.getStartTime(), wfJobEvent.getEndTime(), wfJobEvent.getStatus(), 074 wfJobEvent.getUser(), wfJobEvent.getAppName(), wfJobEvent.getErrorCode(), wfJobEvent.getErrorMessage()); 075 return wfJobMessage; 076 } 077 078 /** 079 * Constructs and returns the coordinator action message for coordinator 080 * action event 081 * 082 * @param coordActionEvent the coordinator action event 083 * @return 084 */ 085 public static CoordinatorActionMessage createCoordinatorActionMessage(CoordinatorActionEvent coordActionEvent) { 086 CoordinatorActionMessage coordActionMessage = new CoordinatorActionMessage(coordActionEvent.getEventStatus(), 087 coordActionEvent.getId(), coordActionEvent.getParentId(), coordActionEvent.getStartTime(), 088 coordActionEvent.getEndTime(), coordActionEvent.getNominalTime(), coordActionEvent.getStatus(), 089 coordActionEvent.getUser(), coordActionEvent.getAppName(), coordActionEvent.getMissingDeps(), 090 coordActionEvent.getErrorCode(), coordActionEvent.getErrorMessage()); 091 return coordActionMessage; 092 } 093 094 /** 095 * Constructs and returns SLA notification message 096 * @param event SLA event 097 * @return 098 */ 099 public static SLAMessage createSLAMessage(SLAEvent event) { 100 SLAMessage slaMessage = new SLAMessage(event.getEventStatus(), event.getSLAStatus(), event.getAppType(), 101 event.getAppName(), event.getUser(), event.getId(), event.getParentId(), event.getNominalTime(), 102 event.getExpectedStart(), event.getActualStart(), event.getExpectedEnd(), event.getActualEnd(), 103 event.getExpectedDuration(), event.getActualDuration(), event.getNotificationMsg(), event.getUpstreamApps()); 104 return slaMessage; 105 } 106}