001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.event.messaging;
019
020import org.apache.hadoop.util.ReflectionUtils;
021import org.apache.oozie.client.event.SLAEvent;
022import org.apache.oozie.client.event.message.CoordinatorActionMessage;
023import org.apache.oozie.client.event.message.WorkflowJobMessage;
024import org.apache.oozie.client.event.message.SLAMessage;
025import org.apache.oozie.event.CoordinatorActionEvent;
026import org.apache.oozie.event.WorkflowJobEvent;
027import org.apache.oozie.service.Services;
028
029/**
030 * Factory for constructing messages and retrieving the serializer
031 */
032public class MessageFactory {
033
034    public static final String OOZIE_MESSAGE_FORMAT = Services.get().getConf().get("message.format", "json");
035    public static final String OOZIE_MESSAGE_SERIALIZE = "oozie.jms.serialize.";
036
037    private static class MessageSerializerHolder {
038        private static String messageSerializerInstance = Services
039                .get()
040                .getConf()
041                .get(OOZIE_MESSAGE_SERIALIZE + OOZIE_MESSAGE_FORMAT,
042                        "org.apache.oozie.event.messaging.JSONMessageSerializer");
043        public static final MessageSerializer INSTANCE;
044        static {
045            try {
046                INSTANCE = (MessageSerializer) ReflectionUtils.newInstance(Class.forName(messageSerializerInstance),
047                        null);
048            }
049            catch (ClassNotFoundException cnfe) {
050                throw new IllegalStateException("Could not construct the serializer ", cnfe);
051            }
052        }
053    }
054
055    /**
056     * Gets the configured serializer
057     *
058     * @return
059     */
060    public static MessageSerializer getMessageSerializer() {
061        return MessageSerializerHolder.INSTANCE;
062    }
063
064    /**
065     * Constructs and returns the workflow job message for workflow job event
066     *
067     * @param wfJobEvent the workflow job event
068     * @return
069     */
070    public static WorkflowJobMessage createWorkflowJobMessage(WorkflowJobEvent wfJobEvent) {
071        WorkflowJobMessage wfJobMessage = new WorkflowJobMessage(wfJobEvent.getEventStatus(), wfJobEvent.getId(),
072                wfJobEvent.getParentId(), wfJobEvent.getStartTime(), wfJobEvent.getEndTime(), wfJobEvent.getStatus(),
073                wfJobEvent.getUser(), wfJobEvent.getAppName(), wfJobEvent.getErrorCode(), wfJobEvent.getErrorMessage());
074        return wfJobMessage;
075    }
076
077    /**
078     * Constructs and returns the coordinator action message for coordinator
079     * action event
080     *
081     * @param coordActionEvent the coordinator action event
082     * @return
083     */
084    public static CoordinatorActionMessage createCoordinatorActionMessage(CoordinatorActionEvent coordActionEvent) {
085        CoordinatorActionMessage coordActionMessage = new CoordinatorActionMessage(coordActionEvent.getEventStatus(),
086                coordActionEvent.getId(), coordActionEvent.getParentId(), coordActionEvent.getStartTime(),
087                coordActionEvent.getEndTime(), coordActionEvent.getNominalTime(), coordActionEvent.getStatus(),
088                coordActionEvent.getUser(), coordActionEvent.getAppName(), coordActionEvent.getMissingDeps(),
089                coordActionEvent.getErrorCode(), coordActionEvent.getErrorMessage());
090        return coordActionMessage;
091    }
092
093    /**
094     * Constructs and returns SLA notification message
095     * @param event SLA event
096     * @return
097     */
098    public static SLAMessage createSLAMessage(SLAEvent event) {
099        SLAMessage slaMessage = new SLAMessage(event.getEventStatus(), event.getSLAStatus(), event.getAppType(),
100                event.getAppName(), event.getUser(), event.getId(), event.getParentId(), event.getNominalTime(),
101                event.getExpectedStart(), event.getActualStart(), event.getExpectedEnd(), event.getActualEnd(),
102                event.getExpectedDuration(), event.getActualDuration(), event.getNotificationMsg(), event.getUpstreamApps());
103        return slaMessage;
104    }
105}