This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.coord;
019
020 import java.net.URI;
021 import java.util.ArrayList;
022 import java.util.Calendar;
023 import java.util.Date;
024 import java.util.List;
025 import java.util.TimeZone;
026
027 import org.apache.hadoop.conf.Configuration;
028 import org.apache.oozie.client.OozieClient;
029 import org.apache.oozie.dependency.URIHandler.Context;
030 import org.apache.oozie.dependency.URIHandler;
031 import org.apache.oozie.util.DateUtils;
032 import org.apache.oozie.util.ELEvaluator;
033 import org.apache.oozie.util.ParamChecker;
034 import org.apache.oozie.util.XLog;
035 import org.apache.oozie.service.Services;
036 import org.apache.oozie.service.URIHandlerService;
037
038 /**
039 * This class implements the EL function related to coordinator
040 */
041
042 public class CoordELFunctions {
043 final public static String DATASET = "oozie.coord.el.dataset.bean";
044 final public static String COORD_ACTION = "oozie.coord.el.app.bean";
045 final public static String CONFIGURATION = "oozie.coord.el.conf";
046 final public static String LATEST_EL_USE_CURRENT_TIME = "oozie.service.ELService.latest-el.use-current-time";
047 // INSTANCE_SEPARATOR is used to separate multiple directories into one tag.
048 final public static String INSTANCE_SEPARATOR = "#";
049 final public static String DIR_SEPARATOR = ",";
050 // TODO: in next release, support flexibility
051 private static String END_OF_OPERATION_INDICATOR_FILE = "_SUCCESS";
052
053 /**
054 * Used in defining the frequency in 'day' unit. <p/> domain: <code> val > 0</code> and should be integer.
055 *
056 * @param val frequency in number of days.
057 * @return number of days and also set the frequency timeunit to "day"
058 */
059 public static int ph1_coord_days(int val) {
060 val = ParamChecker.checkGTZero(val, "n");
061 ELEvaluator eval = ELEvaluator.getCurrent();
062 eval.setVariable("timeunit", TimeUnit.DAY);
063 eval.setVariable("endOfDuration", TimeUnit.NONE);
064 return val;
065 }
066
067 /**
068 * Used in defining the frequency in 'month' unit. <p/> domain: <code> val > 0</code> and should be integer.
069 *
070 * @param val frequency in number of months.
071 * @return number of months and also set the frequency timeunit to "month"
072 */
073 public static int ph1_coord_months(int val) {
074 val = ParamChecker.checkGTZero(val, "n");
075 ELEvaluator eval = ELEvaluator.getCurrent();
076 eval.setVariable("timeunit", TimeUnit.MONTH);
077 eval.setVariable("endOfDuration", TimeUnit.NONE);
078 return val;
079 }
080
081 /**
082 * Used in defining the frequency in 'hour' unit. <p/> parameter value domain: <code> val > 0</code> and should
083 * be integer.
084 *
085 * @param val frequency in number of hours.
086 * @return number of minutes and also set the frequency timeunit to "minute"
087 */
088 public static int ph1_coord_hours(int val) {
089 val = ParamChecker.checkGTZero(val, "n");
090 ELEvaluator eval = ELEvaluator.getCurrent();
091 eval.setVariable("timeunit", TimeUnit.MINUTE);
092 eval.setVariable("endOfDuration", TimeUnit.NONE);
093 return val * 60;
094 }
095
096 /**
097 * Used in defining the frequency in 'minute' unit. <p/> domain: <code> val > 0</code> and should be integer.
098 *
099 * @param val frequency in number of minutes.
100 * @return number of minutes and also set the frequency timeunit to "minute"
101 */
102 public static int ph1_coord_minutes(int val) {
103 val = ParamChecker.checkGTZero(val, "n");
104 ELEvaluator eval = ELEvaluator.getCurrent();
105 eval.setVariable("timeunit", TimeUnit.MINUTE);
106 eval.setVariable("endOfDuration", TimeUnit.NONE);
107 return val;
108 }
109
110 /**
111 * Used in defining the frequency in 'day' unit and specify the "end of day" property. <p/> Every instance will
112 * start at 00:00 hour of each day. <p/> domain: <code> val > 0</code> and should be integer.
113 *
114 * @param val frequency in number of days.
115 * @return number of days and also set the frequency timeunit to "day" and end_of_duration flag to "day"
116 */
117 public static int ph1_coord_endOfDays(int val) {
118 val = ParamChecker.checkGTZero(val, "n");
119 ELEvaluator eval = ELEvaluator.getCurrent();
120 eval.setVariable("timeunit", TimeUnit.DAY);
121 eval.setVariable("endOfDuration", TimeUnit.END_OF_DAY);
122 return val;
123 }
124
125 /**
126 * Used in defining the frequency in 'month' unit and specify the "end of month" property. <p/> Every instance will
127 * start at first day of each month at 00:00 hour. <p/> domain: <code> val > 0</code> and should be integer.
128 *
129 * @param val: frequency in number of months.
130 * @return number of months and also set the frequency timeunit to "month" and end_of_duration flag to "month"
131 */
132 public static int ph1_coord_endOfMonths(int val) {
133 val = ParamChecker.checkGTZero(val, "n");
134 ELEvaluator eval = ELEvaluator.getCurrent();
135 eval.setVariable("timeunit", TimeUnit.MONTH);
136 eval.setVariable("endOfDuration", TimeUnit.END_OF_MONTH);
137 return val;
138 }
139
140 /**
141 * Calculate the difference of timezone offset in minutes between dataset and coordinator job. <p/> Depends on: <p/>
142 * 1. Timezone of both dataset and job <p/> 2. Action creation Time
143 *
144 * @return difference in minutes (DataSet TZ Offset - Application TZ offset)
145 */
146 public static int ph2_coord_tzOffset() {
147 Date actionCreationTime = getActionCreationtime();
148 TimeZone dsTZ = ParamChecker.notNull(getDatasetTZ(), "DatasetTZ");
149 TimeZone jobTZ = ParamChecker.notNull(getJobTZ(), "JobTZ");
150 // Apply the TZ into Calendar object
151 Calendar dsTime = Calendar.getInstance(dsTZ);
152 dsTime.setTime(actionCreationTime);
153 Calendar jobTime = Calendar.getInstance(jobTZ);
154 jobTime.setTime(actionCreationTime);
155 return (dsTime.get(Calendar.ZONE_OFFSET) - jobTime.get(Calendar.ZONE_OFFSET)) / (1000 * 60);
156 }
157
158 public static int ph3_coord_tzOffset() {
159 return ph2_coord_tzOffset();
160 }
161
162 /**
163 * Returns the a date string while given a base date in 'strBaseDate',
164 * offset and unit (e.g. DAY, MONTH, HOUR, MINUTE, MONTH).
165 *
166 * @param strBaseDate -- base date
167 * @param offset -- any number
168 * @param unit -- DAY, MONTH, HOUR, MINUTE, MONTH
169 * @return date string
170 * @throws Exception
171 */
172 public static String ph2_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception {
173 Calendar baseCalDate = DateUtils.getCalendar(strBaseDate);
174 StringBuilder buffer = new StringBuilder();
175 baseCalDate.add(TimeUnit.valueOf(unit).getCalendarUnit(), offset);
176 buffer.append(DateUtils.formatDateOozieTZ(baseCalDate));
177 return buffer.toString();
178 }
179
180 public static String ph3_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception {
181 return ph2_coord_dateOffset(strBaseDate, offset, unit);
182 }
183
184 /**
185 * Determine the date-time in Oozie processing timezone of n-th future available dataset instance
186 * from nominal Time but not beyond the instance specified as 'instance.
187 * <p/>
188 * It depends on:
189 * <p/>
190 * 1. Data set frequency
191 * <p/>
192 * 2. Data set Time unit (day, month, minute)
193 * <p/>
194 * 3. Data set Time zone/DST
195 * <p/>
196 * 4. End Day/Month flag
197 * <p/>
198 * 5. Data set initial instance
199 * <p/>
200 * 6. Action Creation Time
201 * <p/>
202 * 7. Existence of dataset's directory
203 *
204 * @param n :instance count
205 * <p/>
206 * domain: n >= 0, n is integer
207 * @param instance: How many future instance it should check? value should
208 * be >=0
209 * @return date-time in Oozie processing timezone of the n-th instance
210 * <p/>
211 * @throws Exception
212 */
213 public static String ph3_coord_future(int n, int instance) throws Exception {
214 ParamChecker.checkGEZero(n, "future:n");
215 ParamChecker.checkGTZero(instance, "future:instance");
216 if (isSyncDataSet()) {// For Sync Dataset
217 return coord_future_sync(n, instance);
218 }
219 else {
220 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
221 }
222 }
223
224 /**
225 * Determine the date-time in Oozie processing timezone of the future available dataset instances
226 * from start to end offsets from nominal Time but not beyond the instance specified as 'instance'.
227 * <p/>
228 * It depends on:
229 * <p/>
230 * 1. Data set frequency
231 * <p/>
232 * 2. Data set Time unit (day, month, minute)
233 * <p/>
234 * 3. Data set Time zone/DST
235 * <p/>
236 * 4. End Day/Month flag
237 * <p/>
238 * 5. Data set initial instance
239 * <p/>
240 * 6. Action Creation Time
241 * <p/>
242 * 7. Existence of dataset's directory
243 *
244 * @param start : start instance offset
245 * <p/>
246 * domain: start >= 0, start is integer
247 * @param end : end instance offset
248 * <p/>
249 * domain: end >= 0, end is integer
250 * @param instance: How many future instance it should check? value should
251 * be >=0
252 * @return date-time in Oozie processing timezone of the instances from start to end offsets
253 * delimited by comma.
254 * <p/>
255 * @throws Exception
256 */
257 public static String ph3_coord_futureRange(int start, int end, int instance) throws Exception {
258 ParamChecker.checkGEZero(start, "future:n");
259 ParamChecker.checkGEZero(end, "future:n");
260 ParamChecker.checkGTZero(instance, "future:instance");
261 if (isSyncDataSet()) {// For Sync Dataset
262 return coord_futureRange_sync(start, end, instance);
263 }
264 else {
265 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
266 }
267 }
268
269 private static String coord_future_sync(int n, int instance) throws Exception {
270 return coord_futureRange_sync(n, n, instance);
271 }
272
273 private static String coord_futureRange_sync(int startOffset, int endOffset, int instance) throws Exception {
274 final XLog LOG = XLog.getLog(CoordELFunctions.class);
275 final Thread currentThread = Thread.currentThread();
276 ELEvaluator eval = ELEvaluator.getCurrent();
277 String retVal = "";
278 int datasetFrequency = (int) getDSFrequency();// in minutes
279 TimeUnit dsTimeUnit = getDSTimeUnit();
280 int[] instCount = new int[1];
281 Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
282 StringBuilder resolvedInstances = new StringBuilder();
283 StringBuilder resolvedURIPaths = new StringBuilder();
284 if (nominalInstanceCal != null) {
285 Calendar initInstance = getInitialInstanceCal();
286 nominalInstanceCal = (Calendar) initInstance.clone();
287 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
288
289 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
290 if (ds == null) {
291 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
292 }
293 String uriTemplate = ds.getUriTemplate();
294 Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
295 if (conf == null) {
296 throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
297 }
298 int available = 0, checkedInstance = 0;
299 boolean resolved = false;
300 String user = ParamChecker
301 .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
302 String doneFlag = ds.getDoneFlag();
303 URIHandlerService uriService = Services.get().get(URIHandlerService.class);
304 URIHandler uriHandler = null;
305 Context uriContext = null;
306 try {
307 while (instance >= checkedInstance && !currentThread.isInterrupted()) {
308 ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
309 String uriPath = uriEval.evaluate(uriTemplate, String.class);
310 if (uriHandler == null) {
311 URI uri = new URI(uriPath);
312 uriHandler = uriService.getURIHandler(uri);
313 uriContext = uriHandler.getContext(uri, conf, user);
314 }
315 String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
316 if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) {
317 if (available == endOffset) {
318 LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag);
319 resolved = true;
320 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
321 resolvedURIPaths.append(uriPath);
322 retVal = resolvedInstances.toString();
323 eval.setVariable("resolved_path", resolvedURIPaths.toString());
324 break;
325 }
326 else if (available >= startOffset) {
327 LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag);
328 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(
329 INSTANCE_SEPARATOR);
330 resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
331 }
332 available++;
333 }
334 // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency);
335 nominalInstanceCal = (Calendar) initInstance.clone();
336 instCount[0]++;
337 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
338 checkedInstance++;
339 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
340 }
341 }
342 finally {
343 if (uriContext != null) {
344 uriContext.destroy();
345 }
346 }
347 if (!resolved) {
348 // return unchanged future function with variable 'is_resolved'
349 // to 'false'
350 eval.setVariable("is_resolved", Boolean.FALSE);
351 if (startOffset == endOffset) {
352 retVal = "${coord:future(" + startOffset + ", " + instance + ")}";
353 }
354 else {
355 retVal = "${coord:futureRange(" + startOffset + ", " + endOffset + ", " + instance + ")}";
356 }
357 }
358 else {
359 eval.setVariable("is_resolved", Boolean.TRUE);
360 }
361 }
362 else {// No feasible nominal time
363 eval.setVariable("is_resolved", Boolean.TRUE);
364 retVal = "";
365 }
366 return retVal;
367 }
368
369 /**
370 * Return nominal time or Action Creation Time.
371 * <p/>
372 *
373 * @return coordinator action creation or materialization date time
374 * @throws Exception if unable to format the Date object to String
375 */
376 public static String ph2_coord_nominalTime() throws Exception {
377 ELEvaluator eval = ELEvaluator.getCurrent();
378 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
379 "Coordinator Action");
380 return DateUtils.formatDateOozieTZ(action.getNominalTime());
381 }
382
383 public static String ph3_coord_nominalTime() throws Exception {
384 return ph2_coord_nominalTime();
385 }
386
387 /**
388 * Convert from standard date-time formatting to a desired format.
389 * <p/>
390 * @param dateTimeStr - A timestamp in standard (ISO8601) format.
391 * @param format - A string representing the desired format.
392 * @return coordinator action creation or materialization date time
393 * @throws Exception if unable to format the Date object to String
394 */
395 public static String ph2_coord_formatTime(String dateTimeStr, String format)
396 throws Exception {
397 Date dateTime = DateUtils.parseDateOozieTZ(dateTimeStr);
398 return DateUtils.formatDateCustom(dateTime, format);
399 }
400
401 public static String ph3_coord_formatTime(String dateTimeStr, String format)
402 throws Exception {
403 return ph2_coord_formatTime(dateTimeStr, format);
404 }
405
406 /**
407 * Return Action Id. <p/>
408 *
409 * @return coordinator action Id
410 */
411 public static String ph2_coord_actionId() throws Exception {
412 ELEvaluator eval = ELEvaluator.getCurrent();
413 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
414 "Coordinator Action");
415 return action.getActionId();
416 }
417
418 public static String ph3_coord_actionId() throws Exception {
419 return ph2_coord_actionId();
420 }
421
422 /**
423 * Return Job Name. <p/>
424 *
425 * @return coordinator name
426 */
427 public static String ph2_coord_name() throws Exception {
428 ELEvaluator eval = ELEvaluator.getCurrent();
429 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
430 "Coordinator Action");
431 return action.getName();
432 }
433
434 public static String ph3_coord_name() throws Exception {
435 return ph2_coord_name();
436 }
437
438 /**
439 * Return Action Start time. <p/>
440 *
441 * @return coordinator action start time
442 * @throws Exception if unable to format the Date object to String
443 */
444 public static String ph2_coord_actualTime() throws Exception {
445 ELEvaluator eval = ELEvaluator.getCurrent();
446 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
447 if (coordAction == null) {
448 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
449 }
450 return DateUtils.formatDateOozieTZ(coordAction.getActualTime());
451 }
452
453 public static String ph3_coord_actualTime() throws Exception {
454 return ph2_coord_actualTime();
455 }
456
457 /**
458 * Used to specify a list of URI's that are used as input dir to the workflow job. <p/> Look for two evaluator-level
459 * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
460 * URI. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
461 * unresolved, this function will echo back the original function <p/> otherwise it sends the uris.
462 *
463 * @param dataInName : Datain name
464 * @return the list of URI's separated by INSTANCE_SEPARATOR <p/> if there are unresolved EL function (i.e. latest)
465 * , echo back <p/> the function without resolving the function.
466 */
467 public static String ph3_coord_dataIn(String dataInName) {
468 String uris = "";
469 ELEvaluator eval = ELEvaluator.getCurrent();
470 uris = (String) eval.getVariable(".datain." + dataInName);
471 Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
472 if (unresolved != null && unresolved.booleanValue() == true) {
473 return "${coord:dataIn('" + dataInName + "')}";
474 }
475 return uris;
476 }
477
478 /**
479 * Used to specify a list of URI's that are output dir of the workflow job. <p/> Look for one evaluator-level
480 * variable <p/> dataout.<DATAOUT_NAME> <p/> It defines the current list of URI. <p/> otherwise it sends the uris.
481 *
482 * @param dataOutName : Dataout name
483 * @return the list of URI's separated by INSTANCE_SEPARATOR
484 */
485 public static String ph3_coord_dataOut(String dataOutName) {
486 String uris = "";
487 ELEvaluator eval = ELEvaluator.getCurrent();
488 uris = (String) eval.getVariable(".dataout." + dataOutName);
489 return uris;
490 }
491
492 /**
493 * Determine the date-time in Oozie processing timezone of n-th dataset instance. <p/> It depends on: <p/> 1.
494 * Data set frequency <p/> 2.
495 * Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. Data
496 * set initial instance <p/> 6. Action Creation Time
497 *
498 * @param n instance count domain: n is integer
499 * @return date-time in Oozie processing timezone of the n-th instance returns 'null' means n-th instance is
500 * earlier than Initial-Instance of DS
501 * @throws Exception
502 */
503 public static String ph2_coord_current(int n) throws Exception {
504 if (isSyncDataSet()) { // For Sync Dataset
505 return coord_current_sync(n);
506 }
507 else {
508 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
509 }
510 }
511
512 /**
513 * Determine the date-time in Oozie processing timezone of current dataset instances
514 * from start to end offsets from the nominal time. <p/> It depends
515 * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST
516 * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time
517 *
518 * @param start :start instance offset <p/> domain: start <= 0, start is integer
519 * @param end :end instance offset <p/> domain: end <= 0, end is integer
520 * @return date-time in Oozie processing timezone of the instances from start to end offsets
521 * delimited by comma. <p/> If the current instance time of the dataset based on the Action Creation Time
522 * is earlier than the Initial-Instance of DS an empty string is returned.
523 * If an instance within the range is earlier than Initial-Instance of DS that instance is ignored
524 * @throws Exception
525 */
526 public static String ph2_coord_currentRange(int start, int end) throws Exception {
527 if (isSyncDataSet()) { // For Sync Dataset
528 return coord_currentRange_sync(start, end);
529 }
530 else {
531 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
532 }
533 }
534
535 /**
536 * Determine the date-time in Oozie processing timezone of the given offset from the dataset effective nominal time. <p/> It
537 * depends on: <p> 1. Data set frequency <p/> 2. Data set Time Unit <p/> 3. Data set Time zone/DST
538 * <p/> 4. Data set initial instance <p/> 5. Action Creation Time
539 *
540 * @param n offset amount (integer)
541 * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR")
542 * @return date-time in Oozie processing timezone of the given offset from the dataset effective nominal time
543 * @throws Exception if there was a problem formatting
544 */
545 public static String ph2_coord_offset(int n, String timeUnit) throws Exception {
546 if (isSyncDataSet()) { // For Sync Dataset
547 return coord_offset_sync(n, timeUnit);
548 }
549 else {
550 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
551 }
552 }
553
554 /**
555 * Determine how many hours is on the date of n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency
556 * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5.
557 * Data set initial instance <p/> 6. Action Creation Time
558 *
559 * @param n instance count <p/> domain: n is integer
560 * @return number of hours on that day <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS
561 * @throws Exception
562 */
563 public static int ph2_coord_hoursInDay(int n) throws Exception {
564 int datasetFrequency = (int) getDSFrequency();
565 // /Calendar nominalInstanceCal =
566 // getCurrentInstance(getActionCreationtime());
567 Calendar nominalInstanceCal = getEffectiveNominalTime();
568 if (nominalInstanceCal == null) {
569 return -1;
570 }
571 nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n);
572 /*
573 * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0)
574 * { return -1; }
575 */
576 nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ
577 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
578 return DateUtils.hoursInDay(nominalInstanceCal);
579 }
580
581 public static int ph3_coord_hoursInDay(int n) throws Exception {
582 return ph2_coord_hoursInDay(n);
583 }
584
585 /**
586 * Calculate number of days in one month for n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency .
587 * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5.
588 * Data set initial instance <p/> 6. Action Creation Time
589 *
590 * @param n instance count. domain: n is integer
591 * @return number of days in that month <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS
592 * @throws Exception
593 */
594 public static int ph2_coord_daysInMonth(int n) throws Exception {
595 int datasetFrequency = (int) getDSFrequency();// in minutes
596 // Calendar nominalInstanceCal =
597 // getCurrentInstance(getActionCreationtime());
598 Calendar nominalInstanceCal = getEffectiveNominalTime();
599 if (nominalInstanceCal == null) {
600 return -1;
601 }
602 nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n);
603 /*
604 * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0)
605 * { return -1; }
606 */
607 nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ
608 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
609 return nominalInstanceCal.getActualMaximum(Calendar.DAY_OF_MONTH);
610 }
611
612 public static int ph3_coord_daysInMonth(int n) throws Exception {
613 return ph2_coord_daysInMonth(n);
614 }
615
616 /**
617 * Determine the date-time in Oozie processing timezone of n-th latest available dataset instance. <p/> It depends
618 * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST
619 * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of
620 * dataset's directory
621 *
622 * @param n :instance count <p/> domain: n <= 0, n is integer
623 * @return date-time in Oozie processing timezone of the n-th instance <p/> returns 'null' means n-th instance is
624 * earlier than Initial-Instance of DS
625 * @throws Exception
626 */
627 public static String ph3_coord_latest(int n) throws Exception {
628 ParamChecker.checkLEZero(n, "latest:n");
629 if (isSyncDataSet()) {// For Sync Dataset
630 return coord_latest_sync(n);
631 }
632 else {
633 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
634 }
635 }
636
637 /**
638 * Determine the date-time in Oozie processing timezone of latest available dataset instances
639 * from start to end offsets from the nominal time. <p/> It depends
640 * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST
641 * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of
642 * dataset's directory
643 *
644 * @param start :start instance offset <p/> domain: start <= 0, start is integer
645 * @param end :end instance offset <p/> domain: end <= 0, end is integer
646 * @return date-time in Oozie processing timezone of the instances from start to end offsets
647 * delimited by comma. <p/> returns 'null' means start offset instance is
648 * earlier than Initial-Instance of DS
649 * @throws Exception
650 */
651 public static String ph3_coord_latestRange(int start, int end) throws Exception {
652 ParamChecker.checkLEZero(start, "latest:n");
653 ParamChecker.checkLEZero(end, "latest:n");
654 if (isSyncDataSet()) {// For Sync Dataset
655 return coord_latestRange_sync(start, end);
656 }
657 else {
658 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
659 }
660 }
661
662 /**
663 * Configure an evaluator with data set and application specific information. <p/> Helper method of associating
664 * dataset and application object
665 *
666 * @param evaluator : to set variables
667 * @param ds : Data Set object
668 * @param coordAction : Application instance
669 */
670 public static void configureEvaluator(ELEvaluator evaluator, SyncCoordDataset ds, SyncCoordAction coordAction) {
671 evaluator.setVariable(COORD_ACTION, coordAction);
672 evaluator.setVariable(DATASET, ds);
673 }
674
675 /**
676 * Helper method to wrap around with "${..}". <p/>
677 *
678 *
679 * @param eval :EL evaluator
680 * @param expr : expression to evaluate
681 * @return Resolved expression or echo back the same expression
682 * @throws Exception
683 */
684 public static String evalAndWrap(ELEvaluator eval, String expr) throws Exception {
685 try {
686 eval.setVariable(".wrap", null);
687 String result = eval.evaluate(expr, String.class);
688 if (eval.getVariable(".wrap") != null) {
689 return "${" + result + "}";
690 }
691 else {
692 return result;
693 }
694 }
695 catch (Exception e) {
696 throw new Exception("Unable to evaluate :" + expr + ":\n", e);
697 }
698 }
699
700 // Set of echo functions
701
702 public static String ph1_coord_current_echo(String n) {
703 return echoUnResolved("current", n);
704 }
705
706 public static String ph1_coord_currentRange_echo(String start, String end) {
707 return echoUnResolved("currentRange", start + ", " + end);
708 }
709
710 public static String ph1_coord_offset_echo(String n, String timeUnit) {
711 return echoUnResolved("offset", n + " , " + timeUnit);
712 }
713
714 public static String ph2_coord_current_echo(String n) {
715 return echoUnResolved("current", n);
716 }
717
718 public static String ph2_coord_currentRange_echo(String start, String end) {
719 return echoUnResolved("currentRange", start + ", " + end);
720 }
721
722 public static String ph2_coord_offset_echo(String n, String timeUnit) {
723 return echoUnResolved("offset", n + " , " + timeUnit);
724 }
725
726 public static String ph1_coord_dateOffset_echo(String n, String offset, String unit) {
727 return echoUnResolved("dateOffset", n + " , " + offset + " , " + unit);
728 }
729
730 public static String ph1_coord_formatTime_echo(String dateTime, String format) {
731 // Quote the dateTime value since it would contain a ':'.
732 return echoUnResolved("formatTime", "'"+dateTime+"'" + " , " + format);
733 }
734
735 public static String ph1_coord_latest_echo(String n) {
736 return echoUnResolved("latest", n);
737 }
738
739 public static String ph2_coord_latest_echo(String n) {
740 return ph1_coord_latest_echo(n);
741 }
742
743 public static String ph1_coord_future_echo(String n, String instance) {
744 return echoUnResolved("future", n + ", " + instance + "");
745 }
746
747 public static String ph2_coord_future_echo(String n, String instance) {
748 return ph1_coord_future_echo(n, instance);
749 }
750
751 public static String ph1_coord_latestRange_echo(String start, String end) {
752 return echoUnResolved("latestRange", start + ", " + end);
753 }
754
755 public static String ph2_coord_latestRange_echo(String start, String end) {
756 return ph1_coord_latestRange_echo(start, end);
757 }
758
759 public static String ph1_coord_futureRange_echo(String start, String end, String instance) {
760 return echoUnResolved("futureRange", start + ", " + end + ", " + instance);
761 }
762
763 public static String ph2_coord_futureRange_echo(String start, String end, String instance) {
764 return ph1_coord_futureRange_echo(start, end, instance);
765 }
766
767 public static String ph1_coord_dataIn_echo(String n) {
768 ELEvaluator eval = ELEvaluator.getCurrent();
769 String val = (String) eval.getVariable("oozie.dataname." + n);
770 if (val == null || val.equals("data-in") == false) {
771 XLog.getLog(CoordELFunctions.class).error("data_in_name " + n + " is not valid");
772 throw new RuntimeException("data_in_name " + n + " is not valid");
773 }
774 return echoUnResolved("dataIn", "'" + n + "'");
775 }
776
777 public static String ph1_coord_dataOut_echo(String n) {
778 ELEvaluator eval = ELEvaluator.getCurrent();
779 String val = (String) eval.getVariable("oozie.dataname." + n);
780 if (val == null || val.equals("data-out") == false) {
781 XLog.getLog(CoordELFunctions.class).error("data_out_name " + n + " is not valid");
782 throw new RuntimeException("data_out_name " + n + " is not valid");
783 }
784 return echoUnResolved("dataOut", "'" + n + "'");
785 }
786
787 public static String ph1_coord_nominalTime_echo() {
788 return echoUnResolved("nominalTime", "");
789 }
790
791 public static String ph1_coord_nominalTime_echo_wrap() {
792 // return "${coord:nominalTime()}"; // no resolution
793 return echoUnResolved("nominalTime", "");
794 }
795
796 public static String ph1_coord_nominalTime_echo_fixed() {
797 return "2009-03-06T010:00"; // Dummy resolution
798 }
799
800 public static String ph1_coord_actualTime_echo_wrap() {
801 // return "${coord:actualTime()}"; // no resolution
802 return echoUnResolved("actualTime", "");
803 }
804
805 public static String ph1_coord_actionId_echo() {
806 return echoUnResolved("actionId", "");
807 }
808
809 public static String ph1_coord_name_echo() {
810 return echoUnResolved("name", "");
811 }
812
813 // The following echo functions are not used in any phases yet
814 // They are here for future purpose.
815 public static String coord_minutes_echo(String n) {
816 return echoUnResolved("minutes", n);
817 }
818
819 public static String coord_hours_echo(String n) {
820 return echoUnResolved("hours", n);
821 }
822
823 public static String coord_days_echo(String n) {
824 return echoUnResolved("days", n);
825 }
826
827 public static String coord_endOfDay_echo(String n) {
828 return echoUnResolved("endOfDay", n);
829 }
830
831 public static String coord_months_echo(String n) {
832 return echoUnResolved("months", n);
833 }
834
835 public static String coord_endOfMonth_echo(String n) {
836 return echoUnResolved("endOfMonth", n);
837 }
838
839 public static String coord_actualTime_echo() {
840 return echoUnResolved("actualTime", "");
841 }
842
843 // This echo function will always return "24" for validation only.
844 // This evaluation ****should not**** replace the original XML
845 // Create a temporary string and validate the function
846 // This is **required** for evaluating an expression like
847 // coord:HoursInDay(0) + 3
848 // actual evaluation will happen in phase 2 or phase 3.
849 public static String ph1_coord_hoursInDay_echo(String n) {
850 return "24";
851 // return echoUnResolved("hoursInDay", n);
852 }
853
854 // This echo function will always return "30" for validation only.
855 // This evaluation ****should not**** replace the original XML
856 // Create a temporary string and validate the function
857 // This is **required** for evaluating an expression like
858 // coord:daysInMonth(0) + 3
859 // actual evaluation will happen in phase 2 or phase 3.
860 public static String ph1_coord_daysInMonth_echo(String n) {
861 // return echoUnResolved("daysInMonth", n);
862 return "30";
863 }
864
865 // This echo function will always return "3" for validation only.
866 // This evaluation ****should not**** replace the original XML
867 // Create a temporary string and validate the function
868 // This is **required** for evaluating an expression like coord:tzOffset + 2
869 // actual evaluation will happen in phase 2 or phase 3.
870 public static String ph1_coord_tzOffset_echo() {
871 // return echoUnResolved("tzOffset", "");
872 return "3";
873 }
874
875 // Local methods
876 /**
877 * @param n
878 * @return n-th instance Date-Time from current instance for data-set <p/> return empty string ("") if the
879 * Action_Creation_time or the n-th instance <p/> is earlier than the Initial_Instance of dataset.
880 * @throws Exception
881 */
882 private static String coord_current_sync(int n) throws Exception {
883 return coord_currentRange_sync(n, n);
884 }
885
886 private static String coord_currentRange_sync(int start, int end) throws Exception {
887 final XLog LOG = XLog.getLog(CoordELFunctions.class);
888 int datasetFrequency = getDSFrequency();// in minutes
889 TimeUnit dsTimeUnit = getDSTimeUnit();
890 int[] instCount = new int[1];// used as pass by ref
891 Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
892 StringBuilder instanceList = new StringBuilder();
893 if (nominalInstanceCal == null) {
894 LOG.warn("If the initial instance of the dataset is later than the nominal time, an empty string is"
895 + " returned. This means that no data is available at the current-instance specified by the user"
896 + " and the user could try modifying his initial-instance to an earlier time.");
897 return "";
898 } else {
899 Calendar initInstance = getInitialInstanceCal();
900 instCount[0] = instCount[0] + end;
901 // Add in the reverse order - newest instance first.
902 for (int i = end; i >= start; i--) {
903 // Tried to avoid the clone. But subtracting datasetFrequency gives different results than multiplying
904 // and Spring DST transition test in TestCoordELfunctions.testCurrent() fails
905 //nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), -datasetFrequency);
906 nominalInstanceCal = (Calendar) initInstance.clone();
907 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
908 instCount[0]--;
909 if (nominalInstanceCal.compareTo(initInstance) < 0) {
910 LOG.warn("If the initial instance of the dataset is later than the current-instance specified,"
911 + " such as coord:current({0}) in this case, an empty string is returned. This means that"
912 + " no data is available at the current-instance specified by the user and the user could"
913 + " try modifying his initial-instance to an earlier time.", start);
914 break;
915 }
916 else {
917 instanceList.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
918 instanceList.append(CoordELFunctions.INSTANCE_SEPARATOR);
919 }
920 }
921 }
922
923 if (instanceList.length() > 0) {
924 instanceList.setLength(instanceList.length() - CoordELFunctions.INSTANCE_SEPARATOR.length());
925 }
926 return instanceList.toString();
927 }
928
929 /**
930 *
931 * @param n offset amount (integer)
932 * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR")
933 * @return the offset time from the effective nominal time <p/> return empty string ("") if the Action_Creation_time or the
934 * offset instance <p/> is earlier than the Initial_Instance of dataset.
935 * @throws Exception
936 */
937 private static String coord_offset_sync(int n, String timeUnit) throws Exception {
938 Calendar rawCal = resolveOffsetRawTime(n, TimeUnit.valueOf(timeUnit), null);
939 if (rawCal == null) {
940 // warning already logged by resolveOffsetRawTime()
941 return "";
942 }
943
944 int freq = getDSFrequency();
945 TimeUnit freqUnit = getDSTimeUnit();
946 int freqCount = 0;
947 // We're going to manually turn back/forward cal by decrements/increments of freq and then check that it gives the same
948 // time as rawCal; this is to check that the offset time resolves to a frequency offset of the effective nominal time
949 // In other words, that there exists an integer x, such that coord:offset(n, timeUnit) == coord:current(x) is true
950 // If not, then we'll "rewind" rawCal to the latest instance earlier than rawCal and use that.
951 Calendar cal = getInitialInstanceCal();
952 if (rawCal.before(cal)) {
953 while (cal.after(rawCal)) {
954 cal.add(freqUnit.getCalendarUnit(), -freq);
955 freqCount--;
956 }
957 }
958 else if (rawCal.after(cal)) {
959 while (cal.before(rawCal)) {
960 cal.add(freqUnit.getCalendarUnit(), freq);
961 freqCount++;
962 }
963 }
964 if (cal.before(rawCal)) {
965 rawCal = cal;
966 }
967 else if (cal.after(rawCal)) {
968 cal.add(freqUnit.getCalendarUnit(), -freq);
969 rawCal = cal;
970 freqCount--;
971 }
972 String rawCalStr = DateUtils.formatDateOozieTZ(rawCal);
973
974 Calendar nominalInstanceCal = getInitialInstanceCal();
975 nominalInstanceCal.add(freqUnit.getCalendarUnit(), freq * freqCount);
976 if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) {
977 XLog.getLog(CoordELFunctions.class).warn("If the initial instance of the dataset is later than the offset instance"
978 + " specified, such as coord:offset({0}, {1}) in this case, an empty string is returned. This means that no"
979 + " data is available at the offset instance specified by the user and the user could try modifying his"
980 + " initial-instance to an earlier time.", n, timeUnit);
981 return "";
982 }
983 String nominalCalStr = DateUtils.formatDateOozieTZ(nominalInstanceCal);
984
985 if (!rawCalStr.equals(nominalCalStr)) {
986 throw new RuntimeException("Shouldn't happen");
987 }
988 return rawCalStr;
989 }
990
991 /**
992 * @param offset
993 * @return n-th available latest instance Date-Time for SYNC data-set
994 * @throws Exception
995 */
996 private static String coord_latest_sync(int offset) throws Exception {
997 return coord_latestRange_sync(offset, offset);
998 }
999
1000 private static String coord_latestRange_sync(int startOffset, int endOffset) throws Exception {
1001 final XLog LOG = XLog.getLog(CoordELFunctions.class);
1002 final Thread currentThread = Thread.currentThread();
1003 ELEvaluator eval = ELEvaluator.getCurrent();
1004 String retVal = "";
1005 int datasetFrequency = (int) getDSFrequency();// in minutes
1006 TimeUnit dsTimeUnit = getDSTimeUnit();
1007 int[] instCount = new int[1];
1008 boolean useCurrentTime = Services.get().getConf().getBoolean(LATEST_EL_USE_CURRENT_TIME, false);
1009 Calendar nominalInstanceCal;
1010 if (useCurrentTime) {
1011 nominalInstanceCal = getCurrentInstance(new Date(), instCount);
1012 }
1013 else {
1014 nominalInstanceCal = getCurrentInstance(getActualTime(), instCount);
1015 }
1016 StringBuilder resolvedInstances = new StringBuilder();
1017 StringBuilder resolvedURIPaths = new StringBuilder();
1018 if (nominalInstanceCal != null) {
1019 Calendar initInstance = getInitialInstanceCal();
1020 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1021 if (ds == null) {
1022 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1023 }
1024 String uriTemplate = ds.getUriTemplate();
1025 Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
1026 if (conf == null) {
1027 throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
1028 }
1029 int available = 0;
1030 boolean resolved = false;
1031 String user = ParamChecker
1032 .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
1033 String doneFlag = ds.getDoneFlag();
1034 URIHandlerService uriService = Services.get().get(URIHandlerService.class);
1035 URIHandler uriHandler = null;
1036 Context uriContext = null;
1037 try {
1038 while (nominalInstanceCal.compareTo(initInstance) >= 0 && !currentThread.isInterrupted()) {
1039 ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
1040 String uriPath = uriEval.evaluate(uriTemplate, String.class);
1041 if (uriHandler == null) {
1042 URI uri = new URI(uriPath);
1043 uriHandler = uriService.getURIHandler(uri);
1044 uriContext = uriHandler.getContext(uri, conf, user);
1045 }
1046 String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
1047 if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) {
1048 XLog.getLog(CoordELFunctions.class)
1049 .debug("Found latest(" + available + "): " + uriWithDoneFlag);
1050 if (available == startOffset) {
1051 LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag);
1052 resolved = true;
1053 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
1054 resolvedURIPaths.append(uriPath);
1055 retVal = resolvedInstances.toString();
1056 eval.setVariable("resolved_path", resolvedURIPaths.toString());
1057 break;
1058 }
1059 else if (available <= endOffset) {
1060 LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag);
1061 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(
1062 INSTANCE_SEPARATOR);
1063 resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
1064 }
1065
1066 available--;
1067 }
1068 // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), -datasetFrequency);
1069 nominalInstanceCal = (Calendar) initInstance.clone();
1070 instCount[0]--;
1071 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
1072 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
1073 }
1074 }
1075 finally {
1076 if (uriContext != null) {
1077 uriContext.destroy();
1078 }
1079 }
1080 if (!resolved) {
1081 // return unchanged latest function with variable 'is_resolved'
1082 // to 'false'
1083 eval.setVariable("is_resolved", Boolean.FALSE);
1084 if (startOffset == endOffset) {
1085 retVal = "${coord:latest(" + startOffset + ")}";
1086 }
1087 else {
1088 retVal = "${coord:latestRange(" + startOffset + "," + endOffset + ")}";
1089 }
1090 }
1091 else {
1092 eval.setVariable("is_resolved", Boolean.TRUE);
1093 }
1094 }
1095 else {// No feasible nominal time
1096 eval.setVariable("is_resolved", Boolean.FALSE);
1097 }
1098 return retVal;
1099 }
1100
1101 /**
1102 * @param tm
1103 * @return a new Evaluator to be used for URI-template evaluation
1104 */
1105 private static ELEvaluator getUriEvaluator(Calendar tm) {
1106 tm.setTimeZone(DateUtils.getOozieProcessingTimeZone());
1107 ELEvaluator retEval = new ELEvaluator();
1108 retEval.setVariable("YEAR", tm.get(Calendar.YEAR));
1109 retEval.setVariable("MONTH", (tm.get(Calendar.MONTH) + 1) < 10 ? "0" + (tm.get(Calendar.MONTH) + 1) : (tm
1110 .get(Calendar.MONTH) + 1));
1111 retEval.setVariable("DAY", tm.get(Calendar.DAY_OF_MONTH) < 10 ? "0" + tm.get(Calendar.DAY_OF_MONTH) : tm
1112 .get(Calendar.DAY_OF_MONTH));
1113 retEval.setVariable("HOUR", tm.get(Calendar.HOUR_OF_DAY) < 10 ? "0" + tm.get(Calendar.HOUR_OF_DAY) : tm
1114 .get(Calendar.HOUR_OF_DAY));
1115 retEval.setVariable("MINUTE", tm.get(Calendar.MINUTE) < 10 ? "0" + tm.get(Calendar.MINUTE) : tm
1116 .get(Calendar.MINUTE));
1117 return retEval;
1118 }
1119
1120 /**
1121 * @return whether a data set is SYNCH or ASYNC
1122 */
1123 private static boolean isSyncDataSet() {
1124 ELEvaluator eval = ELEvaluator.getCurrent();
1125 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1126 if (ds == null) {
1127 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1128 }
1129 return ds.getType().equalsIgnoreCase("SYNC");
1130 }
1131
1132 /**
1133 * Check whether a function should be resolved.
1134 *
1135 * @param functionName
1136 * @param n
1137 * @return null if the functionName needs to be resolved otherwise return the calling function unresolved.
1138 */
1139 private static String checkIfResolved(String functionName, String n) {
1140 ELEvaluator eval = ELEvaluator.getCurrent();
1141 String replace = (String) eval.getVariable("resolve_" + functionName);
1142 if (replace == null || (replace != null && replace.equalsIgnoreCase("false"))) { // Don't
1143 // resolve
1144 // return "${coord:" + functionName + "(" + n +")}"; //Unresolved
1145 eval.setVariable(".wrap", "true");
1146 return "coord:" + functionName + "(" + n + ")"; // Unresolved
1147 }
1148 return null; // Resolved it
1149 }
1150
1151 private static String echoUnResolved(String functionName, String n) {
1152 return echoUnResolvedPre(functionName, n, "coord:");
1153 }
1154
1155 private static String echoUnResolvedPre(String functionName, String n, String prefix) {
1156 ELEvaluator eval = ELEvaluator.getCurrent();
1157 eval.setVariable(".wrap", "true");
1158 return prefix + functionName + "(" + n + ")"; // Unresolved
1159 }
1160
1161 /**
1162 * @return the initial instance of a DataSet in DATE
1163 */
1164 private static Date getInitialInstance() {
1165 ELEvaluator eval = ELEvaluator.getCurrent();
1166 return getInitialInstance(eval);
1167 }
1168
1169 /**
1170 * @return the initial instance of a DataSet in DATE
1171 */
1172 private static Date getInitialInstance(ELEvaluator eval) {
1173 return getInitialInstanceCal(eval).getTime();
1174 // return ds.getInitInstance();
1175 }
1176
1177 /**
1178 * @return the initial instance of a DataSet in Calendar
1179 */
1180 private static Calendar getInitialInstanceCal() {
1181 ELEvaluator eval = ELEvaluator.getCurrent();
1182 return getInitialInstanceCal(eval);
1183 }
1184
1185 /**
1186 * @return the initial instance of a DataSet in Calendar
1187 */
1188 private static Calendar getInitialInstanceCal(ELEvaluator eval) {
1189 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1190 if (ds == null) {
1191 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1192 }
1193 Calendar effInitTS = Calendar.getInstance();
1194 effInitTS.setTime(ds.getInitInstance());
1195 effInitTS.setTimeZone(ds.getTimeZone());
1196 // To adjust EOD/EOM
1197 DateUtils.moveToEnd(effInitTS, getDSEndOfFlag(eval));
1198 return effInitTS;
1199 // return ds.getInitInstance();
1200 }
1201
1202 /**
1203 * @return Nominal or action creation Time when all the dependencies of an application instance are met.
1204 */
1205 private static Date getActionCreationtime() {
1206 ELEvaluator eval = ELEvaluator.getCurrent();
1207 return getActionCreationtime(eval);
1208 }
1209
1210 /**
1211 * @return Nominal or action creation Time when all the dependencies of an application instance are met.
1212 */
1213 private static Date getActionCreationtime(ELEvaluator eval) {
1214 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
1215 if (coordAction == null) {
1216 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
1217 }
1218 return coordAction.getNominalTime();
1219 }
1220
1221 /**
1222 * @return Actual Time when all the dependencies of an application instance are met.
1223 */
1224 private static Date getActualTime() {
1225 ELEvaluator eval = ELEvaluator.getCurrent();
1226 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
1227 if (coordAction == null) {
1228 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
1229 }
1230 return coordAction.getActualTime();
1231 }
1232
1233 /**
1234 * @return TimeZone for the application or job.
1235 */
1236 private static TimeZone getJobTZ() {
1237 ELEvaluator eval = ELEvaluator.getCurrent();
1238 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
1239 if (coordAction == null) {
1240 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
1241 }
1242 return coordAction.getTimeZone();
1243 }
1244
1245 /**
1246 * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time)
1247 *
1248 * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
1249 * the dataset.
1250 */
1251 public static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[]) {
1252 ELEvaluator eval = ELEvaluator.getCurrent();
1253 return getCurrentInstance(effectiveTime, instanceCount, eval);
1254 }
1255
1256 /**
1257 * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time)
1258 *
1259 * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
1260 * the dataset.
1261 */
1262 private static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[], ELEvaluator eval) {
1263 Date datasetInitialInstance = getInitialInstance(eval);
1264 TimeUnit dsTimeUnit = getDSTimeUnit(eval);
1265 TimeZone dsTZ = getDatasetTZ(eval);
1266 int dsFreq = getDSFrequency(eval);
1267 // Convert Date to Calendar for corresponding TZ
1268 Calendar current = Calendar.getInstance();
1269 current.setTime(datasetInitialInstance);
1270 current.setTimeZone(dsTZ);
1271
1272 Calendar calEffectiveTime = Calendar.getInstance();
1273 calEffectiveTime.setTime(effectiveTime);
1274 calEffectiveTime.setTimeZone(dsTZ);
1275 if (instanceCount == null) { // caller doesn't care about this value
1276 instanceCount = new int[1];
1277 }
1278 instanceCount[0] = 0;
1279 if (current.compareTo(calEffectiveTime) > 0) {
1280 return null;
1281 }
1282 Calendar origCurrent = (Calendar) current.clone();
1283 while (current.compareTo(calEffectiveTime) <= 0) {
1284 current = (Calendar) origCurrent.clone();
1285 instanceCount[0]++;
1286 current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq);
1287 }
1288 instanceCount[0]--;
1289
1290 current = (Calendar) origCurrent.clone();
1291 current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq);
1292 return current;
1293 }
1294
1295 public static Calendar getEffectiveNominalTime() {
1296 Date datasetInitialInstance = getInitialInstance();
1297 TimeZone dsTZ = getDatasetTZ();
1298 // Convert Date to Calendar for corresponding TZ
1299 Calendar current = Calendar.getInstance();
1300 current.setTime(datasetInitialInstance);
1301 current.setTimeZone(dsTZ);
1302
1303 Calendar calEffectiveTime = Calendar.getInstance();
1304 calEffectiveTime.setTime(getActionCreationtime());
1305 calEffectiveTime.setTimeZone(dsTZ);
1306 if (current.compareTo(calEffectiveTime) > 0) {
1307 // Nominal Time < initial Instance
1308 // TODO: getClass() call doesn't work from static method.
1309 // XLog.getLog("CoordELFunction.class").warn("ACTION CREATED BEFORE INITIAL INSTACE "+
1310 // current.getTime());
1311 return null;
1312 }
1313 return calEffectiveTime;
1314 }
1315
1316 /**
1317 * @return dataset frequency in minutes
1318 */
1319 private static int getDSFrequency() {
1320 ELEvaluator eval = ELEvaluator.getCurrent();
1321 return getDSFrequency(eval);
1322 }
1323
1324 /**
1325 * @return dataset frequency in minutes
1326 */
1327 private static int getDSFrequency(ELEvaluator eval) {
1328 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1329 if (ds == null) {
1330 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1331 }
1332 return ds.getFrequency();
1333 }
1334
1335 /**
1336 * @return dataset TimeUnit
1337 */
1338 private static TimeUnit getDSTimeUnit() {
1339 ELEvaluator eval = ELEvaluator.getCurrent();
1340 return getDSTimeUnit(eval);
1341 }
1342
1343 /**
1344 * @return dataset TimeUnit
1345 */
1346 private static TimeUnit getDSTimeUnit(ELEvaluator eval) {
1347 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1348 if (ds == null) {
1349 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1350 }
1351 return ds.getTimeUnit();
1352 }
1353
1354 /**
1355 * @return dataset TimeZone
1356 */
1357 public static TimeZone getDatasetTZ() {
1358 ELEvaluator eval = ELEvaluator.getCurrent();
1359 return getDatasetTZ(eval);
1360 }
1361
1362 /**
1363 * @return dataset TimeZone
1364 */
1365 private static TimeZone getDatasetTZ(ELEvaluator eval) {
1366 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1367 if (ds == null) {
1368 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1369 }
1370 return ds.getTimeZone();
1371 }
1372
1373 /**
1374 * @return dataset TimeUnit
1375 */
1376 private static TimeUnit getDSEndOfFlag() {
1377 ELEvaluator eval = ELEvaluator.getCurrent();
1378 return getDSEndOfFlag(eval);
1379 }
1380
1381 /**
1382 * @return dataset TimeUnit
1383 */
1384 private static TimeUnit getDSEndOfFlag(ELEvaluator eval) {
1385 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1386 if (ds == null) {
1387 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1388 }
1389 return ds.getEndOfDuration();// == null ? "": ds.getEndOfDuration();
1390 }
1391
1392 /**
1393 * Return a job configuration property for the coordinator.
1394 *
1395 * @param property property name.
1396 * @return the value of the property, <code>null</code> if the property is undefined.
1397 */
1398 public static String coord_conf(String property) {
1399 ELEvaluator eval = ELEvaluator.getCurrent();
1400 return (String) eval.getVariable(property);
1401 }
1402
1403 /**
1404 * Return the user that submitted the coordinator job.
1405 *
1406 * @return the user that submitted the coordinator job.
1407 */
1408 public static String coord_user() {
1409 ELEvaluator eval = ELEvaluator.getCurrent();
1410 return (String) eval.getVariable(OozieClient.USER_NAME);
1411 }
1412
1413 /**
1414 * Takes two offset times and returns a list of multiples of the frequency offset from the effective nominal time that occur
1415 * between them. The caller should make sure that startCal is earlier than endCal.
1416 * <p>
1417 * As a simple example, assume its the same day: startCal is 1:00, endCal is 2:00, frequency is 20min, and effective nominal
1418 * time is 1:20 -- then this method would return a list containing: -20, 0, 20, 40, 60
1419 *
1420 * @param startCal The earlier offset time
1421 * @param endCal The later offset time
1422 * @param eval The ELEvaluator to use; cannot be null
1423 * @return A list of multiple of the frequency offset from the effective nominal time that occur between the startCal and endCal
1424 */
1425 public static List<Integer> expandOffsetTimes(Calendar startCal, Calendar endCal, ELEvaluator eval) {
1426 List<Integer> expandedFreqs = new ArrayList<Integer>();
1427 // Use eval because the "current" eval isn't set
1428 int freq = getDSFrequency(eval);
1429 TimeUnit freqUnit = getDSTimeUnit(eval);
1430 Calendar cal = getCurrentInstance(getActionCreationtime(eval), null, eval);
1431 int totalFreq = 0;
1432 if (startCal.before(cal)) {
1433 while (cal.after(startCal)) {
1434 cal.add(freqUnit.getCalendarUnit(), -freq);
1435 totalFreq += -freq;
1436 }
1437 if (cal.before(startCal)) {
1438 cal.add(freqUnit.getCalendarUnit(), freq);
1439 totalFreq += freq;
1440 }
1441 }
1442 else if (startCal.after(cal)) {
1443 while (cal.before(startCal)) {
1444 cal.add(freqUnit.getCalendarUnit(), freq);
1445 totalFreq += freq;
1446 }
1447 }
1448 // At this point, cal is the smallest multiple of the dataset frequency that is >= to the startCal and offset from the
1449 // effective nominal time. Now we can find all of the instances that occur between startCal and endCal, inclusive.
1450 while (cal.before(endCal) || cal.equals(endCal)) {
1451 expandedFreqs.add(totalFreq);
1452 cal.add(freqUnit.getCalendarUnit(), freq);
1453 totalFreq += freq;
1454 }
1455 return expandedFreqs;
1456 }
1457
1458 /**
1459 * Resolve the offset time from the effective nominal time
1460 *
1461 * @param n offset amount (integer)
1462 * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR")
1463 * @param eval The ELEvaluator to use; or null to use the "current" eval
1464 * @return A Calendar of the offset time
1465 */
1466 public static Calendar resolveOffsetRawTime(int n, TimeUnit timeUnit, ELEvaluator eval) {
1467 // Use eval if given (for when the "current" eval isn't set)
1468 Calendar cal;
1469 if (eval == null) {
1470 cal = getCurrentInstance(getActionCreationtime(), null);
1471 }
1472 else {
1473 cal = getCurrentInstance(getActionCreationtime(eval), null, eval);
1474 }
1475 if (cal == null) {
1476 XLog.getLog(CoordELFunctions.class).warn("If the initial instance of the dataset is later than the nominal time, an"
1477 + " empty string is returned. This means that no data is available at the offset instance specified by the user"
1478 + " and the user could try modifying his or her initial-instance to an earlier time.");
1479 return null;
1480 }
1481 cal.add(timeUnit.getCalendarUnit(), n);
1482 return cal;
1483 }
1484 }