This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.coord;
019
020 import java.io.IOException;
021 import java.util.Calendar;
022 import java.util.Date;
023 import java.util.TimeZone;
024
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.hadoop.fs.Path;
027
028 import org.apache.oozie.client.OozieClient;
029 import org.apache.oozie.util.DateUtils;
030 import org.apache.oozie.util.ELEvaluator;
031 import org.apache.oozie.util.ParamChecker;
032 import org.apache.oozie.util.XLog;
033 import org.apache.oozie.service.HadoopAccessorException;
034 import org.apache.oozie.service.Services;
035 import org.apache.oozie.service.HadoopAccessorService;
036
037 /**
038 * This class implements the EL function related to coordinator
039 */
040
041 public class CoordELFunctions {
042 final private static String DATASET = "oozie.coord.el.dataset.bean";
043 final private static String COORD_ACTION = "oozie.coord.el.app.bean";
044 final public static String CONFIGURATION = "oozie.coord.el.conf";
045 // INSTANCE_SEPARATOR is used to separate multiple directories into one tag.
046 final public static String INSTANCE_SEPARATOR = "#";
047 final public static String DIR_SEPARATOR = ",";
048 // TODO: in next release, support flexibility
049 private static String END_OF_OPERATION_INDICATOR_FILE = "_SUCCESS";
050
051 /**
052 * Used in defining the frequency in 'day' unit. <p/> domain: <code> val > 0</code> and should be integer.
053 *
054 * @param val frequency in number of days.
055 * @return number of days and also set the frequency timeunit to "day"
056 */
057 public static int ph1_coord_days(int val) {
058 val = ParamChecker.checkGTZero(val, "n");
059 ELEvaluator eval = ELEvaluator.getCurrent();
060 eval.setVariable("timeunit", TimeUnit.DAY);
061 eval.setVariable("endOfDuration", TimeUnit.NONE);
062 return val;
063 }
064
065 /**
066 * Used in defining the frequency in 'month' unit. <p/> domain: <code> val > 0</code> and should be integer.
067 *
068 * @param val frequency in number of months.
069 * @return number of months and also set the frequency timeunit to "month"
070 */
071 public static int ph1_coord_months(int val) {
072 val = ParamChecker.checkGTZero(val, "n");
073 ELEvaluator eval = ELEvaluator.getCurrent();
074 eval.setVariable("timeunit", TimeUnit.MONTH);
075 eval.setVariable("endOfDuration", TimeUnit.NONE);
076 return val;
077 }
078
079 /**
080 * Used in defining the frequency in 'hour' unit. <p/> parameter value domain: <code> val > 0</code> and should
081 * be integer.
082 *
083 * @param val frequency in number of hours.
084 * @return number of minutes and also set the frequency timeunit to "minute"
085 */
086 public static int ph1_coord_hours(int val) {
087 val = ParamChecker.checkGTZero(val, "n");
088 ELEvaluator eval = ELEvaluator.getCurrent();
089 eval.setVariable("timeunit", TimeUnit.MINUTE);
090 eval.setVariable("endOfDuration", TimeUnit.NONE);
091 return val * 60;
092 }
093
094 /**
095 * Used in defining the frequency in 'minute' unit. <p/> domain: <code> val > 0</code> and should be integer.
096 *
097 * @param val frequency in number of minutes.
098 * @return number of minutes and also set the frequency timeunit to "minute"
099 */
100 public static int ph1_coord_minutes(int val) {
101 val = ParamChecker.checkGTZero(val, "n");
102 ELEvaluator eval = ELEvaluator.getCurrent();
103 eval.setVariable("timeunit", TimeUnit.MINUTE);
104 eval.setVariable("endOfDuration", TimeUnit.NONE);
105 return val;
106 }
107
108 /**
109 * Used in defining the frequency in 'day' unit and specify the "end of day" property. <p/> Every instance will
110 * start at 00:00 hour of each day. <p/> domain: <code> val > 0</code> and should be integer.
111 *
112 * @param val frequency in number of days.
113 * @return number of days and also set the frequency timeunit to "day" and end_of_duration flag to "day"
114 */
115 public static int ph1_coord_endOfDays(int val) {
116 val = ParamChecker.checkGTZero(val, "n");
117 ELEvaluator eval = ELEvaluator.getCurrent();
118 eval.setVariable("timeunit", TimeUnit.DAY);
119 eval.setVariable("endOfDuration", TimeUnit.END_OF_DAY);
120 return val;
121 }
122
123 /**
124 * Used in defining the frequency in 'month' unit and specify the "end of month" property. <p/> Every instance will
125 * start at first day of each month at 00:00 hour. <p/> domain: <code> val > 0</code> and should be integer.
126 *
127 * @param val: frequency in number of months.
128 * @return number of months and also set the frequency timeunit to "month" and end_of_duration flag to "month"
129 */
130 public static int ph1_coord_endOfMonths(int val) {
131 val = ParamChecker.checkGTZero(val, "n");
132 ELEvaluator eval = ELEvaluator.getCurrent();
133 eval.setVariable("timeunit", TimeUnit.MONTH);
134 eval.setVariable("endOfDuration", TimeUnit.END_OF_MONTH);
135 return val;
136 }
137
138 /**
139 * Calculate the difference of timezone offset in minutes between dataset and coordinator job. <p/> Depends on: <p/>
140 * 1. Timezone of both dataset and job <p/> 2. Action creation Time
141 *
142 * @return difference in minutes (DataSet TZ Offset - Application TZ offset)
143 */
144 public static int ph2_coord_tzOffset() {
145 Date actionCreationTime = getActionCreationtime();
146 TimeZone dsTZ = ParamChecker.notNull(getDatasetTZ(), "DatasetTZ");
147 TimeZone jobTZ = ParamChecker.notNull(getJobTZ(), "JobTZ");
148 // Apply the TZ into Calendar object
149 Calendar dsTime = Calendar.getInstance(dsTZ);
150 dsTime.setTime(actionCreationTime);
151 Calendar jobTime = Calendar.getInstance(jobTZ);
152 jobTime.setTime(actionCreationTime);
153 return (dsTime.get(Calendar.ZONE_OFFSET) - jobTime.get(Calendar.ZONE_OFFSET)) / (1000 * 60);
154 }
155
156 public static int ph3_coord_tzOffset() {
157 return ph2_coord_tzOffset();
158 }
159
160 /**
161 * Returns the a date string while given a base date in 'strBaseDate',
162 * offset and unit (e.g. DAY, MONTH, HOUR, MINUTE, MONTH).
163 *
164 * @param strBaseDate -- base date
165 * @param offset -- any number
166 * @param unit -- DAY, MONTH, HOUR, MINUTE, MONTH
167 * @return date string
168 * @throws Exception
169 */
170 public static String ph2_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception {
171 Calendar baseCalDate = DateUtils.getCalendar(strBaseDate);
172 StringBuilder buffer = new StringBuilder();
173 baseCalDate.add(TimeUnit.valueOf(unit).getCalendarUnit(), offset);
174 buffer.append(DateUtils.formatDateUTC(baseCalDate));
175 return buffer.toString();
176 }
177
178 public static String ph3_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception {
179 return ph2_coord_dateOffset(strBaseDate, offset, unit);
180 }
181
182 /**
183 * Determine the date-time in UTC of n-th future available dataset instance
184 * from nominal Time but not beyond the instance specified as 'instance.
185 * <p/>
186 * It depends on:
187 * <p/>
188 * 1. Data set frequency
189 * <p/>
190 * 2. Data set Time unit (day, month, minute)
191 * <p/>
192 * 3. Data set Time zone/DST
193 * <p/>
194 * 4. End Day/Month flag
195 * <p/>
196 * 5. Data set initial instance
197 * <p/>
198 * 6. Action Creation Time
199 * <p/>
200 * 7. Existence of dataset's directory
201 *
202 * @param n :instance count
203 * <p/>
204 * domain: n >= 0, n is integer
205 * @param instance: How many future instance it should check? value should
206 * be >=0
207 * @return date-time in UTC of the n-th instance
208 * <p/>
209 * @throws Exception
210 */
211 public static String ph3_coord_future(int n, int instance) throws Exception {
212 ParamChecker.checkGEZero(n, "future:n");
213 ParamChecker.checkGTZero(instance, "future:instance");
214 if (isSyncDataSet()) {// For Sync Dataset
215 return coord_future_sync(n, instance);
216 }
217 else {
218 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
219 }
220 }
221
222 private static String coord_future_sync(int n, int instance) throws Exception {
223 ELEvaluator eval = ELEvaluator.getCurrent();
224 String retVal = "";
225 int datasetFrequency = (int) getDSFrequency();// in minutes
226 TimeUnit dsTimeUnit = getDSTimeUnit();
227 int[] instCount = new int[1];
228 Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
229 if (nominalInstanceCal != null) {
230 Calendar initInstance = getInitialInstanceCal();
231 nominalInstanceCal = (Calendar) initInstance.clone();
232 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
233
234 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
235 if (ds == null) {
236 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
237 }
238 String uriTemplate = ds.getUriTemplate();
239 Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
240 if (conf == null) {
241 throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
242 }
243 int available = 0, checkedInstance = 0;
244 boolean resolved = false;
245 String user = ParamChecker
246 .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
247 String group = ParamChecker.notEmpty((String) eval.getVariable(OozieClient.GROUP_NAME),
248 OozieClient.GROUP_NAME);
249 String doneFlag = ds.getDoneFlag();
250 while (instance >= checkedInstance) {
251 ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
252 String uriPath = uriEval.evaluate(uriTemplate, String.class);
253 String pathWithDoneFlag = uriPath;
254 if (doneFlag.length() > 0) {
255 pathWithDoneFlag += "/" + doneFlag;
256 }
257 if (isPathAvailable(pathWithDoneFlag, user, group, conf)) {
258 XLog.getLog(CoordELFunctions.class).debug("Found future(" + available + "): " + pathWithDoneFlag);
259 if (available == n) {
260 XLog.getLog(CoordELFunctions.class).debug("Found future File: " + pathWithDoneFlag);
261 resolved = true;
262 retVal = DateUtils.formatDateUTC(nominalInstanceCal);
263 eval.setVariable("resolved_path", uriPath);
264 break;
265 }
266 available++;
267 }
268 // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(),
269 // -datasetFrequency);
270 nominalInstanceCal = (Calendar) initInstance.clone();
271 instCount[0]++;
272 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
273 checkedInstance++;
274 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
275 }
276 if (!resolved) {
277 // return unchanged future function with variable 'is_resolved'
278 // to 'false'
279 eval.setVariable("is_resolved", Boolean.FALSE);
280 retVal = "${coord:future(" + n + ", " + instance + ")}";
281 }
282 else {
283 eval.setVariable("is_resolved", Boolean.TRUE);
284 }
285 }
286 else {// No feasible nominal time
287 eval.setVariable("is_resolved", Boolean.TRUE);
288 retVal = "";
289 }
290 return retVal;
291 }
292
293 /**
294 * Return nominal time or Action Creation Time.
295 * <p/>
296 *
297 * @return coordinator action creation or materialization date time
298 * @throws Exception if unable to format the Date object to String
299 */
300 public static String ph2_coord_nominalTime() throws Exception {
301 ELEvaluator eval = ELEvaluator.getCurrent();
302 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
303 "Coordinator Action");
304 return DateUtils.formatDateUTC(action.getNominalTime());
305 }
306
307 public static String ph3_coord_nominalTime() throws Exception {
308 return ph2_coord_nominalTime();
309 }
310
311 /**
312 * Convert from standard date-time formatting to a desired format.
313 * <p/>
314 * @param dateTimeStr - A timestamp in standard (ISO8601) format.
315 * @param format - A string representing the desired format.
316 * @return coordinator action creation or materialization date time
317 * @throws Exception if unable to format the Date object to String
318 */
319 public static String ph2_coord_formatTime(String dateTimeStr, String format)
320 throws Exception {
321 Date dateTime = DateUtils.parseDateUTC(dateTimeStr);
322 return DateUtils.formatDateCustom(dateTime, format);
323 }
324
325 public static String ph3_coord_formatTime(String dateTimeStr, String format)
326 throws Exception {
327 return ph2_coord_formatTime(dateTimeStr, format);
328 }
329
330 /**
331 * Return Action Id. <p/>
332 *
333 * @return coordinator action Id
334 */
335 public static String ph2_coord_actionId() throws Exception {
336 ELEvaluator eval = ELEvaluator.getCurrent();
337 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
338 "Coordinator Action");
339 return action.getActionId();
340 }
341
342 public static String ph3_coord_actionId() throws Exception {
343 return ph2_coord_actionId();
344 }
345
346 /**
347 * Return Job Name. <p/>
348 *
349 * @return coordinator name
350 */
351 public static String ph2_coord_name() throws Exception {
352 ELEvaluator eval = ELEvaluator.getCurrent();
353 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
354 "Coordinator Action");
355 return action.getName();
356 }
357
358 public static String ph3_coord_name() throws Exception {
359 return ph2_coord_name();
360 }
361
362 /**
363 * Return Action Start time. <p/>
364 *
365 * @return coordinator action start time
366 * @throws Exception if unable to format the Date object to String
367 */
368 public static String ph2_coord_actualTime() throws Exception {
369 ELEvaluator eval = ELEvaluator.getCurrent();
370 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
371 if (coordAction == null) {
372 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
373 }
374 return DateUtils.formatDateUTC(coordAction.getActualTime());
375 }
376
377 public static String ph3_coord_actualTime() throws Exception {
378 return ph2_coord_actualTime();
379 }
380
381 /**
382 * Used to specify a list of URI's that are used as input dir to the workflow job. <p/> Look for two evaluator-level
383 * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
384 * URI. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
385 * unresolved, this function will echo back the original function <p/> otherwise it sends the uris.
386 *
387 * @param dataInName : Datain name
388 * @return the list of URI's separated by INSTANCE_SEPARATOR <p/> if there are unresolved EL function (i.e. latest)
389 * , echo back <p/> the function without resolving the function.
390 */
391 public static String ph3_coord_dataIn(String dataInName) {
392 String uris = "";
393 ELEvaluator eval = ELEvaluator.getCurrent();
394 uris = (String) eval.getVariable(".datain." + dataInName);
395 Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
396 if (unresolved != null && unresolved.booleanValue() == true) {
397 return "${coord:dataIn('" + dataInName + "')}";
398 }
399 return uris;
400 }
401
402 /**
403 * Used to specify a list of URI's that are output dir of the workflow job. <p/> Look for one evaluator-level
404 * variable <p/> dataout.<DATAOUT_NAME> <p/> It defines the current list of URI. <p/> otherwise it sends the uris.
405 *
406 * @param dataOutName : Dataout name
407 * @return the list of URI's separated by INSTANCE_SEPARATOR
408 */
409 public static String ph3_coord_dataOut(String dataOutName) {
410 String uris = "";
411 ELEvaluator eval = ELEvaluator.getCurrent();
412 uris = (String) eval.getVariable(".dataout." + dataOutName);
413 return uris;
414 }
415
416 /**
417 * Determine the date-time in UTC of n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency <p/> 2.
418 * Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. Data
419 * set initial instance <p/> 6. Action Creation Time
420 *
421 * @param n instance count domain: n is integer
422 * @return date-time in UTC of the n-th instance returns 'null' means n-th instance is earlier than Initial-Instance
423 * of DS
424 * @throws Exception
425 */
426 public static String ph2_coord_current(int n) throws Exception {
427 if (isSyncDataSet()) { // For Sync Dataset
428 return coord_current_sync(n);
429 }
430 else {
431 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
432 }
433 }
434
435 /**
436 * Determine how many hours is on the date of n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency
437 * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5.
438 * Data set initial instance <p/> 6. Action Creation Time
439 *
440 * @param n instance count <p/> domain: n is integer
441 * @return number of hours on that day <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS
442 * @throws Exception
443 */
444 public static int ph2_coord_hoursInDay(int n) throws Exception {
445 int datasetFrequency = (int) getDSFrequency();
446 // /Calendar nominalInstanceCal =
447 // getCurrentInstance(getActionCreationtime());
448 Calendar nominalInstanceCal = getEffectiveNominalTime();
449 if (nominalInstanceCal == null) {
450 return -1;
451 }
452 nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n);
453 /*
454 * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0)
455 * { return -1; }
456 */
457 nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ
458 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
459 return DateUtils.hoursInDay(nominalInstanceCal);
460 }
461
462 public static int ph3_coord_hoursInDay(int n) throws Exception {
463 return ph2_coord_hoursInDay(n);
464 }
465
466 /**
467 * Calculate number of days in one month for n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency .
468 * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5.
469 * Data set initial instance <p/> 6. Action Creation Time
470 *
471 * @param n instance count. domain: n is integer
472 * @return number of days in that month <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS
473 * @throws Exception
474 */
475 public static int ph2_coord_daysInMonth(int n) throws Exception {
476 int datasetFrequency = (int) getDSFrequency();// in minutes
477 // Calendar nominalInstanceCal =
478 // getCurrentInstance(getActionCreationtime());
479 Calendar nominalInstanceCal = getEffectiveNominalTime();
480 if (nominalInstanceCal == null) {
481 return -1;
482 }
483 nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n);
484 /*
485 * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0)
486 * { return -1; }
487 */
488 nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ
489 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
490 return nominalInstanceCal.getActualMaximum(Calendar.DAY_OF_MONTH);
491 }
492
493 public static int ph3_coord_daysInMonth(int n) throws Exception {
494 return ph2_coord_daysInMonth(n);
495 }
496
497 /**
498 * Determine the date-time in UTC of n-th latest available dataset instance. <p/> It depends on: <p/> 1. Data set
499 * frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month
500 * flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of dataset's directory
501 *
502 * @param n :instance count <p/> domain: n > 0, n is integer
503 * @return date-time in UTC of the n-th instance <p/> returns 'null' means n-th instance is earlier than
504 * Initial-Instance of DS
505 * @throws Exception
506 */
507 public static String ph3_coord_latest(int n) throws Exception {
508 if (n > 0) {
509 throw new IllegalArgumentException("paramter should be <= 0 but it is " + n);
510 }
511 if (isSyncDataSet()) {// For Sync Dataset
512 return coord_latest_sync(n);
513 }
514 else {
515 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
516 }
517 }
518
519 /**
520 * Configure an evaluator with data set and application specific information. <p/> Helper method of associating
521 * dataset and application object
522 *
523 * @param evaluator : to set variables
524 * @param ds : Data Set object
525 * @param coordAction : Application instance
526 */
527 public static void configureEvaluator(ELEvaluator evaluator, SyncCoordDataset ds, SyncCoordAction coordAction) {
528 evaluator.setVariable(COORD_ACTION, coordAction);
529 evaluator.setVariable(DATASET, ds);
530 }
531
532 /**
533 * Helper method to wrap around with "${..}". <p/>
534 *
535 *
536 * @param eval :EL evaluator
537 * @param expr : expression to evaluate
538 * @return Resolved expression or echo back the same expression
539 * @throws Exception
540 */
541 public static String evalAndWrap(ELEvaluator eval, String expr) throws Exception {
542 try {
543 eval.setVariable(".wrap", null);
544 String result = eval.evaluate(expr, String.class);
545 if (eval.getVariable(".wrap") != null) {
546 return "${" + result + "}";
547 }
548 else {
549 return result;
550 }
551 }
552 catch (Exception e) {
553 throw new Exception("Unable to evaluate :" + expr + ":\n", e);
554 }
555 }
556
557 // Set of echo functions
558
559 public static String ph1_coord_current_echo(String n) {
560 return echoUnResolved("current", n);
561 }
562
563 public static String ph2_coord_current_echo(String n) {
564 return echoUnResolved("current", n);
565 }
566
567 public static String ph1_coord_dateOffset_echo(String n, String offset, String unit) {
568 return echoUnResolved("dateOffset", n + " , " + offset + " , " + unit);
569 }
570
571 public static String ph1_coord_formatTime_echo(String dateTime, String format) {
572 // Quote the dateTime value since it would contain a ':'.
573 return echoUnResolved("formatTime", "'"+dateTime+"'" + " , " + format);
574 }
575
576 public static String ph1_coord_latest_echo(String n) {
577 return echoUnResolved("latest", n);
578 }
579
580 public static String ph2_coord_latest_echo(String n) {
581 return ph1_coord_latest_echo(n);
582 }
583
584 public static String ph1_coord_future_echo(String n, String instance) {
585 return echoUnResolved("future", n + ", " + instance + "");
586 }
587
588 public static String ph2_coord_future_echo(String n, String instance) {
589 return ph1_coord_future_echo(n, instance);
590 }
591
592 public static String ph1_coord_dataIn_echo(String n) {
593 ELEvaluator eval = ELEvaluator.getCurrent();
594 String val = (String) eval.getVariable("oozie.dataname." + n);
595 if (val == null || val.equals("data-in") == false) {
596 XLog.getLog(CoordELFunctions.class).error("data_in_name " + n + " is not valid");
597 throw new RuntimeException("data_in_name " + n + " is not valid");
598 }
599 return echoUnResolved("dataIn", "'" + n + "'");
600 }
601
602 public static String ph1_coord_dataOut_echo(String n) {
603 ELEvaluator eval = ELEvaluator.getCurrent();
604 String val = (String) eval.getVariable("oozie.dataname." + n);
605 if (val == null || val.equals("data-out") == false) {
606 XLog.getLog(CoordELFunctions.class).error("data_out_name " + n + " is not valid");
607 throw new RuntimeException("data_out_name " + n + " is not valid");
608 }
609 return echoUnResolved("dataOut", "'" + n + "'");
610 }
611
612 public static String ph1_coord_nominalTime_echo() {
613 return echoUnResolved("nominalTime", "");
614 }
615
616 public static String ph1_coord_nominalTime_echo_wrap() {
617 // return "${coord:nominalTime()}"; // no resolution
618 return echoUnResolved("nominalTime", "");
619 }
620
621 public static String ph1_coord_nominalTime_echo_fixed() {
622 return "2009-03-06T010:00"; // Dummy resolution
623 }
624
625 public static String ph1_coord_actualTime_echo_wrap() {
626 // return "${coord:actualTime()}"; // no resolution
627 return echoUnResolved("actualTime", "");
628 }
629
630 public static String ph1_coord_actionId_echo() {
631 return echoUnResolved("actionId", "");
632 }
633
634 public static String ph1_coord_name_echo() {
635 return echoUnResolved("name", "");
636 }
637
638 // The following echo functions are not used in any phases yet
639 // They are here for future purpose.
640 public static String coord_minutes_echo(String n) {
641 return echoUnResolved("minutes", n);
642 }
643
644 public static String coord_hours_echo(String n) {
645 return echoUnResolved("hours", n);
646 }
647
648 public static String coord_days_echo(String n) {
649 return echoUnResolved("days", n);
650 }
651
652 public static String coord_endOfDay_echo(String n) {
653 return echoUnResolved("endOfDay", n);
654 }
655
656 public static String coord_months_echo(String n) {
657 return echoUnResolved("months", n);
658 }
659
660 public static String coord_endOfMonth_echo(String n) {
661 return echoUnResolved("endOfMonth", n);
662 }
663
664 public static String coord_actualTime_echo() {
665 return echoUnResolved("actualTime", "");
666 }
667
668 // This echo function will always return "24" for validation only.
669 // This evaluation ****should not**** replace the original XML
670 // Create a temporary string and validate the function
671 // This is **required** for evaluating an expression like
672 // coord:HoursInDay(0) + 3
673 // actual evaluation will happen in phase 2 or phase 3.
674 public static String ph1_coord_hoursInDay_echo(String n) {
675 return "24";
676 // return echoUnResolved("hoursInDay", n);
677 }
678
679 // This echo function will always return "30" for validation only.
680 // This evaluation ****should not**** replace the original XML
681 // Create a temporary string and validate the function
682 // This is **required** for evaluating an expression like
683 // coord:daysInMonth(0) + 3
684 // actual evaluation will happen in phase 2 or phase 3.
685 public static String ph1_coord_daysInMonth_echo(String n) {
686 // return echoUnResolved("daysInMonth", n);
687 return "30";
688 }
689
690 // This echo function will always return "3" for validation only.
691 // This evaluation ****should not**** replace the original XML
692 // Create a temporary string and validate the function
693 // This is **required** for evaluating an expression like coord:tzOffset + 2
694 // actual evaluation will happen in phase 2 or phase 3.
695 public static String ph1_coord_tzOffset_echo() {
696 // return echoUnResolved("tzOffset", "");
697 return "3";
698 }
699
700 // Local methods
701 /**
702 * @param n
703 * @return n-th instance Date-Time from current instance for data-set <p/> return empty string ("") if the
704 * Action_Creation_time or the n-th instance <p/> is earlier than the Initial_Instance of dataset.
705 * @throws Exception
706 */
707 private static String coord_current_sync(int n) throws Exception {
708 int datasetFrequency = getDSFrequency();// in minutes
709 TimeUnit dsTimeUnit = getDSTimeUnit();
710 int[] instCount = new int[1];// used as pass by ref
711 Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
712 if (nominalInstanceCal == null) {
713 XLog.getLog(CoordELFunctions.class)
714 .warn("If the initial instance of the dataset is later than the nominal time, an empty string is returned. This means that no data is available at the current-instance specified by the user and the user could try modifying his initial-instance to an earlier time.");
715 return "";
716 }
717 nominalInstanceCal = getInitialInstanceCal();
718 int absInstanceCount = instCount[0] + n;
719 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency * absInstanceCount);
720
721 if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) {
722 XLog.getLog(CoordELFunctions.class)
723 .warn("If the initial instance of the dataset is later than the current-instance specified, such as coord:current({0}) in this case, an empty string is returned. This means that no data is available at the current-instance specified by the user and the user could try modifying his initial-instance to an earlier time.", n);
724 return "";
725 }
726 String str = DateUtils.formatDateUTC(nominalInstanceCal);
727 return str;
728 }
729
730 /**
731 * @param offset
732 * @return n-th available latest instance Date-Time for SYNC data-set
733 * @throws Exception
734 */
735 private static String coord_latest_sync(int offset) throws Exception {
736 if (offset > 0) {
737 throw new RuntimeException("For latest there is no meaning " + "of positive instance. n should be <=0"
738 + offset);
739 }
740 ELEvaluator eval = ELEvaluator.getCurrent();
741 String retVal = "";
742 int datasetFrequency = (int) getDSFrequency();// in minutes
743 TimeUnit dsTimeUnit = getDSTimeUnit();
744 int[] instCount = new int[1];
745 Calendar nominalInstanceCal = getCurrentInstance(getActualTime(), instCount);
746 if (nominalInstanceCal != null) {
747 Calendar initInstance = getInitialInstanceCal();
748 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
749 if (ds == null) {
750 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
751 }
752 String uriTemplate = ds.getUriTemplate();
753 Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
754 if (conf == null) {
755 throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
756 }
757 int available = 0;
758 boolean resolved = false;
759 String user = ParamChecker
760 .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
761 String group = ParamChecker.notEmpty((String) eval.getVariable(OozieClient.GROUP_NAME),
762 OozieClient.GROUP_NAME);
763 String doneFlag = ds.getDoneFlag();
764 while (nominalInstanceCal.compareTo(initInstance) >= 0) {
765 ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
766 String uriPath = uriEval.evaluate(uriTemplate, String.class);
767 String pathWithDoneFlag = uriPath;
768 if (doneFlag.length() > 0) {
769 pathWithDoneFlag += "/" + doneFlag;
770 }
771 if (isPathAvailable(pathWithDoneFlag, user, group, conf)) {
772 XLog.getLog(CoordELFunctions.class).debug("Found latest(" + available + "): " + pathWithDoneFlag);
773 if (available == offset) {
774 XLog.getLog(CoordELFunctions.class).debug("Found Latest File: " + pathWithDoneFlag);
775 resolved = true;
776 retVal = DateUtils.formatDateUTC(nominalInstanceCal);
777 eval.setVariable("resolved_path", uriPath);
778 break;
779 }
780
781 available--;
782 }
783 // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(),
784 // -datasetFrequency);
785 nominalInstanceCal = (Calendar) initInstance.clone();
786 instCount[0]--;
787 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
788 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
789 }
790 if (!resolved) {
791 // return unchanged latest function with variable 'is_resolved'
792 // to 'false'
793 eval.setVariable("is_resolved", Boolean.FALSE);
794 retVal = "${coord:latest(" + offset + ")}";
795 }
796 else {
797 eval.setVariable("is_resolved", Boolean.TRUE);
798 }
799 }
800 else {// No feasible nominal time
801 eval.setVariable("is_resolved", Boolean.FALSE);
802 }
803 return retVal;
804 }
805
806 // TODO : Not an efficient way. In a loop environment, we could do something
807 // outside the loop
808 /**
809 * Check whether a URI path exists
810 *
811 * @param sPath
812 * @param conf
813 * @return
814 * @throws IOException
815 */
816
817 private static boolean isPathAvailable(String sPath, String user, String group, Configuration conf)
818 throws IOException, HadoopAccessorException {
819 // sPath += "/" + END_OF_OPERATION_INDICATOR_FILE;
820 Path path = new Path(sPath);
821 return Services.get().get(HadoopAccessorService.class).
822 createFileSystem(user, group, path.toUri(), conf).exists(path);
823 }
824
825 /**
826 * @param tm
827 * @return a new Evaluator to be used for URI-template evaluation
828 */
829 private static ELEvaluator getUriEvaluator(Calendar tm) {
830 ELEvaluator retEval = new ELEvaluator();
831 retEval.setVariable("YEAR", tm.get(Calendar.YEAR));
832 retEval.setVariable("MONTH", (tm.get(Calendar.MONTH) + 1) < 10 ? "0" + (tm.get(Calendar.MONTH) + 1) : (tm
833 .get(Calendar.MONTH) + 1));
834 retEval.setVariable("DAY", tm.get(Calendar.DAY_OF_MONTH) < 10 ? "0" + tm.get(Calendar.DAY_OF_MONTH) : tm
835 .get(Calendar.DAY_OF_MONTH));
836 retEval.setVariable("HOUR", tm.get(Calendar.HOUR_OF_DAY) < 10 ? "0" + tm.get(Calendar.HOUR_OF_DAY) : tm
837 .get(Calendar.HOUR_OF_DAY));
838 retEval.setVariable("MINUTE", tm.get(Calendar.MINUTE) < 10 ? "0" + tm.get(Calendar.MINUTE) : tm
839 .get(Calendar.MINUTE));
840 return retEval;
841 }
842
843 /**
844 * @return whether a data set is SYNCH or ASYNC
845 */
846 private static boolean isSyncDataSet() {
847 ELEvaluator eval = ELEvaluator.getCurrent();
848 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
849 if (ds == null) {
850 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
851 }
852 return ds.getType().equalsIgnoreCase("SYNC");
853 }
854
855 /**
856 * Check whether a function should be resolved.
857 *
858 * @param functionName
859 * @param n
860 * @return null if the functionName needs to be resolved otherwise return the calling function unresolved.
861 */
862 private static String checkIfResolved(String functionName, String n) {
863 ELEvaluator eval = ELEvaluator.getCurrent();
864 String replace = (String) eval.getVariable("resolve_" + functionName);
865 if (replace == null || (replace != null && replace.equalsIgnoreCase("false"))) { // Don't
866 // resolve
867 // return "${coord:" + functionName + "(" + n +")}"; //Unresolved
868 eval.setVariable(".wrap", "true");
869 return "coord:" + functionName + "(" + n + ")"; // Unresolved
870 }
871 return null; // Resolved it
872 }
873
874 private static String echoUnResolved(String functionName, String n) {
875 return echoUnResolvedPre(functionName, n, "coord:");
876 }
877
878 private static String echoUnResolvedPre(String functionName, String n, String prefix) {
879 ELEvaluator eval = ELEvaluator.getCurrent();
880 eval.setVariable(".wrap", "true");
881 return prefix + functionName + "(" + n + ")"; // Unresolved
882 }
883
884 /**
885 * @return the initial instance of a DataSet in DATE
886 */
887 private static Date getInitialInstance() {
888 return getInitialInstanceCal().getTime();
889 // return ds.getInitInstance();
890 }
891
892 /**
893 * @return the initial instance of a DataSet in Calendar
894 */
895 private static Calendar getInitialInstanceCal() {
896 ELEvaluator eval = ELEvaluator.getCurrent();
897 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
898 if (ds == null) {
899 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
900 }
901 Calendar effInitTS = Calendar.getInstance();
902 effInitTS.setTime(ds.getInitInstance());
903 effInitTS.setTimeZone(ds.getTimeZone());
904 // To adjust EOD/EOM
905 DateUtils.moveToEnd(effInitTS, getDSEndOfFlag());
906 return effInitTS;
907 // return ds.getInitInstance();
908 }
909
910 /**
911 * @return Nominal or action creation Time when all the dependencies of an application instance are met.
912 */
913 private static Date getActionCreationtime() {
914 ELEvaluator eval = ELEvaluator.getCurrent();
915 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
916 if (coordAction == null) {
917 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
918 }
919 return coordAction.getNominalTime();
920 }
921
922 /**
923 * @return Actual Time when all the dependencies of an application instance are met.
924 */
925 private static Date getActualTime() {
926 ELEvaluator eval = ELEvaluator.getCurrent();
927 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
928 if (coordAction == null) {
929 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
930 }
931 return coordAction.getActualTime();
932 }
933
934 /**
935 * @return TimeZone for the application or job.
936 */
937 private static TimeZone getJobTZ() {
938 ELEvaluator eval = ELEvaluator.getCurrent();
939 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
940 if (coordAction == null) {
941 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
942 }
943 return coordAction.getTimeZone();
944 }
945
946 /**
947 * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time)
948 *
949 * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
950 * the dataset.
951 */
952 private static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[]) {
953 Date datasetInitialInstance = getInitialInstance();
954 TimeUnit dsTimeUnit = getDSTimeUnit();
955 TimeZone dsTZ = getDatasetTZ();
956 // Convert Date to Calendar for corresponding TZ
957 Calendar current = Calendar.getInstance();
958 current.setTime(datasetInitialInstance);
959 current.setTimeZone(dsTZ);
960
961 Calendar calEffectiveTime = Calendar.getInstance();
962 calEffectiveTime.setTime(effectiveTime);
963 calEffectiveTime.setTimeZone(dsTZ);
964 instanceCount[0] = 0;
965 if (current.compareTo(calEffectiveTime) > 0) {
966 // Nominal Time < initial Instance
967 // TODO: getClass() call doesn't work from static method.
968 // XLog.getLog("CoordELFunction.class").warn("ACTION CREATED BEFORE INITIAL INSTACE "+
969 // current.getTime());
970 return null;
971 }
972 Calendar origCurrent = (Calendar) current.clone();
973 while (current.compareTo(calEffectiveTime) <= 0) {
974 current = (Calendar) origCurrent.clone();
975 instanceCount[0]++;
976 current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * getDSFrequency());
977 }
978 instanceCount[0]--;
979
980 current = (Calendar) origCurrent.clone();
981 current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * getDSFrequency());
982 return current;
983 }
984
985 private static Calendar getEffectiveNominalTime() {
986 Date datasetInitialInstance = getInitialInstance();
987 TimeZone dsTZ = getDatasetTZ();
988 // Convert Date to Calendar for corresponding TZ
989 Calendar current = Calendar.getInstance();
990 current.setTime(datasetInitialInstance);
991 current.setTimeZone(dsTZ);
992
993 Calendar calEffectiveTime = Calendar.getInstance();
994 calEffectiveTime.setTime(getActionCreationtime());
995 calEffectiveTime.setTimeZone(dsTZ);
996 if (current.compareTo(calEffectiveTime) > 0) {
997 // Nominal Time < initial Instance
998 // TODO: getClass() call doesn't work from static method.
999 // XLog.getLog("CoordELFunction.class").warn("ACTION CREATED BEFORE INITIAL INSTACE "+
1000 // current.getTime());
1001 return null;
1002 }
1003 return calEffectiveTime;
1004 }
1005
1006 /**
1007 * @return dataset frequency in minutes
1008 */
1009 private static int getDSFrequency() {
1010 ELEvaluator eval = ELEvaluator.getCurrent();
1011 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1012 if (ds == null) {
1013 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1014 }
1015 return ds.getFrequency();
1016 }
1017
1018 /**
1019 * @return dataset TimeUnit
1020 */
1021 private static TimeUnit getDSTimeUnit() {
1022 ELEvaluator eval = ELEvaluator.getCurrent();
1023 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1024 if (ds == null) {
1025 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1026 }
1027 return ds.getTimeUnit();
1028 }
1029
1030 /**
1031 * @return dataset TimeZone
1032 */
1033 private static TimeZone getDatasetTZ() {
1034 ELEvaluator eval = ELEvaluator.getCurrent();
1035 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1036 if (ds == null) {
1037 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1038 }
1039 return ds.getTimeZone();
1040 }
1041
1042 /**
1043 * @return dataset TimeUnit
1044 */
1045 private static TimeUnit getDSEndOfFlag() {
1046 ELEvaluator eval = ELEvaluator.getCurrent();
1047 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1048 if (ds == null) {
1049 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1050 }
1051 return ds.getEndOfDuration();// == null ? "": ds.getEndOfDuration();
1052 }
1053
1054 /**
1055 * Return the user that submitted the coordinator job.
1056 *
1057 * @return the user that submitted the coordinator job.
1058 */
1059 public static String coord_user() {
1060 ELEvaluator eval = ELEvaluator.getCurrent();
1061 return (String) eval.getVariable(OozieClient.USER_NAME);
1062 }
1063 }