001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.event.messaging;
019    
020    import org.apache.hadoop.util.ReflectionUtils;
021    import org.apache.oozie.client.event.SLAEvent;
022    import org.apache.oozie.client.event.message.CoordinatorActionMessage;
023    import org.apache.oozie.client.event.message.WorkflowJobMessage;
024    import org.apache.oozie.client.event.message.SLAMessage;
025    import org.apache.oozie.event.CoordinatorActionEvent;
026    import org.apache.oozie.event.WorkflowJobEvent;
027    import org.apache.oozie.service.Services;
028    
029    /**
030     * Factory for constructing messages and retrieving the serializer
031     */
032    public class MessageFactory {
033    
034        public static final String OOZIE_MESSAGE_FORMAT = Services.get().getConf().get("message.format", "json");
035        public static final String OOZIE_MESSAGE_SERIALIZE = "oozie.jms.serialize.";
036    
037        private static class MessageSerializerHolder {
038            private static String messageSerializerInstance = Services
039                    .get()
040                    .getConf()
041                    .get(OOZIE_MESSAGE_SERIALIZE + OOZIE_MESSAGE_FORMAT,
042                            "org.apache.oozie.event.messaging.JSONMessageSerializer");
043            public static final MessageSerializer INSTANCE;
044            static {
045                try {
046                    INSTANCE = (MessageSerializer) ReflectionUtils.newInstance(Class.forName(messageSerializerInstance),
047                            null);
048                }
049                catch (ClassNotFoundException cnfe) {
050                    throw new IllegalStateException("Could not construct the serializer ", cnfe);
051                }
052            }
053        }
054    
055        /**
056         * Gets the configured serializer
057         *
058         * @return
059         */
060        public static MessageSerializer getMessageSerializer() {
061            return MessageSerializerHolder.INSTANCE;
062        }
063    
064        /**
065         * Constructs and returns the workflow job message for workflow job event
066         *
067         * @param wfJobEvent the workflow job event
068         * @return
069         */
070        public static WorkflowJobMessage createWorkflowJobMessage(WorkflowJobEvent wfJobEvent) {
071            WorkflowJobMessage wfJobMessage = new WorkflowJobMessage(wfJobEvent.getEventStatus(), wfJobEvent.getId(),
072                    wfJobEvent.getParentId(), wfJobEvent.getStartTime(), wfJobEvent.getEndTime(), wfJobEvent.getStatus(),
073                    wfJobEvent.getUser(), wfJobEvent.getAppName(), wfJobEvent.getErrorCode(), wfJobEvent.getErrorMessage());
074            return wfJobMessage;
075        }
076    
077        /**
078         * Constructs and returns the coordinator action message for coordinator
079         * action event
080         *
081         * @param coordActionEvent the coordinator action event
082         * @return
083         */
084        public static CoordinatorActionMessage createCoordinatorActionMessage(CoordinatorActionEvent coordActionEvent) {
085            CoordinatorActionMessage coordActionMessage = new CoordinatorActionMessage(coordActionEvent.getEventStatus(),
086                    coordActionEvent.getId(), coordActionEvent.getParentId(), coordActionEvent.getStartTime(),
087                    coordActionEvent.getEndTime(), coordActionEvent.getNominalTime(), coordActionEvent.getStatus(),
088                    coordActionEvent.getUser(), coordActionEvent.getAppName(), coordActionEvent.getMissingDeps(),
089                    coordActionEvent.getErrorCode(), coordActionEvent.getErrorMessage());
090            return coordActionMessage;
091        }
092    
093        /**
094         * Constructs and returns SLA notification message
095         * @param event SLA event
096         * @return
097         */
098        public static SLAMessage createSLAMessage(SLAEvent event) {
099            SLAMessage slaMessage = new SLAMessage(event.getEventStatus(), event.getSLAStatus(), event.getAppType(),
100                    event.getAppName(), event.getUser(), event.getId(), event.getParentId(), event.getNominalTime(),
101                    event.getExpectedStart(), event.getActualStart(), event.getExpectedEnd(), event.getActualEnd(),
102                    event.getExpectedDuration(), event.getActualDuration(), event.getNotificationMsg(), event.getUpstreamApps());
103            return slaMessage;
104        }
105    }