001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.event.messaging;
020
021import org.apache.hadoop.util.ReflectionUtils;
022import org.apache.oozie.client.event.SLAEvent;
023import org.apache.oozie.client.event.message.CoordinatorActionMessage;
024import org.apache.oozie.client.event.message.WorkflowJobMessage;
025import org.apache.oozie.client.event.message.SLAMessage;
026import org.apache.oozie.event.CoordinatorActionEvent;
027import org.apache.oozie.event.WorkflowJobEvent;
028import org.apache.oozie.service.Services;
029
030/**
031 * Factory for constructing messages and retrieving the serializer
032 */
033public class MessageFactory {
034
035    public static final String OOZIE_MESSAGE_FORMAT = Services.get().getConf().get("message.format", "json");
036    public static final String OOZIE_MESSAGE_SERIALIZE = "oozie.jms.serialize.";
037
038    private static class MessageSerializerHolder {
039        private static String messageSerializerInstance = Services
040                .get()
041                .getConf()
042                .get(OOZIE_MESSAGE_SERIALIZE + OOZIE_MESSAGE_FORMAT,
043                        "org.apache.oozie.event.messaging.JSONMessageSerializer");
044        public static final MessageSerializer INSTANCE;
045        static {
046            try {
047                INSTANCE = (MessageSerializer) ReflectionUtils.newInstance(Class.forName(messageSerializerInstance),
048                        null);
049            }
050            catch (ClassNotFoundException cnfe) {
051                throw new IllegalStateException("Could not construct the serializer ", cnfe);
052            }
053        }
054    }
055
056    /**
057     * Gets the configured serializer
058     *
059     * @return
060     */
061    public static MessageSerializer getMessageSerializer() {
062        return MessageSerializerHolder.INSTANCE;
063    }
064
065    /**
066     * Constructs and returns the workflow job message for workflow job event
067     *
068     * @param wfJobEvent the workflow job event
069     * @return
070     */
071    public static WorkflowJobMessage createWorkflowJobMessage(WorkflowJobEvent wfJobEvent) {
072        WorkflowJobMessage wfJobMessage = new WorkflowJobMessage(wfJobEvent.getEventStatus(), wfJobEvent.getId(),
073                wfJobEvent.getParentId(), wfJobEvent.getStartTime(), wfJobEvent.getEndTime(), wfJobEvent.getStatus(),
074                wfJobEvent.getUser(), wfJobEvent.getAppName(), wfJobEvent.getErrorCode(), wfJobEvent.getErrorMessage());
075        return wfJobMessage;
076    }
077
078    /**
079     * Constructs and returns the coordinator action message for coordinator
080     * action event
081     *
082     * @param coordActionEvent the coordinator action event
083     * @return
084     */
085    public static CoordinatorActionMessage createCoordinatorActionMessage(CoordinatorActionEvent coordActionEvent) {
086        CoordinatorActionMessage coordActionMessage = new CoordinatorActionMessage(coordActionEvent.getEventStatus(),
087                coordActionEvent.getId(), coordActionEvent.getParentId(), coordActionEvent.getStartTime(),
088                coordActionEvent.getEndTime(), coordActionEvent.getNominalTime(), coordActionEvent.getStatus(),
089                coordActionEvent.getUser(), coordActionEvent.getAppName(), coordActionEvent.getMissingDeps(),
090                coordActionEvent.getErrorCode(), coordActionEvent.getErrorMessage());
091        return coordActionMessage;
092    }
093
094    /**
095     * Constructs and returns SLA notification message
096     * @param event SLA event
097     * @return
098     */
099    public static SLAMessage createSLAMessage(SLAEvent event) {
100        SLAMessage slaMessage = new SLAMessage(event.getEventStatus(), event.getSLAStatus(), event.getAppType(),
101                event.getAppName(), event.getUser(), event.getId(), event.getParentId(), event.getNominalTime(),
102                event.getExpectedStart(), event.getActualStart(), event.getExpectedEnd(), event.getActualEnd(),
103                event.getExpectedDuration(), event.getActualDuration(), event.getNotificationMsg(), event.getUpstreamApps());
104        return slaMessage;
105    }
106}