This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.coord;
019
020 import java.io.IOException;
021 import java.util.ArrayList;
022 import java.util.Calendar;
023 import java.util.Date;
024 import java.util.List;
025 import java.util.TimeZone;
026
027 import org.apache.hadoop.conf.Configuration;
028 import org.apache.hadoop.fs.Path;
029
030 import org.apache.oozie.client.OozieClient;
031 import org.apache.oozie.util.DateUtils;
032 import org.apache.oozie.util.ELEvaluator;
033 import org.apache.oozie.util.ParamChecker;
034 import org.apache.oozie.util.XLog;
035 import org.apache.oozie.service.HadoopAccessorException;
036 import org.apache.oozie.service.Services;
037 import org.apache.oozie.service.HadoopAccessorService;
038
039 /**
040 * This class implements the EL function related to coordinator
041 */
042
043 public class CoordELFunctions {
044 final private static XLog LOG = XLog.getLog(CoordELFunctions.class);
045 final private static String DATASET = "oozie.coord.el.dataset.bean";
046 final private static String COORD_ACTION = "oozie.coord.el.app.bean";
047 final public static String CONFIGURATION = "oozie.coord.el.conf";
048 final public static String LATEST_EL_USE_CURRENT_TIME = "oozie.service.ELService.latest-el.use-current-time";
049 // INSTANCE_SEPARATOR is used to separate multiple directories into one tag.
050 final public static String INSTANCE_SEPARATOR = "#";
051 final public static String DIR_SEPARATOR = ",";
052 // TODO: in next release, support flexibility
053 private static String END_OF_OPERATION_INDICATOR_FILE = "_SUCCESS";
054
055 /**
056 * Used in defining the frequency in 'day' unit. <p/> domain: <code> val > 0</code> and should be integer.
057 *
058 * @param val frequency in number of days.
059 * @return number of days and also set the frequency timeunit to "day"
060 */
061 public static int ph1_coord_days(int val) {
062 val = ParamChecker.checkGTZero(val, "n");
063 ELEvaluator eval = ELEvaluator.getCurrent();
064 eval.setVariable("timeunit", TimeUnit.DAY);
065 eval.setVariable("endOfDuration", TimeUnit.NONE);
066 return val;
067 }
068
069 /**
070 * Used in defining the frequency in 'month' unit. <p/> domain: <code> val > 0</code> and should be integer.
071 *
072 * @param val frequency in number of months.
073 * @return number of months and also set the frequency timeunit to "month"
074 */
075 public static int ph1_coord_months(int val) {
076 val = ParamChecker.checkGTZero(val, "n");
077 ELEvaluator eval = ELEvaluator.getCurrent();
078 eval.setVariable("timeunit", TimeUnit.MONTH);
079 eval.setVariable("endOfDuration", TimeUnit.NONE);
080 return val;
081 }
082
083 /**
084 * Used in defining the frequency in 'hour' unit. <p/> parameter value domain: <code> val > 0</code> and should
085 * be integer.
086 *
087 * @param val frequency in number of hours.
088 * @return number of minutes and also set the frequency timeunit to "minute"
089 */
090 public static int ph1_coord_hours(int val) {
091 val = ParamChecker.checkGTZero(val, "n");
092 ELEvaluator eval = ELEvaluator.getCurrent();
093 eval.setVariable("timeunit", TimeUnit.MINUTE);
094 eval.setVariable("endOfDuration", TimeUnit.NONE);
095 return val * 60;
096 }
097
098 /**
099 * Used in defining the frequency in 'minute' unit. <p/> domain: <code> val > 0</code> and should be integer.
100 *
101 * @param val frequency in number of minutes.
102 * @return number of minutes and also set the frequency timeunit to "minute"
103 */
104 public static int ph1_coord_minutes(int val) {
105 val = ParamChecker.checkGTZero(val, "n");
106 ELEvaluator eval = ELEvaluator.getCurrent();
107 eval.setVariable("timeunit", TimeUnit.MINUTE);
108 eval.setVariable("endOfDuration", TimeUnit.NONE);
109 return val;
110 }
111
112 /**
113 * Used in defining the frequency in 'day' unit and specify the "end of day" property. <p/> Every instance will
114 * start at 00:00 hour of each day. <p/> domain: <code> val > 0</code> and should be integer.
115 *
116 * @param val frequency in number of days.
117 * @return number of days and also set the frequency timeunit to "day" and end_of_duration flag to "day"
118 */
119 public static int ph1_coord_endOfDays(int val) {
120 val = ParamChecker.checkGTZero(val, "n");
121 ELEvaluator eval = ELEvaluator.getCurrent();
122 eval.setVariable("timeunit", TimeUnit.DAY);
123 eval.setVariable("endOfDuration", TimeUnit.END_OF_DAY);
124 return val;
125 }
126
127 /**
128 * Used in defining the frequency in 'month' unit and specify the "end of month" property. <p/> Every instance will
129 * start at first day of each month at 00:00 hour. <p/> domain: <code> val > 0</code> and should be integer.
130 *
131 * @param val: frequency in number of months.
132 * @return number of months and also set the frequency timeunit to "month" and end_of_duration flag to "month"
133 */
134 public static int ph1_coord_endOfMonths(int val) {
135 val = ParamChecker.checkGTZero(val, "n");
136 ELEvaluator eval = ELEvaluator.getCurrent();
137 eval.setVariable("timeunit", TimeUnit.MONTH);
138 eval.setVariable("endOfDuration", TimeUnit.END_OF_MONTH);
139 return val;
140 }
141
142 /**
143 * Calculate the difference of timezone offset in minutes between dataset and coordinator job. <p/> Depends on: <p/>
144 * 1. Timezone of both dataset and job <p/> 2. Action creation Time
145 *
146 * @return difference in minutes (DataSet TZ Offset - Application TZ offset)
147 */
148 public static int ph2_coord_tzOffset() {
149 Date actionCreationTime = getActionCreationtime();
150 TimeZone dsTZ = ParamChecker.notNull(getDatasetTZ(), "DatasetTZ");
151 TimeZone jobTZ = ParamChecker.notNull(getJobTZ(), "JobTZ");
152 // Apply the TZ into Calendar object
153 Calendar dsTime = Calendar.getInstance(dsTZ);
154 dsTime.setTime(actionCreationTime);
155 Calendar jobTime = Calendar.getInstance(jobTZ);
156 jobTime.setTime(actionCreationTime);
157 return (dsTime.get(Calendar.ZONE_OFFSET) - jobTime.get(Calendar.ZONE_OFFSET)) / (1000 * 60);
158 }
159
160 public static int ph3_coord_tzOffset() {
161 return ph2_coord_tzOffset();
162 }
163
164 /**
165 * Returns the a date string while given a base date in 'strBaseDate',
166 * offset and unit (e.g. DAY, MONTH, HOUR, MINUTE, MONTH).
167 *
168 * @param strBaseDate -- base date
169 * @param offset -- any number
170 * @param unit -- DAY, MONTH, HOUR, MINUTE, MONTH
171 * @return date string
172 * @throws Exception
173 */
174 public static String ph2_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception {
175 Calendar baseCalDate = DateUtils.getCalendar(strBaseDate);
176 StringBuilder buffer = new StringBuilder();
177 baseCalDate.add(TimeUnit.valueOf(unit).getCalendarUnit(), offset);
178 buffer.append(DateUtils.formatDateOozieTZ(baseCalDate));
179 return buffer.toString();
180 }
181
182 public static String ph3_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception {
183 return ph2_coord_dateOffset(strBaseDate, offset, unit);
184 }
185
186 /**
187 * Determine the date-time in Oozie processing timezone of n-th future available dataset instance
188 * from nominal Time but not beyond the instance specified as 'instance.
189 * <p/>
190 * It depends on:
191 * <p/>
192 * 1. Data set frequency
193 * <p/>
194 * 2. Data set Time unit (day, month, minute)
195 * <p/>
196 * 3. Data set Time zone/DST
197 * <p/>
198 * 4. End Day/Month flag
199 * <p/>
200 * 5. Data set initial instance
201 * <p/>
202 * 6. Action Creation Time
203 * <p/>
204 * 7. Existence of dataset's directory
205 *
206 * @param n :instance count
207 * <p/>
208 * domain: n >= 0, n is integer
209 * @param instance: How many future instance it should check? value should
210 * be >=0
211 * @return date-time in Oozie processing timezone of the n-th instance
212 * <p/>
213 * @throws Exception
214 */
215 public static String ph3_coord_future(int n, int instance) throws Exception {
216 ParamChecker.checkGEZero(n, "future:n");
217 ParamChecker.checkGTZero(instance, "future:instance");
218 if (isSyncDataSet()) {// For Sync Dataset
219 return coord_future_sync(n, instance);
220 }
221 else {
222 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
223 }
224 }
225
226 /**
227 * Determine the date-time in Oozie processing timezone of the future available dataset instances
228 * from start to end offsets from nominal Time but not beyond the instance specified as 'instance'.
229 * <p/>
230 * It depends on:
231 * <p/>
232 * 1. Data set frequency
233 * <p/>
234 * 2. Data set Time unit (day, month, minute)
235 * <p/>
236 * 3. Data set Time zone/DST
237 * <p/>
238 * 4. End Day/Month flag
239 * <p/>
240 * 5. Data set initial instance
241 * <p/>
242 * 6. Action Creation Time
243 * <p/>
244 * 7. Existence of dataset's directory
245 *
246 * @param start : start instance offset
247 * <p/>
248 * domain: start >= 0, start is integer
249 * @param end : end instance offset
250 * <p/>
251 * domain: end >= 0, end is integer
252 * @param instance: How many future instance it should check? value should
253 * be >=0
254 * @return date-time in Oozie processing timezone of the instances from start to end offsets
255 * delimited by comma.
256 * <p/>
257 * @throws Exception
258 */
259 public static String ph3_coord_futureRange(int start, int end, int instance) throws Exception {
260 ParamChecker.checkGEZero(start, "future:n");
261 ParamChecker.checkGEZero(end, "future:n");
262 ParamChecker.checkGTZero(instance, "future:instance");
263 if (isSyncDataSet()) {// For Sync Dataset
264 return coord_futureRange_sync(start, end, instance);
265 }
266 else {
267 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
268 }
269 }
270
271 private static String coord_future_sync(int n, int instance) throws Exception {
272 return coord_futureRange_sync(n, n, instance);
273 }
274
275 private static String coord_futureRange_sync(int startOffset, int endOffset, int instance) throws Exception {
276 ELEvaluator eval = ELEvaluator.getCurrent();
277 String retVal = "";
278 int datasetFrequency = (int) getDSFrequency();// in minutes
279 TimeUnit dsTimeUnit = getDSTimeUnit();
280 int[] instCount = new int[1];
281 Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
282 StringBuilder resolvedInstances = new StringBuilder();
283 StringBuilder resolvedURIPaths = new StringBuilder();
284 if (nominalInstanceCal != null) {
285 Calendar initInstance = getInitialInstanceCal();
286 nominalInstanceCal = (Calendar) initInstance.clone();
287 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
288
289 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
290 if (ds == null) {
291 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
292 }
293 String uriTemplate = ds.getUriTemplate();
294 Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
295 if (conf == null) {
296 throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
297 }
298 int available = 0, checkedInstance = 0;
299 boolean resolved = false;
300 String user = ParamChecker
301 .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
302 String doneFlag = ds.getDoneFlag();
303 while (instance >= checkedInstance) {
304 ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
305 String uriPath = uriEval.evaluate(uriTemplate, String.class);
306 String pathWithDoneFlag = uriPath;
307 if (doneFlag.length() > 0) {
308 pathWithDoneFlag += "/" + doneFlag;
309 }
310 if (isPathAvailable(pathWithDoneFlag, user, null, conf)) {
311 LOG.debug("Found future(" + available + "): " + pathWithDoneFlag);
312 if (available == endOffset) {
313 LOG.debug("Matched future(" + available + "): " + pathWithDoneFlag);
314 resolved = true;
315 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
316 resolvedURIPaths.append(uriPath);
317 retVal = resolvedInstances.toString();
318 eval.setVariable("resolved_path", resolvedURIPaths.toString());
319 break;
320 } else if (available >= startOffset) {
321 LOG.debug("Matched future(" + available + "): " + pathWithDoneFlag);
322 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(INSTANCE_SEPARATOR);
323 resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
324 }
325 available++;
326 }
327 // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(),
328 // -datasetFrequency);
329 nominalInstanceCal = (Calendar) initInstance.clone();
330 instCount[0]++;
331 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
332 checkedInstance++;
333 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
334 }
335 if (!resolved) {
336 // return unchanged future function with variable 'is_resolved'
337 // to 'false'
338 eval.setVariable("is_resolved", Boolean.FALSE);
339 if (startOffset == endOffset) {
340 retVal = "${coord:future(" + startOffset + ", " + instance + ")}";
341 }
342 else {
343 retVal = "${coord:futureRange(" + startOffset + ", " + endOffset + ", " + instance + ")}";
344 }
345 }
346 else {
347 eval.setVariable("is_resolved", Boolean.TRUE);
348 }
349 }
350 else {// No feasible nominal time
351 eval.setVariable("is_resolved", Boolean.TRUE);
352 retVal = "";
353 }
354 return retVal;
355 }
356
357 /**
358 * Return nominal time or Action Creation Time.
359 * <p/>
360 *
361 * @return coordinator action creation or materialization date time
362 * @throws Exception if unable to format the Date object to String
363 */
364 public static String ph2_coord_nominalTime() throws Exception {
365 ELEvaluator eval = ELEvaluator.getCurrent();
366 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
367 "Coordinator Action");
368 return DateUtils.formatDateOozieTZ(action.getNominalTime());
369 }
370
371 public static String ph3_coord_nominalTime() throws Exception {
372 return ph2_coord_nominalTime();
373 }
374
375 /**
376 * Convert from standard date-time formatting to a desired format.
377 * <p/>
378 * @param dateTimeStr - A timestamp in standard (ISO8601) format.
379 * @param format - A string representing the desired format.
380 * @return coordinator action creation or materialization date time
381 * @throws Exception if unable to format the Date object to String
382 */
383 public static String ph2_coord_formatTime(String dateTimeStr, String format)
384 throws Exception {
385 Date dateTime = DateUtils.parseDateOozieTZ(dateTimeStr);
386 return DateUtils.formatDateCustom(dateTime, format);
387 }
388
389 public static String ph3_coord_formatTime(String dateTimeStr, String format)
390 throws Exception {
391 return ph2_coord_formatTime(dateTimeStr, format);
392 }
393
394 /**
395 * Return Action Id. <p/>
396 *
397 * @return coordinator action Id
398 */
399 public static String ph2_coord_actionId() throws Exception {
400 ELEvaluator eval = ELEvaluator.getCurrent();
401 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
402 "Coordinator Action");
403 return action.getActionId();
404 }
405
406 public static String ph3_coord_actionId() throws Exception {
407 return ph2_coord_actionId();
408 }
409
410 /**
411 * Return Job Name. <p/>
412 *
413 * @return coordinator name
414 */
415 public static String ph2_coord_name() throws Exception {
416 ELEvaluator eval = ELEvaluator.getCurrent();
417 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
418 "Coordinator Action");
419 return action.getName();
420 }
421
422 public static String ph3_coord_name() throws Exception {
423 return ph2_coord_name();
424 }
425
426 /**
427 * Return Action Start time. <p/>
428 *
429 * @return coordinator action start time
430 * @throws Exception if unable to format the Date object to String
431 */
432 public static String ph2_coord_actualTime() throws Exception {
433 ELEvaluator eval = ELEvaluator.getCurrent();
434 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
435 if (coordAction == null) {
436 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
437 }
438 return DateUtils.formatDateOozieTZ(coordAction.getActualTime());
439 }
440
441 public static String ph3_coord_actualTime() throws Exception {
442 return ph2_coord_actualTime();
443 }
444
445 /**
446 * Used to specify a list of URI's that are used as input dir to the workflow job. <p/> Look for two evaluator-level
447 * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
448 * URI. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
449 * unresolved, this function will echo back the original function <p/> otherwise it sends the uris.
450 *
451 * @param dataInName : Datain name
452 * @return the list of URI's separated by INSTANCE_SEPARATOR <p/> if there are unresolved EL function (i.e. latest)
453 * , echo back <p/> the function without resolving the function.
454 */
455 public static String ph3_coord_dataIn(String dataInName) {
456 String uris = "";
457 ELEvaluator eval = ELEvaluator.getCurrent();
458 uris = (String) eval.getVariable(".datain." + dataInName);
459 Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
460 if (unresolved != null && unresolved.booleanValue() == true) {
461 return "${coord:dataIn('" + dataInName + "')}";
462 }
463 return uris;
464 }
465
466 /**
467 * Used to specify a list of URI's that are output dir of the workflow job. <p/> Look for one evaluator-level
468 * variable <p/> dataout.<DATAOUT_NAME> <p/> It defines the current list of URI. <p/> otherwise it sends the uris.
469 *
470 * @param dataOutName : Dataout name
471 * @return the list of URI's separated by INSTANCE_SEPARATOR
472 */
473 public static String ph3_coord_dataOut(String dataOutName) {
474 String uris = "";
475 ELEvaluator eval = ELEvaluator.getCurrent();
476 uris = (String) eval.getVariable(".dataout." + dataOutName);
477 return uris;
478 }
479
480 /**
481 * Determine the date-time in Oozie processing timezone of n-th dataset instance. <p/> It depends on: <p/> 1.
482 * Data set frequency <p/> 2.
483 * Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. Data
484 * set initial instance <p/> 6. Action Creation Time
485 *
486 * @param n instance count domain: n is integer
487 * @return date-time in Oozie processing timezone of the n-th instance returns 'null' means n-th instance is
488 * earlier than Initial-Instance of DS
489 * @throws Exception
490 */
491 public static String ph2_coord_current(int n) throws Exception {
492 if (isSyncDataSet()) { // For Sync Dataset
493 return coord_current_sync(n);
494 }
495 else {
496 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
497 }
498 }
499
500 /**
501 * Determine the date-time in Oozie processing timezone of the given offset from the dataset effective nominal time. <p/> It
502 * depends on: <p> 1. Data set frequency <p/> 2. Data set Time Unit <p/> 3. Data set Time zone/DST
503 * <p/> 4. Data set initial instance <p/> 5. Action Creation Time
504 *
505 * @param n offset amount (integer)
506 * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR")
507 * @return date-time in Oozie processing timezone of the given offset from the dataset effective nominal time
508 * @throws Exception if there was a problem formatting
509 */
510 public static String ph2_coord_offset(int n, String timeUnit) throws Exception {
511 if (isSyncDataSet()) { // For Sync Dataset
512 return coord_offset_sync(n, timeUnit);
513 }
514 else {
515 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
516 }
517 }
518
519 /**
520 * Determine how many hours is on the date of n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency
521 * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5.
522 * Data set initial instance <p/> 6. Action Creation Time
523 *
524 * @param n instance count <p/> domain: n is integer
525 * @return number of hours on that day <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS
526 * @throws Exception
527 */
528 public static int ph2_coord_hoursInDay(int n) throws Exception {
529 int datasetFrequency = (int) getDSFrequency();
530 // /Calendar nominalInstanceCal =
531 // getCurrentInstance(getActionCreationtime());
532 Calendar nominalInstanceCal = getEffectiveNominalTime();
533 if (nominalInstanceCal == null) {
534 return -1;
535 }
536 nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n);
537 /*
538 * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0)
539 * { return -1; }
540 */
541 nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ
542 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
543 return DateUtils.hoursInDay(nominalInstanceCal);
544 }
545
546 public static int ph3_coord_hoursInDay(int n) throws Exception {
547 return ph2_coord_hoursInDay(n);
548 }
549
550 /**
551 * Calculate number of days in one month for n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency .
552 * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5.
553 * Data set initial instance <p/> 6. Action Creation Time
554 *
555 * @param n instance count. domain: n is integer
556 * @return number of days in that month <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS
557 * @throws Exception
558 */
559 public static int ph2_coord_daysInMonth(int n) throws Exception {
560 int datasetFrequency = (int) getDSFrequency();// in minutes
561 // Calendar nominalInstanceCal =
562 // getCurrentInstance(getActionCreationtime());
563 Calendar nominalInstanceCal = getEffectiveNominalTime();
564 if (nominalInstanceCal == null) {
565 return -1;
566 }
567 nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n);
568 /*
569 * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0)
570 * { return -1; }
571 */
572 nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ
573 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
574 return nominalInstanceCal.getActualMaximum(Calendar.DAY_OF_MONTH);
575 }
576
577 public static int ph3_coord_daysInMonth(int n) throws Exception {
578 return ph2_coord_daysInMonth(n);
579 }
580
581 /**
582 * Determine the date-time in Oozie processing timezone of n-th latest available dataset instance. <p/> It depends
583 * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST
584 * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of
585 * dataset's directory
586 *
587 * @param n :instance count <p/> domain: n > 0, n is integer
588 * @return date-time in Oozie processing timezone of the n-th instance <p/> returns 'null' means n-th instance is
589 * earlier than Initial-Instance of DS
590 * @throws Exception
591 */
592 public static String ph3_coord_latest(int n) throws Exception {
593 if (n > 0) {
594 throw new IllegalArgumentException("paramter should be <= 0 but it is " + n);
595 }
596 if (isSyncDataSet()) {// For Sync Dataset
597 return coord_latest_sync(n);
598 }
599 else {
600 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
601 }
602 }
603
604 /**
605 * Determine the date-time in Oozie processing timezone of latest available dataset instances
606 * from start to end offsets from the nominal time. <p/> It depends
607 * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST
608 * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of
609 * dataset's directory
610 *
611 * @param start :start instance offset <p/> domain: start > 0, start is integer
612 * @param end :end instance offset <p/> domain: end > 0, end is integer
613 * @return date-time in Oozie processing timezone of the instances from start to end offsets
614 * delimited by comma. <p/> returns 'null' means start offset instance is
615 * earlier than Initial-Instance of DS
616 * @throws Exception
617 */
618 public static String ph3_coord_latestRange(int start, int end) throws Exception {
619 if (isSyncDataSet()) {// For Sync Dataset
620 return coord_latestRange_sync(start, end);
621 }
622 else {
623 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
624 }
625 }
626
627 /**
628 * Configure an evaluator with data set and application specific information. <p/> Helper method of associating
629 * dataset and application object
630 *
631 * @param evaluator : to set variables
632 * @param ds : Data Set object
633 * @param coordAction : Application instance
634 */
635 public static void configureEvaluator(ELEvaluator evaluator, SyncCoordDataset ds, SyncCoordAction coordAction) {
636 evaluator.setVariable(COORD_ACTION, coordAction);
637 evaluator.setVariable(DATASET, ds);
638 }
639
640 /**
641 * Helper method to wrap around with "${..}". <p/>
642 *
643 *
644 * @param eval :EL evaluator
645 * @param expr : expression to evaluate
646 * @return Resolved expression or echo back the same expression
647 * @throws Exception
648 */
649 public static String evalAndWrap(ELEvaluator eval, String expr) throws Exception {
650 try {
651 eval.setVariable(".wrap", null);
652 String result = eval.evaluate(expr, String.class);
653 if (eval.getVariable(".wrap") != null) {
654 return "${" + result + "}";
655 }
656 else {
657 return result;
658 }
659 }
660 catch (Exception e) {
661 throw new Exception("Unable to evaluate :" + expr + ":\n", e);
662 }
663 }
664
665 // Set of echo functions
666
667 public static String ph1_coord_current_echo(String n) {
668 return echoUnResolved("current", n);
669 }
670
671 public static String ph1_coord_offset_echo(String n, String timeUnit) {
672 return echoUnResolved("offset", n + " , " + timeUnit);
673 }
674
675 public static String ph2_coord_current_echo(String n) {
676 return echoUnResolved("current", n);
677 }
678
679 public static String ph2_coord_offset_echo(String n, String timeUnit) {
680 return echoUnResolved("offset", n + " , " + timeUnit);
681 }
682
683 public static String ph1_coord_dateOffset_echo(String n, String offset, String unit) {
684 return echoUnResolved("dateOffset", n + " , " + offset + " , " + unit);
685 }
686
687 public static String ph1_coord_formatTime_echo(String dateTime, String format) {
688 // Quote the dateTime value since it would contain a ':'.
689 return echoUnResolved("formatTime", "'"+dateTime+"'" + " , " + format);
690 }
691
692 public static String ph1_coord_latest_echo(String n) {
693 return echoUnResolved("latest", n);
694 }
695
696 public static String ph2_coord_latest_echo(String n) {
697 return ph1_coord_latest_echo(n);
698 }
699
700 public static String ph1_coord_future_echo(String n, String instance) {
701 return echoUnResolved("future", n + ", " + instance + "");
702 }
703
704 public static String ph2_coord_future_echo(String n, String instance) {
705 return ph1_coord_future_echo(n, instance);
706 }
707
708 public static String ph1_coord_latestRange_echo(String start, String end) {
709 return echoUnResolved("latestRange", start + ", " + end);
710 }
711
712 public static String ph2_coord_latestRange_echo(String start, String end) {
713 return ph1_coord_latestRange_echo(start, end);
714 }
715
716 public static String ph1_coord_futureRange_echo(String start, String end, String instance) {
717 return echoUnResolved("futureRange", start + ", " + end + ", " + instance);
718 }
719
720 public static String ph2_coord_futureRange_echo(String start, String end, String instance) {
721 return ph1_coord_futureRange_echo(start, end, instance);
722 }
723
724 public static String ph1_coord_dataIn_echo(String n) {
725 ELEvaluator eval = ELEvaluator.getCurrent();
726 String val = (String) eval.getVariable("oozie.dataname." + n);
727 if (val == null || val.equals("data-in") == false) {
728 LOG.error("data_in_name " + n + " is not valid");
729 throw new RuntimeException("data_in_name " + n + " is not valid");
730 }
731 return echoUnResolved("dataIn", "'" + n + "'");
732 }
733
734 public static String ph1_coord_dataOut_echo(String n) {
735 ELEvaluator eval = ELEvaluator.getCurrent();
736 String val = (String) eval.getVariable("oozie.dataname." + n);
737 if (val == null || val.equals("data-out") == false) {
738 LOG.error("data_out_name " + n + " is not valid");
739 throw new RuntimeException("data_out_name " + n + " is not valid");
740 }
741 return echoUnResolved("dataOut", "'" + n + "'");
742 }
743
744 public static String ph1_coord_nominalTime_echo() {
745 return echoUnResolved("nominalTime", "");
746 }
747
748 public static String ph1_coord_nominalTime_echo_wrap() {
749 // return "${coord:nominalTime()}"; // no resolution
750 return echoUnResolved("nominalTime", "");
751 }
752
753 public static String ph1_coord_nominalTime_echo_fixed() {
754 return "2009-03-06T010:00"; // Dummy resolution
755 }
756
757 public static String ph1_coord_actualTime_echo_wrap() {
758 // return "${coord:actualTime()}"; // no resolution
759 return echoUnResolved("actualTime", "");
760 }
761
762 public static String ph1_coord_actionId_echo() {
763 return echoUnResolved("actionId", "");
764 }
765
766 public static String ph1_coord_name_echo() {
767 return echoUnResolved("name", "");
768 }
769
770 // The following echo functions are not used in any phases yet
771 // They are here for future purpose.
772 public static String coord_minutes_echo(String n) {
773 return echoUnResolved("minutes", n);
774 }
775
776 public static String coord_hours_echo(String n) {
777 return echoUnResolved("hours", n);
778 }
779
780 public static String coord_days_echo(String n) {
781 return echoUnResolved("days", n);
782 }
783
784 public static String coord_endOfDay_echo(String n) {
785 return echoUnResolved("endOfDay", n);
786 }
787
788 public static String coord_months_echo(String n) {
789 return echoUnResolved("months", n);
790 }
791
792 public static String coord_endOfMonth_echo(String n) {
793 return echoUnResolved("endOfMonth", n);
794 }
795
796 public static String coord_actualTime_echo() {
797 return echoUnResolved("actualTime", "");
798 }
799
800 // This echo function will always return "24" for validation only.
801 // This evaluation ****should not**** replace the original XML
802 // Create a temporary string and validate the function
803 // This is **required** for evaluating an expression like
804 // coord:HoursInDay(0) + 3
805 // actual evaluation will happen in phase 2 or phase 3.
806 public static String ph1_coord_hoursInDay_echo(String n) {
807 return "24";
808 // return echoUnResolved("hoursInDay", n);
809 }
810
811 // This echo function will always return "30" for validation only.
812 // This evaluation ****should not**** replace the original XML
813 // Create a temporary string and validate the function
814 // This is **required** for evaluating an expression like
815 // coord:daysInMonth(0) + 3
816 // actual evaluation will happen in phase 2 or phase 3.
817 public static String ph1_coord_daysInMonth_echo(String n) {
818 // return echoUnResolved("daysInMonth", n);
819 return "30";
820 }
821
822 // This echo function will always return "3" for validation only.
823 // This evaluation ****should not**** replace the original XML
824 // Create a temporary string and validate the function
825 // This is **required** for evaluating an expression like coord:tzOffset + 2
826 // actual evaluation will happen in phase 2 or phase 3.
827 public static String ph1_coord_tzOffset_echo() {
828 // return echoUnResolved("tzOffset", "");
829 return "3";
830 }
831
832 // Local methods
833 /**
834 * @param n
835 * @return n-th instance Date-Time from current instance for data-set <p/> return empty string ("") if the
836 * Action_Creation_time or the n-th instance <p/> is earlier than the Initial_Instance of dataset.
837 * @throws Exception
838 */
839 private static String coord_current_sync(int n) throws Exception {
840 int datasetFrequency = getDSFrequency();// in minutes
841 TimeUnit dsTimeUnit = getDSTimeUnit();
842 int[] instCount = new int[1];// used as pass by ref
843 Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
844 if (nominalInstanceCal == null) {
845 LOG.warn("If the initial instance of the dataset is later than the nominal time, an empty string is"
846 + " returned. This means that no data is available at the current-instance specified by the user"
847 + " and the user could try modifying his initial-instance to an earlier time.");
848 return "";
849 }
850 nominalInstanceCal = getInitialInstanceCal();
851 int absInstanceCount = instCount[0] + n;
852 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency * absInstanceCount);
853
854 if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) {
855 LOG.warn("If the initial instance of the dataset is later than the current-instance specified, such as"
856 + " coord:current({0}) in this case, an empty string is returned. This means that no data is"
857 + " available at the current-instance specified by the user and the user could try modifying his"
858 + " initial-instance to an earlier time.", n);
859 return "";
860 }
861 String str = DateUtils.formatDateOozieTZ(nominalInstanceCal);
862 return str;
863 }
864
865 /**
866 *
867 * @param n offset amount (integer)
868 * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR")
869 * @return the offset time from the effective nominal time <p/> return empty string ("") if the Action_Creation_time or the
870 * offset instance <p/> is earlier than the Initial_Instance of dataset.
871 * @throws Exception
872 */
873 private static String coord_offset_sync(int n, String timeUnit) throws Exception {
874 Calendar rawCal = resolveOffsetRawTime(n, TimeUnit.valueOf(timeUnit), null);
875 if (rawCal == null) {
876 // warning already logged by resolveOffsetRawTime()
877 return "";
878 }
879
880 int freq = getDSFrequency();
881 TimeUnit freqUnit = getDSTimeUnit();
882 int freqCount = 0;
883 // We're going to manually turn back/forward cal by decrements/increments of freq and then check that it gives the same
884 // time as rawCal; this is to check that the offset time resolves to a frequency offset of the effective nominal time
885 // In other words, that there exists an integer x, such that coord:offset(n, timeUnit) == coord:current(x) is true
886 // If not, then we'll "rewind" rawCal to the latest instance earlier than rawCal and use that.
887 Calendar cal = getInitialInstanceCal();
888 if (rawCal.before(cal)) {
889 while (cal.after(rawCal)) {
890 cal.add(freqUnit.getCalendarUnit(), -freq);
891 freqCount--;
892 }
893 }
894 else if (rawCal.after(cal)) {
895 while (cal.before(rawCal)) {
896 cal.add(freqUnit.getCalendarUnit(), freq);
897 freqCount++;
898 }
899 }
900 if (cal.before(rawCal)) {
901 rawCal = cal;
902 }
903 else if (cal.after(rawCal)) {
904 cal.add(freqUnit.getCalendarUnit(), -freq);
905 rawCal = cal;
906 freqCount--;
907 }
908 String rawCalStr = DateUtils.formatDateOozieTZ(rawCal);
909
910 Calendar nominalInstanceCal = getInitialInstanceCal();
911 nominalInstanceCal.add(freqUnit.getCalendarUnit(), freq * freqCount);
912 if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) {
913 LOG.warn("If the initial instance of the dataset is later than the offset instance"
914 + " specified, such as coord:offset({0}, {1}) in this case, an empty string is returned. This means that no"
915 + " data is available at the offset instance specified by the user and the user could try modifying his"
916 + " initial-instance to an earlier time.", n, timeUnit);
917 return "";
918 }
919 String nominalCalStr = DateUtils.formatDateOozieTZ(nominalInstanceCal);
920
921 if (!rawCalStr.equals(nominalCalStr)) {
922 throw new RuntimeException("Shouldn't happen");
923 }
924 return rawCalStr;
925 }
926
927 /**
928 * @param offset
929 * @return n-th available latest instance Date-Time for SYNC data-set
930 * @throws Exception
931 */
932 private static String coord_latest_sync(int offset) throws Exception {
933 return coord_latestRange_sync(offset, offset);
934 }
935
936 private static String coord_latestRange_sync(int startOffset, int endOffset) throws Exception {
937 if (startOffset > 0) {
938 throw new RuntimeException("For latest there is no meaning " + "of positive instance. n should be <=0"
939 + startOffset);
940 }
941 if (endOffset > 0) {
942 throw new RuntimeException("For latest there is no meaning " + "of positive instance. n should be <=0"
943 + endOffset);
944 }
945 ELEvaluator eval = ELEvaluator.getCurrent();
946 String retVal = "";
947 int datasetFrequency = (int) getDSFrequency();// in minutes
948 TimeUnit dsTimeUnit = getDSTimeUnit();
949 int[] instCount = new int[1];
950 boolean useCurrentTime = Services.get().getConf().getBoolean(LATEST_EL_USE_CURRENT_TIME, false);
951 Calendar nominalInstanceCal;
952 if (useCurrentTime) {
953 nominalInstanceCal = getCurrentInstance(new Date(), instCount);
954 }
955 else {
956 nominalInstanceCal = getCurrentInstance(getActualTime(), instCount);
957 }
958 StringBuilder resolvedInstances = new StringBuilder();
959 StringBuilder resolvedURIPaths = new StringBuilder();
960 if (nominalInstanceCal != null) {
961 Calendar initInstance = getInitialInstanceCal();
962 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
963 if (ds == null) {
964 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
965 }
966 String uriTemplate = ds.getUriTemplate();
967 Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
968 if (conf == null) {
969 throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
970 }
971 int available = 0;
972 boolean resolved = false;
973 String user = ParamChecker
974 .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
975 String doneFlag = ds.getDoneFlag();
976 while (nominalInstanceCal.compareTo(initInstance) >= 0) {
977 ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
978 String uriPath = uriEval.evaluate(uriTemplate, String.class);
979 String pathWithDoneFlag = uriPath;
980 if (doneFlag.length() > 0) {
981 pathWithDoneFlag += "/" + doneFlag;
982 }
983 if (isPathAvailable(pathWithDoneFlag, user, null, conf)) {
984 LOG.debug("Found latest(" + available + "): " + pathWithDoneFlag);
985 if (available == startOffset) {
986 LOG.debug("Matched latest(" + available + "): " + pathWithDoneFlag);
987 resolved = true;
988 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
989 resolvedURIPaths.append(uriPath);
990 retVal = resolvedInstances.toString();
991 eval.setVariable("resolved_path", resolvedURIPaths.toString());
992 break;
993 } else if (available <= endOffset) {
994 LOG.debug("Matched latest(" + available + "): " + pathWithDoneFlag);
995 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(INSTANCE_SEPARATOR);
996 resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
997 }
998
999 available--;
1000 }
1001 // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(),
1002 // -datasetFrequency);
1003 nominalInstanceCal = (Calendar) initInstance.clone();
1004 instCount[0]--;
1005 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
1006 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
1007 }
1008 if (!resolved) {
1009 // return unchanged latest function with variable 'is_resolved'
1010 // to 'false'
1011 eval.setVariable("is_resolved", Boolean.FALSE);
1012 if (startOffset == endOffset) {
1013 retVal = "${coord:latest(" + startOffset + ")}";
1014 }
1015 else {
1016 retVal = "${coord:latestRange(" + startOffset + "," + endOffset + ")}";
1017 }
1018 }
1019 else {
1020 eval.setVariable("is_resolved", Boolean.TRUE);
1021 }
1022 }
1023 else {// No feasible nominal time
1024 eval.setVariable("is_resolved", Boolean.FALSE);
1025 }
1026 return retVal;
1027 }
1028
1029 // TODO : Not an efficient way. In a loop environment, we could do something
1030 // outside the loop
1031 /**
1032 * Check whether a URI path exists
1033 *
1034 * @param sPath
1035 * @param conf
1036 * @return
1037 * @throws IOException
1038 */
1039
1040 private static boolean isPathAvailable(String sPath, String user, String group, Configuration conf)
1041 throws IOException, HadoopAccessorException {
1042 // sPath += "/" + END_OF_OPERATION_INDICATOR_FILE;
1043 Path path = new Path(sPath);
1044 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
1045 Configuration fsConf = has.createJobConf(path.toUri().getAuthority());
1046 return has.createFileSystem(user, path.toUri(), fsConf).exists(path);
1047 }
1048
1049 /**
1050 * @param tm
1051 * @return a new Evaluator to be used for URI-template evaluation
1052 */
1053 private static ELEvaluator getUriEvaluator(Calendar tm) {
1054 tm.setTimeZone(DateUtils.getOozieProcessingTimeZone());
1055 ELEvaluator retEval = new ELEvaluator();
1056 retEval.setVariable("YEAR", tm.get(Calendar.YEAR));
1057 retEval.setVariable("MONTH", (tm.get(Calendar.MONTH) + 1) < 10 ? "0" + (tm.get(Calendar.MONTH) + 1) : (tm
1058 .get(Calendar.MONTH) + 1));
1059 retEval.setVariable("DAY", tm.get(Calendar.DAY_OF_MONTH) < 10 ? "0" + tm.get(Calendar.DAY_OF_MONTH) : tm
1060 .get(Calendar.DAY_OF_MONTH));
1061 retEval.setVariable("HOUR", tm.get(Calendar.HOUR_OF_DAY) < 10 ? "0" + tm.get(Calendar.HOUR_OF_DAY) : tm
1062 .get(Calendar.HOUR_OF_DAY));
1063 retEval.setVariable("MINUTE", tm.get(Calendar.MINUTE) < 10 ? "0" + tm.get(Calendar.MINUTE) : tm
1064 .get(Calendar.MINUTE));
1065 return retEval;
1066 }
1067
1068 /**
1069 * @return whether a data set is SYNCH or ASYNC
1070 */
1071 private static boolean isSyncDataSet() {
1072 ELEvaluator eval = ELEvaluator.getCurrent();
1073 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1074 if (ds == null) {
1075 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1076 }
1077 return ds.getType().equalsIgnoreCase("SYNC");
1078 }
1079
1080 /**
1081 * Check whether a function should be resolved.
1082 *
1083 * @param functionName
1084 * @param n
1085 * @return null if the functionName needs to be resolved otherwise return the calling function unresolved.
1086 */
1087 private static String checkIfResolved(String functionName, String n) {
1088 ELEvaluator eval = ELEvaluator.getCurrent();
1089 String replace = (String) eval.getVariable("resolve_" + functionName);
1090 if (replace == null || (replace != null && replace.equalsIgnoreCase("false"))) { // Don't
1091 // resolve
1092 // return "${coord:" + functionName + "(" + n +")}"; //Unresolved
1093 eval.setVariable(".wrap", "true");
1094 return "coord:" + functionName + "(" + n + ")"; // Unresolved
1095 }
1096 return null; // Resolved it
1097 }
1098
1099 private static String echoUnResolved(String functionName, String n) {
1100 return echoUnResolvedPre(functionName, n, "coord:");
1101 }
1102
1103 private static String echoUnResolvedPre(String functionName, String n, String prefix) {
1104 ELEvaluator eval = ELEvaluator.getCurrent();
1105 eval.setVariable(".wrap", "true");
1106 return prefix + functionName + "(" + n + ")"; // Unresolved
1107 }
1108
1109 /**
1110 * @return the initial instance of a DataSet in DATE
1111 */
1112 private static Date getInitialInstance() {
1113 ELEvaluator eval = ELEvaluator.getCurrent();
1114 return getInitialInstance(eval);
1115 }
1116
1117 /**
1118 * @return the initial instance of a DataSet in DATE
1119 */
1120 private static Date getInitialInstance(ELEvaluator eval) {
1121 return getInitialInstanceCal(eval).getTime();
1122 // return ds.getInitInstance();
1123 }
1124
1125 /**
1126 * @return the initial instance of a DataSet in Calendar
1127 */
1128 private static Calendar getInitialInstanceCal() {
1129 ELEvaluator eval = ELEvaluator.getCurrent();
1130 return getInitialInstanceCal(eval);
1131 }
1132
1133 /**
1134 * @return the initial instance of a DataSet in Calendar
1135 */
1136 private static Calendar getInitialInstanceCal(ELEvaluator eval) {
1137 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1138 if (ds == null) {
1139 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1140 }
1141 Calendar effInitTS = Calendar.getInstance();
1142 effInitTS.setTime(ds.getInitInstance());
1143 effInitTS.setTimeZone(ds.getTimeZone());
1144 // To adjust EOD/EOM
1145 DateUtils.moveToEnd(effInitTS, getDSEndOfFlag(eval));
1146 return effInitTS;
1147 // return ds.getInitInstance();
1148 }
1149
1150 /**
1151 * @return Nominal or action creation Time when all the dependencies of an application instance are met.
1152 */
1153 private static Date getActionCreationtime() {
1154 ELEvaluator eval = ELEvaluator.getCurrent();
1155 return getActionCreationtime(eval);
1156 }
1157
1158 /**
1159 * @return Nominal or action creation Time when all the dependencies of an application instance are met.
1160 */
1161 private static Date getActionCreationtime(ELEvaluator eval) {
1162 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
1163 if (coordAction == null) {
1164 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
1165 }
1166 return coordAction.getNominalTime();
1167 }
1168
1169 /**
1170 * @return Actual Time when all the dependencies of an application instance are met.
1171 */
1172 private static Date getActualTime() {
1173 ELEvaluator eval = ELEvaluator.getCurrent();
1174 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
1175 if (coordAction == null) {
1176 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
1177 }
1178 return coordAction.getActualTime();
1179 }
1180
1181 /**
1182 * @return TimeZone for the application or job.
1183 */
1184 private static TimeZone getJobTZ() {
1185 ELEvaluator eval = ELEvaluator.getCurrent();
1186 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
1187 if (coordAction == null) {
1188 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
1189 }
1190 return coordAction.getTimeZone();
1191 }
1192
1193 /**
1194 * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time)
1195 *
1196 * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
1197 * the dataset.
1198 */
1199 private static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[]) {
1200 ELEvaluator eval = ELEvaluator.getCurrent();
1201 return getCurrentInstance(effectiveTime, instanceCount, eval);
1202 }
1203
1204 /**
1205 * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time)
1206 *
1207 * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
1208 * the dataset.
1209 */
1210 private static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[], ELEvaluator eval) {
1211 Date datasetInitialInstance = getInitialInstance(eval);
1212 TimeUnit dsTimeUnit = getDSTimeUnit(eval);
1213 TimeZone dsTZ = getDatasetTZ(eval);
1214 int dsFreq = getDSFrequency(eval);
1215 // Convert Date to Calendar for corresponding TZ
1216 Calendar current = Calendar.getInstance();
1217 current.setTime(datasetInitialInstance);
1218 current.setTimeZone(dsTZ);
1219
1220 Calendar calEffectiveTime = Calendar.getInstance();
1221 calEffectiveTime.setTime(effectiveTime);
1222 calEffectiveTime.setTimeZone(dsTZ);
1223 if (instanceCount == null) { // caller doesn't care about this value
1224 instanceCount = new int[1];
1225 }
1226 instanceCount[0] = 0;
1227 if (current.compareTo(calEffectiveTime) > 0) {
1228 return null;
1229 }
1230 Calendar origCurrent = (Calendar) current.clone();
1231 while (current.compareTo(calEffectiveTime) <= 0) {
1232 current = (Calendar) origCurrent.clone();
1233 instanceCount[0]++;
1234 current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq);
1235 }
1236 instanceCount[0]--;
1237
1238 current = (Calendar) origCurrent.clone();
1239 current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq);
1240 return current;
1241 }
1242
1243 private static Calendar getEffectiveNominalTime() {
1244 Date datasetInitialInstance = getInitialInstance();
1245 TimeZone dsTZ = getDatasetTZ();
1246 // Convert Date to Calendar for corresponding TZ
1247 Calendar current = Calendar.getInstance();
1248 current.setTime(datasetInitialInstance);
1249 current.setTimeZone(dsTZ);
1250
1251 Calendar calEffectiveTime = Calendar.getInstance();
1252 calEffectiveTime.setTime(getActionCreationtime());
1253 calEffectiveTime.setTimeZone(dsTZ);
1254 if (current.compareTo(calEffectiveTime) > 0) {
1255 // Nominal Time < initial Instance
1256 // TODO: getClass() call doesn't work from static method.
1257 // XLog.getLog("CoordELFunction.class").warn("ACTION CREATED BEFORE INITIAL INSTACE "+
1258 // current.getTime());
1259 return null;
1260 }
1261 return calEffectiveTime;
1262 }
1263
1264 /**
1265 * @return dataset frequency in minutes
1266 */
1267 private static int getDSFrequency() {
1268 ELEvaluator eval = ELEvaluator.getCurrent();
1269 return getDSFrequency(eval);
1270 }
1271
1272 /**
1273 * @return dataset frequency in minutes
1274 */
1275 private static int getDSFrequency(ELEvaluator eval) {
1276 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1277 if (ds == null) {
1278 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1279 }
1280 return ds.getFrequency();
1281 }
1282
1283 /**
1284 * @return dataset TimeUnit
1285 */
1286 private static TimeUnit getDSTimeUnit() {
1287 ELEvaluator eval = ELEvaluator.getCurrent();
1288 return getDSTimeUnit(eval);
1289 }
1290
1291 /**
1292 * @return dataset TimeUnit
1293 */
1294 private static TimeUnit getDSTimeUnit(ELEvaluator eval) {
1295 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1296 if (ds == null) {
1297 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1298 }
1299 return ds.getTimeUnit();
1300 }
1301
1302 /**
1303 * @return dataset TimeZone
1304 */
1305 private static TimeZone getDatasetTZ() {
1306 ELEvaluator eval = ELEvaluator.getCurrent();
1307 return getDatasetTZ(eval);
1308 }
1309
1310 /**
1311 * @return dataset TimeZone
1312 */
1313 private static TimeZone getDatasetTZ(ELEvaluator eval) {
1314 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1315 if (ds == null) {
1316 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1317 }
1318 return ds.getTimeZone();
1319 }
1320
1321 /**
1322 * @return dataset TimeUnit
1323 */
1324 private static TimeUnit getDSEndOfFlag() {
1325 ELEvaluator eval = ELEvaluator.getCurrent();
1326 return getDSEndOfFlag(eval);
1327 }
1328
1329 /**
1330 * @return dataset TimeUnit
1331 */
1332 private static TimeUnit getDSEndOfFlag(ELEvaluator eval) {
1333 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1334 if (ds == null) {
1335 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1336 }
1337 return ds.getEndOfDuration();// == null ? "": ds.getEndOfDuration();
1338 }
1339
1340 /**
1341 * Return a job configuration property for the coordinator.
1342 *
1343 * @param property property name.
1344 * @return the value of the property, <code>null</code> if the property is undefined.
1345 */
1346 public static String coord_conf(String property) {
1347 ELEvaluator eval = ELEvaluator.getCurrent();
1348 return (String) eval.getVariable(property);
1349 }
1350
1351 /**
1352 * Return the user that submitted the coordinator job.
1353 *
1354 * @return the user that submitted the coordinator job.
1355 */
1356 public static String coord_user() {
1357 ELEvaluator eval = ELEvaluator.getCurrent();
1358 return (String) eval.getVariable(OozieClient.USER_NAME);
1359 }
1360
1361 /**
1362 * Takes two offset times and returns a list of multiples of the frequency offset from the effective nominal time that occur
1363 * between them. The caller should make sure that startCal is earlier than endCal.
1364 * <p>
1365 * As a simple example, assume its the same day: startCal is 1:00, endCal is 2:00, frequency is 20min, and effective nominal
1366 * time is 1:20 -- then this method would return a list containing: -20, 0, 20, 40, 60
1367 *
1368 * @param startCal The earlier offset time
1369 * @param endCal The later offset time
1370 * @param eval The ELEvaluator to use; cannot be null
1371 * @return A list of multiple of the frequency offset from the effective nominal time that occur between the startCal and endCal
1372 */
1373 public static List<Integer> expandOffsetTimes(Calendar startCal, Calendar endCal, ELEvaluator eval) {
1374 List<Integer> expandedFreqs = new ArrayList<Integer>();
1375 // Use eval because the "current" eval isn't set
1376 int freq = getDSFrequency(eval);
1377 TimeUnit freqUnit = getDSTimeUnit(eval);
1378 Calendar cal = getCurrentInstance(getActionCreationtime(eval), null, eval);
1379 int totalFreq = 0;
1380 if (startCal.before(cal)) {
1381 while (cal.after(startCal)) {
1382 cal.add(freqUnit.getCalendarUnit(), -freq);
1383 totalFreq += -freq;
1384 }
1385 if (cal.before(startCal)) {
1386 cal.add(freqUnit.getCalendarUnit(), freq);
1387 totalFreq += freq;
1388 }
1389 }
1390 else if (startCal.after(cal)) {
1391 while (cal.before(startCal)) {
1392 cal.add(freqUnit.getCalendarUnit(), freq);
1393 totalFreq += freq;
1394 }
1395 }
1396 // At this point, cal is the smallest multiple of the dataset frequency that is >= to the startCal and offset from the
1397 // effective nominal time. Now we can find all of the instances that occur between startCal and endCal, inclusive.
1398 while (cal.before(endCal) || cal.equals(endCal)) {
1399 expandedFreqs.add(totalFreq);
1400 cal.add(freqUnit.getCalendarUnit(), freq);
1401 totalFreq += freq;
1402 }
1403 return expandedFreqs;
1404 }
1405
1406 /**
1407 * Resolve the offset time from the effective nominal time
1408 *
1409 * @param n offset amount (integer)
1410 * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR")
1411 * @param eval The ELEvaluator to use; or null to use the "current" eval
1412 * @return A Calendar of the offset time
1413 */
1414 public static Calendar resolveOffsetRawTime(int n, TimeUnit timeUnit, ELEvaluator eval) {
1415 // Use eval if given (for when the "current" eval isn't set)
1416 Calendar cal;
1417 if (eval == null) {
1418 cal = getCurrentInstance(getActionCreationtime(), null);
1419 }
1420 else {
1421 cal = getCurrentInstance(getActionCreationtime(eval), null, eval);
1422 }
1423 if (cal == null) {
1424 LOG.warn("If the initial instance of the dataset is later than the nominal time, an"
1425 + " empty string is returned. This means that no data is available at the offset instance specified by the user"
1426 + " and the user could try modifying his or her initial-instance to an earlier time.");
1427 return null;
1428 }
1429 cal.add(timeUnit.getCalendarUnit(), n);
1430 return cal;
1431 }
1432 }