::Go back to Oozie Documentation Index::
The goal of this document is to define a coordinator engine system specialized in submitting workflows based on time and data triggers.
Users typically run map-reduce, hadoop-streaming, hdfs and/or Pig jobs on the grid. Multiple of these jobs can be combined to form a workflow job. Oozie, Hadoop Workflow System defines a workflow system that runs such jobs.
Commonly, workflow jobs are run based on regular time intervals and/or data availability. And, in some cases, they can be triggered by an external event.
Expressing the condition(s) that trigger a workflow job can be modeled as a predicate that has to be satisfied. The workflow job is started after the predicate is satisfied. A predicate can reference to data, time and/or external events. In the future, the model can be extended to support additional event types.
It is also necessary to connect workflow jobs that run regularly, but at different time intervals. The outputs of multiple subsequent runs of a workflow become the input to the next workflow. For example, the outputs of last 4 runs of a workflow that runs every 15 minutes become the input of another workflow that runs every 60 minutes. Chaining together these workflows result it is referred as a data application pipeline.
The Oozie Coordinator system allows the user to define and execute recurrent and interdependent workflow jobs (data application pipelines).
Real world data application pipelines have to account for reprocessing, late processing, catchup, partial processing, monitoring, notification and SLAs.
This document defines the functional specification for the Oozie Coordinator system.
Actual time: The actual time indicates the time when something actually happens.
Nominal time: The nominal time specifies the time when something should happen. In theory the nominal time and the actual time should match, however, in practice due to delays the actual time may occur later than the nominal time.
Dataset: Collection of data referred to by a logical name. A dataset normally has several instances of data and each one of them can be referred individually. Each dataset instance is represented by a unique set of URIs.
Synchronous Dataset: Synchronous datasets instances are generated at fixed time intervals and there is a dataset instance associated with each time interval. Synchronous dataset instances are identified by their nominal time. For example, in the case of a HDFS based dataset, the nominal time would be somewhere in the file path of the dataset instance: hdfs://foo:8020/usr/logs/2009/04/15/23/30. In the case of HCatalog table partitions, the nominal time would be part of some partition values: hcat://bar:8020/mydb/mytable/year=2009;month=04;dt=15;region=us.
Coordinator Action: A coordinator action is a workflow job that is started when a set of conditions are met (input dataset instances are available).
Coordinator Application: A coordinator application defines the conditions under which coordinator actions should be created (the frequency) and when the actions can be started. The coordinator application also defines a start and an end time. Normally, coordinator applications are parameterized. A Coordinator application is written in XML.
Coordinator Job: A coordinator job is an executable instance of a coordination definition. A job submission is done by submitting a job configuration that resolves all parameters in the application definition.
Data pipeline: A data pipeline is a connected set of coordinator applications that consume and produce interdependent datasets.
Coordinator Definition Language: The language used to describe datasets and coordinator applications.
Coordinator Engine: A system that executes coordinator jobs.
Coordinator application definitions can be parameterized with variables, built-in constants and built-in functions.
At execution time all the parameters are resolved into concrete values.
The parameterization of workflow definitions it done using JSP Expression Language syntax from the JSP 2.0 Specification (JSP.2.3) , allowing not only to support variables as parameters but also functions and complex expressions.
EL expressions can be used in XML attribute values and XML text element values. They cannot be used in XML element and XML attribute names.
Refer to section #6.5 'Parameterization of Coordinator Applications' for more details.
Oozie processes coordinator jobs in a fixed timezone with no DST (typically UTC ), this timezone is referred as 'Oozie processing timezone'.
The Oozie processing timezone is used to resolve coordinator jobs start/end times, job pause times and the initial-instance of datasets. Also, all coordinator dataset instance URI templates are resolved to a datetime in the Oozie processing time-zone.
All the datetimes used in coordinator applications and job parameters to coordinator applications must be specified in the Oozie processing timezone. If Oozie processing timezone is UTC , the qualifier is Z . If Oozie processing time zone is other than UTC , the qualifier must be the GMT offset, (+/-)#### .
For example, a datetime in UTC is 2012-08-12T00:00Z , the same datetime in GMT+5:30 is 2012-08-12T05:30+0530 .
For simplicity, there rest of this specification uses UTC datetimes.
#datetime
If the Oozie processing timezone is UTC , all datetime values are always in UTC down to a minute precision, 'YYYY-MM-DDTHH:mmZ'.
For example 2009-08-10T13:10Z is August 10th 2009 at 13:10 UTC.
If the Oozie processing timezone is a GMT offset GMT(+/-)#### , all datetime values are always in ISO 8601 in the corresponding GMT offset down to a minute precision, 'YYYY-MM-DDTHH:mmGMT(+/-)####'.
For example 2009-08-10T13:10+0530 is August 10th 2009 at 13:10 GMT+0530, India timezone.
There is no widely accepted standard to identify timezones.
Oozie Coordinator will understand the following timezone identifiers:
Oozie Coordinator must provide a tool for developers to list all supported timezone identifiers.
While Oozie coordinator engine works in a fixed timezone with no DST (typically UTC ), it provides DST support for coordinator applications.
The baseline datetime for datasets and coordinator applications are expressed in UTC. The baseline datetime is the time of the first occurrence.
Datasets and coordinator applications also contain a timezone indicator.
The use of UTC as baseline enables a simple way of mix and matching datasets and coordinator applications that use a different timezone by just adding the timezone offset.
The timezone indicator enables Oozie coordinator engine to properly compute frequencies that are daylight-saving sensitive. For example: a daily frequency can be 23, 24 or 25 hours for timezones that observe daylight-saving. Weekly and monthly frequencies are also affected by this as the number of hours in the day may change.
Section #7 'Handling Timezones and Daylight Saving Time' explains how coordinator applications can be written to handle timezones and daylight-saving-time properly.
Frequency is used to capture the periodic intervals at which datasets that are produced, and coordinator applications are scheduled to run.
This time periods representation is also used to specify non-recurrent time-periods, for example a timeout interval.
For datasets and coordinator applications the frequency time-period is applied N times to the baseline datetime to compute recurrent times.
Frequency is always expressed in minutes.
Because the number of minutes in day may vary for timezones that observe daylight saving time, constants cannot be use to express frequencies greater than a day for datasets and coordinator applications for such timezones. For such uses cases, Oozie coordinator provides 2 EL functions, ${coord:days(int n)} and ${coord:months(int n)} .
Frequencies can be expressed using EL constants and EL functions that evaluate to an positive integer number.
Coordinator Frequencies can also be expressed using cron syntax.
Examples:
EL Constant | Value | Example |
---|---|---|
${coord:minutes(int n)} | n | ${coord:minutes(45)} --> 45 |
${coord:hours(int n)} | n * 60 | ${coord:hours(3)} --> 180 |
${coord:days(int n)} | variable | ${coord:days(2)} --> minutes in 2 full days from the current date |
${coord:months(int n)} | variable | ${coord:months(1)} --> minutes in a 1 full month from the current date |
${cron syntax} | variable | ${0,10 15 * * 2-6} --> a job that runs every weekday at 3:00pm and 3:10pm UTC time |
The ${coord:days(int n)} and ${coord:endOfDays(int n)} EL functions should be used to handle day based frequencies.
Constant values should not be used to indicate a day based frequency (every 1 day, every 1 week, etc) because the number of hours in every day is not always the same for timezones that observe daylight-saving time.
It is a good practice to use always these EL functions instead of using a constant expression (i.e. 24 * 60 ) even if the timezone for which the application is being written for does not support daylight saving time. This makes application foolproof to country legislation changes and also makes applications portable across timezones.
The ${coord:days(int n)} EL function returns the number of minutes for 'n' complete days starting with the day of the specified nominal time for which the computation is being done.
The ${coord:days(int n)} EL function includes all the minutes of the current day, regardless of the time of the day of the current nominal time.
Examples:
Starting Nominal UTC time | Timezone | Usage | Value | First Occurrence | Comments |
---|---|---|---|---|---|
2009-01-01T08:00Z | UTC | ${coord:days(1)} | 1440 | 2009-01-01T08:00Z | total minutes on 2009JAN01 UTC time |
2009-01-01T08:00Z | America/Los_Angeles | ${coord:days(1)} | 1440 | 2009-01-01T08:00Z | total minutes in 2009JAN01 PST8PDT time |
2009-01-01T08:00Z | America/Los_Angeles | ${coord:days(2)} | 2880 | 2009-01-01T08:00Z | total minutes in 2009JAN01 and 2009JAN02 PST8PDT time |
2009-03-08T08:00Z | UTC | ${coord:days(1)} | 1440 | 2009-03-08T08:00Z | total minutes on 2009MAR08 UTC time |
2009-03-08T08:00Z | Europe/London | ${coord:days(1)} | 1440 | 2009-03-08T08:00Z | total minutes in 2009MAR08 BST1BDT time |
2009-03-08T08:00Z | America/Los_Angeles | ${coord:days(1)} | 1380 | 2009-03-08T08:00Z | total minutes in 2009MAR08 PST8PDT time (2009MAR08 is DST switch in the US) |
2009-03-08T08:00Z | UTC | ${coord:days(2)} | 2880 | 2009-03-08T08:00Z | total minutes in 2009MAR08 and 2009MAR09 UTC time |
2009-03-08T08:00Z | America/Los_Angeles | ${coord:days(2)} | 2820 | 2009-03-08T08:00Z | total minutes in 2009MAR08 and 2009MAR09 PST8PDT time (2009MAR08 is DST switch in the US) |
2009-03-09T08:00Z | America/Los_Angeles | ${coord:days(1)} | 1440 | 2009-03-09T07:00Z | total minutes in 2009MAR09 PST8PDT time (2009MAR08 is DST ON, frequency tick is earlier in UTC) |
The ${coord:endOfDays(int n)} EL function is identical to the ${coord:days(int n)} except that it shifts the first occurrence to the end of the day for the specified timezone before computing the interval in minutes.
Examples:
Starting Nominal UTC time | Timezone | Usage | Value | First Occurrence | Comments |
---|---|---|---|---|---|
2009-01-01T08:00Z | UTC | ${coord:endOfDays(1)} | 1440 | 2009-01-02T00:00Z | first occurrence in 2009JAN02 00:00 UTC time, first occurrence shifted to the end of the UTC day |
2009-01-01T08:00Z | America/Los_Angeles | ${coord:endOfDays(1)} | 1440 | 2009-01-02T08:00Z | first occurrence in 2009JAN02 08:00 UTC time, first occurrence shifted to the end of the PST8PDT day |
2009-01-01T08:01Z | America/Los_Angeles | ${coord:endOfDays(1)} | 1440 | 2009-01-02T08:00Z | first occurrence in 2009JAN02 08:00 UTC time, first occurrence shifted to the end of the PST8PDT day |
2009-01-01T18:00Z | America/Los_Angeles | ${coord:endOfDays(1)} | 1440 | 2009-01-02T08:00Z | first occurrence in 2009JAN02 08:00 UTC time, first occurrence shifted to the end of the PST8PDT day |
2009-03-07T09:00Z | America/Los_Angeles | ${coord:endOfDays(1)} | 1380 | 2009-03-08T08:00Z | first occurrence in 2009MAR08 08:00 UTC time first occurrence shifted to the end of the PST8PDT day |
2009-03-08T07:00Z | America/Los_Angeles | ${coord:endOfDays(1)} | 1440 | 2009-03-08T08:00Z | first occurrence in 2009MAR08 08:00 UTC time first occurrence shifted to the end of the PST8PDT day |
2009-03-09T07:00Z | America/Los_Angeles | ${coord:endOfDays(1)} | 1440 | 2009-03-10T07:00Z | first occurrence in 2009MAR10 07:00 UTC time (2009MAR08 is DST switch in the US), first occurrence shifted to the end of the PST8PDT day |
<coordinator-app name="hello-coord" frequency="${coord:days(1)}" start="2009-01-02T08:00Z" end="2009-01-04T08:00Z" timezone="America/Los_Angeles" xmlns="uri:oozie:coordinator:0.1"> <controls> <timeout>10</timeout> <concurrency>${concurrency_level}</concurrency> <execution>${execution_order}</execution> <throttle>${materialization_throttle}</throttle> </controls> <datasets> <dataset name="din" frequency="${coord:endOfDays(1)}" initial-instance="2009-01-02T08:00Z" timezone="America/Los_Angeles"> <uri-template>${baseFsURI}/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template> </dataset> <dataset name="dout" frequency="${coord:minutes(30)}" initial-instance="2009-01-02T08:00Z" timezone="UTC"> <uri-template>${baseFsURI}/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template> </dataset> </datasets> <input-events> <data-in name="input" dataset="din"> <instance>${coord:current(0)}</instance> </data-in> </input-events> <output-events> <data-out name="output" dataset="dout"> <instance>${coord:current(1)}</instance> </data-out> </output-events> <action> <workflow> <app-path>${wf_app_path}</app-path> <configuration> <property> <name>wfInput</name> <value>${coord:dataIn('input')}</value> </property> <property> <name>wfOutput</name> <value>${coord:dataOut('output')}</value> </property> </configuration> </workflow> </action> </coordinator-app>
The ${coord:months(int n)} and ${coord:endOfMonths(int n)} EL functions should be used to handle month based frequencies.
Constant values cannot be used to indicate a month based frequency because the number of days in a month changes month to month and on leap years; plus the number of hours in every day of the month are not always the same for timezones that observe daylight-saving time.
The ${coord:months(int n)} EL function returns the number of minutes for 'n' complete months starting with the month of the current nominal time for which the computation is being done.
The ${coord:months(int n)} EL function includes all the minutes of the current month, regardless of the day of the month of the current nominal time.
Examples:
Starting Nominal UTC time | Timezone | Usage | Value | First Occurrence | Comments |
---|---|---|---|---|---|
2009-01-01T08:00Z | UTC | ${coord:months(1)} | 44640 | 2009-01-01T08:00Z | total minutes for 2009JAN UTC time |
2009-01-01T08:00Z | America/Los_Angeles | ${coord:months(1)} | 44640 | 2009-01-01T08:00Z | total minutes in 2009JAN PST8PDT time |
2009-01-01T08:00Z | America/Los_Angeles | ${coord:months(2)} | 84960 | 2009-01-01T08:00Z | total minutes in 2009JAN and 2009FEB PST8PDT time |
2009-03-08T08:00Z | UTC | ${coord:months(1)} | 44640 | 2009-03-08T08:00Z | total minutes on 2009MAR UTC time |
2009-03-08T08:00Z | Europe/London | ${coord:months(1)} | 44580 | 2009-03-08T08:00Z | total minutes in 2009MAR BST1BDT time (2009MAR29 is DST switch in Europe) |
2009-03-08T08:00Z | America/Los_Angeles | ${coord:months(1)} | 44580 | 2009-03-08T08:00Z | total minutes in 2009MAR PST8PDT time (2009MAR08 is DST switch in the US) |
2009-03-08T08:00Z | UTC | ${coord:months(2)} | 87840 | 2009-03-08T08:00Z | total minutes in 2009MAR and 2009APR UTC time |
2009-03-08T08:00Z | America/Los_Angeles | ${coord:months(2)} | 87780 | 2009-03-08T08:00Z | total minutes in 2009MAR and 2009APR PST8PDT time (2009MAR08 is DST switch in US) |
The ${coord:endOfMonths(int n)} EL function is identical to the ${coord:months(int n)} except that it shifts the first occurrence to the end of the month for the specified timezone before computing the interval in minutes.
Examples:
Starting Nominal UTC time | Timezone | Usage | Value | First Occurrence | Comments |
---|---|---|---|---|---|
2009-01-01T00:00Z | UTC | ${coord:endOfMonths(1)} | 40320 | 2009-02-01T00:00Z | first occurrence in 2009FEB 00:00 UTC time |
2009-01-01T08:00Z | UTC | ${coord:endOfMonths(1)} | 40320 | 2009-02-01T00:00Z | first occurrence in 2009FEB 00:00 UTC time |
2009-01-31T08:00Z | UTC | ${coord:endOfMonths(1)} | 40320 | 2009-02-01T00:00Z | first occurrence in 2009FEB 00:00 UTC time |
2009-01-01T08:00Z | America/Los_Angeles | ${coord:endOfMonths(1)} | 40320 | 2009-02-01T08:00Z | first occurrence in 2009FEB 08:00 UTC time |
2009-02-02T08:00Z | America/Los_Angeles | ${coord:endOfMonths(1)} | 44580 | 2009-03-01T08:00Z | first occurrence in 2009MAR 08:00 UTC time |
2009-02-01T08:00Z | America/Los_Angeles | ${coord:endOfMonths(1)} | 44580 | 2009-03-01T08:00Z | first occurrence in 2009MAR 08:00 UTC time |
<coordinator-app name="hello-coord" frequency="${coord:months(1)}" start="2009-01-02T08:00Z" end="2009-04-02T08:00Z" timezone="America/Los_Angeles" xmlns="uri:oozie:coordinator:0.1"> <controls> <timeout>10</timeout> <concurrency>${concurrency_level}</concurrency> <execution>${execution_order}</execution> <throttle>${materialization_throttle}</throttle> </controls> <datasets> <dataset name="din" frequency="${coord:endOfMonths(1)}" initial-instance="2009-01-02T08:00Z" timezone="America/Los_Angeles"> <uri-template>${baseFsURI}/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template> </dataset> <dataset name="dout" frequency="${coord:minutes(30)}" initial-instance="2009-01-02T08:00Z" timezone="UTC"> <uri-template>${baseFsURI}/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template> </dataset> </datasets> <input-events> <data-in name="input" dataset="din"> <instance>${coord:current(0)}</instance> </data-in> </input-events> <output-events> <data-out name="output" dataset="dout"> <instance>${coord:current(1)}</instance> </data-out> </output-events> <action> <workflow> <app-path>${wf_app_path}</app-path> <configuration> <property> <name>wfInput</name> <value>${coord:dataIn('input')}</value> </property> <property> <name>wfOutput</name> <value>${coord:dataOut('output')}</value> </property> </configuration> </workflow> </action> </coordinator-app>
Oozie has historically allowed only very basic forms of scheduling: You could choose to run jobs separated by a certain number of minutes, hours, days or weeks. That's all. This works fine for processes that need to run continuously all year like building a search index to power an online website.
However, there are a lot of cases that don't fit this model. For example, maybe you want to export data to a reporting system used during the day by business analysts. It would be wasteful to run the jobs when no analyst is going to take advantage of the new information, such as overnight. You might want a policy that says "only run these jobs on weekdays between 6AM and 8PM". Previous versions of Oozie didn't support this kind of complex scheduling policy without requiring multiple identical coordinators. Cron-scheduling improves the user experience in this area, allowing for a lot more flexibility.
Cron is a standard time-based job scheduling mechanism in unix-like operating system. It is used extensively by system administrators to setup jobs and maintain software environment. Cron syntax generally consists of five fields, minutes, hours, date of month, month, and day of week respectively although multiple variations do exist.
<coordinator-app name="cron-coord" frequency="0/10 1/2 * * *" start="${start}" end="${end}" timezone="UTC" xmlns="uri:oozie:coordinator:0.2"> <action> <workflow> <app-path>${workflowAppUri}</app-path> <configuration> <property> <name>jobTracker</name> <value>${jobTracker}</value> </property> <property> <name>nameNode</name> <value>${nameNode}</value> </property> <property> <name>queueName</name> <value>${queueName}</value> </property> </configuration> </workflow> </action> </coordinator-app>
Cron expressions are comprised of 5 required fields. The fields respectively are described as follows:
Field name | Allowed Values | Allowed Special Characters |
---|---|---|
Minutes | 0-59 | , - * / |
Hours | 0-23 | , - * / |
Day-of-month | 1-31 | , - * ? / L W |
Month | 1-12 or JAN-DEC | , - * / |
Day-of-Week | 1-7 or SUN-SAT | , - * ? / L # |
The '?' character is allowed for the day-of-month and day-of-week fields. It is used to specify 'no specific value'. This is useful when you need to specify something in one of the two fields, but not the other.
The '-' character is used to specify ranges For example "10-12" in the hour field means "the hours 10, 11 and 12".
The ',' character is used to specify additional values. For example "MON,WED,FRI" in the day-of-week field means "the days Monday, Wednesday, and Friday".
The '/' character is used to specify increments. For example "0/15" in the minutes field means "the minutes 0, 15, 30, and 45". And "5/15" in the minutes field means "the minutes 5, 20, 35, and 50". Specifying '*' before the '/' is equivalent to specifying 0 is the value to start with. Essentially, for each field in the expression, there is a set of numbers that can be turned on or off. For minutes, the numbers range from 0 to 59. For hours 0 to 23, for days of the month 0 to 31, and for months 1 to 12. The "/" character simply helps you turn on every "nth" value in the given set. Thus "7/6" in the month field only turns on month "7", it does NOT mean every 6th month, please note that subtlety.
The 'L' character is allowed for the day-of-month and day-of-week fields. This character is short-hand for "last", but it has different meaning in each of the two fields. For example, the value "L" in the day-of-month field means "the last day of the month" - day 31 for January, day 28 for February on non-leap years. If used in the day-of-week field by itself, it simply means "7" or "SAT". But if used in the day-of-week field after another value, it means "the last xxx day of the month" - for example "6L" means "the last Friday of the month". You can also specify an offset from the last day of the month, such as "L-3" which would mean the third-to-last day of the calendar month. When using the 'L' option, it is important not to specify lists, or ranges of values, as you'll get confusing/unexpected results.
The 'W' character is allowed for the day-of-month field. This character is used to specify the weekday (Monday-Friday) nearest the given day. As an example, if you were to specify "15W" as the value for the day-of-month field, the meaning is: "the nearest weekday to the 15th of the month". So if the 15th is a Saturday, the trigger will fire on Friday the 14th. If the 15th is a Sunday, the trigger will fire on Monday the 16th. If the 15th is a Tuesday, then it will fire on Tuesday the 15th. However if you specify "1W" as the value for day-of-month, and the 1st is a Saturday, the trigger will fire on Monday the 3rd, as it will not 'jump' over the boundary of a month's days. The 'W' character can only be specified when the day-of-month is a single day, not a range or list of days.
The 'L' and 'W' characters can also be combined for the day-of-month expression to yield 'LW', which translates to "last weekday of the month".
The '#' character is allowed for the day-of-week field. This character is used to specify "the nth" XXX day of the month. For example, the value of "6#3" in the day-of-week field means the third Friday of the month (day 6 = Friday and "#3" = the 3rd one in the month). Other examples: "2#1" = the first Monday of the month and "4#5" = the fifth Wednesday of the month. Note that if you specify "#5" and there is not 5 of the given day-of-week in the month, then no firing will occur that month. If the '#' character is used, there can only be one expression in the day-of-week field ("3#1,6#3" is not valid, since there are two expressions).
The legal characters and the names of months and days of the week are not case sensitive.
If a user specifies an invalid cron syntax to run something on Feb, 30th for example: "0 10 30 2 *", the coordinator job will not be created and an invalid coordinator frequency parse exception will be thrown.
If a user has a coordinator job that materializes no action during run time, for example: frequency of "0 10 * * *" with start time of 2013-10-18T21:00Z and end time of 2013-10-18T22:00Z, the coordinator job submission will be rejected and an invalid coordinator attribute exception will be thrown.
Examples:
Cron Expression | Meaning |
---|---|
10 9 * * * | Runs everyday at 9:10am |
10,30,45 9 * * * | Runs everyday at 9:10am, 9:30am, and 9:45am |
0 * 30 JAN 2-6 | Runs at 0 minute of every hour on weekdays and 30th of January |
0/20 9-17 * * 2-5 | Runs every Mon, Tue, Wed, and Thurs at minutes 0, 20, 40 from 9am to 5pm |
1 2 L-3 * * | Runs every third-to-last day of month at 2:01am |
1 2 6W 3 ? | Runs on the nearest weekday to March, 6th every year at 2:01am |
1 2 * 3 3#2 | Runs every second Tuesday of March at 2:01am every year |
0 10,13 * * MON-FRI | Runs every weekday at 10am and 1pm |
NOTES:
Cron expression and syntax in Oozie are inspired by Quartz:http://quartz-scheduler.org/api/2.0.0/org/quartz/CronExpression.html. However, there is a major difference between Quartz cron and Oozie cron in which Oozie cron doesn't have "Seconds" field since everything in Oozie functions at the minute granularity at most. Everything related to Oozie cron syntax should be based on the documentation in the Oozie documentation.
Cron expression uses oozie server processing timezone. Since default oozie processing timezone is UTC, if you want to run a job on every weekday at 10am in Tokyo, Japan(UTC + 9), your cron expression should be "0 1 * * 2-6" instead of the "0 10 * * 2-6" which you might expect.
Overflowing ranges is supported but strongly discouraged - that is, having a larger number on the left hand side than the right. You might do 22-2 to catch 10 o'clock at night until 2 o'clock in the morning, or you might have NOV-FEB. It is very important to note that overuse of overflowing ranges creates ranges that don't make sense and no effort has been made to determine which interpretation CronExpression chooses. An example would be "0 14-6 ? * FRI-MON".
A dataset is a collection of data referred to by a logical name.
A dataset instance is a particular occurrence of a dataset and it is represented by a unique set of URIs. A dataset instance can be individually referred. Dataset instances for datasets containing ranges are identified by a set of unique URIs, otherwise a dataset instance is identified by a single unique URI.
Datasets are typically defined in some central place for a business domain and can be accessed by the coordinator. Because of this, they can be defined once and used many times.
A dataset is a synchronous (produced at regular time intervals, it has an expected frequency) input.
A dataset instance is considered to be immutable while it is being consumed by coordinator jobs.
Instances of synchronous datasets are produced at regular time intervals, at an expected frequency. They are also referred to as "clocked datasets".
Synchronous dataset instances are identified by their nominal creation time. The nominal creation time is normally specified in the dataset instance URI.
A synchronous dataset definition contains the following information:
The following EL constants can be used within synchronous dataset URI templates:
EL Constant | Resulting Format | Comments |
---|---|---|
YEAR | YYYY | 4 digits representing the year |
MONTH | DD | 2 digits representing the month of the year, January = 1 |
DAY | DD | 2 digits representing the day of the month |
HOUR | HH | 2 digits representing the hour of the day, in 24 hour format, 0 - 23 |
MINUTE | mm | 2 digits representing the minute of the hour, 0 - 59 |
<dataset name="[NAME]" frequency="[FREQUENCY]" initial-instance="[DATETIME]" timezone="[TIMEZONE]"> <uri-template>[URI TEMPLATE]</uri-template> <done-flag>[FILE NAME]</done-flag> </dataset>
IMPORTANT: The values of the EL constants in the dataset URIs (in HDFS) are expected in UTC. Oozie Coordinator takes care of the timezone conversion when performing calculations.
Examples:
1. A dataset produced once every day at 00:15 PST8PDT and done-flag is set to empty:
<dataset name="logs" frequency="${coord:days(1)}" initial-instance="2009-02-15T08:15Z" timezone="America/Los_Angeles"> <uri-template> hdfs://foo:8020/app/logs/${market}/${YEAR}${MONTH}/${DAY}/data </uri-template> <done-flag></done-flag> </dataset>
The dataset would resolve to the following URIs and Coordinator looks for the existence of the directory itself:
[market] will be replaced with user given property. hdfs://foo:8020/usr/app/[market]/2009/02/15/data hdfs://foo:8020/usr/app/[market]/2009/02/16/data hdfs://foo:8020/usr/app/[market]/2009/02/17/data ...
2. A dataset available on the 10th of each month and done-flag is default '_SUCCESS':
<dataset name="stats" frequency="${coord:months(1)}" initial-instance="2009-01-10T10:00Z" timezone="America/Los_Angeles"> <uri-template>hdfs://foo:8020/usr/app/stats/${YEAR}/${MONTH}/data</uri-template> </dataset>
The dataset would resolve to the following URIs:
hdfs://foo:8020/usr/app/stats/2009/01/data hdfs://foo:8020/usr/app/stats/2009/02/data hdfs://foo:8020/usr/app/stats/2009/03/data ...
The dataset instances are not ready until '_SUCCESS' exists in each path:
hdfs://foo:8020/usr/app/stats/2009/01/data/_SUCCESS hdfs://foo:8020/usr/app/stats/2009/02/data/_SUCCESS hdfs://foo:8020/usr/app/stats/2009/03/data/_SUCCESS ...
3. A dataset available at the end of every quarter and done-flag is 'trigger.dat':
<dataset name="stats" frequency="${coord:months(3)}" initial-instance="2009-01-31T20:00Z" timezone="America/Los_Angeles"> <uri-template> hdfs://foo:8020/usr/app/stats/${YEAR}/${MONTH}/data </uri-template> <done-flag>trigger.dat</done-flag> </dataset>
The dataset would resolve to the following URIs:
hdfs://foo:8020/usr/app/stats/2009/01/data hdfs://foo:8020/usr/app/stats/2009/04/data hdfs://foo:8020/usr/app/stats/2009/07/data ...
The dataset instances are not ready until 'trigger.dat' exists in each path:
hdfs://foo:8020/usr/app/stats/2009/01/data/trigger.dat hdfs://foo:8020/usr/app/stats/2009/04/data/trigger.dat hdfs://foo:8020/usr/app/stats/2009/07/data/trigger.dat ...
4. Normally the URI template of a dataset has a precision similar to the frequency:
<dataset name="logs" frequency="${coord:days(1)}" initial-instance="2009-01-01T10:30Z" timezone="America/Los_Angeles"> <uri-template> hdfs://foo:8020/usr/app/logs/${YEAR}/${MONTH}/${DAY}/data </uri-template> </dataset>
The dataset would resolve to the following URIs:
hdfs://foo:8020/usr/app/logs/2009/01/01/data hdfs://foo:8020/usr/app/logs/2009/01/02/data hdfs://foo:8020/usr/app/logs/2009/01/03/data ...
5. However, if the URI template has a finer precision than the dataset frequency:
<dataset name="logs" frequency="${coord:days(1)}" initial-instance="2009-01-01T10:30Z" timezone="America/Los_Angeles"> <uri-template> hdfs://foo:8020/usr/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/data </uri-template> </dataset>
The dataset resolves to the following URIs with fixed values for the finer precision template variables:
hdfs://foo:8020/usr/app/logs/2009/01/01/10/30/data hdfs://foo:8020/usr/app/logs/2009/01/02/10/30/data hdfs://foo:8020/usr/app/logs/2009/01/03/10/30/data ...
Each dataset URI could be a HDFS path URI denoting a HDFS directory: hdfs://foo:8020/usr/logs/20090415 or a HCatalog partition URI identifying a set of table partitions: hcat://bar:8020/logsDB/logsTable/dt=20090415;region=US.
HCatalog enables table and storage management for Pig, Hive and MapReduce. The format to specify a HCatalog table partition URI is hcat://[metastore server]:[port]/[database name]/[table name]/[partkey1]=[value];[partkey2]=[value];...
For example,
<dataset name="logs" frequency="${coord:days(1)}" initial-instance="2009-02-15T08:15Z" timezone="America/Los_Angeles"> <uri-template> hcat://myhcatmetastore:9080/database1/table1/myfirstpartitionkey=myfirstvalue;mysecondpartitionkey=mysecondvalue </uri-template> <done-flag></done-flag> </dataset>
Dataset definitions are grouped in XML files. *IMPORTANT:* Please note that if an XML namespace version is specified for the coordinator-app element in the coordinator.xml file, no namespace needs to be defined separately for the datasets element (even if the dataset is defined in a separate file). Specifying it at multiple places might result in xml errors while submitting the coordinator job.
Syntax:
<!-- Synchronous datasets --> <datasets> <include>[SHARED_DATASETS]</include> ... <dataset name="[NAME]" frequency="[FREQUENCY]" initial-instance="[DATETIME]" timezone="[TIMEZONE]"> <uri-template>[URI TEMPLATE]</uri-template> </dataset> ... </datasets>
Example:
<datasets> . <include>hdfs://foo:8020/app/dataset-definitions/globallogs.xml</include> . <dataset name="logs" frequency="${coord:hours(12)}" initial-instance="2009-02-15T08:15Z" timezone="Americas/Los_Angeles"> <uri-template> hdfs://foo:8020/app/logs/${market}/${YEAR}${MONTH}/${DAY}/${HOUR}/${MINUTE}/data </uri-template> </dataset> . <dataset name="stats" frequency="${coord:months(1)}" initial-instance="2009-01-10T10:00Z" timezone="Americas/Los_Angeles"> <uri-template>hdfs://foo:8020/usr/app/stats/${YEAR}/${MONTH}/data</uri-template> </dataset> . </datasets>
A coordinator application is a program that triggers actions (commonly workflow jobs) when a set of conditions are met. Conditions can be a time frequency, the availability of new dataset instances or other external events.
Types of coordinator applications:
Coordinator applications are normally parameterized.
To create a coordinator job, a job configuration that resolves all coordinator application parameters must be provided to the coordinator engine.
A coordinator job is a running instance of a coordinator application running from a start time to an end time. The start time must be earlier than the end time.
At any time, a coordinator job is in one of the following status: PREP, RUNNING, RUNNINGWITHERROR, PREPSUSPENDED, SUSPENDED, SUSPENDEDWITHERROR, PREPPAUSED, PAUSED, PAUSEDWITHERROR, SUCCEEDED, DONEWITHERROR, KILLED, FAILED .
Valid coordinator job status transitions are:
When a coordinator job is submitted, oozie parses the coordinator job XML. Oozie then creates a record for the coordinator with status PREP and returns a unique ID. The coordinator is also started immediately if pause time is not set.
When a user requests to suspend a coordinator job that is in PREP state, oozie puts the job in status PREPSUSPENDED . Similarly, when pause time reaches for a coordinator job with PREP status, oozie puts the job in status PREPPAUSED .
Conversely, when a user requests to resume a PREPSUSPENDED coordinator job, oozie puts the job in status PREP . And when pause time is reset for a coordinator job and job status is PREPPAUSED , oozie puts the job in status PREP .
When a coordinator job starts, oozie puts the job in status RUNNING and start materializing workflow jobs based on job frequency. If any workflow job goes to FAILED/KILLED/TIMEDOUT state, the coordinator job is put in RUNNINGWITHERROR
When a user requests to kill a coordinator job, oozie puts the job in status KILLED and it sends kill to all submitted workflow jobs.
When a user requests to suspend a coordinator job that is in RUNNING status, oozie puts the job in status SUSPENDED and it suspends all submitted workflow jobs. Similarly, when a user requests to suspend a coordinator job that is in RUNNINGWITHERROR status, oozie puts the job in status SUSPENDEDWITHERROR and it suspends all submitted workflow jobs.
When pause time reaches for a coordinator job that is in RUNNING status, oozie puts the job in status PAUSED . Similarly, when pause time reaches for a coordinator job that is in RUNNINGWITHERROR status, oozie puts the job in status PAUSEDWITHERROR .
Conversely, when a user requests to resume a SUSPENDED coordinator job, oozie puts the job in status RUNNING . Also, when a user requests to resume a SUSPENDEDWITHERROR coordinator job, oozie puts the job in status RUNNINGWITHERROR . And when pause time is reset for a coordinator job and job status is PAUSED , oozie puts the job in status RUNNING . Also, when the pause time is reset for a coordinator job and job status is PAUSEDWITHERROR , oozie puts the job in status RUNNINGWITHERROR
A coordinator job creates workflow jobs (commonly coordinator actions) only for the duration of the coordinator job and only if the coordinator job is in RUNNING status. If the coordinator job has been suspended, when resumed it will create all the coordinator actions that should have been created during the time it was suspended, actions will not be lost, they will delayed.
When the coordinator job materialization finishes and all workflow jobs finish, oozie updates the coordinator status accordingly. For example, if all workflows are SUCCEEDED , oozie puts the coordinator job into SUCCEEDED status. If all workflows are FAILED , oozie puts the coordinator job into FAILED status. If all workflows are KILLED , the coordinator job status changes to KILLED. However, if any workflow job finishes with not SUCCEEDED and combination of KILLED , FAILED or *TIMEOUT*, oozie puts the coordinator job into DONEWITHERROR . If all coordinator actions are TIMEDOUT , oozie puts the coordinator job into DONEWITHERROR .
A coordinator job in FAILED or KILLED status can be changed to IGNORED status. A coordinator job in IGNORED status can be changed to RUNNING status.
A coordinator job creates and executes coordinator actions.
A coordinator action is normally a workflow job that consumes and produces dataset instances.
Once an coordinator action is created (this is also referred as the action being materialized), the coordinator action will be in waiting until all required inputs for execution are satisfied or until the waiting times out.
A coordinator job has one driver event that determines the creation (materialization) of its coordinator actions (typically a workflow job).
Once a coordinator action has been created (materialized) the coordinator action qualifies for execution. At this point, the action status is WAITING .
A coordinator action in WAITING status must wait until all its input events are available before is ready for execution. When a coordinator action is ready for execution its status is READY .
A coordinator action in WAITING status may timeout before it becomes ready for execution. Then the action status is TIMEDOUT .
A coordinator action may remain in READY status for a while, without starting execution, due to the concurrency execution policies of the coordinator job.
A coordinator action in READY or WAITING status changes to SKIPPED status if the execution strategy is LAST_ONLY and the current time is past the next action's nominal time. See section 6.3 for more details.
A coordinator action in READY or WAITING status changes to SKIPPED status if the execution strategy is NONE and the current time is past the action's nominal time + 1 minute. See section 6.3 for more details.
A coordinator action in READY status changes to SUBMITTED status if total current RUNNING and SUBMITTED actions are less than concurrency execution limit.
A coordinator action in SUBMITTED status changes to RUNNING status when the workflow engine start execution of the coordinator action.
A coordinator action is in RUNNING status until the associated workflow job completes its execution. Depending on the workflow job completion status, the coordinator action will be in SUCCEEDED , KILLED or FAILED status.
A coordinator action in WAITING , READY , SUBMITTED or RUNNING status can be killed, changing to KILLED status.
A coordinator action in SUBMITTED or RUNNING status can also fail, changing to FAILED status.
A coordinator action in FAILED , KILLED , or TIMEDOUT status can be changed to IGNORED status. A coordinator action in IGNORED status can be rerun, changing to WAITING status.
Valid coordinator action status transitions are:
The Input events of a coordinator application specify the input conditions that are required in order to execute a coordinator action.
In the current specification input events are restricted to dataset instances availability.
All the datasets instances defined as input events must be available for the coordinator action to be ready for execution ( READY status).
Input events are normally parameterized. For example, the last 24 hourly instances of the 'searchlogs' dataset.
Input events can be refer to multiple instances of multiple datasets. For example, the last 24 hourly instances of the 'searchlogs' dataset and the last weekly instance of the 'celebrityRumours' dataset.
A coordinator action can produce one or more dataset(s) instances as output.
Dataset instances produced as output by one coordinator actions may be consumed as input by another coordinator action(s) of other coordinator job(s).
The chaining of coordinator jobs via the datasets they produce and consume is referred as a data pipeline.
In the current specification coordinator job output events are restricted to dataset instances.
The execution policies for the actions of a coordinator job can be defined in the coordinator application.
Commonly, multiple workflow applications are chained together to form a more complex application.
Workflow applications are run on regular basis, each of one of them at their own frequency. The data consumed and produced by these workflow applications is relative to the nominal time of workflow job that is processing the data. This is a coordinator application .
The output of multiple workflow jobs of a single workflow application is then consumed by a single workflow job of another workflow application, this is done on regular basis as well. These workflow jobs are triggered by recurrent actions of coordinator jobs. This is a set of coordinator jobs that inter-depend on each other via the data they produce and consume.
This set of interdependent coordinator applications is referred as a data pipeline application .
This example contains describes all the components that conform a data pipeline: datasets, coordinator jobs and coordinator actions (workflows).
The coordinator actions (the workflows) are completely agnostic of datasets and their frequencies, they just use them as input and output data (i.e. HDFS files or directories). Furthermore, as the example shows, the same workflow can be used to process similar datasets of different frequencies.
The frequency of the hourlyRevenue-coord coordinator job is 1 hour, this means that every hour a coordinator action is created. A coordinator action will be executed only when the 4 checkouts dataset instances for the corresponding last hour are available, until then the coordinator action will remain as created (materialized), in WAITING status. Once the 4 dataset instances for the corresponding last hour are available, the coordinator action will be executed and it will start a revenueCalculator-wf workflow job.
A synchronous coordinator definition is a is defined by a name, start time and end time, the frequency of creation of its coordinator actions, the input events, the output events and action control information:
LAST_ONLY: While FIFO and LIFO simply specify the order in which READY actions should be executed, LAST_ONLY can actually cause some actions to be SKIPPED and is a little harder to understand. When LAST_ONLY is set, an action that is WAITING or READY will be SKIPPED when the current time is past the next action's nominal time. For example, suppose action 1 and 2 are both WAITING , the current time is 5:00pm, and action 2's nominal time is 5:10pm. In 10 minutes from now, at 5:10pm, action 1 will become SKIPPED, assuming it doesn't transition to SUBMITTED (or a terminal state) before then. Another way of thinking about this is to view it as similar to setting the timeout equal to the frequency , except that the SKIPPED status doesn't cause the coordinator job to eventually become DONEWITHERROR and can actually become SUCCEEDED (i.e. it's a "good" version of TIMEDOUT ). LAST_ONLY is useful if you want a recurring job, but do not actually care about the individual instances and just always want the latest action. For example, if you have a coordinator running every 10 minutes and take Oozie down for 1 hour, when Oozie comes back, there would normally be 6 actions READY to run. However, with LAST_ONLY , only the current one will go to SUBMITTED and RUNNING ; the others will go to SKIPPED.
NONE: Similar to LAST_ONLY except all older materializations are skipped. When NONE is set, an action that is WAITING or READY will be SKIPPED when the current time is more than a certain configured number of minutes (tolerance) past the action's nominal time. By default, the threshold is 1 minute. For example, suppose action 1 and 2 are both WAITING , the current time is 5:20pm, and both actions' nominal times are before 5:19pm. Both actions will become SKIPPED, assuming they don't transition to SUBMITTED (or a terminal state) before then. Another way of thinking about this is to view it as similar to setting the timeout equal to 1 minute which is the smallest time unit, except that the SKIPPED status doesn't cause the coordinator job to eventually become DONEWITHERROR and can actually become SUCCEEDED (i.e. it's a "good" version of TIMEDOUT ).
Syntax:
<coordinator-app name="[NAME]" frequency="[FREQUENCY]" start="[DATETIME]" end="[DATETIME]" timezone="[TIMEZONE]" xmlns="uri:oozie:coordinator:0.1"> <controls> <timeout>[TIME_PERIOD]</timeout> <concurrency>[CONCURRENCY]</concurrency> <execution>[EXECUTION_STRATEGY]</execution> </controls> . <datasets> <include>[SHARED_DATASETS]</include> ... . <!-- Synchronous datasets --> <dataset name="[NAME]" frequency="[FREQUENCY]" initial-instance="[DATETIME]" timezone="[TIMEZONE]"> <uri-template>[URI_TEMPLATE]</uri-template> </dataset> ... . </datasets> . <input-events> <data-in name="[NAME]" dataset="[DATASET]"> <instance>[INSTANCE]</instance> ... </data-in> ... <data-in name="[NAME]" dataset="[DATASET]"> <start-instance>[INSTANCE]</start-instance> <end-instance>[INSTANCE]</end-instance> </data-in> ... </input-events> <output-events> <data-out name="[NAME]" dataset="[DATASET]"> <instance>[INSTANCE]</instance> </data-out> ... </output-events> <action> <workflow> <app-path>[WF-APPLICATION-PATH]</app-path> <configuration> <property> <name>[PROPERTY-NAME]</name> <value>[PROPERTY-VALUE]</value> </property> ... </configuration> </workflow> </action> </coordinator-app>
Examples:
1. A Coordinator Job that creates an executes a single coordinator action:
The following example describes a synchronous coordinator application that runs once a day for 1 day at the end of the day. It consumes an instance of a daily 'logs' dataset and produces an instance of a daily 'siteAccessStats' dataset.
Coordinator application definition:
<coordinator-app name="hello-coord" frequency="${coord:days(1)}" start="2009-01-02T08:00Z" end="2009-01-02T08:00Z" timezone="America/Los_Angeles" xmlns="uri:oozie:coordinator:0.1"> <datasets> <dataset name="logs" frequency="${coord:days(1)}" initial-instance="2009-01-02T08:00Z" timezone="America/Los_Angeles"> <uri-template>hdfs://bar:8020/app/logs/${YEAR}${MONTH}/${DAY}/data</uri-template> </dataset> <dataset name="siteAccessStats" frequency="${coord:days(1)}" initial-instance="2009-01-02T08:00Z" timezone="America/Los_Angeles"> <uri-template>hdfs://bar:8020/app/stats/${YEAR}/${MONTH}/${DAY}/data</uri-template> </dataset> </datasets> <input-events> <data-in name="input" dataset="logs"> <instance>2009-01-02T08:00Z</instance> </data-in> </input-events> <output-events> <data-out name="output" dataset="siteAccessStats"> <instance>2009-01-02T08:00Z</instance> </data-out> </output-events> <action> <workflow> <app-path>hdfs://bar:8020/usr/joe/logsprocessor-wf</app-path> <configuration> <property> <name>wfInput</name> <value>${coord:dataIn('input')}</value> </property> <property> <name>wfOutput</name> <value>${coord:dataOut('output')}</value> </property> </configuration> </workflow> </action> </coordinator-app>
There are 2 synchronous datasets with a daily frequency and they are expected at the end of each PST8PDT day.
This coordinator job runs for 1 day on January 1st 2009 at 24:00 PST8PDT.
The workflow job invocation for the single coordinator action would resolve to:
<workflow> <app-path>hdfs://bar:8020/usr/joe/logsprocessor-wf</app-path> <configuration> <property> <name>wfInput</name> <value>hdfs://bar:8020/app/logs/200901/02/data</value> </property> <property> <name>wfOutput</name> <value>hdfs://bar:8020/app/stats/2009/01/02/data</value> </property> </configuration> </workflow>
IMPORTANT: Note Oozie works in UTC datetimes, all URI templates resolve to UTC datetime values. Because of the timezone difference between UTC and PST8PDT, the URIs resolves to 2009-01-02T08:00Z (UTC) which is equivalent to 2009-01-01T24:00PST8PDT= (PST).
There is single input event, which resolves to January 1st PST8PDT instance of the 'logs' dataset. There is single output event, which resolves to January 1st PST8PDT instance of the 'siteAccessStats' dataset.
The ${coord:dataIn(String name)} and ${coord:dataOut(String name)} EL functions resolve to the dataset instance URIs of the corresponding dataset instances. These EL functions are properly defined in a subsequent section.
Because the ${coord:dataIn(String name)} and ${coord:dataOut(String name)} EL functions resolve to URIs, which are HDFS URIs, the workflow job itself does not deal with dataset instances, just HDFS URIs.
2. A Coordinator Job that executes its coordinator action multiple times:
A more realistic version of the previous example would be a coordinator job that runs for a year creating a daily action an consuming the daily 'logs' dataset instance and producing the daily 'siteAccessStats' dataset instance.
The coordinator application is identical, except for the frequency, 'end' date and parameterization in the input and output events sections:
<coordinator-app name="hello-coord" frequency="${coord:days(1)}" start="2009-01-02T08:00Z" end="2010-01-02T08:00Z" timezone="America/Los_Angeles" xmlns="uri:oozie:coordinator:0.1"> <datasets> <dataset name="logs" frequency="${coord:days(1)}" initial-instance="2009-01-02T08:00Z" timezone="America/Los_Angeles"> <uri-template>hdfs://bar:8020/app/logs/${YEAR}${MONTH}/${DAY}/data</uri-template> </dataset> <dataset name="siteAccessStats" frequency="${coord:days(1)}" initial-instance="2009-01-02T08:00Z" timezone="America/Los_Angeles"> <uri-template>hdfs://bar:8020/app/stats/${YEAR}/${MONTH}/${DAY}/data</uri-template> </dataset> </datasets> <input-events> <data-in name="input" dataset="logs"> <instance>${coord:current(0)}</instance> </data-in> </input-events> <output-events> <data-out name="output" dataset="siteAccessStats"> <instance>${coord:current(0)}</instance> </data-out> </output-events> <action> <workflow> <app-path>hdfs://bar:8020/usr/joe/logsprocessor-wf</app-path> <configuration> <property> <name>wfInput</name> <value>${coord:dataIn('input')}</value> </property> <property> <name>wfOutput</name> <value>${coord:dataOut('output')}</value> </property> </configuration> </workflow> </action> </coordinator-app>
The ${coord:current(int offset)} EL function resolves to coordinator action creation time, that would be the current day at the time the coordinator action is created: 2009-01-02T08:00 ... 2010-01-01T08:00 . This EL function is properly defined in a subsequent section.
There is single input event, which resolves to the current day instance of the 'logs' dataset.
There is single output event, which resolves to the current day instance of the 'siteAccessStats' dataset.
The workflow job invocation for the first coordinator action would resolve to:
<workflow> <app-path>hdfs://bar:8020/usr/joe/logsprocessor-wf</app-path> <configuration> <property> <name>wfInput</name> <value>hdfs://bar:8020/app/logs/200901/02/data</value> </property> <property> <name>wfOutput</name> <value>hdfs://bar:8020/app/stats/2009/01/02/data</value> </property> </configuration> </workflow>
For the second coordinator action it would resolve to:
<workflow> <app-path>hdfs://bar:8020/usr/joe/logsprocessor-wf</app-path> <configuration> <property> <name>wfInput</name> <value>hdfs://bar:8020/app/logs/200901/03/data</value> </property> <property> <name>wfOutput</name> <value>hdfs://bar:8020/app/stats/2009/01/03/data</value> </property> </configuration> </workflow>
And so on.
3. A Coordinator Job that executes its coordinator action multiple times and as input takes multiple dataset instances:
The following example is a variation of the example #2 where the synchronous coordinator application runs weekly. It consumes the of the last 7 instances of a daily 'logs' dataset and produces an instance of a weekly 'weeklySiteAccessStats' dataset.
'logs' is a synchronous dataset with a daily frequency and it is expected at the end of each day (24:00).
'weeklystats' is a synchronous dataset with a weekly frequency and it is expected at the end (24:00) of every 7th day.
The coordinator application frequency is weekly and it starts on the 7th day of the year:
<coordinator-app name="hello2-coord" frequency="${coord:days(7)}" start="2009-01-07T24:00Z" end="2009-12-12T24:00Z" timezone="UTC" xmlns="uri:oozie:coordinator:0.1"> <datasets> <dataset name="logs" frequency="${coord:days(1)}" initial-instance="2009-01-01T24:00Z" timezone="UTC"> <uri-template>hdfs://bar:8020/app/logs/${YEAR}${MONTH}/${DAY}</uri-template> </dataset> <dataset name="weeklySiteAccessStats" frequency="${coord:days(7)}" initial-instance="2009-01-07T24:00Z" timezone="UTC"> <uri-template>hdfs://bar:8020/app/weeklystats/${YEAR}/${MONTH}/${DAY}</uri-template> </dataset> </datasets> <input-events> <data-in name="input" dataset="logs"> <start-instance>${coord:current(-6)}</start-instance> <end-instance>${coord:current(0)}</end-instance> </data-in> </input-events> <output-events> <data-out name="output" dataset="siteAccessStats"> <instance>${coord:current(0)}</instance> </data-out> </output-events> <action> <workflow> <app-path>hdfs://bar:8020/usr/joe/logsprocessor-wf</app-path> <configuration> <property> <name>wfInput</name> <value>${coord:dataIn('input')}</value> </property> <property> <name>wfOutput</name> <value>${coord:dataOut('output')}</value> </property> </configuration> </workflow> </action> </coordinator-app>
The ${coord:current(int offset)} EL function resolves to coordinator action creation time minus the specified offset multiplied by the dataset frequency. This EL function is properly defined in a subsequent section.
The input event, instead resolving to a single 'logs' dataset instance, it refers to a range of 7 dataset instances - the instance for 6 days ago, 5 days ago, ... and today's instance.
The output event resolves to the current day instance of the 'weeklySiteAccessStats' dataset. As the coordinator job will create a coordinator action every 7 days, dataset instances for the 'weeklySiteAccessStats' dataset will be created every 7 days.
The workflow job invocation for the first coordinator action would resolve to:
<workflow> <app-path>hdfs://bar:8020/usr/joe/logsprocessor-wf</app-path> <configuration> <property> <name>wfInput</name> <value> hdfs://bar:8020/app/logs/200901/01,hdfs://bar:8020/app/logs/200901/02, hdfs://bar:8020/app/logs/200901/03,hdfs://bar:8020/app/logs/200901/05, hdfs://bar:8020/app/logs/200901/05,hdfs://bar:8020/app/logs/200901/06, hdfs://bar:8020/app/logs/200901/07 </value> </property> <property> <name>wfOutput</name> <value>hdfs://bar:8020/app/stats/2009/01/07</value> </property> </configuration> </workflow>
For the second coordinator action it would resolve to:
<workflow> <app-path>hdfs://bar:8020/usr/joe/logsprocessor-wf</app-path> <configuration> <property> <name>wfInput</name> <value> hdfs://bar:8020/app/logs/200901/08,hdfs://bar:8020/app/logs/200901/09, hdfs://bar:8020/app/logs/200901/10,hdfs://bar:8020/app/logs/200901/11, hdfs://bar:8020/app/logs/200901/12,hdfs://bar:8020/app/logs/200901/13, hdfs://bar:8020/app/logs/200901/16 </value> </property> <property> <name>wfOutput</name> <value>hdfs://bar:8020/app/stats/2009/01/16</value> </property> </configuration> </workflow>
And so on.
When a coordinator job is submitted to Oozie, the submitter may specify as many coordinator job configuration properties as required (similar to Hadoop JobConf properties).
Configuration properties that are a valid Java identifier, [A-Za-z_][0-9A-Za-z_]*, are available as ${NAME} variables within the coordinator application definition.
Configuration Properties that are not a valid Java identifier, for example job.tracker , are available via the ${coord:conf(String name)} function. Valid Java identifier properties are available via this function as well.
Using properties that are valid Java identifiers result in a more readable and compact definition.
Dataset definitions can be also parameterized, the parameters are resolved using the configuration properties of Job configuration used to submit the coordinator job.
If a configuration property used in the definitions is not provided with the job configuration used to submit a coordinator job, the value of the parameter will be undefined and the job submission will fail.
Example:
Coordinator application definition:
<coordinator-app name="app-coord" frequency="${coord:days(1)}" start="${jobStart}" end="${jobEnd}" timezone="${timezone}" xmlns="uri:oozie:coordinator:0.1"> <datasets> <dataset name="logs" frequency="${coord:hours(1)}" initial-instance="${logsInitialInstance}" timezone="${timezone}"> <uri-template> hdfs://bar:8020/app/logs/${market}/${language}/${YEAR}${MONTH}/${DAY}/${HOUR} </uri-template> </dataset> </datasets> <input-events> <data-in name="input" dataset="logs"> <start-instance>${coord:current(-23)}</start-instance> <end-instance>${coord:current(0)}</end-instance> </data-in> </input-events> <action> <workflow> ... </workflow> </action> </coordinator-app>
In the above example there are 6 configuration parameters (variables) that have to be provided when submitting a job:
IMPORTANT: Note that this example is not completely correct as it always consumes the last 24 instances of the 'logs' dataset. It is assumed that all days have 24 hours. For timezones that observe daylight saving this application will not work as expected as it will consume the wrong number of dataset instances in DST switch days. To be able to handle these scenarios, the ${coord:hoursInDays(int n)} and ${coord:daysInMonths(int n)} EL functions must be used (refer to section #6.6.2 and #6.6.3).
If the above 6 properties are not specified, the job will fail.
As of schema 0.4, a list of formal parameters can be provided which will allow Oozie to verify, at submission time, that said properties are actually specified (i.e. before the job is executed and fails). Default values can also be provided.
Example:
The previous parameterized coordinator application definition with formal parameters:
<coordinator-app name="app-coord" frequency="${coord:days(1)}" start="${jobStart}" end="${jobEnd}" timezone="${timezone}" xmlns="uri:oozie:coordinator:0.1"> <parameters> <property> <name>jobStart</name> </property> <property> <name>jobEnd</name> <value>2012-12-01T22:00Z</value> </property> </parameters> <datasets> <dataset name="logs" frequency="${coord:hours(1)}" initial-instance="${logsInitialInstance}" timezone="${timezone}"> <uri-template> hdfs://bar:8020/app/logs/${market}/${language}/${YEAR}${MONTH}/${DAY}/${HOUR} </uri-template> </dataset> </datasets> <input-events> <data-in name="input" dataset="logs"> <start-instance>${coord:current(-23)}</start-instance> <end-instance>${coord:current(0)}</end-instance> </data-in> </input-events> <action> <workflow> ... </workflow> </action> </coordinator-app>
In the above example, if jobStart is not specified, Oozie will print an error message instead of submitting the job. If =jobEnd= is not specified, Oozie will use the default value, 2012-12-01T22:00Z .
A coordinator application job typically launches several coordinator actions during its lifetime. A coordinator action typically uses its creation (materialization) time to resolve the specific datasets instances required for its input and output events.
The following EL functions are the means for binding the coordinator action creation time to the datasets instances of its input and output events.
${coord:current(int n)} represents the nth dataset instance for a synchronous dataset, relative to the coordinator action creation (materialization) time. The coordinator action creation (materialization) time is computed based on the coordinator job start time and its frequency. The nth dataset instance is computed based on the dataset's initial-instance datetime, its frequency and the (current) coordinator action creation (materialization) time.
n can be a negative integer, zero or a positive integer.
${coord:current(int n)} returns the nominal datetime for nth dataset instance relative to the coordinator action creation (materialization) time.
${coord:current(int n)} performs the following calculation:
DS_II : dataset initial-instance (datetime) DS_FREQ: dataset frequency (minutes) CA_NT: coordinator action creation (materialization) nominal timecoord:current(int n) = DS_II + DS_FREQ * ( (CA_NT - DS_II) div DS_FREQ + n)
NOTE: The formula above is not 100% correct, because DST changes the calculation has to account for hour shifts. Oozie Coordinator must make the correct calculation accounting for DST hour shifts.
When a positive integer is used with the ${coord:current(int n)} , it refers to a dataset instance in the future from the coordinator action creation (materialization) time. This can be useful when creating dataset instances for future use by other systems.
The datetime returned by ${coord:current(int n)} returns the exact datetime for the computed dataset instance.
IMPORTANT: The coordinator engine does use output events to keep track of new dataset instances. Workflow jobs triggered from coordinator actions can leverage the coordinator engine capability to synthesize dataset instances URIs to create output directories.
Examples:
1. *=${coord:current(int n)}= datetime calculation:*
Datasets Definition:
<datasets> . <dataset name="logs" frequency="${coord:days(1)}" initial-instance="2009-01-01T24:00Z" timezone="UTC"> <uri-template>hdfs://bar:8020/app/logs/${YEAR}${MONTH}/${DAY}</uri-template> </dataset> . <dataset name="weeklySiteAccessStats" frequency="${coord:days(7)}" initial-instance="2009-01-07T24:00Z" timezone="UTC"> <uri-template>hdfs://bar:8020/app/weeklystats/${YEAR}/${MONTH}/${DAY}</uri-template> </dataset> . </datasets>
For a coordinator action creation time: 2009-05-29T24:00Z the ${coord:current(int n)} EL function would resolve to the following datetime values for the 'logs' and 'weeklySiteStats' datasets:
${coord:current(int offset)} | Dataset 'logs' | Dataset 'weeklySiteAccessStats' |
---|---|---|
${coord:current(0)} | 2009-05-29T24:00Z | 2009-05-27T24:00Z |
${coord:current(1)} | 2009-05-30T24:00Z | 2009-06-03T24:00Z |
${coord:current(-1)} | 2009-05-28T24:00Z | 2009-05-20T24:00Z |
${coord:current(-3)} | 2009-05-26T24:00Z | 2009-05-06T24:00Z |
Datasets Definition file 'datasets.xml':
<datasets> <dataset name="logs" frequency="${coord:hours(1)}" initial-instance="2009-01-01T01:00Z" timezone="UTC"> <uri-template>hdfs://bar:8020/app/logs/${YEAR}${MONTH}/${DAY}/${HOUR}</uri-template> </dataset> </datasets>
a. Coordinator application definition that creates a coordinator action once a day for a year, that is 365 coordinator actions:
<coordinator-app name="app-coord" frequency="${coord:days(1)}" start="2009-01-01T24:00Z" end="2009-12-31T24:00Z" timezone="UTC" xmlns="uri:oozie:coordinator:0.1"> <datasets> <include>hdfs://foo:8020/app/dataset-definitions/datasets.xml</include> </datasets> <input-events> <data-in name="input" dataset="logs"> <start-instance>${coord:current(-23)}</start-instance> <end-instance>${coord:current(0)}</end-instance> </data-in> </input-events> <action> <workflow> ... </workflow> </action> </coordinator-app>
Each coordinator action will require as input events the last 24 (-23 to 0) dataset instances for the 'logs' dataset. Because the dataset 'logs' is a hourly dataset, it means all its instances for the last 24 hours.
In this case, the dataset instances are used in a rolling window fashion.
b. Coordinator application definition that creates a coordinator action once an hour for a year, that is 8760 (24*8760) coordinator actions:
<coordinator-app name="app-coord" frequency="${coord:hours(1)}" start="2009-01-01T24:00Z" end="2009-12-31T24:00Z" timezone="UTC" xmlns="uri:oozie:coordinator:0.1"> <datasets> <include>hdfs://foo:8020/app/dataset-definitions/datasets.xml</include> </datasets> <input-events> <data-in name="input" dataset="logs"> <start-instance>${coord:current(-23)}</start-instance> <end-instance>${coord:current(0)}</end-instance> </data-in> </input-events> <action> <workflow> ... </workflow> </action> </coordinator-app>
Each coordinator action will require as input events the last 24 (-23 to 0) dataset instances for the 'logs' dataset. Similarly to the previous coordinator application example, it means all its instances for the last 24 hours.
However, because the frequency is hourly instead of daily, each coordinator action will use the last 23 dataset instances used by the previous coordinator action plus a new one.
In this case, the dataset instances are used in a sliding window fashion.
3. Using ${coord:current(int n)} to specify dataset instances created by a coordinator application:
Datasets Definition file 'datasets.xml':
<datasets> . <dataset name="logs" frequency="${coord:hours(1)}" initial-instance="2009-01-01T01:00Z" timezone="UTC"> <uri-template>hdfs://bar:8020/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template> </dataset> . <dataset name="stats" frequency="${coord:days(1)}" initial-instance="2009-01-01T24:00Z" timezone="UTC"> <uri-template>hdfs://bar:8020/app/logs/${YEAR}/${MONTH}/${DAY}</uri-template> </dataset> . </datasets>
Coordinator application definition:
<coordinator-app name="app-coord" frequency="${coord:days(1)}" start="2009-01-01T24:00Z" end="2009-12-31T24:00Z" timezone="UTC" xmlns="uri:oozie:coordinator:0.1"> <datasets> <include>hdfs://foo:8020/app/dataset-definitions/datasets.xml</include> </datasets> <input-events> <data-in name="input" dataset="logs"> <start-instance>${coord:current(-23)}</start-instance> <end-instance>${coord:current(0)}</end-instance> </data-in> </input-events> <output-events> <data-out name="output" dataset="stats"> <instance>${coord:current(0)}</instance> </data-out> </output-events> <action> <workflow> ... </workflow> </action> </coordinator-app>
This coordinator application creates a coordinator action once a day for a year, this is 365 coordinator actions.
Each coordinator action will require as input events the last 24 (-23 to 0) dataset instances for the 'logs' dataset.
Each coordinator action will create as output event a new dataset instance for the 'stats' dataset.
Note that the 'stats' dataset initial-instance and frequency match the coordinator application start and frequency.
4. Using ${coord:current(int n)} to create a data-pipeline using a coordinator application:
This example shows how to chain together coordinator applications to create a data pipeline.
Dataset definitions file 'datasets.xml':
<!--- Dataset A - produced every 15 minutes. --> . <dataset name="15MinLogs" frequency="${coord:minutes(15)}" initial-instance="2009-01-01T00:15:00Z" timezone="UTC"> <uri-template>hdfs://bar:8020/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template> </dataset> . <dataset name="1HourLogs" frequency="${coord:hours(1)}" initial-instance="2009-01-01T01:00:00Z" timezone="UTC"> <uri-template>hdfs://bar:8020/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template> </dataset> . <dataset name="1DayLogs" frequency="${coord:hours(24)}" initial-instance="2009-01-01T24:00:00Z" timezone="UTC"> <uri-template>hdfs://bar:8020/app/logs/${YEAR}/${MONTH}/${DAY}</uri-template> </dataset>
Coordinator application definitions. A data-pipeline with two coordinator-applications, one scheduled to run every hour, and another scheduled to run every day:
<coordinator-app name="app-coord-hourly" frequency="${coord:hours(1)}" start="2009-01-01T01:00Z" end="2009-12-31T24:00Z" timezone="UTC" xmlns="uri:oozie:coordinator:0.1"> <datasets> <include>hdfs://foo:8020/app/dataset-definitions/datasets.xml</include> </datasets> <input-events> <data-in name="input" dataset="15MinLogs"> <start-instance>${coord:current(-3)}</start-instance> <end-instance>${coord:current(0)}</end-instance> </data-in> </input-events> <output-events> <data-out name="output" dataset="1HourLogs"> <instance>${coord:current(0)}</instance> </data-out> </output-events> <action> <workflow> ... </workflow> </action> </coordinator-app>
<coordinator-app name="app-coord-daily" frequency="${coord:days(1)}" start="2009-01-01T24:00Z" end="2009-12-31T24:00Z" timezone="UTC" xmlns="uri:oozie:coordinator:0.1"> <datasets> <include>hdfs://foo:8020/app/dataset-definitions/datasets.xml</include> </datasets> <input-events> <data-in name="input" dataset="1HourLogs"> <start-instance>${coord:current(-23)}</start-instance> <end-instance>${coord:current(0)}</end-instance> </data-in> </input-events> <output-events> <data-out name="output" dataset="1DayLogs"> <instance>${coord:current(0)}</instance> </data-out> </output-events> <action> <workflow> ... </workflow> </action> </coordinator-app>
The 'app-coord-hourly' coordinator application runs every every hour, uses 4 instances of the dataset "15MinLogs" to create one instance of the dataset "1HourLogs"
The 'app-coord-daily' coordinator application runs every every day, uses 24 instances of "1HourLogs" to create one instance of "1DayLogs"
The output datasets from the 'app-coord-hourly' coordinator application are the input to the 'app-coord-daily' coordinator application thereby forming a simple data-pipeline application.
${coord:offset(int n, String timeUnit)} represents the nth timeUnit, relative to the coordinator action creation (materialization) time. The coordinator action creation (materialization) time is computed based on the coordinator job start time and its frequency.
It is an alternative to the ${coord:current(int n)} command (see previous section) and can be used anywhere =${coord:current(int n)}= can be used. The difference between the two functions is that ${coord:current(int n)} computes an offset based on the nth multiple of the frequency, while ${coord:offset(int n, String timeUnit)} computes an offset based on the nth multiple of timeUnit .
n can be a negative integer, zero or a positive integer.
timeUnit can be any one of the following constants: "MINUTE" , "HOUR" , "DAY" , "MONTH" , "YEAR"
${coord:offset(int n, String timeUnit)} returns the nominal datetime for nth timeUnit relative to the coordinator action creation (materialization) time.
When used directly, ${coord:offset(int n, String timeUnit)} performs the following calculation:
DS_FREQ: dataset frequency (minutes) CA_NT = coordinator action creation (materialization) nominal time coord:offset(int n, String timeUnit) = CA_NT + floor(timeUnit * n div DS_FREQ) * DS_FREQ
NOTE: The formula above is not 100% correct, because DST changes the calculation has to account for hour shifts. Oozie Coordinator must make the correct calculation accounting for DST hour shifts.
When used in 'instance' or 'end-instance' XML elements, the above equation is used; the effect of the floor function is to "rewind" the resolved datetime to match the latest instance before the resolved time. When used in 'start-instance' XML elements, a slight modification to the above equation is used; instead of being "rewinded", the resolved datetime is "fastforwarded" to match the earliest instance after the resolved time. See the next two examples for more information.
Examples:
1. *=${coord:offset(int n, String timeUnit)}= datetime calculation:*
Datasets Definition:
<datasets> . <dataset name="logs" frequency="${coord:days(1)}" initial-instance="2009-01-01T24:00Z" timezone="UTC"> <uri-template>hdfs://bar:8020/app/logs/${YEAR}${MONTH}/${DAY}</uri-template> </dataset> . <dataset name="weeklySiteAccessStats" frequency="${coord:days(7)}" initial-instance="2009-01-07T24:00Z" timezone="UTC"> <uri-template>hdfs://bar:8020/app/weeklystats/${YEAR}/${MONTH}/${DAY}</uri-template> </dataset> . </datasets>
For a coordinator action creation time: 2009-05-29T24:00Z the ${coord:offset(int n, String timeUnit)} EL function would resolve to the following datetime values for the 'logs' and 'weeklySiteStats' datasets:
${coord:offset(int n, String timeUnit)} | Dataset 'logs' | Dataset 'weeklySiteAccessStats' |
---|---|---|
${coord:offset(0, "MINUTE")}
${coord:offset(0, "HOUR")} ${coord:offset(0, "DAY")} ${coord:offset(0, "MONTH")} ${coord:offset(0, "YEAR")} |
2009-05-29T24:00Z | 2009-05-27T24:00Z |
${coord:offset(1440, "MINUTE")}
${coord:offset(24, "HOUR")} ${coord:offset(1, "DAY")} |
2009-05-30T24:00Z | 2009-05-27T24:00Z |
${coord:offset(-1440, "MINUTE")}
${coord:offset(-24, "HOUR")} ${coord:offset(-1, "DAY")} |
2009-05-28T24:00Z | 2009-05-20T24:00Z |
${coord:offset(-4320, "MINUTE")}
${coord:offset(-72, "HOUR")} ${coord:offset(-3, "DAY")} |
2009-05-26T24:00Z | 2009-05-20T24:00Z |
${coord:offset(11520, "MINUTE")}
${coord:offset(192, "HOUR")} ${coord:offset(8, "DAY")} |
2009-06-06T24:00Z | 2009-06-03T24:00Z |
${coord:offset(10, "MINUTE")} | 2009-05-29T24:00Z | 2009-05-27T24:00Z |
2. "fastforwarding" in
When specifying dataset instances, keep in mind that the resolved value of ${coord:offset(int n, String timeUnit)} must line up with an offset of a multiple of the frequency when used in an 'instance' XML element. However, when used in 'start-instance' and 'end-instance' XML elements, this is not a requirement. In this case, the function will automatically resolve the range of instances to match the offset of a multiple of the frequency that would fall between the ='start-instance'= and 'end-instance' XML elements; in other words, 'start-instance' XML element is "fastforwarded" while ='end-instance'= XML element is "rewinded". So, in the example below, the frequency of the "logs" dataset is 1 hour while the ='start-instance'= XML element is ${coord:offset(-90, "MINUTE")} (-1.5 hours). If this were in an 'instance' XML element, it would be "rewinded", but here it is effectively equivalent to ${coord:offset(-60, "MINUTE")} or ${coord:current(-1)} as we are dealing with a range.
Datasets Definition file 'datasets.xml':
<datasets> . <dataset name="logs" frequency="${coord:hours(1)}" initial-instance="2009-01-01T01:00Z" timezone="UTC"> <uri-template>hdfs://bar:8020/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template> </dataset> . <dataset name="stats" frequency="${coord:days(1)}" initial-instance="2009-01-01T24:00Z" timezone="UTC"> <uri-template>hdfs://bar:8020/app/logs/${YEAR}/${MONTH}/${DAY}</uri-template> </dataset> . </datasets>
Coordinator application definition:
<coordinator-app name="app-coord" frequency="${coord:days(1)}" start="2009-01-01T24:00Z" end="2009-12-31T24:00Z" timezone="UTC" xmlns="uri:oozie:coordinator:0.1"> <datasets> <include>hdfs://foo:8020/app/dataset-definitions/datasets.xml</include> </datasets> <input-events> <data-in name="input" dataset="logs"> <start-instance>${coord:offset(-90, "MINUTE")}</start-instance> <end-instance>${coord:offset(0, "DAY")}</end-instance> </data-in> </input-events> <output-events> <data-out name="output" dataset="stats"> <instance>${coord:offset(0, "DAY")}</instance> </data-out> </output-events> <action> <workflow> ... </workflow> </action> </coordinator-app>
The ${coord:hoursInDay(int n)} EL function returns the number of hours for the specified day, in a timezone/daylight-saving sensitive way.
n is offset (in days) from the current nominal time. A negative value is the nth previous day. Zero is the current day. A positive number is the nth next day.
The returned value is calculated taking into account timezone daylight-saving information.
Normally it returns 24 , only DST switch days for the timezone in question it will return either 23 or 25 .
Examples:
Nominal UTC time | Timezone | EndOfFlag | Usage | Value | Comments |
---|---|---|---|---|---|
2009-01-01T08:00Z | UTC | NO | ${coord:hoursInDay(0)} | 24 | hours in 2009JAN01 UTC |
2009-01-01T08:00Z | America/Los_Angeles | NO | ${coord:hoursInDay(0)} | 24 | hours in 2009JAN01 PST8PDT time |
2009-01-01T08:00Z | America/Los_Angeles | NO | ${coord:hoursInDay(-1)} | 24 | hours in 2008DEC31 PST8PDT time |
2009-03-08T08:00Z | UTC | NO | ${coord:hoursInDay(0)} | 24 | hours in 2009MAR08 UTC time |
2009-03-08T08:00Z | Europe/London | NO | ${coord:hoursInDay(0)} | 24 | hours in 2009MAR08 BST1BDT time |
2009-03-08T08:00Z | America/Los_Angeles | NO | ${coord:hoursInDay(0)} | 23 | hours in 2009MAR08 PST8PDT time (2009MAR08 is DST switch in the US) |
2009-03-08T08:00Z | America/Los_Angeles | NO | ${coord:hoursInDay(1)} | 24 | hours in 2009MAR09 PST8PDT time |
2009-03-07T08:00Z | America/Los_Angeles | EndOfDay | ${coord:hoursInDay(0)} | 24 | hours in 2009MAR07 PST8PDT time |
2009-03-07T08:00Z | America/Los_Angeles | EndOfDay | ${coord:hoursInDay(1)} | 23 | hours in 2009MAR08 PST8PDT time (2009MAR08 is DST switch in the US) |
Coordinator application definition:
<coordinator-app name="app-coord" frequency="${coord:days(1)}" start="${jobStart}" end="${jobEnd}" timezone="${timezone}" xmlns="uri:oozie:coordinator:0.1"> <datasets> <dataset name="logs" frequency="${coord:hours(1)}" initial-instance="${logsInitialInstance}" timezone="${timezone}"> <uri-template> hdfs://bar:8020/app/logs/${market}/${language}/${YEAR}${MONTH}/${DAY}/${HOUR} </uri-template> </dataset> </datasets> <input-events> <data-in name="input" dataset="logs"> <start-instance>${coord:current( -(coord:hoursInDay(0) - 1) )}</start-instance> <end-instance>${coord:current(0)}</end-instance> </data-in> </input-events> <action> <workflow> ... </workflow> </action> </coordinator-app>
This example is the example of section #6.5 but with a minor change. The argument for the ${coord:current(int n)} function in the 'start-instance' element, instead using -23 , the example now uses -(coord:hoursInDay(0) - 1) .
This simple change fully enables this coordinator application to handle daily data (produced hourly) for any timezone, with timezones observing or not daylight saving.
For timezones observing daylight saving, on the days of DST switch, the function will resolve to 23 or 25 , thus the dataset instances used will be for for the day in the DST sense.
For timezones not observing daylight saving, it always returns 24 .
The ${coord:daysInMonth(int n)} EL function returns the number of days for month of the specified day.
n is offset (in months) from the current nominal time. A negative value is the nth previous month. Zero is the current month. A positive number is the nth next month.
The returned value is calculated taking into account leap years information.
The ${coord:daysInMonth(int n)} EL function can be used to express monthly ranges for dataset instances.
Examples:
Nominal UTC time | Timezone | EndOfFlag | Usage | Value | Comments |
---|---|---|---|---|---|
2008-02-01T00:00Z | UTC | NO | ${coord:daysInMonth(0)} | 29 | days in 2008FEB UTC time |
2009-02-01T00:00Z | UTC | NO | ${coord:daysInMonth(0)} | 28 | days in 2009FEB UTC time |
2009-02-01T00:00Z | UTC | NO | ${coord:daysInMonth(-1)} | 31 | days in 2009JAN UTC time |
2009-03-01T00:00Z | UTC | NO | ${coord:daysInMonth(1)} | 30 | days in 2009APR UTC time |
2009-02-01T00:00Z | Americas/Los_Angeles | NO | ${coord:daysInMonth(0)} | 31 | days in 2009JAN PST8PDT time, note that the nominal time is UTC |
2008-02-01T00:00Z | UTC | EndOfMonth | ${coord:daysInMonth(0)} | 29 | days in 2008FEB UTC time |
2008-02-01T00:00Z | UTC | EndOfMonth | ${coord:daysInMonth(-1)} | 31 | days in 2008JAN UTC time |
2009-02-01T00:00Z | UTC | EndOfMonth | ${coord:daysInMonth(0)} | 28 | days in 2009FEB UTC time |
2009-02-01T00:00Z | UTC | EndOfMonth | ${coord:daysInMonth(-1)} | 31 | days in 2009JAN UTC time |
2009-03-01T00:00Z | UTC | EndOfMonth | ${coord:daysInMonth(1)} | 30 | days in 2009APR UTC time |
2009-02-01T00:00Z | Americas/Los_Angeles | EndOfMonth | ${coord:daysInMonth(0)} | 31 | days in 2009JAN PST8PDT time, note that the nominal time is UTC |
Coordinator application definition:
<coordinator-app name="app-coord" frequency="${coord:months(1)}" start="2009-01-31T24:00Z" end="2009-12-31T24:00" timezone="UTC" xmlns="uri:oozie:coordinator:0.1"> <datasets> <dataset name="logs" frequency="${coord:days(1)}" initial-instance="2009-01-01T24:00Z" timezone="UTC"> <uri-template> hdfs://bar:8020/app/logs/${market}/${language}/${YEAR}${MONTH}/${DAY} </uri-template> </dataset> </datasets> <input-events> <data-in name="input" dataset="logs"> <start-instance>${coord:current( -(coord:daysInMonth(0) - 1) )}</start-instance> <end-instance>${coord:current(0)}</end-instance> </data-in> </input-events> <action> <workflow> ... </workflow> </action> </coordinator-app>
This example is a coordinator application that runs monthly, and consumes the daily feeds for the last month.
${coord:tzOffset()} EL function returns the difference in minutes between a dataset timezone and the coordinator job timezone at the current nominal time. This EL function is useful when dealing with datasets from multiple timezones, but execute in a different timezone.
DS_TZ : dataset TZ offset in minutes at the current nominal time (UTC offset) JOB_TZ: coordinator job UTC TZ offset in minutes at the current nominal time (UTC offset). coord:tzOffset() = DS_TZ - JOB_TZ
For example: Los Angeles Winter offset (no DST) is -480 (-08:00 hours). India offset is -330 (+05:30 hours).
The value returned by this function may change because of the daylight saving rules of the 2 timezones. For example, between Continental Europe and The U.S. West coast, most of the year the timezone different is 9 hours, but there are a few day or weeks.
IMPORTANT: While the offset is multiples of 60 for most timezones, it can be multiple of 30 mins when one of the timezones is has a ##:30 offset (i.e. India).
Refer to section #7, 3nd use case for a detailed example.
${coord:latest(int n)} represents the nth latest currently available instance of a synchronous dataset.
${coord:latest(int n)} is not relative to the coordinator action creation (materialization) time, it is the nth latest instance available when the action is started (when the workflow job is started).
If a coordinator job is suspended, when resumed, all usages of ${coord:latest(int n)} will be resolved to the currently existent instances.
Finally, it is not possible to represent the latest dataset when execution reaches a node in the workflow job. The resolution of latest dataset instances happens at action start time (workflow job start time).
The parameter n can be a negative integer or zero. Where 0 means the latest instance available, -1 means the second latest instance available, etc.
the ${coord:latest(int n)} ignores gaps in dataset instances, it just looks for the latest nth instance available.
Example: :
Coordinator application definition:
<coordinator-app name="app-coord" frequency="${coord:hours(1)}" start="2009-01-01T24:00Z" end="2009-12-31T24:00Z" timezone="UTC" xmlns="uri:oozie:coordinator:0.1"> <datasets> <dataset name="logs" frequency="${coord:hours(1)}" initial-instance="2009-01-01T01:00Z" timezone="UTC"> <uri-template> hdfs://bar:8020/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR} </uri-template> </dataset> </datasets> <input-events> <data-in name="input" dataset="logs"> <instance>${coord:latest(-2)}</instance> <instance>${coord:latest(0)}</instance> </data-in> </input-events> <action> <workflow> ... </workflow> </action> </coordinator-app>
If the available dataset instances in HDFS at time of a coordinator action being executed are:
hdfs://bar:8020/app/logs/2009/01/01 hdfs://bar:8020/app/logs/2009/01/02 hdfs://bar:8020/app/logs/2009/01/03 (missing) hdfs://bar:8020/app/logs/2009/01/05 (missing) hdfs://bar:8020/app/logs/2009/01/07 (missing) (missing) hdfs://bar:8020/app/logs/2009/01/10
Then, the dataset instances for the input events for the coordinator action will be:
hdfs://bar:8020/app/logs/2009/01/05 hdfs://bar:8020/app/logs/2009/01/10
${coord:future(int n, int limit)} represents the nth currently available future instance of a synchronous dataset while looking ahead for 'limit' number of instances.
${coord:future(int n, int limit)} is relative to the coordinator action creation (materialization) time. The coordinator action creation (materialization) time is computed based on the coordinator job start time and its frequency. The nth dataset instance is computed based on the dataset's initial-instance datetime, its frequency and the (current) coordinator action creation (materialization) time.
n can be a zero or a positive integer. Where 0 means the immediate instance available, 1 means the second next instance available, etc.
limit should be a positive integer. Where 3 means search for nth next instance and should not check beyond 3 instance.
The ${coord:future(int n, int limit)} ignores gaps in dataset instances, it just looks for the next nth instance available.
Example: :
Coordinator application definition:
<coordinator-app name="app-coord" frequency="${coord:hours(1)}" start="2009-01-01T24:00Z" end="2009-12-31T24:00Z" timezone="UTC" xmlns="uri:oozie:coordinator:0.1"> <datasets> <dataset name="logs" frequency="${coord:hours(1)}" initial-instance="2009-01-01T01:00Z" timezone="UTC"> <uri-template> hdfs://bar:8020/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR} </uri-template> </dataset> </datasets> <input-events> <data-in name="input" dataset="logs"> <instance>${coord:future(0, 10)}</instance> <instance>${coord:future(2, 10)}</instance> </data-in> </input-events> <action> <workflow> ... </workflow> </action> </coordinator-app>
If the available dataset instances in HDFS at time of a coordinator action being executed are:
hdfs://bar:8020/app/logs/2009/02/01 (missing) (missing) (missing) hdfs://bar:8020/app/logs/2009/02/04 (missing) (missing) hdfs://bar:8020/app/logs/2009/02/07 (missing) (missing) (missing) hdfs://bar:8020/app/logs/2009/02/11 (missing) (missing) hdfs://bar:8020/app/logs/2009/02/14 (missing) hdfs://bar:8020/app/logs/2009/02/16
Then, the dataset instances for the input events for the coordinator action will be:
hdfs://bar:8020/app/logs/2009/02/01 hdfs://bar:8020/app/logs/2009/02/07
=${coord:absolute(String timeStamp)} = represents absolute dataset instance time. coord:absolute is only supported with range where, start-instance is coord:absolute and end-instance is coord:current. Specifying a fixed date as the start instance is useful if your processing needs to process all dataset instances from a specific instance to the current instance.
Example: :
Coordinator application definition:
<coordinator-app name="app-coord" frequency="${coord:months(1)}" start="2009-01-01T01:00Z" end="2009-12-31T24:00" timezone="UTC" xmlns="uri:oozie:coordinator:0.4"> <input-events> <data-in name="input" dataset="logs"> <dataset name='a' frequency='7' initial-instance="2009-01-01T01:00Z" timezone='UTC' freq_timeunit='DAY' end_of_duration='NONE'> <uri-template>hdfs://bar:8020/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR} </uri-template> </dataset> <start-instance>${coord:absolute("2009-01-01T01:00Z")}</start-instance> <end-instance>${coord:current(0)}</end-instance> </data-in> </input-events> <action> <workflow> ............. </workflow> </action> </coordinator-app>
Then, the dataset instances for the input events for the coordinator action at first run will be:
hdfs://bar:8020/app/logs/2009/02/01 <verbatim>The dataset instances for the input events for the coordinator action at second run will be: <verbatim> hdfs://bar:8020/app/logs/2009/02/01 hdfs://bar:8020/app/logs/2009/02/07 <verbatim> ---++++ 6.6.9. coord:version(int n) EL Function for Asynchronous Datasets
In the case of the synchronous 'logs' dataset, for the first action of this coordinator job, the instances referred in the input events will resolve to just 1 instance. For the second action it will resolve to 2 instances. And so on. Only after the 24th action, the input events will resolve constantly to 24 instances. In other words, while ${coord:current(-23)} resolves to datetimes prior to the 'initial-instance' the required range will start from the 'initial-instance', '2009-01-01T00:00Z' in this example.
Actions started by a coordinator application normally require access to the dataset instances resolved by the input and output events to be able to propagate them to the workflow job as parameters.
The following EL functions are the mechanism that enables this propagation.
The ${coord:dataIn(String name)} EL function resolves to all the URIs for the dataset instances specified in an input event dataset section.
The ${coord:dataIn(String name)} is commonly used to pass the URIs of dataset instances that will be consumed by a workflow job triggered by a coordinator action.
Example: :
Coordinator application definition:
<coordinator-app name="app-coord" frequency="${coord:days(1)}" start="2009-01-01T24:00Z" end="2009-12-31T24:00Z" timezone="UTC" xmlns="uri:oozie:coordinator:0.1"> <datasets> <dataset name="logs" frequency="${coord:hours(1)}" initial-instance="2009-01-01T01:00Z" timezone="UTC"> <uri-template> hdfs://bar:8020/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR} </uri-template> </dataset> </datasets> <input-events> <data-in name="inputLogs" dataset="logs"> <start-instance>${coord:current( -(coord:hoursInDay(0) - 1) )}</start-instance> <end-instance>${coord:current(-1)}</end-instance> </data-in> </input-events> <action> <workflow> <app-path>hdfs://bar:8020/usr/joe/logsprocessor-wf</app-path> <configuration> <property> <name>wfInput</name> <value>${coord:dataIn('inputLogs')}</value> </property> </configuration> </workflow> </action> </coordinator-app>
In this example, each coordinator action will use as input events the last day hourly instances of the 'logs' dataset.
The ${coord:dataIn(String name)} function enables the coordinator application to pass the URIs of all the dataset instances for the last day to the workflow job triggered by the coordinator action. For the 2009-01-02T00:00Z" run, the =${coord:dataIn('inputLogs')} function will resolve to:
hdfs://bar:8020/app/logs/2009/01/01/01, hdfs://bar:8020/app/logs/2009/01/01/02, ... hdfs://bar:8020/app/logs/2009/01/01/23, hdfs://bar:8020/app/logs/2009/02/00/00
The ${coord:dataIn('inputLogs')} is used for workflow job configuration property 'wfInput' for the workflow job that will be submitted by the coordinator action on January 2nd 2009. Thus, when the workflow job gets started, the 'wfInput' workflow job configuration property will contain all the above URIs.
Note that all the URIs form a single string value and the URIs are separated by commas. Multiple HDFS URIs separated by commas can be specified as input data to a Map/Reduce job.
The ${coord:dataOut(String name)} EL function resolves to all the URIs for the dataset instance specified in an output event dataset section.
The ${coord:dataOut(String name)} is commonly used to pass the URIs of a dataset instance that will be produced by a workflow job triggered by a coordinator action.
Example: :
Datasets Definition file 'datasets.xml'
<datasets> . <dataset name="hourlyLogs" frequency="${coord:hours(1)}" initial-instance="2009-01-01T01:00Z" timezone="UTC"> <uri-template>hdfs://bar:8020/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template> </dataset> . <dataset name="dailyLogs" frequency="${coord:days(1)}" initial-instance="2009-01-01T24:00Z" timezone="UTC"> <uri-template>hdfs://bar:8020/app/daily-logs/${YEAR}/${MONTH}/${DAY}</uri-template> </dataset> </datasets>
Coordinator application definition:
<coordinator-app name="app-coord" frequency="${coord:days(1)}" start="2009-01-01T24:00Z" end="2009-12-31T24:00Z" timezone="UTC" xmlns="uri:oozie:coordinator:0.1"> <datasets> <include>hdfs://foo:8020/app/dataset-definitions/datasets.xml</include> </datasets> <input-events> <data-in name="inputLogs" dataset="hourlyLogs"> <start-instance>${coord:current( -(coord:hoursInDay(0) -1) )}</start-instance> <end-instance>${coord:current(0)}</end-instance> </data-in> </input-events> <output-events> <data-out name="outputLogs" dataset="dailyLogs"> <instance>${coord:current(0)}</instance> </data-out> </output-events> <action> <workflow> <app-path>hdfs://bar:8020/usr/joe/logsaggretor-wf</app-path> <configuration> <property> <name>wfInput</name> <value>${coord:dataIn('inputLogs')}</value> </property> <property> <name>wfOutput</name> <value>${coord:dataOut('outputLogs')}</value> </property> </configuration> </workflow> </action> </coordinator-app>
In this example, each coordinator action will use as input events the last 24 hourly instances of the 'hourlyLogs' dataset to create a 'dailyLogs' dataset instance.
The ${coord:dataOut(String name)} function enables the coordinator application to pass the URIs of the dataset instance that will be created by the workflow job triggered by the coordinator action. For the 2009-01-01T24:00Z" run, the =${coord:dataOut('dailyLogs')} function will resolve to:
hdfs://bar:8020/app/logs/2009/01/02
NOTE: The use of 24:00 as hour is useful for human to denote end of the day, but internally Oozie handles it as the zero hour of the next day.
The ${coord:dataOut('dailyLogs')} is used for workflow job configuration property 'wfOutput' for the workflow job that will be submitted by the coordinator action on January 2nd 2009. Thus, when the workflow job gets started, the 'wfOutput' workflow job configuration property will contain the above URI.
The ${coord:nominalTime()} EL function resolves to the coordinator action creation (materialization) datetime.
The nominal times is always the coordinator job start datetime plus a multiple of the coordinator job frequency.
This is, when the coordinator action was created based on driver event. For synchronous coordinator applications this would be every tick of the frequency.
Example: :
Coordinator application definition:
<coordinator-app name="app-coord" frequency="${coord:days(1)}" start="2009-01-01T24:00Z" end="2009-12-31T24:00Z" timezone="UTC" xmlns="uri:oozie:coordinator:0.1"> <datasets> <dataset name="hourlyLogs" frequency="${coord:hours(1)}" initial-instance="2009-01-01T01:00Z" timezone="UTC"> <uri-template>hdfs://bar:8020/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template> </dataset> </datasets> <input-events> <data-in name="inputLogs" dataset="hourlyLogs"> <start-instance>${coord:current(-23)}</start-instance> <end-instance>${coord:current(0)}</end-instance> </data-in> </input-events> <action> <app-path>${nameNode}/user/${coord:user()}/examples/apps/aggregator</app-path> <configuration> <property> <name>nextInstance</name> <value>${coord:dateOffset(coord:nominalTime(), 1, 'DAY')}</value> </property> <property> <name>previousInstance</name> <value>${coord:dateOffset(coord:nominalTime(), -1, 'DAY')}</value> </property> </configuration> </action> </coordinator-app>
The nominal times for the coordinator actions of this coordinator application example are:
2009-01-02T00:00Z 2009-01-03T00:00Z 2009-01-04T00:00Z ... 2010-01-01T00:00Z
These are the times the action where created (materialized).
The ${coord:actualTime()} EL function resolves to the coordinator action actual creation datetime.
When the coordinator action is created based on driver event, the current time is saved to action. An action's actual time is less than the nominal time if coordinator job is in running in current mode. If job is running as catch-up mode (job's start time is in the past), the actual time is greater than the nominal time.
Example: :
Coordinator application definition:
<coordinator-app name="app-coord" frequency="${coord:days(1)}" start="2011-05-01T24:00Z" end="2011-12-31T24:00Z" timezone="UTC" xmlns="uri:oozie:coordinator:0.1"> <datasets> <dataset name="hourlyLogs" frequency="${coord:hours(1)}" initial-instance="2011-04-01T01:00Z" timezone="UTC"> <uri-template>hdfs://bar:8020/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template> </dataset> </datasets> <input-events> <data-in name="inputLogs" dataset="hourlyLogs"> <start-instance>${coord:current(-23)}</start-instance> <end-instance>${coord:current(0)}</end-instance> </data-in> </input-events> <action> <app-path>${nameNode}/user/${coord:user()}/examples/apps/aggregator</app-path> <configuration> <property> <name>actualTime</name> <value>${coord:formatTime(coord:actualTime(), 'yyyy-MM-dd')}</value> </property> </configuration> </action> </coordinator-app>
The actual time for the coordinator actions of this coordinator application example will look like:
If coordinator job was started at 2011-05-01, then actions' actualTime is
2011-05-01 2011-05-02 2011-05-03 ... 2011-12-31
This section describes the different EL functions that work with HCatalog data dependencies, in order to write Coordinator applications that use HCatalog data dependencies.
The functions ${coord:databaseIn(String name)} and ${coord:databaseOut(String name)} are used to pass the database name of HCat dataset instances, input and output respectively, that will be consumed by a workflow job triggered by a coordinator action.
For input database, you should pass the "data-in" name attribute of your 'input-events' configured in the coordinator. Similarly for output database, pass the "data-out" name attribute of your 'output-events'.
To illustrate it better:
If data belongs to 'input-events' and the name attribute of your
Pitfall: Please note NOT to pass the
Refer to the Example below for usage.
The functions ${coord:tableIn(String name)} and ${coord:tableOut(String name)} are used to pass the table name of HCat dataset instances, input and output respectively, that will be consumed by a workflow job triggered by a coordinator action.
For input table, you should pass the "data-in" name attribute of your 'input-events' configured in the coordinator. Similarly for output table, pass the "data-out" name attribute of your 'output-events'.
To illustrate it better:
If data belongs to 'input-events' and the name attribute of your
Pitfall: Please note NOT to pass the
Refer to the Example below for usage.
The ${coord:dataInPartitionFilter(String name, String type)} EL function resolves to a filter clause to filter all the partitions corresponding to the dataset instances specified in an input event dataset section. This EL function takes two arguments - the name of the input dataset, and the type of the workflow action which will be consuming this filter. There are 3 types - 'pig', 'hive' and 'java'. This filter clause from the EL function is to be passed as a parameter in the respective action in the workflow.
The evaluated value of the filter clause will vary based on the action type passed to the EL function. In case of pig, the filter will have "==" as the equality operator in the condition. In case of hive and java, the filter will have "=" as the equality operator in the condition. The type java is for java actions, which use HCatInputFormat directly and launch jobs. The filter clause in that case can be used to construct the InputJobInfo in =HCatInputFormat.setInput(Job job, InputJobInfo inputJobInfo)=.
Refer to the Example below for usage.
The ${coord:dataOutPartitions(String name)} EL function resolves to a comma-separated list of partition key-value pairs for the output-event dataset. This can be passed as an argument to HCatStorer in Pig scripts or in case of java actions that directly use HCatOutputFormat and launch jobs, the partitions list can be parsed to construct partition values map for OutputJobInfo in HcatOutputFormat.setOutput(Job job, OutputJobInfo outputJobInfo) .
The example below illustrates a pig job triggered by a coordinator, using the EL functions for HCat database, table, input partitions filter and output partitions. The example takes as input previous day's hourly data to produce aggregated daily output.
Example:
Coordinator application definition:
<coordinator-app name="app-coord" frequency="${coord:days(1)}" start="2009-01-01T24:00Z" end="2009-12-31T24:00Z" timezone="UTC" xmlns="uri:oozie:coordinator:0.3"> <datasets> <dataset name="Click-data" frequency="${coord:hours(1)}" initial-instance="2009-01-01T01:00Z" timezone="UTC"> <uri-template> hcat://foo:11002/myInputDatabase/myInputTable/datestamp=${YEAR}${MONTH}${DAY}${HOUR};region=USA </uri-template> </dataset> <dataset name="Stats" frequency="${coord:days(1)}" initial-instance="2009-01-01T01:00Z" timezone="UTC"> <uri-template> hcat://foo:11002/myOutputDatabase/myOutputTable/datestamp=${YEAR}${MONTH}${DAY} </uri-template> </dataset> </datasets> <input-events> <data-in name="raw-logs" dataset="Click-data"> <start-instance>${coord:current(-23)}</start-instance> <end-instance>${coord:current(0)}</end-instance> </data-in> </input-events> <output-events> <data-out name="processed-logs" dataset="Stats"> <instance>${coord:current(0)}</instance> </data-out> </output-events> <action> <workflow> <app-path>hdfs://bar:8020/usr/joe/logsprocessor-wf</app-path> <configuration> <property> <name>IN_DB</name> <value>${coord:databaseIn('raw-logs')}</value> </property> <property> <name>IN_TABLE</name> <value>${coord:tableIn('raw-logs')}</value> </property> <property> <name>FILTER</name> <value>${coord:dataInPartitionFilter('raw-logs', 'pig')}</value> </property> <property> <name>OUT_DB</name> <value>${coord:databaseOut('processed-logs')}</value> </property> <property> <name>OUT_TABLE</name> <value>${coord:tableOut('processed-logs')}</value> </property> <property> <name>OUT_PARTITIONS</name> <value>${coord:dataOutPartitions('processed-logs')}</value> </property> </configuration> </workflow> </action> </coordinator-app>
Parameterizing the input/output databases and tables using the corresponding EL function as shown will make them available in the pig action of the workflow 'logsprocessor-wf'.
Each coordinator action will use as input events the last 24 hourly instances of the 'Click-data' dataset. The ${coord:dataInPartitionFilter(String name, String type)} function enables the coordinator application to pass the Partition Filter corresponding to all the dataset instances for the last 24 hours to the workflow job triggered by the coordinator action. The ${coord:dataOutPartitions(String name)} function enables the coordinator application to pass the partition key-value string needed by the HCatStorer in Pig job when the workflow is triggered by the coordinator action.
Workflow definition:
<workflow-app xmlns="uri:oozie:workflow:0.3" name="logsprocessor-wf"> <credentials> <credential name='hcatauth' type='hcat'> <property> <name>hcat.metastore.uri</name> <value>${HCAT_URI}</value> <property> </property> <name>hcat.metastore.principal</name> <value>${HCAT_PRINCIPAL}</value> <property> </credential> </credentials> <start to="pig-node"/> <action name="pig-node" cred="hcatauth"> <pig> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="hcat://foo:11002/${OUT_DB}/${OUT_TABLE}/date=${OUT_PARTITION_VAL_DATE}"/> </prepare> ... <script>id.pig</script> <param>HCAT_IN_DB=${IN_DB}</param> <param>HCAT_IN_TABLE=${IN_TABLE}</param> <param>HCAT_OUT_DB=${OUT_DB}</param> <param>HCAT_OUT_TABLE=${OUT_TABLE}</param> <param>PARTITION_FILTER=${FILTER}</param> <param>OUTPUT_PARTITIONS=${OUT_PARTITIONS}</param> <file>lib/hive-site.xml</file> </pig> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Pig failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end"/> </workflow-app>
Important : Ensure that the required hcatalog jars and hive-site.xml are in classpath, with versions corresponding to hcatalog installation. Refer HCatalog Libraries for the different ways to place them in the hadoop job classpath.
Important : See Action Authentication for more information about how to access a secure HCatalog from any workflow action.
Example usage in Pig:
A = load '$HCAT_IN_DB.$HCAT_IN_TABLE' using org.apache.hive.hcatalog.pig.HCatLoader(); B = FILTER A BY $PARTITION_FILTER; C = foreach B generate foo, bar; store C into '$HCAT_OUT_DB.$HCAT_OUT_TABLE' using org.apache.hive.hcatalog.pig.HCatStorer('$OUTPUT_PARTITIONS');
For the 2009-01-02T00:00Z run with the given dataset instances, the above Pig script with resolved values would look like:
A load 'myInputDatabase.myInputTable' using org.apache.hive.hcatalog.pig.HCatLoader(); B = FILTER A BY ((datestamp =2009010101 AND region==USA) OR (datestamp==2009010102 AND region==USA) OR ... (datestamp==2009010123 AND region==USA) OR (datestamp==2009010200 AND region==USA)); C = foreach B generate foo, bar; store C into 'myOutputDatabase.myOutputTable' using org.apache.hive.hcatalog.pig.HCatStorer('datestamp=20090102,region=EUR');
The ${coord:dataInPartitionMin(String name, String partition)} EL function resolves to the minimum value of the specified partition for all the dataset instances specified in an input event dataset section. It can be used to do range based filtering of partitions in pig scripts together with dataInPartitionMax EL function.
Refer to the Example below for usage.
The ${coord:dataInPartitionMax(String name, String partition)} EL function resolves to the maximum value of the specified partition for all the dataset instances specified in an input event dataset section. It is a better practice to use dataInPartitionMin and dataInPartitionMax to form a range filter wherever possible instead of datainPartitionPigFilter as it will be more efficient for filtering.
Refer to the Example below for usage.
The ${coord:dataOutPartitionValue(String name, String partition)} EL function resolves to value of the specified partition for the output-event dataset; that will be consumed by a workflow job, e.g Pig job triggered by a coordinator action. This is another convenience function to use a single partition-key's value if required, in addition to dataoutPartitionsPig() and either one can be used.
The example below illustrates a pig job triggered by a coordinator, using the aforementioned EL functions for input partition max/min values, output partition value, and database and table.
Example:
Coordinator application definition:
<coordinator-app name="app-coord" frequency="${coord:days(1)}" start="2009-01-01T24:00Z" end="2009-12-31T24:00Z" timezone="UTC" xmlns="uri:oozie:coordinator:0.1"> <datasets> <dataset name="Click-data" frequency="${coord:hours(1)}" initial-instance="2009-01-01T01:00Z" timezone="UTC"> <uri-template> hcat://foo:11002/myInputDatabase/myInputTable/datestamp=${YEAR}${MONTH}${DAY}${HOUR};region=USA </uri-template> </dataset> <dataset name="Stats" frequency="${coord:days(1)}" initial-instance="2009-01-01T01:00Z" timezone="UTC"> <uri-template> hcat://foo:11002/myOutputDatabase/myOutputTable/datestamp=${YEAR}${MONTH}${DAY};region=USA </uri-template> </dataset> </datasets> <input-events> <data-in name="raw-logs" dataset="Click-data"> <start-instance>${coord:current(-23)}</start-instance> <end-instance>${coord:current(0)}</end-instance> </data-in> </input-events> <output-events> <data-out name="processed-logs" dataset="Stats"> <instance>${coord:current(0)}</instance> </data-out> </output-events> <action> <workflow> <app-path>hdfs://bar:8020/usr/joe/logsprocessor-wf</app-path> <configuration> <property> <name>IN_DB</name> <value>${coord:databaseIn('raw-logs')}</value> </property> <property> <name>IN_TABLE</name> <value>${coord:tableIn('raw-logs')}</value> </property> <property> <name>DATE_MIN</name> <value>${coord:dataInPartitionMin('raw-logs','datestamp')}</value> </property> <property> <name>DATE_MAX</name> <value>${coord:dataInPartitionMax('raw-logs','datestamp')}</value> </property> <property> <name>OUT_DB</name> <value>${coord:databaseOut('processed-logs')}</value> </property> <property> <name>OUT_TABLE</name> <value>${coord:tableOut('processed-logs')}</value> </property> <property> <name>OUT_PARTITION_VAL_REGION</name> <value>${coord:dataOutPartitionValue('processed-logs','region')}</value> </property> <property> <name>OUT_PARTITION_VAL_DATE</name> <value>${coord:dataOutPartitionValue('processed-logs','datestamp')}</value> </property> </configuration> </workflow> </action> </coordinator-app>
In this example, each coordinator action will use as input events the last 24 hourly instances of the 'logs' dataset.
For the 2009-01-02T00:00Z run, the ${coord:dataInPartitionMin('raw-logs','datestamp')} function will resolve to the minimum of the 5 dataset instances for partition 'datestamp' i.e. among 2009010101, 2009010102, ...., 2009010123, 2009010200, the minimum would be "2009010101".
Similarly, the ${coord:dataInPartitionMax('raw-logs','datestamp')} function will resolve to the maximum of the 5 dataset instances for partition 'datestamp' i.e. among 2009010120, 2009010121, ...., 2009010123, 2009010200, the maximum would be "2009010200".
Finally, the ${coord:dataOutPartitionValue(String name, String partition)} function enables the coordinator application to pass a specified partition's value string needed by the HCatStorer in Pig job. The ${coord:dataOutPartitionValue('processed-logs','region')} function will resolve to: "${region}" and ${coord:dataOutPartitionValue('processed-logs','datestamp')} function will resolve to: "20090102".
For the workflow definition with
... <param>PARTITION_DATE_MIN=${DATE_MIN}</param> <param>PARTITION_DATE_MAX=${DATE_MAX}</param> <param>REGION=${region}</param> <param>OUT_PARTITION_VAL_REGION=${OUT_PARTITION_VAL_REGION}</param> <param>OUT_PARTITION_VAL_DATE=${OUT_PARTITION_VAL_DATE}</param> ...
Example usage in Pig: This illustrates another pig script which filters partitions based on range, with range limits parameterized with the EL functions
A load '$HCAT_IN_DB.$HCAT_IN_TABLE' using org.apache.hive.hcatalog.pig.HCatLoader(); B = FILTER A BY datestamp > '$PARTITION_DATE_MIN' AND datestamp < '$PARTITION_DATE_MAX' AND region=='$REGION'; C = foreach B generate foo, bar; store C into '$HCAT_OUT_DB.$HCAT_OUT_TABLE' using org.apache.hive.hcatalog.pig.HCatStorer('region=$OUT_PARTITION_VAL_REGION,datestamp=$OUT_PARTITION_VAL_DATE');
For example, for the 2009-01-02T00:00Z run with the given dataset instances, the above Pig script with resolved values would look like:
A load 'myInputDatabase.myInputTable' using org.apache.hive.hcatalog.pig.HCatLoader(); B = FILTER A BY datestamp > '2009010101' AND datestamp < '2009010200' AND region='APAC'; C = foreach B generate foo, bar; store C into 'myOutputDatabase.myOutputTable' using org.apache.hive.hcatalog.pig.HCatStorer('region=APAC,datestamp=20090102');
The ${coord:dataInPartitions(String name, String type)} EL function resolves to a list of partition key-value pairs for the input-event dataset. Currently the only type supported is 'hive-export'. The 'hive-export' type supports only one partition instance and it can be used to create the complete partition value string that can be used in a hive query for partition export/import.
The example below illustrates a hive export-import job triggered by a coordinator, using the EL functions for HCat database, table, input partitions. The example replicates the hourly processed data across hive tables.
Example:
Coordinator application definition:
<coordinator-app xmlns="uri:oozie:coordinator:0.3" name="app-coord" frequency="${coord:hours(1)}" start="2014-03-28T08:00Z" end="2030-01-01T00:00Z" timezone="UTC"> <datasets> <dataset name="Stats-1" frequency="${coord:hours(1)}" initial-instance="2014-03-28T08:00Z" timezone="UTC"> <uri-template>hcat://foo:11002/myInputDatabase1/myInputTable1/year=${YEAR};month=${MONTH};day=${DAY};hour=${HOUR} </uri-template> </dataset> <dataset name="Stats-2" frequency="${coord:hours(1)}" initial-instance="2014-03-28T08:00Z" timezone="UTC"> <uri-template>hcat://foo:11002/myInputDatabase2/myInputTable2/year=${YEAR};month=${MONTH};day=${DAY};hour=${HOUR} </uri-template> </dataset> </datasets> <input-events> <data-in name="processed-logs-1" dataset="Stats-1"> <instance>${coord:current(0)}</instance> </data-in> </input-events> <output-events> <data-out name="processed-logs-2" dataset="Stats-2"> <instance>${coord:current(0)}</instance> </data-out> </output-events> <action> <workflow> <app-path>hdfs://bar:8020/usr/joe/logsreplicator-wf</app-path> <configuration> <property> <name>EXPORT_DB</name> <value>${coord:databaseIn('processed-logs-1')}</value> </property> <property> <name>EXPORT_TABLE</name> <value>${coord:tableIn('processed-logs-1')}</value> </property> <property> <name>IMPORT_DB</name> <value>${coord:databaseOut('processed-logs-2')}</value> </property> <property> <name>IMPORT_TABLE</name> <value>${coord:tableOut('processed-logs-2')}</value> </property> <property> <name>EXPORT_PARTITION</name> <value>${coord:dataInPartitions('processed-logs-1', 'hive-export')}</value> </property> <property> <name>EXPORT_PATH</name> <value>hdfs://bar:8020/staging/${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH')}/data</value> </property> </configuration> </workflow> </action> </coordinator-app>
Parameterizing the input/output databases and tables using the corresponding EL function as shown will make them available in the hive action of the workflow 'logsreplicator-wf'.
Each coordinator action will use as input events the hourly instances of the 'processed-logs-1' dataset. The =${coord:dataInPartitions(String name, String type)}= function enables the coordinator application to pass the partition corresponding to hourly dataset instances to the workflow job triggered by the coordinator action. The workflow passes this partition value to the hive export script that exports the hourly partition from source database to the staging location referred as EXPORT_PATH . The hive import script imports the hourly partition from =EXPORT_PATH= staging location into the target database.
Workflow definition:
<workflow-app xmlns="uri:oozie:workflow:0.3" name="logsreplicator-wf"> <start to="table-export"/> <action name="table-export"> <hive:hive xmlns:hive="uri:oozie:hive-action:0.2" xmlns="uri:oozie:hive-action:0.2"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <job-xml>${wf:appPath()}/conf/hive-site.xml</job-xml> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> <property> <name>oozie.launcher.mapred.job.priority</name> <value>${jobPriority}</value> </property> </configuration> <script>${wf:appPath()}/scripts/table-export.hql</script> <param>sourceDatabase=${EXPORT_DB}</param> <param>sourceTable=${EXPORT_TABLE}</param> <param>sourcePartition=${EXPORT_PARTITION}</param> <param>sourceStagingDir=${EXPORT_PATH}</param> </hive:hive> <ok to="table-import"/> <error to="fail"/> </action> <action name="table-import"> <hive:hive xmlns:hive="uri:oozie:hive-action:0.2" xmlns="uri:oozie:hive-action:0.2"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <job-xml>${wf:appPath()}/conf/hive-site.xml</job-xml> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> <property> <name>oozie.launcher.mapred.job.priority</name> <value>${jobPriority}</value> </property> </configuration> <script>${wf:appPath()}/scripts/table-import.hql</script> <param>targetDatabase=${IMPORT_DB}</param> <param>targetTable=${IMPORT_TABLE}</param> <param>targetPartition=${EXPORT_PARTITION}</param> <param>sourceStagingDir=${EXPORT_PATH}</param> </hive:hive> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message> Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}] </message> </kill> <end name="end"/> </workflow-app>
Ensure that the following jars are in classpath, with versions corresponding to hcatalog installation: hcatalog-core.jar, webhcat-java-client.jar, hive-common.jar, hive-exec.jar, hive-metastore.jar, hive-serde.jar, libfb303.jar. The hive-site.xml needs to be present in classpath as well.
Example Hive Export script: The following script exports a particular Hive table partition into staging location, where the partition value is computed through ${coord:dataInPartitions(String name, String type)} EL function.
export table ${sourceDatabase}.${sourceTable} partition (${sourcePartition}) to '${sourceStagingDir}';
For example, for the 2014-03-28T08:00Z run with the given dataset instances and ${coord:dataInPartitions( 'processed-logs-1', 'hive-export'), the above Hive script with resolved values would look like:
export table myInputDatabase1/myInputTable1 partition (year='2014',month='03',day='28',hour='08') to 'hdfs://bar:8020/staging/2014-03-28-08';
Example Hive Import script: The following script imports a particular Hive table partition from staging location, where the partition value is computed through ${coord:dataInPartitions(String name, String type)} EL function.
use ${targetDatabase}; alter table ${targetTable} drop if exists partition ${targetPartition}; import table ${targetTable} partition (${targetPartition}) from '${sourceStagingDir}';
For example, for the 2014-03-28T08:00Z run with the given dataset instances and ${coord:dataInPartitions( 'processed-logs-2', 'hive-export'), the above Hive script with resolved values would look like:
use myInputDatabase2; alter table myInputTable2 drop if exists partition (year='2014',month='03',day='28',hour='08'); import table myInputTable2 partition (year='2014',month='03',day='28',hour='08') from 'hdfs://bar:8020/staging/2014-03-28-08';
This section describes the EL functions that could be used to parameterized both data-set and coordination application action.
The ${coord:dateOffset(String baseDate, int instance, String timeUnit)} EL function calculates the date based on the following equation : newDate = baseDate + (instance * timeUnit) In other words, it offsets the baseDate by the amount specified by instance and timeUnit .
The timeUnit argument accepts one of 'DAY', 'MONTH', 'HOUR', 'MINUTE', 'MONTH'
For example, if baseDate is '2009-01-01T00:00Z', instance is '2' and timeUnit is 'MONTH', the return date will be '2009-03-01T00:00Z'. If baseDate is '2009-01-01T00:00Z', instance is '1' and timeUnit is 'YEAR', the return date will be '2010-01-01T00:00Z'.
Example: :
<coordinator-app name="app-coord" frequency="${coord:days(1)}" start="2009-01-01T23:00Z" end="2009-12-31T23:00Z" timezone="UTC" xmlns="uri:oozie:coordinator:0.1"> ...... <action> <workflow> <app-path>hdfs://bar:8020/usr/joe/logsaggretor-wf</app-path> <configuration> <property> <name>nextInstance</name> <value>${coord:dateOffset(coord:nominalTime(), 1, 'DAY')}</value> </property> <property> <name>previousInstance</name> <value>${coord:dateOffset(coord:nominalTime(), -1, 'DAY')}</value> </property> </configuration> </workflow> </action> </coordinator-app>
In this example, the 'nextInstance' will be '2009-01-02T23:00Z' for the first action. And the value of 'previousInstance' will be '2008-12-31T23:00Z' for the same instance.
The ${coord:dateTzOffset(String baseDate, String timezone)} EL function calculates the date based on the following equation : newDate = baseDate + (Oozie procesing timezone - timezone) In other words, it offsets the baseDate by the difference from Oozie processing timezone to the given timezone . It will account for daylight saving time based on the given baseDate and timezone .
The timezone argument accepts any timezone or GMT offset that is returned by the "info -timezones" command. For example, "America/Los_Angeles" or "PST".
For example, if baseDate is '2012-06-13T00:00Z' and timezone is 'America/Los_Angeles', the return date will be '2012-06-12T17:00Z'. But if baseDate is '2012-12-13T00:00Z', then the return date will be '2012-12-12T16:00Z'. The difference in return dates occurs because the former occurs during Summer when DST is in effect (UTC-0700) and the latter occurs during Winter when DST is no in effect (UTC-0800).
Example: :
<coordinator-app name="app-coord" frequency="${coord:days(1)}" start="2009-01-01T24:00Z" end="2009-12-31T24:00Z" timezone="UTC" xmlns="uri:oozie:coordinator:0.1"> ...... <action> <workflow> <app-path>hdfs://bar:8020/usr/joe/logsaggretor-wf</app-path> <configuration> <property> <name>myDate</name> <value>${coord:dateTzOffset(coord:nominalTime(), "America/Los_Angeles")}</value> </property> </configuration> </workflow> </action> </coordinator-app>
In this example, the 'myDate' will be '2009-01-01T15:00Z' for the first action.
The ${coord:formatTime(String timeStamp, String format)} function allows transformation of the standard ISO8601 timestamp strings into other desired formats.
The format string should be in Java's SimpleDateFormat format.
For example, if timeStamp is '2009-01-01T00:00Z' and format is 'yyyy', the returned date string will be '2009'.
As mentioned in section #4.1.1 'Timezones and Daylight-Saving', the coordinator engine works exclusively in UTC, and dataset and application definitions are always expressed in UTC.
For timezones that don't observe day light saving time, handling timezones offsets is trivial.
For these timezones, dataset and application definitions, it suffices to express datetimes taking into account the timezone offset.
Example: :
Coordinator application definition: A daily coordinator job for India timezone (+05:30) that consumes 24 hourly dataset instances from the previous day starting at the beginning of 2009 for a full year.
<coordinator-app name="app-coord" frequency="${coord:days(1)}" start="2008-12-31T19:30Z" end="2009-12-30T19:30Z" timezone="UTC" xmlns="uri:oozie:coordinator:0.1"> <datasets> <dataset name="hourlyLogs" frequency="${coord:hours(1)}" initial-instance="2008-12-31T19:30Z" timezone="UTC"> <uri-template>hdfs://bar:8020/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template> </dataset> </datasets> <input-events> <data-in name="inputLogs" dataset="hourlyLogs"> <start-instance>${coord:current(-23)}</start-instance> <end-instance>${coord:current(0)}</end-instance> </data-in> </input-events> <action> ... </action> </coordinator-app>
Oozie Coordinator provides all the necessary functionality to write coordinator applications that work properly when data and processing spans across multiple timezones and different daylight saving rules.
The following 2 use cases will be used to show how Oozie Coordinator built-in functionality can be used to handle such cases:
1 Process logs hourly data from the last day from US East-coast 1 Process logs hourly data from the last day from US East-coast and Continental Europe
1. Process logs hourly data from the last day from US East-coast:
<coordinator-app name="eastcoast-processing" frequency="${coord:days(1)}" start="2009-01-02T05:00Z" end="2010-01-02T05:00Z" timezone="America/New_York" xmlns="uri:oozie:coordinator:0.1"> <datasets> <dataset name="eastlogs" frequency="${coord:hours(1)}" initial-instance="2009-01-01T06:00Z" timezone="America/New_York"> <uri-template> hdfs://bar:8020/app/logs/eastcoast/${YEAR}/${MONTH}/${DAY}/${HOUR} </uri-template> </dataset> </datasets> <input-events> <data-in name="EC" dataset="eastlogs"> <start-instance>${coord:current( -(coord:hoursInDay(0) - 1) )}</start-instance> <end-instance>${coord:current(0)}</end-instance> </data-in> </input-events> <action> <workflow> <app-path>hdfs://bar:8020/usr/joe/logsaggretor-wf</app-path> <configuration> <property> <name>wfInput</name> <value>${coord:dataIn('EC')}</value> </property> </configuration> </workflow> </action> </coordinator-app>
Because the ${coord:days(1)} EL function is used to specify the job frequency, each coordinator action will be materialized (created) at 00:00 EST5EDT regardless of timezone daylight-saving adjustments (05:00 UTC in Winter and 04:00 UTC in Summer)
The ${coord:hoursInDay(-1)} EL function will resolve to number of hours of the previous day taking into account daylight-saving changes if any. It will resolve to 24 (on regular days), 23 (on spring forward day) or 25 (on fall backward day).
Because of the use of the ${coord:hoursInDay(-1)} EL function, the dataset instances range resolves [-24 .. -1], [-23 .. -1] or [-25 .. -1]. Thus, they will resolve into the exact number of dataset instances for the day taking daylight-saving adjustments into account.
Note that because the coordinator application and the dataset are in the same timezone, there is no need to do any hour offset corrections in the dataset instances being used as input for each coordinator action.
2. Process logs hourly data from the last day from US East-coast and the US West-coast:
<coordinator-app name="eastcoast-europe-processing" frequency="${coord:days(1)}" start="2009-01-02T09:00Z" end="2010-01-02T09:00Z" timezone="America/Los_Angeles" xmlns="uri:oozie:coordinator:0.1"> <datasets> <dataset name="eastlogs" frequency="${coord:hours(1)}" initial-instance="2009-01-01T06:00Z" timezone="America/New_York"> <uri-template> hdfs://bar:8020/app/logs/eastcoast/${YEAR}/${MONTH}/${DAY}/${HOUR} </uri-template> </dataset> <dataset name="estlogs" frequency="${coord:hours(1)}" initial-instance="2009-01-01T09:00Z" timezone="America/Los_Angeles"> <uri-template> hdfs://bar:8020/app/logs/westcoast/${YEAR}/${MONTH}/${DAY}/${HOUR} </uri-template> </dataset> </datasets> <input-events> <data-in name="EC" dataset="eastlogs"> <start-instance>${coord:current( -(coord:hoursInDay(0) - 1) -3)}</start-instance> <end-instance>${coord:current(-3)}</end-instance> </data-in> <data-in name="WC" dataset="westlogs"> <start-instance>$coord:{current(- (coord:hoursInDay(0) - 1) )}</start-instance> <end-instance>${coord:current(0)}</end-instance> </data-in> </input-events> <action> <workflow> <app-path>hdfs://bar:8020/usr/joe/logsaggretor-wf</app-path> <configuration> <property> <name>wfInput</name> <value>${coord:dataIn('EC')},${coord:dataIn('WC')}</value> </property> </configuration> </workflow> </action> </coordinator-app>
The additional complexity of this use case over the first use case is because the job and the datasets are not all in the same timezone. The corresponding timezone offset has to accounted for.
As the use care requires to process all the daily data for the East coast and the West coast, the processing has to be adjusted to the West coast end of the day because the day there finished 3 hours later and processing will have to wait until then.
The data input range for the East coast dataset must be adjusted (with -3) in order to take the data for the previous EST5EDT day.
3. Process logs hourly data from the last day from US East-coast and Continental Europe:
<coordinator-app name="eastcoast-europe-processing" frequency="${coord:days(1)}" start="2009-01-02T05:00Z" end="2010-01-02T05:00Z" timezone="America/New_York" xmlns="uri:oozie:coordinator:0.1"> <datasets> <dataset name="eastlogs" frequency="${coord:hours(1)}" initial-instance="2009-01-01T06:00Z" timezone="America/New_York"> <uri-template> hdfs://bar:8020/app/logs/eastcoast/${YEAR}/${MONTH}/${DAY}/${HOUR} </uri-template> </dataset> <dataset name="europelogs" frequency="${coord:hours(1)}" initial-instance="2009-01-01T01:00Z" timezone="Europe/Berlin"> <uri-template> hdfs://bar:8020/app/logs/europe/${YEAR}/${MONTH}/${DAY}/${HOUR} </uri-template> </dataset> </datasets> <input-events> <data-in name="EC" dataset="eastlogs"> <start-instance>${coord:current( -(coord:hoursInDay(0) - 1) )}</start-instance> <end-instance>${coord:current(-1)}</end-instance> </data-in> <data-in name="EU" dataset="eastlogs"> <start-instance>${coord:current( -(coord:hoursInDay(0) -1) - coord:tzOffset()/60)}</start-instance> <end-instance>${coord:current( - coord:tzOffset()/60)}</end-instance> </data-in> </input-events> <action> <workflow> <app-path>hdfs://bar:8020/usr/joe/logsaggretor-wf</app-path> <configuration> <property> <name>wfInput</name> <value>${coord:dataIn('EC')}</value> </property> </configuration> </workflow> </action> </coordinator-app>
The additional complexity of this use case over the second use case is because the timezones used for the job and the datasets do not follow the same daylight saving rules (Europe and the US apply the DST changes on different days).
Because of this, the timezone offset between Europe and the US is not constant. To obtain the current timezone offset between the coordinator job and a dataset, the ${coord:tzOffset()} EL function must be used.
As the use care requires to process all the daily data for the East coast and the continental Europe, the processing happens on East coast time (thus having daily data already available for both Europe and the East coast).
The data input range for the Europe dataset must be adjusted with the ${coord:tzOffset()} EL function in order to take the data for the previous EST5EDT day.
IMPORTANT: The ${coord:tzOffset()} function returns the offset in minutes, and the datasets in the example are hourly datasets. Because of this, the offset must be divided by 60 to compute the instance offset.
When submitting a coordinator job, the configuration must contain a user.name property. If security is enabled, Oozie must ensure that the value of the user.name property in the configuration match the user credentials present in the protocol (web services) request.
When submitting a coordinator job, the configuration may contain the oozie.job.acl property (the group.name property has been deprecated). If authorization is enabled, this property is treated as as the ACL for the job, it can contain user and group IDs separated by commas.
The specified user and ACL are assigned to the created coordinator job.
Oozie must propagate the specified user and ACL to the system executing the actions (workflow jobs).
Coordinator applications consist exclusively of dataset definitions and coordinator application definitions. They must be installed in an HDFS directory. To submit a job for a coordinator application, the full HDFS path to coordinator application definition must be specified.
The usage of Oozie Coordinator can be categorized in 3 different segments:
Systems that fall in the medium and (specially) in the large categories are usually referred as data pipeline systems.
Oozie Coordinator definition XML schemas provide a convenient and flexible mechanism for all 3 systems categorization define above.
For small systems: All dataset definitions and the coordinator application definition can be defined in a single XML file. The XML definition file is commonly in its own HDFS directory.
For medium systems: A single datasets XML file defines all shared/public datasets. Each coordinator application has its own definition file, they may have embedded/private datasets and they may refer, via inclusion, to the shared datasets XML file. All the XML definition files are grouped in a single HDFS directory.
For large systems: Multiple datasets XML file define all shared/public datasets. Each coordinator application has its own definition file, they may have embedded/private datasets and they may refer, via inclusion, to multiple shared datasets XML files. XML definition files are logically grouped in different HDFS directories.
NOTE: Oozie Coordinator does not enforce any specific organization, grouping or naming for datasets and coordinator application definition files.
The fact that each coordinator application is in a separate XML definition file simplifies coordinator job submission, monitoring and managing of jobs. Tools to support groups of jobs can be built on of the basic, per job, commands provided by the Oozie coordinator engine.
Embedded dataset definitions within a coordinator application cannot have the same name.
Dataset definitions within a dataset definition XML file cannot have the same name.
If a coordinator application includes one or more dataset definition XML files, there cannot be datasets with the same names in the 2 dataset definition XML files.
If any of the dataset name collisions occurs the coordinator job submission must fail.
If a coordinator application includes one or more dataset definition XML files and it has embedded dataset definitions, in case of dataset name collision between the included and the embedded definition files, the embedded dataset takes precedence over the included dataset.
When a coordinator job is submitted to Oozie Coordinator, the submitter must specified all the required job properties plus the HDFS path to the coordinator application definition for the job.
The coordinator application definition HDFS path must be specified in the 'oozie.coord.application.path' job property.
All the coordinator job properties, the HDFS path for the coordinator application, the 'user.name' and 'oozie.job.acl' must be submitted to the Oozie coordinator engine using an XML configuration file (Hadoop XML configuration file).
Example: :
<?xml version="1.0" encoding="UTF-8"?> <configuration> <property> <name>user.name</name> <value>joe</value> </property> <property> <name>oozie.coord.application.path</name> <value>hdfs://foo:8020/user/joe/myapps/hello-coord.xml</value> </property> ... </configuration>
Oozie 2.0 is integrated with GMS (Grid Monitoring System).
If you add sla tags to the Coordinator or Workflow XML files, then the SLA information will be propagated to the GMS system.
<coordinator-app name="hello-coord" frequency="${coord:days(1)}" start="2009-01-02T08:01Z" end="2010-01-01T08:01Z" timezone="America/Los_Angeles" xmlns="uri:oozie:coordinator:0.1" xmlns:sla="uri:oozie:sla:0.1"> <datasets> <dataset name="logs" frequency="${1 * HOURS}" initial-instance="2009-01-01T09:00Z" timezone="America/Los_Angeles"> <uri-template> hdfs://bar:8020/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR}/data </uri-template> </dataset> </datasets> <input-events> <data-in name="input" dataset="logs"> <start-instance>${coord:current( -(coord:hoursInDay(0) - 1) )}</start-instance> <end-instance>${coord:current(0)}</end-instance> </data-in> </input-events> <action> <workflow> <app-path>hdfs://bar:8020/usr/joe/hello-wf</app-path> <configuration> <property> <name>input</name> <value>${coord:dataIn('input')}</value> </property> </configuration> </workflow> <sla:info> <sla:nominal-time>${coord:nominalTime()}</sla:nominal-time> <sla:should-start>${5 * MINUTES}</sla:should-start> <sla:should-end>${55 * MINUTES}</sla:should-end> <sla:message>log processor run for: ${coord:nominalTime()}</sla:message> <sla:alert-contact>joe@example.com</sla:alert-contact> <sla:dev-contact>abc@example.com</sla:dev-contact> <sla:qa-contact>abc@example.com</sla:qa-contact> <sla:se-contact>abc@example.com</sla:se-contact> <sla:upstream-apps>application-a,application-b</sla:upstream-apps> <sla:alert-percentage>99</sla:alert-percentage> <sla:alert-frequency>${24 * LAST_HOUR}</sla:alert-frequency> </sla:info> </action> </coordinator-app>
<workflow-app name="hello-wf" xmlns="uri:oozie:workflow:0.2" xmlns:sla="uri:oozie:sla:0.1"> <start to="grouper"/> <action name="grouper"> <map-reduce> <job-tracker>${jobtracker}</job-tracker> <name-node>${namenode}</name-node> <configuration> <property> <name>mapred.input.dir</name> <value>${input}</value> </property> <property> <name>mapred.output.dir</name> <value>/usr/foo/${wf:id()}/temp1</value> </property> </configuration> </map-reduce> <ok to="end"/> <error to="end"/> </action> <sla:info> <sla:nominal-time>${nominal-time}</sla:nominal-time> <sla:should-start>${10 * MINUTES}</sla:should-start> <sla:should-end>${30 * MINUTES}</sla:should-end> <sla:message>abc.grouper for input ${input}</sla:message> <sla:alert-contact>joe@example.com</sla:alert-contact> <sla:dev-contact>abc@example.com</sla:dev-contact> <sla:qa-contact>abc@example.com</sla:qa-contact> <sla:se-contact>abc@example.com</sla:se-contact> <sla:upstream-apps>application-a,application-b</sla:upstream-apps> <sla:alert-percentage>99</sla:alert-percentage> <sla:alert-frequency>${24 * LAST_HOUR}</sla:alert-frequency> </sla:info> <end name="end"/> </workflow-app>* TBD
Example:
$oozie job -rerun <coord_Job_id> [-nocleanup] [-refresh] [-failed] [-config <arg> (job configuration file '.xml' or '.properties', this file can used to supply properties, which can be used for workflow)] [-action 1, 3-4, 7-40] (-action or -date is required to rerun.) [-date 2009-01-01T01:00Z::2009-05-31T23:59Z, 2009-11-10T01:00Z, 2009-12-31T22:00Z] (if neither -action nor -date is given, the exception will be thrown.)
The rerun option reruns a terminated (=TIMEDOUT=, SUCCEEDED , KILLED , FAILED ) coordinator action when coordinator job is not in FAILED or KILLED state.
After the command is executed the rerun coordinator action will be in WAITING status.
Refer to the Rerunning Coordinator Actions for details on rerun.
Coordinator jobs can be configured to make an HTTP GET notification upon whenever a coordinator action changes its status.
Oozie will make a best effort to deliver the notifications, in case of failure it will retry the notification a pre-configured number of times at a pre-configured interval before giving up.
See also Workflow Notifications
If the oozie.coord.workflow.notification.url property is present in the coordinator job properties when submitting the job, Oozie will make a notification to the provided URL when any of the coordinator's actions changes its status. =oozie.coord.action.notification.proxy= property can be used to configure either a http or socks proxy. The format is proxyHostname:port or proxyType@proxyHostname:port. If proxy type is not specified, it defaults to http. For eg: myhttpproxyhost.mydomain.com:80 or socks@mysockshost.mydomain.com:1080.
If the URL contains any of the following tokens, they will be replaced with the actual values by Oozie before making the notification:
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:coordinator="uri:oozie:coordinator:0.2" elementFormDefault="qualified" targetNamespace="uri:oozie:coordinator:0.2"> <xs:element name="coordinator-app" type="coordinator:COORDINATOR-APP"/> <xs:element name="datasets" type="coordinator:DATASETS"/> <xs:simpleType name="IDENTIFIER"> <xs:restriction base="xs:string"> <xs:pattern value="([a-zA-Z]([\-_a-zA-Z0-9])*){1,39})"/> </xs:restriction> </xs:simpleType> <xs:complexType name="COORDINATOR-APP"> <xs:sequence> <xs:element name="parameters" type="coordinator:PARAMETERS" minOccurs="0" maxOccurs="1"/> <xs:element name="controls" type="coordinator:CONTROLS" minOccurs="0" maxOccurs="1"/> <xs:element name="datasets" type="coordinator:DATASETS" minOccurs="0" maxOccurs="1"/> <xs:element name="input-events" type="coordinator:INPUTEVENTS" minOccurs="0" maxOccurs="1"/> <xs:element name="output-events" type="coordinator:OUTPUTEVENTS" minOccurs="0" maxOccurs="1"/> <xs:element name="action" type="coordinator:ACTION" minOccurs="1" maxOccurs="1"/> </xs:sequence> <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/> <xs:attribute name="frequency" type="xs:string" use="required"/> <xs:attribute name="start" type="xs:string" use="required"/> <xs:attribute name="end" type="xs:string" use="required"/> <xs:attribute name="timezone" type="xs:string" use="required"/> </xs:complexType> <xs:complexType name="PARAMETERS"> <xs:sequence> <xs:element name="property" minOccurs="1" maxOccurs="unbounded"> <xs:complexType> <xs:sequence> <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/> <xs:element name="value" minOccurs="0" maxOccurs="1" type="xs:string"/> <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/> </xs:sequence> </xs:complexType> </xs:element> </xs:sequence> </xs:complexType> <xs:complexType name="CONTROLS"> <xs:sequence minOccurs="0" maxOccurs="1"> <xs:element name="timeout" type="xs:string" minOccurs="0" maxOccurs="1"/> <xs:element name="concurrency" type="xs:string" minOccurs="0" maxOccurs="1"/> <xs:element name="execution" type="xs:string" minOccurs="0" maxOccurs="1"/> <xs:element name="throttle" type="xs:string" minOccurs="0" maxOccurs="1"/> </xs:sequence> </xs:complexType> <xs:complexType name="DATASETS"> <xs:sequence minOccurs="0" maxOccurs="1"> <xs:element name="include" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> <xs:choice minOccurs="0" maxOccurs="unbounded"> <xs:element name="dataset" type="coordinator:SYNCDATASET" minOccurs="0" maxOccurs="1"/> </xs:choice> </xs:sequence> </xs:complexType> <xs:complexType name="SYNCDATASET"> <xs:sequence> <xs:element name="uri-template" type="xs:string" minOccurs="1" maxOccurs="1"/> <xs:element name="done-flag" type="xs:string" minOccurs="0" maxOccurs="1"/> </xs:sequence> <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/> <xs:attribute name="frequency" type="xs:string" use="required"/> <xs:attribute name="initial-instance" type="xs:string" use="required"/> <xs:attribute name="timezone" type="xs:string" use="required"/> </xs:complexType> <xs:complexType name="INPUTEVENTS"> <xs:sequence minOccurs="1" maxOccurs="1"> <xs:element name="data-in" type="coordinator:DATAIN" minOccurs="1" maxOccurs="unbounded"/> </xs:sequence> </xs:complexType> <xs:complexType name="DATAIN"> <xs:choice minOccurs="1" maxOccurs="1"> <xs:element name="instance" type="xs:string" minOccurs="1" maxOccurs="unbounded"/> <xs:sequence minOccurs="1" maxOccurs="1"> <xs:element name="start-instance" type="xs:string" minOccurs="1" maxOccurs="1"/> <xs:element name="end-instance" type="xs:string" minOccurs="1" maxOccurs="1"/> </xs:sequence> </xs:choice> <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/> <xs:attribute name="dataset" type="coordinator:IDENTIFIER" use="required"/> </xs:complexType> <xs:complexType name="OUTPUTEVENTS"> <xs:sequence minOccurs="1" maxOccurs="1"> <xs:element name="data-out" type="coordinator:DATAOUT" minOccurs="1" maxOccurs="unbounded"/> </xs:sequence> </xs:complexType> <xs:complexType name="DATAOUT"> <xs:sequence minOccurs="1" maxOccurs="1"> <xs:element name="instance" type="xs:string" minOccurs="1" maxOccurs="1"/> </xs:sequence> <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/> <xs:attribute name="dataset" type="coordinator:IDENTIFIER" use="required"/> </xs:complexType> <xs:complexType name="ACTION"> <xs:sequence minOccurs="1" maxOccurs="1"> <xs:element name="workflow" type="coordinator:WORKFLOW" minOccurs="1" maxOccurs="1"/> <xs:any namespace="uri:oozie:sla:0.1" minOccurs="0" maxOccurs="1"/> </xs:sequence> </xs:complexType> <xs:complexType name="WORKFLOW"> <xs:sequence> <xs:element name="app-path" type="xs:string" minOccurs="1" maxOccurs="1"/> <xs:element name="configuration" type="coordinator:CONFIGURATION" minOccurs="0" maxOccurs="1"/> </xs:sequence> </xs:complexType> <xs:complexType name="FLAG"/> <xs:complexType name="CONFIGURATION"> <xs:sequence> <xs:element name="property" minOccurs="1" maxOccurs="unbounded"> <xs:complexType> <xs:sequence> <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/> <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/> <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/> </xs:sequence> </xs:complexType> </xs:element> </xs:sequence> </xs:complexType> </xs:schema>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:coordinator="uri:oozie:coordinator:0.2" elementFormDefault="qualified" targetNamespace="uri:oozie:coordinator:0.2"> <xs:element name="coordinator-app" type="coordinator:COORDINATOR-APP"/> <xs:element name="datasets" type="coordinator:DATASETS"/> <xs:simpleType name="IDENTIFIER"> <xs:restriction base="xs:string"> <xs:pattern value="([a-zA-Z]([\-_a-zA-Z0-9])*){1,39})"/> </xs:restriction> </xs:simpleType> <xs:complexType name="COORDINATOR-APP"> <xs:sequence> <xs:element name="controls" type="coordinator:CONTROLS" minOccurs="0" maxOccurs="1"/> <xs:element name="datasets" type="coordinator:DATASETS" minOccurs="0" maxOccurs="1"/> <xs:element name="input-events" type="coordinator:INPUTEVENTS" minOccurs="0" maxOccurs="1"/> <xs:element name="output-events" type="coordinator:OUTPUTEVENTS" minOccurs="0" maxOccurs="1"/> <xs:element name="action" type="coordinator:ACTION" minOccurs="1" maxOccurs="1"/> </xs:sequence> <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/> <xs:attribute name="frequency" type="xs:string" use="required"/> <xs:attribute name="start" type="xs:string" use="required"/> <xs:attribute name="end" type="xs:string" use="required"/> <xs:attribute name="timezone" type="xs:string" use="required"/> </xs:complexType> <xs:complexType name="CONTROLS"> <xs:sequence minOccurs="0" maxOccurs="1"> <xs:element name="timeout" type="xs:string" minOccurs="0" maxOccurs="1"/> <xs:element name="concurrency" type="xs:string" minOccurs="0" maxOccurs="1"/> <xs:element name="execution" type="xs:string" minOccurs="0" maxOccurs="1"/> <xs:element name="throttle" type="xs:string" minOccurs="0" maxOccurs="1"/> </xs:sequence> </xs:complexType> <xs:complexType name="DATASETS"> <xs:sequence minOccurs="0" maxOccurs="1"> <xs:element name="include" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> <xs:choice minOccurs="0" maxOccurs="unbounded"> <xs:element name="dataset" type="coordinator:SYNCDATASET" minOccurs="0" maxOccurs="1"/> </xs:choice> </xs:sequence> </xs:complexType> <xs:complexType name="SYNCDATASET"> <xs:sequence> <xs:element name="uri-template" type="xs:string" minOccurs="1" maxOccurs="1"/> <xs:element name="done-flag" type="xs:string" minOccurs="0" maxOccurs="1"/> </xs:sequence> <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/> <xs:attribute name="frequency" type="xs:string" use="required"/> <xs:attribute name="initial-instance" type="xs:string" use="required"/> <xs:attribute name="timezone" type="xs:string" use="required"/> </xs:complexType> <xs:complexType name="INPUTEVENTS"> <xs:sequence minOccurs="1" maxOccurs="1"> <xs:element name="data-in" type="coordinator:DATAIN" minOccurs="1" maxOccurs="unbounded"/> </xs:sequence> </xs:complexType> <xs:complexType name="DATAIN"> <xs:choice minOccurs="1" maxOccurs="1"> <xs:element name="instance" type="xs:string" minOccurs="1" maxOccurs="unbounded"/> <xs:sequence minOccurs="1" maxOccurs="1"> <xs:element name="start-instance" type="xs:string" minOccurs="1" maxOccurs="1"/> <xs:element name="end-instance" type="xs:string" minOccurs="1" maxOccurs="1"/> </xs:sequence> </xs:choice> <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/> <xs:attribute name="dataset" type="coordinator:IDENTIFIER" use="required"/> </xs:complexType> <xs:complexType name="OUTPUTEVENTS"> <xs:sequence minOccurs="1" maxOccurs="1"> <xs:element name="data-out" type="coordinator:DATAOUT" minOccurs="1" maxOccurs="unbounded"/> </xs:sequence> </xs:complexType> <xs:complexType name="DATAOUT"> <xs:sequence minOccurs="1" maxOccurs="1"> <xs:element name="instance" type="xs:string" minOccurs="1" maxOccurs="1"/> </xs:sequence> <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/> <xs:attribute name="dataset" type="coordinator:IDENTIFIER" use="required"/> </xs:complexType> <xs:complexType name="ACTION"> <xs:sequence minOccurs="1" maxOccurs="1"> <xs:element name="workflow" type="coordinator:WORKFLOW" minOccurs="1" maxOccurs="1"/> <xs:any namespace="uri:oozie:sla:0.1" minOccurs="0" maxOccurs="1"/> </xs:sequence> </xs:complexType> <xs:complexType name="WORKFLOW"> <xs:sequence> <xs:element name="app-path" type="xs:string" minOccurs="1" maxOccurs="1"/> <xs:element name="configuration" type="coordinator:CONFIGURATION" minOccurs="0" maxOccurs="1"/> </xs:sequence> </xs:complexType> <xs:complexType name="FLAG"/> <xs:complexType name="CONFIGURATION"> <xs:sequence> <xs:element name="property" minOccurs="1" maxOccurs="unbounded"> <xs:complexType> <xs:sequence> <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/> <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/> <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/> </xs:sequence> </xs:complexType> </xs:element> </xs:sequence> </xs:complexType> </xs:schema>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:coordinator="uri:oozie:coordinator:0.1" elementFormDefault="qualified" targetNamespace="uri:oozie:coordinator:0.1"> <xs:element name="coordinator-app" type="coordinator:COORDINATOR-APP"/> <xs:element name="datasets" type="coordinator:DATASETS"/> <xs:simpleType name="IDENTIFIER"> <xs:restriction base="xs:string"> <xs:pattern value="([a-zA-Z]([\-_a-zA-Z0-9])*){1,39})"/> </xs:restriction> </xs:simpleType> <xs:complexType name="COORDINATOR-APP"> <xs:sequence> <xs:element name="controls" type="coordinator:CONTROLS" minOccurs="0" maxOccurs="1"/> <xs:element name="datasets" type="coordinator:DATASETS" minOccurs="0" maxOccurs="1"/> <xs:element name="input-events" type="coordinator:INPUTEVENTS" minOccurs="0" maxOccurs="1"/> <xs:element name="output-events" type="coordinator:OUTPUTEVENTS" minOccurs="0" maxOccurs="1"/> <xs:element name="action" type="coordinator:ACTION" minOccurs="1" maxOccurs="1"/> </xs:sequence> <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/> <xs:attribute name="frequency" type="xs:string" use="required"/> <xs:attribute name="start" type="xs:string" use="required"/> <xs:attribute name="end" type="xs:string" use="required"/> <xs:attribute name="timezone" type="xs:string" use="required"/> </xs:complexType> <xs:complexType name="CONTROLS"> <xs:sequence minOccurs="0" maxOccurs="1"> <xs:element name="timeout" type="xs:string" minOccurs="0" maxOccurs="1"/> <xs:element name="concurrency" type="xs:string" minOccurs="0" maxOccurs="1"/> <xs:element name="execution" type="xs:string" minOccurs="0" maxOccurs="1"/> </xs:sequence> </xs:complexType> <xs:complexType name="DATASETS"> <xs:sequence minOccurs="0" maxOccurs="1"> <xs:element name="include" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> <xs:choice minOccurs="0" maxOccurs="unbounded"> <xs:element name="dataset" type="coordinator:SYNCDATASET" minOccurs="0" maxOccurs="1"/> </xs:choice> </xs:sequence> </xs:complexType> <xs:complexType name="SYNCDATASET"> <xs:sequence> <xs:element name="uri-template" type="xs:string" minOccurs="1" maxOccurs="1"/> <xs:element name="done-flag" type="xs:string" minOccurs="0" maxOccurs="1"/> </xs:sequence> <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/> <xs:attribute name="frequency" type="xs:string" use="required"/> <xs:attribute name="initial-instance" type="xs:string" use="required"/> <xs:attribute name="timezone" type="xs:string" use="required"/> </xs:complexType> <xs:complexType name="INPUTEVENTS"> <xs:sequence minOccurs="1" maxOccurs="1"> <xs:element name="data-in" type="coordinator:DATAIN" minOccurs="1" maxOccurs="unbounded"/> </xs:sequence> </xs:complexType> <xs:complexType name="DATAIN"> <xs:choice minOccurs="1" maxOccurs="1"> <xs:element name="instance" type="xs:string" minOccurs="1" maxOccurs="unbounded"/> <xs:sequence minOccurs="1" maxOccurs="1"> <xs:element name="start-instance" type="xs:string" minOccurs="1" maxOccurs="1"/> <xs:element name="end-instance" type="xs:string" minOccurs="1" maxOccurs="1"/> </xs:sequence> </xs:choice> <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/> <xs:attribute name="dataset" type="coordinator:IDENTIFIER" use="required"/> </xs:complexType> <xs:complexType name="OUTPUTEVENTS"> <xs:sequence minOccurs="1" maxOccurs="1"> <xs:element name="data-out" type="coordinator:DATAOUT" minOccurs="1" maxOccurs="unbounded"/> </xs:sequence> </xs:complexType> <xs:complexType name="DATAOUT"> <xs:sequence minOccurs="1" maxOccurs="1"> <xs:element name="instance" type="xs:string" minOccurs="1" maxOccurs="1"/> </xs:sequence> <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/> <xs:attribute name="dataset" type="coordinator:IDENTIFIER" use="required"/> </xs:complexType> <xs:complexType name="ACTION"> <xs:sequence minOccurs="1" maxOccurs="1"> <xs:element name="workflow" type="coordinator:WORKFLOW" minOccurs="1" maxOccurs="1"/> <xs:any namespace="uri:oozie:sla:0.1" minOccurs="0" maxOccurs="1"/> </xs:sequence> </xs:complexType> <xs:complexType name="WORKFLOW"> <xs:sequence> <xs:element name="app-path" type="xs:string" minOccurs="1" maxOccurs="1"/> <xs:element name="configuration" type="coordinator:CONFIGURATION" minOccurs="0" maxOccurs="1"/> </xs:sequence> </xs:complexType> <xs:complexType name="FLAG"/> <xs:complexType name="CONFIGURATION"> <xs:sequence> <xs:element name="property" minOccurs="1" maxOccurs="unbounded"> <xs:complexType> <xs:sequence> <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/> <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/> <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/> </xs:sequence> </xs:complexType> </xs:element> </xs:sequence> </xs:complexType> </xs:schema>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:sla="uri:oozie:sla:0.2" elementFormDefault="qualified" targetNamespace="uri:oozie:sla:0.2"> <xs:element name="info" type="sla:SLA-INFO"/> <xs:complexType name="SLA-INFO"> <xs:sequence> <xs:element name="nominal-time" type="xs:string" minOccurs="1" maxOccurs="1"/> <xs:element name="should-start" type="xs:string" minOccurs="0" maxOccurs="1"/> <xs:element name="should-end" type="xs:string" minOccurs="1" maxOccurs="1"/> <xs:element name="max-duration" type="xs:string" minOccurs="0" maxOccurs="1"/> <xs:element name="alert-events" type="xs:string" minOccurs="0" maxOccurs="1"/> <xs:element name="alert-contact" type="xs:string" minOccurs="0" maxOccurs="1"/> <xs:element name="notification-msg" type="xs:string" minOccurs="0" maxOccurs="1"/> <xs:element name="upstream-apps" type="xs:string" minOccurs="0" maxOccurs="1"/> </xs:sequence> </xs:complexType> </xs:schema>
<?xml version="1.0" encoding="UTF-8"?> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:sla="uri:oozie:sla:0.1" elementFormDefault="qualified" targetNamespace="uri:oozie:sla:0.1"> <xs:element name="info" type="sla:SLA-INFO" /> <xs:complexType name="SLA-INFO"> <xs:sequence> <xs:element name="app-name" type="xs:string" minOccurs="1" maxOccurs="1" /> <xs:element name="nominal-time" type="xs:string" minOccurs="1" maxOccurs="1" /> <xs:element name="should-start" type="xs:string" minOccurs="1" maxOccurs="1" /> <xs:element name="should-end" type="xs:string" minOccurs="1" maxOccurs="1" /> <xs:element name="parent-client-id" type="xs:string" minOccurs="0" maxOccurs="1" /> <xs:element name="parent-sla-id" type="xs:string" minOccurs="0" maxOccurs="1" /> <xs:element name="notification-msg" type="xs:string" minOccurs="0" maxOccurs="1" /> <xs:element name="alert-contact" type="xs:string" minOccurs="1" maxOccurs="1" /> <xs:element name="dev-contact" type="xs:string" minOccurs="1" maxOccurs="1" /> <xs:element name="qa-contact" type="xs:string" minOccurs="1" maxOccurs="1" /> <xs:element name="se-contact" type="xs:string" minOccurs="1" maxOccurs="1" /> <xs:element name="alert-frequency" type="sla:alert-frequencyType" minOccurs="0" maxOccurs="1" /> <xs:element name="alert-percentage" type="sla:alert-percentageType" minOccurs="0" maxOccurs="1" /> <xs:element name="upstream-apps" type="xs:string" minOccurs="0" maxOccurs="1" /> </xs:sequence> </xs:complexType> <xs:simpleType name="alert-percentageType"> <xs:restriction base="xs:integer"> <xs:minInclusive value="0"/> <xs:maxInclusive value="100"/> </xs:restriction> </xs:simpleType> <xs:simpleType name="alert-frequencyType"> <xs:restriction base="xs:string"> <xs:enumeration value="NONE"></xs:enumeration> <xs:enumeration value="LAST_HOUR"></xs:enumeration> <xs:enumeration value="LAST_DAY"></xs:enumeration> <xs:enumeration value="LAST_MONTH"></xs:enumeration> </xs:restriction> </xs:simpleType> </xs:schema>