::Go back o Oozie Documentation Index::
Since Oozie 4.0, Oozie supports publishing notifications to a JMS Provider for job status changes and SLA met and miss events. This provides an alternative to polling Oozie for Job or SLA related information and getting events as they happen without any delay. Clients can be written to consume these notifications and integrate with different monitoring and alerting systems.
Refer to Notifications Configuration section of Oozie Install documentation for the Oozie server side configuration required to support publishing notifications to a JMS Provider. The JNDI properties for the JMS provider, the topics to publish to and the notification types to publish (Job and/or SLA) need to be configured.
Job and SLA notifications are published to the configured JMS Provider on the configured topics.
Job status change notifications include job start, success, failure, suspended, etc. Currently only workflow job and coordinator action status change notifications are published.
SLA notifications include START_MET, END_MET, DURATION_MET, START_MISS, END_MISS, DURATION_MISS events and are published for a workflow job, workflow action or coordinator action for which SLA information is configured in the job xml. Refer to SLA Configuration for information on configuring SLA for a workflow or coordinator.
Consumers interested in notification on events will require to know the JNDI properties to connect to the JMS provider. They will also need to know the JMS topic on which notifications for a particular job are published.
Oozie Client provides the following APIs :
public JMSConnectionInfo getJMSConnectionInfo() public String getJMSTopicName(String jobId)
The JMSConnectionInfo exposes 3 methods:
Properties getJNDIProperties(); String getTopicPattern(AppType appType); String getTopicPrefix();
The topic is obtained by concatenating topic prefix and the substituted value for topic pattern. The topic pattern can be a constant value like workflow or coordinator which the administrator has configured or a variable (either ${username} or ${jobId}). If ${username}, it has to be substituted with the name of the user who has submitted the job; and if ${jobId} it has to be substituted with the job Id. Administrators can chose to publish messages to topics containing user names to avoid having one topic containing all messages and all users having to apply selectors to filter the message they are interested in.
The getJMSTopicName API can be used if the job id is already known and will give the exact topic name to which the notifications for that job are published.
JMS messages published are javax.jms.TextMessage . The body contains JSON and the header contains multiple properties that can be used as selectors. The header properties are not repeated in the body of the message to keep the messages small.
Message Header:
The different header properties are:
WAITING = When a Coordinator Action is waiting for input dependency
STARTED = When the Workflow Job or Coordinator Action is in RUNNING state
SUCCESS = When the Workflow Job or Coordinator Action is in SUCCEEDED state
SUSPEND = When the Workflow Job or Coordinator Action is in SUSPENDED state
FAILURE = When the Workflow Job or Coordinator Action is in terminal state other than SUCCEEDED.
Message Body for Job Notifications:
Sample JSON response for different job and sla events as below.
Workflow Job in RUNNING state: {"status":"RUNNING","id":"0000042-130618221729631-oozie-oozi-W","startTime":1342915200000}
Workflow Job in FAILED state: {"status":"FAILED","errorCode":"EL_ERROR","errorMessage":"variable [dummyvalue] cannot be resolved", "id":"0000042-130618221729631-oozie-oozi-W","startTime":1342915200000,"endTime":1366672183543}
Workflow Job in SUCCEEDED state: {"status":"SUCCEEDED","id":"0000039-130618221729631-oozie-oozi-W","startTime":1342915200000, "parentId":"0000025-130618221729631-oozie-oozi-C@1","endTime":1366676224154}
Workflow Job in SUSPENDED state: {"status":"SUSPENDED","id":"0000039-130618221729631-oozie-oozi-W","startTime":1342915200000, "parentId":"0000025-130618221729631-oozie-oozi-C@1"}
Coordinator Action in WAITING state: {"status":"WAITING","nominalTime":1310342400000,"missingDependency":"hdfs://gsbl90107.blue.com:8020/user/john/dir1/file1", "id":"0000025-130618221729631-oozie-oozi-C@1","startTime":1342915200000,"parentId":"0000025-130618221729631-oozie-oozi-C"}
Coordinator Action in RUNNING state: {"status":"RUNNING","nominalTime":1310342400000,"id":"0000025-130618221729631-oozie-oozi-C@1", "startTime":1342915200000,"parentId":"0000025-130618221729631-oozie-oozi-C"}
Coordinator Action in SUCCEEDED state: {"status":"SUCCEEDED","nominalTime":1310342400000,"id":"0000025-130618221729631-oozie-oozi-C@1", "startTime":1342915200000,"parentId":"0000025-130618221729631-oozie-oozi-C","endTime":1366677082799}
Coordinator Action in FAILED state: {"status":"FAILED","errorCode":"E0101","errorMessage":"dummyError","nominalTime":1310342400000, "id":"0000025-130618221729631-oozie-oozi-C@1","startTime":1342915200000, "parentId":"0000025-130618221729631-oozie-oozi-C","endTime":1366677140818}
Message Body for SLA Notifications:
Workflow Job in sla END_MISS state: {"id":"0000000-000000000000001-oozie-wrkf-C@1","parentId":"0000000-000000000000001-oozie-wrkf-C", "expectedStartTime":1356998400000,"notificationMessage":"notification of start miss","actualStartTime":1357002000000, "expectedDuration":-1, "actualDuration":3600,"expectedEndTime":1356998400000,"actualEndTime":1357002000000}
Oozie provides a helper class JMSMessagingUtils for consumers to deserialize the JMS messages back to Java objects. The below method getEventMessage() expects a sub type of EventMessage. There are different implementations of EventMessage - WorkflowJobMessage, CoordinatorActionMessage and SLAMessage.
<T extends EventMessage> T JMSMessagingUtils.getEventMessage(Message jmsMessage)
Below is a sample code to consume notifications.
First, create the Oozie client and retrieve the JNDI properties to make a connection to the JMS server.
OozieClient oc = new OozieClient("http://localhost:11000/oozie"); JMSConnectionInfo jmsInfo = oc.getJMSConnectionInfo(); Properties jndiProperties = jmsInfo.getJNDIProperties(); Context jndiContext = new InitialContext(jndiProperties); String connectionFactoryName = (String) jndiContext.getEnvironment().get("connectionFactoryNames"); ConnectionFactory connectionFactory = (ConnectionFactory) jndiContext.lookup(connectionFactoryName); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); String topicPrefix = jmsInfo.getTopicPrefix(); String topicPattern = jmsInfo.getTopicPattern(AppType.WORKFLOW_JOB); // Following code checks if the topic pattern is //'username', then the topic name is set to the actual user submitting the job String topicName = null; if (topicPattern.equals("${username}")) { topicName = "john"; // Following code checks if the topic pattern is //'jobId', then the topic name is set to the job id } else if (topicPattern.equals("${jobId}")) { topicName = "0000004-140328125200198-oozie-oozi-W"; } Destination topic = session.createTopic(topicPrefix+topicName); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(this); connection.start();
To start receiving messages, the JMS MessageListener interface needs to be implemented. Also, its onMessage() method needs to be implemented. This method will be called whenever a message is available on the JMS bus.
public void onMessage(Message message) { if (message.getStringProperty(JMSHeaderConstants.MESSAGE_TYPE).equals(MessageType.SLA.name())){ SLAMessage slaMessage = JMSMessagingUtils.getEventMessage(message); // Further processing } else if (message.getStringProperty(JMSHeaderConstants.APP_TYPE).equals(AppType.WORKFLOW_JOB.name())){ WorkflowJobMessage wfJobMessage = JMSMessagingUtils.getEventMessage(message); // Further processing } }
Below is a sample ActiveMQ text message header properties section.
ActiveMQTextMessage {properties {appName = map-reduce-wf, msgType JOB, appType=WORKFLOW_JOB, user=john, msgFormat=json, eventStatus=STARTED} ...}
On the header properties, consumers can apply JMS selectors to filter messages from JMS provider. They are listed at JMSHeaderConstants
Sample use of selector to filter events related to Job which have failed and has a particular app-name
String selector=JMSHeaderConstants.EVENT_STATUS + "='FAILURE' AND " + JMSHeaderConstants.APP_NAME + "='app-name'"; MessageConsumer consumer = session.createConsumer(topic, selector);