::Go back o Oozie Documentation Index::

JMS Notifications

Overview

Since Oozie 4.0, Oozie supports publishing notifications to a JMS Provider for job status changes and SLA met and miss events. This provides an alternative to polling Oozie for Job or SLA related information and getting events as they happen without any delay. Clients can be written to consume these notifications and integrate with different monitoring and alerting systems.

Oozie Server Configuration

Refer to Notifications Configuration section of Oozie Install documentation for the Oozie server side configuration required to support publishing notifications to a JMS Provider. The JNDI properties for the JMS provider, the topics to publish to and the notification types to publish (Job and/or SLA) need to be configured.

Consuming Notifications

Notification types

Job and SLA notifications are published to the configured JMS Provider on the configured topics.

Job status change notifications include job start, success, failure, suspended, etc. Currently only workflow job and coordinator action status change notifications are published.

SLA notifications include START_MET, END_MET, DURATION_MET, START_MISS, END_MISS, DURATION_MISS events and are published for a workflow job, workflow action or coordinator action for which SLA information is configured in the job xml. Refer to SLA Configuration for information on configuring SLA for a workflow or coordinator.

JMS Topic

Consumers interested in notification on events will require to know the JNDI properties to connect to the JMS provider. They will also need to know the JMS topic on which notifications for a particular job are published.

Oozie Client provides the following APIs :

public JMSConnectionInfo getJMSConnectionInfo()
public String getJMSTopicName(String jobId)

The JMSConnectionInfo exposes 3 methods:

Properties getJNDIProperties();
String getTopicPattern(AppType appType);
String getTopicPrefix();

The topic is obtained by concatenating topic prefix and the substituted value for topic pattern. The topic pattern can be a constant value like workflow or coordinator which the administrator has configured or ${username}. If ${username}, it has to be substitued with the name of the user who has submitted the job. Administrators can chose to publish messages to topics containing user names to avoid having one topic containing all messages and all users having to apply selectors to filter the message they are interested in.

The getJMSTopicName API can be used if the job id is already known and will give the exact topic name to which the notifications for that job are published.

JMS Message Format

JMS messages published are javax.jms.TextMessage . The body contains JSON and the header contains multiple properties that can be used as selectors. The header properties are not repeated in the body of the message to keep the messages small.

Message Header:
The different header properties are:

  • msgType - Value can be JOB or SLA.
  • user - The user who submitted the job.
  • appName - Application name of the job.
  • appType - Type of the job. One of WORKFLOW_JOB, WORKFLOW_ACTION, COORDINATOR_ACTION
  • slaStatus - Applicable only to SLA messages. Value is one of NOT_STARTED, IN_PROCESS, MET, MISS
  • eventStatus - It takes one of the following values- START_MET, START_MISS, DURATION_MET, DURATION_MISS, END_MET, END_MISS for SLA notifications. It can take any of the following values- WAITING, STARTED, SUCCESS, SUSPEND, FAILURE for job notifications. Note that event status is different from the Job status. It is included in the header to provide better filtering. Below is the mapping of event status to the actual status of workflow job or coordinator action.

WAITING = When a Coordinator Action is waiting for input dependency

STARTED = When the Workflow Job or Coordinator Action is in RUNNING state

SUCCESS = When the Workflow Job or Coordinator Action is in SUCCEEDED state

SUSPEND = When the Workflow Job or Coordinator Action is in SUSPENDED state

FAILURE = When the Workflow Job or Coordinator Action is in terminal state other than SUCCEEDED.

Message Body for Job Notifications:
Sample JSON response for different job and sla events as below.

Workflow Job in RUNNING state:
{"status":"RUNNING","id":"0000042-130618221729631-oozie-oozi-W","startTime":1342915200000}

Workflow Job in FAILED state:
{"status":"FAILED","errorCode":"EL_ERROR","errorMessage":"variable [dummyvalue] cannot be resolved",
"id":"0000042-130618221729631-oozie-oozi-W","startTime":1342915200000,"endTime":1366672183543}

Workflow Job in SUCCEEDED state:
{"status":"SUCCEEDED","id":"0000039-130618221729631-oozie-oozi-W","startTime":1342915200000,
"parentId":"0000025-130618221729631-oozie-oozi-C@1","endTime":1366676224154}

Workflow Job in SUSPENDED state:
{"status":"SUSPENDED","id":"0000039-130618221729631-oozie-oozi-W","startTime":1342915200000,
"parentId":"0000025-130618221729631-oozie-oozi-C@1"}

Coordinator Action in WAITING state:
{"status":"WAITING","nominalTime":1310342400000,"missingDependency":"hdfs://gsbl90107.blue.com:8020/user/john/dir1/file1",
"id":"0000025-130618221729631-oozie-oozi-C@1","startTime":1342915200000,"parentId":"0000025-130618221729631-oozie-oozi-C"}

Coordinator Action in RUNNING state:
{"status":"RUNNING","nominalTime":1310342400000,"id":"0000025-130618221729631-oozie-oozi-C@1",
"startTime":1342915200000,"parentId":"0000025-130618221729631-oozie-oozi-C"}

Coordinator Action in SUCCEEDED state:
{"status":"SUCCEEDED","nominalTime":1310342400000,"id":"0000025-130618221729631-oozie-oozi-C@1",
"startTime":1342915200000,"parentId":"0000025-130618221729631-oozie-oozi-C","endTime":1366677082799}

Coordinator Action in FAILED state:
{"status":"FAILED","errorCode":"E0101","errorMessage":"dummyError","nominalTime":1310342400000,
"id":"0000025-130618221729631-oozie-oozi-C@1","startTime":1342915200000,
"parentId":"0000025-130618221729631-oozie-oozi-C","endTime":1366677140818}

Message Body for SLA Notifications:

Workflow Job in sla END_MISS state:
{"id":"0000000-000000000000001-oozie-wrkf-C@1","parentId":"0000000-000000000000001-oozie-wrkf-C",
"expectedStartTime":1356998400000,"notificationMessage":"notification of start miss","actualStartTime":1357002000000,
"expectedDuration":-1, "actualDuration":3600,"expectedEndTime":1356998400000,"actualEndTime":1357002000000}

JMS Client

Oozie provides a helper class JMSMessagingUtils for consumers to deserialize the JMS messages back to Java objects. The below method getEventMessage() expects a sub type of EventMessage. There are different implementations of EventMessage - WorkflowJobMessage, CoordinatorActionMessage and SLAMessage.

<T extends EventMessage> T JMSMessagingUtils.getEventMessage(Message jmsMessage)

Example

Below is a sample code to consume notifications.

First, create the Oozie client and retrieve the JNDI properties to make a connection to the JMS server.

   OozieClient oc = new OozieClient("http://localhost:11000/oozie");
   JMSConnectionInfo jmsInfo = oc.getJMSConnectionInfo();
   Properties jndiProperties = jmsInfo.getJNDIProperties();
   Context jndiContext = new InitialContext(jndiProperties);
   String connectionFactoryName = (String) jndiContext.getEnvironment().get("connectionFactoryNames");
   ConnectionFactory connectionFactory = (ConnectionFactory);
   jndiContext.lookup(connectionFactoryName);
   Connection connection = connectionFactory.createConnection();
   Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   String topicPrefix = jmsInfo.getTopicPrefix();
   String topicPattern = jmsInfo.getTopicPattern(AppType.WORKFLOW_JOB);
   // Following code checks if the topic pattern is
   //'username', then the topic name is set to the actual user submitting the job
   String topicName = null;
   if (topicPattern.equals("${username}")) {
       topicName = "john";
   }
   Destination topic = session.createTopic(topicPrefix+topicName);
   MessageConsumer consumer = session.createConsumer(topic);
   consumer.setMessageListener(this);
   connection.start();

To start receiving messages, the JMS MessageListener interface needs to be implemented. Also, its onMessage() method needs to be implemented. This method will be called whenever a message is available on the JMS bus.

    public void onMessage(Message m) {
       if (message.getStringProperty(JMSHeaderConsants.MSG_TYPE).equals(MessageType.SLA.name()){
          SLAMessage slaMessage = JMSMessagingUtils.getEventMessage(message);
          // Further processing
       }
       else if (message.getStringProperty(JMSHeaderConsants.APP_TYPE).equals(AppType.WORKFLOW_JOB.name()){
          WorkflowJobMessage wfJobMessage = JMSMessagingUtils.getEventMessage(message);
          // Further processing
       }
    }

Applying Selectors

Below is a sample ActiveMQ text message header properties section.

ActiveMQTextMessage
{properties  {appName = map-reduce-wf, msgType
JOB, appType=WORKFLOW_JOB, user=john, msgFormat=json, eventStatus=STARTED} ...}

On the header properties, consumers can apply JMS selectors to filter messages from JMS provider. They are listed at JMSHeaderConstants

Sample use of selector to filter events related to Job which have failed and has a particular app-name

String selector=JMSHeaderConstants.EVENT_STATUS + "='FAILURE' AND " + JMSHeaderConstants.APP_NAME + "='app-name'";
MessageConsumer consumer = session.createConsumer(topic, selector);