001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.coord;
019    
020    import java.net.URI;
021    import java.util.ArrayList;
022    import java.util.Calendar;
023    import java.util.Date;
024    import java.util.List;
025    import java.util.TimeZone;
026    
027    import org.apache.hadoop.conf.Configuration;
028    import org.apache.oozie.client.OozieClient;
029    import org.apache.oozie.dependency.URIHandler.Context;
030    import org.apache.oozie.dependency.URIHandler;
031    import org.apache.oozie.util.DateUtils;
032    import org.apache.oozie.util.ELEvaluator;
033    import org.apache.oozie.util.ParamChecker;
034    import org.apache.oozie.util.XLog;
035    import org.apache.oozie.service.Services;
036    import org.apache.oozie.service.URIHandlerService;
037    
038    /**
039     * This class implements the EL function related to coordinator
040     */
041    
042    public class CoordELFunctions {
043        final public static String DATASET = "oozie.coord.el.dataset.bean";
044        final public static String COORD_ACTION = "oozie.coord.el.app.bean";
045        final public static String CONFIGURATION = "oozie.coord.el.conf";
046        final public static String LATEST_EL_USE_CURRENT_TIME = "oozie.service.ELService.latest-el.use-current-time";
047        // INSTANCE_SEPARATOR is used to separate multiple directories into one tag.
048        final public static String INSTANCE_SEPARATOR = "#";
049        final public static String DIR_SEPARATOR = ",";
050        // TODO: in next release, support flexibility
051        private static String END_OF_OPERATION_INDICATOR_FILE = "_SUCCESS";
052    
053        /**
054         * Used in defining the frequency in 'day' unit. <p/> domain: <code> val &gt; 0</code> and should be integer.
055         *
056         * @param val frequency in number of days.
057         * @return number of days and also set the frequency timeunit to "day"
058         */
059        public static int ph1_coord_days(int val) {
060            val = ParamChecker.checkGTZero(val, "n");
061            ELEvaluator eval = ELEvaluator.getCurrent();
062            eval.setVariable("timeunit", TimeUnit.DAY);
063            eval.setVariable("endOfDuration", TimeUnit.NONE);
064            return val;
065        }
066    
067        /**
068         * Used in defining the frequency in 'month' unit. <p/> domain: <code> val &gt; 0</code> and should be integer.
069         *
070         * @param val frequency in number of months.
071         * @return number of months and also set the frequency timeunit to "month"
072         */
073        public static int ph1_coord_months(int val) {
074            val = ParamChecker.checkGTZero(val, "n");
075            ELEvaluator eval = ELEvaluator.getCurrent();
076            eval.setVariable("timeunit", TimeUnit.MONTH);
077            eval.setVariable("endOfDuration", TimeUnit.NONE);
078            return val;
079        }
080    
081        /**
082         * Used in defining the frequency in 'hour' unit. <p/> parameter value domain: <code> val &gt; 0</code> and should
083         * be integer.
084         *
085         * @param val frequency in number of hours.
086         * @return number of minutes and also set the frequency timeunit to "minute"
087         */
088        public static int ph1_coord_hours(int val) {
089            val = ParamChecker.checkGTZero(val, "n");
090            ELEvaluator eval = ELEvaluator.getCurrent();
091            eval.setVariable("timeunit", TimeUnit.MINUTE);
092            eval.setVariable("endOfDuration", TimeUnit.NONE);
093            return val * 60;
094        }
095    
096        /**
097         * Used in defining the frequency in 'minute' unit. <p/> domain: <code> val &gt; 0</code> and should be integer.
098         *
099         * @param val frequency in number of minutes.
100         * @return number of minutes and also set the frequency timeunit to "minute"
101         */
102        public static int ph1_coord_minutes(int val) {
103            val = ParamChecker.checkGTZero(val, "n");
104            ELEvaluator eval = ELEvaluator.getCurrent();
105            eval.setVariable("timeunit", TimeUnit.MINUTE);
106            eval.setVariable("endOfDuration", TimeUnit.NONE);
107            return val;
108        }
109    
110        /**
111         * Used in defining the frequency in 'day' unit and specify the "end of day" property. <p/> Every instance will
112         * start at 00:00 hour of each day. <p/> domain: <code> val &gt; 0</code> and should be integer.
113         *
114         * @param val frequency in number of days.
115         * @return number of days and also set the frequency timeunit to "day" and end_of_duration flag to "day"
116         */
117        public static int ph1_coord_endOfDays(int val) {
118            val = ParamChecker.checkGTZero(val, "n");
119            ELEvaluator eval = ELEvaluator.getCurrent();
120            eval.setVariable("timeunit", TimeUnit.DAY);
121            eval.setVariable("endOfDuration", TimeUnit.END_OF_DAY);
122            return val;
123        }
124    
125        /**
126         * Used in defining the frequency in 'month' unit and specify the "end of month" property. <p/> Every instance will
127         * start at first day of each month at 00:00 hour. <p/> domain: <code> val &gt; 0</code> and should be integer.
128         *
129         * @param val: frequency in number of months.
130         * @return number of months and also set the frequency timeunit to "month" and end_of_duration flag to "month"
131         */
132        public static int ph1_coord_endOfMonths(int val) {
133            val = ParamChecker.checkGTZero(val, "n");
134            ELEvaluator eval = ELEvaluator.getCurrent();
135            eval.setVariable("timeunit", TimeUnit.MONTH);
136            eval.setVariable("endOfDuration", TimeUnit.END_OF_MONTH);
137            return val;
138        }
139    
140        /**
141         * Calculate the difference of timezone offset in minutes between dataset and coordinator job. <p/> Depends on: <p/>
142         * 1. Timezone of both dataset and job <p/> 2. Action creation Time
143         *
144         * @return difference in minutes (DataSet TZ Offset - Application TZ offset)
145         */
146        public static int ph2_coord_tzOffset() {
147            Date actionCreationTime = getActionCreationtime();
148            TimeZone dsTZ = ParamChecker.notNull(getDatasetTZ(), "DatasetTZ");
149            TimeZone jobTZ = ParamChecker.notNull(getJobTZ(), "JobTZ");
150            // Apply the TZ into Calendar object
151            Calendar dsTime = Calendar.getInstance(dsTZ);
152            dsTime.setTime(actionCreationTime);
153            Calendar jobTime = Calendar.getInstance(jobTZ);
154            jobTime.setTime(actionCreationTime);
155            return (dsTime.get(Calendar.ZONE_OFFSET) - jobTime.get(Calendar.ZONE_OFFSET)) / (1000 * 60);
156        }
157    
158        public static int ph3_coord_tzOffset() {
159            return ph2_coord_tzOffset();
160        }
161    
162        /**
163         * Returns the a date string while given a base date in 'strBaseDate',
164         * offset and unit (e.g. DAY, MONTH, HOUR, MINUTE, MONTH).
165         *
166         * @param strBaseDate -- base date
167         * @param offset -- any number
168         * @param unit -- DAY, MONTH, HOUR, MINUTE, MONTH
169         * @return date string
170         * @throws Exception
171         */
172        public static String ph2_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception {
173            Calendar baseCalDate = DateUtils.getCalendar(strBaseDate);
174            StringBuilder buffer = new StringBuilder();
175            baseCalDate.add(TimeUnit.valueOf(unit).getCalendarUnit(), offset);
176            buffer.append(DateUtils.formatDateOozieTZ(baseCalDate));
177            return buffer.toString();
178        }
179    
180        public static String ph3_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception {
181            return ph2_coord_dateOffset(strBaseDate, offset, unit);
182        }
183    
184        /**
185         * Determine the date-time in Oozie processing timezone of n-th future available dataset instance
186         * from nominal Time but not beyond the instance specified as 'instance.
187         * <p/>
188         * It depends on:
189         * <p/>
190         * 1. Data set frequency
191         * <p/>
192         * 2. Data set Time unit (day, month, minute)
193         * <p/>
194         * 3. Data set Time zone/DST
195         * <p/>
196         * 4. End Day/Month flag
197         * <p/>
198         * 5. Data set initial instance
199         * <p/>
200         * 6. Action Creation Time
201         * <p/>
202         * 7. Existence of dataset's directory
203         *
204         * @param n :instance count
205         *        <p/>
206         *        domain: n >= 0, n is integer
207         * @param instance: How many future instance it should check? value should
208         *        be >=0
209         * @return date-time in Oozie processing timezone of the n-th instance
210         *         <p/>
211         * @throws Exception
212         */
213        public static String ph3_coord_future(int n, int instance) throws Exception {
214            ParamChecker.checkGEZero(n, "future:n");
215            ParamChecker.checkGTZero(instance, "future:instance");
216            if (isSyncDataSet()) {// For Sync Dataset
217                return coord_future_sync(n, instance);
218            }
219            else {
220                throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
221            }
222        }
223    
224        /**
225         * Determine the date-time in Oozie processing timezone of the future available dataset instances
226         * from start to end offsets from nominal Time but not beyond the instance specified as 'instance'.
227         * <p/>
228         * It depends on:
229         * <p/>
230         * 1. Data set frequency
231         * <p/>
232         * 2. Data set Time unit (day, month, minute)
233         * <p/>
234         * 3. Data set Time zone/DST
235         * <p/>
236         * 4. End Day/Month flag
237         * <p/>
238         * 5. Data set initial instance
239         * <p/>
240         * 6. Action Creation Time
241         * <p/>
242         * 7. Existence of dataset's directory
243         *
244         * @param start : start instance offset
245         *        <p/>
246         *        domain: start >= 0, start is integer
247         * @param end : end instance offset
248         *        <p/>
249         *        domain: end >= 0, end is integer
250         * @param instance: How many future instance it should check? value should
251         *        be >=0
252         * @return date-time in Oozie processing timezone of the instances from start to end offsets
253         *        delimited by comma.
254         *         <p/>
255         * @throws Exception
256         */
257        public static String ph3_coord_futureRange(int start, int end, int instance) throws Exception {
258            ParamChecker.checkGEZero(start, "future:n");
259            ParamChecker.checkGEZero(end, "future:n");
260            ParamChecker.checkGTZero(instance, "future:instance");
261            if (isSyncDataSet()) {// For Sync Dataset
262                return coord_futureRange_sync(start, end, instance);
263            }
264            else {
265                throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
266            }
267        }
268    
269        private static String coord_future_sync(int n, int instance) throws Exception {
270            return coord_futureRange_sync(n, n, instance);
271        }
272    
273        private static String coord_futureRange_sync(int startOffset, int endOffset, int instance) throws Exception {
274            final XLog LOG = XLog.getLog(CoordELFunctions.class);
275            final Thread currentThread = Thread.currentThread();
276            ELEvaluator eval = ELEvaluator.getCurrent();
277            String retVal = "";
278            int datasetFrequency = (int) getDSFrequency();// in minutes
279            TimeUnit dsTimeUnit = getDSTimeUnit();
280            int[] instCount = new int[1];
281            Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
282            StringBuilder resolvedInstances = new StringBuilder();
283            StringBuilder resolvedURIPaths = new StringBuilder();
284            if (nominalInstanceCal != null) {
285                Calendar initInstance = getInitialInstanceCal();
286                nominalInstanceCal = (Calendar) initInstance.clone();
287                nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
288    
289                SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
290                if (ds == null) {
291                    throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
292                }
293                String uriTemplate = ds.getUriTemplate();
294                Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
295                if (conf == null) {
296                    throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
297                }
298                int available = 0, checkedInstance = 0;
299                boolean resolved = false;
300                String user = ParamChecker
301                        .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
302                String doneFlag = ds.getDoneFlag();
303                URIHandlerService uriService = Services.get().get(URIHandlerService.class);
304                URIHandler uriHandler = null;
305                Context uriContext = null;
306                try {
307                    while (instance >= checkedInstance && !currentThread.isInterrupted()) {
308                        ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
309                        String uriPath = uriEval.evaluate(uriTemplate, String.class);
310                        if (uriHandler == null) {
311                            URI uri = new URI(uriPath);
312                            uriHandler = uriService.getURIHandler(uri);
313                            uriContext = uriHandler.getContext(uri, conf, user);
314                        }
315                        String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
316                        if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) {
317                            if (available == endOffset) {
318                                LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag);
319                                resolved = true;
320                                resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
321                                resolvedURIPaths.append(uriPath);
322                                retVal = resolvedInstances.toString();
323                                eval.setVariable("resolved_path", resolvedURIPaths.toString());
324                                break;
325                            }
326                            else if (available >= startOffset) {
327                                LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag);
328                                resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(
329                                        INSTANCE_SEPARATOR);
330                                resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
331                            }
332                            available++;
333                        }
334                        // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency);
335                        nominalInstanceCal = (Calendar) initInstance.clone();
336                        instCount[0]++;
337                        nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
338                        checkedInstance++;
339                        // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
340                    }
341                }
342                finally {
343                    if (uriContext != null) {
344                        uriContext.destroy();
345                    }
346                }
347                if (!resolved) {
348                    // return unchanged future function with variable 'is_resolved'
349                    // to 'false'
350                    eval.setVariable("is_resolved", Boolean.FALSE);
351                    if (startOffset == endOffset) {
352                        retVal = "${coord:future(" + startOffset + ", " + instance + ")}";
353                    }
354                    else {
355                        retVal = "${coord:futureRange(" + startOffset + ", " + endOffset + ", " + instance + ")}";
356                    }
357                }
358                else {
359                    eval.setVariable("is_resolved", Boolean.TRUE);
360                }
361            }
362            else {// No feasible nominal time
363                eval.setVariable("is_resolved", Boolean.TRUE);
364                retVal = "";
365            }
366            return retVal;
367        }
368    
369        /**
370         * Return nominal time or Action Creation Time.
371         * <p/>
372         *
373         * @return coordinator action creation or materialization date time
374         * @throws Exception if unable to format the Date object to String
375         */
376        public static String ph2_coord_nominalTime() throws Exception {
377            ELEvaluator eval = ELEvaluator.getCurrent();
378            SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
379                    "Coordinator Action");
380            return DateUtils.formatDateOozieTZ(action.getNominalTime());
381        }
382    
383        public static String ph3_coord_nominalTime() throws Exception {
384            return ph2_coord_nominalTime();
385        }
386    
387        /**
388         * Convert from standard date-time formatting to a desired format.
389         * <p/>
390         * @param dateTimeStr - A timestamp in standard (ISO8601) format.
391         * @param format - A string representing the desired format.
392         * @return coordinator action creation or materialization date time
393         * @throws Exception if unable to format the Date object to String
394         */
395        public static String ph2_coord_formatTime(String dateTimeStr, String format)
396            throws Exception {
397            Date dateTime = DateUtils.parseDateOozieTZ(dateTimeStr);
398            return DateUtils.formatDateCustom(dateTime, format);
399        }
400    
401        public static String ph3_coord_formatTime(String dateTimeStr, String format)
402            throws Exception {
403            return ph2_coord_formatTime(dateTimeStr, format);
404        }
405    
406        /**
407         * Return Action Id. <p/>
408         *
409         * @return coordinator action Id
410         */
411        public static String ph2_coord_actionId() throws Exception {
412            ELEvaluator eval = ELEvaluator.getCurrent();
413            SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
414                    "Coordinator Action");
415            return action.getActionId();
416        }
417    
418        public static String ph3_coord_actionId() throws Exception {
419            return ph2_coord_actionId();
420        }
421    
422        /**
423         * Return Job Name. <p/>
424         *
425         * @return coordinator name
426         */
427        public static String ph2_coord_name() throws Exception {
428            ELEvaluator eval = ELEvaluator.getCurrent();
429            SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
430                    "Coordinator Action");
431            return action.getName();
432        }
433    
434        public static String ph3_coord_name() throws Exception {
435            return ph2_coord_name();
436        }
437    
438        /**
439         * Return Action Start time. <p/>
440         *
441         * @return coordinator action start time
442         * @throws Exception if unable to format the Date object to String
443         */
444        public static String ph2_coord_actualTime() throws Exception {
445            ELEvaluator eval = ELEvaluator.getCurrent();
446            SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
447            if (coordAction == null) {
448                throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
449            }
450            return DateUtils.formatDateOozieTZ(coordAction.getActualTime());
451        }
452    
453        public static String ph3_coord_actualTime() throws Exception {
454            return ph2_coord_actualTime();
455        }
456    
457        /**
458         * Used to specify a list of URI's that are used as input dir to the workflow job. <p/> Look for two evaluator-level
459         * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
460         * URI. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
461         * unresolved, this function will echo back the original function <p/> otherwise it sends the uris.
462         *
463         * @param dataInName : Datain name
464         * @return the list of URI's separated by INSTANCE_SEPARATOR <p/> if there are unresolved EL function (i.e. latest)
465         *         , echo back <p/> the function without resolving the function.
466         */
467        public static String ph3_coord_dataIn(String dataInName) {
468            String uris = "";
469            ELEvaluator eval = ELEvaluator.getCurrent();
470            uris = (String) eval.getVariable(".datain." + dataInName);
471            Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
472            if (unresolved != null && unresolved.booleanValue() == true) {
473                return "${coord:dataIn('" + dataInName + "')}";
474            }
475            return uris;
476        }
477    
478        /**
479         * Used to specify a list of URI's that are output dir of the workflow job. <p/> Look for one evaluator-level
480         * variable <p/> dataout.<DATAOUT_NAME> <p/> It defines the current list of URI. <p/> otherwise it sends the uris.
481         *
482         * @param dataOutName : Dataout name
483         * @return the list of URI's separated by INSTANCE_SEPARATOR
484         */
485        public static String ph3_coord_dataOut(String dataOutName) {
486            String uris = "";
487            ELEvaluator eval = ELEvaluator.getCurrent();
488            uris = (String) eval.getVariable(".dataout." + dataOutName);
489            return uris;
490        }
491    
492        /**
493         * Determine the date-time in Oozie processing timezone of n-th dataset instance. <p/> It depends on: <p/> 1.
494         * Data set frequency <p/> 2.
495         * Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. Data
496         * set initial instance <p/> 6. Action Creation Time
497         *
498         * @param n instance count domain: n is integer
499         * @return date-time in Oozie processing timezone of the n-th instance returns 'null' means n-th instance is
500         * earlier than Initial-Instance of DS
501         * @throws Exception
502         */
503        public static String ph2_coord_current(int n) throws Exception {
504            if (isSyncDataSet()) { // For Sync Dataset
505                return coord_current_sync(n);
506            }
507            else {
508                throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
509            }
510        }
511    
512        /**
513         * Determine the date-time in Oozie processing timezone of current dataset instances
514         * from start to end offsets from the nominal time. <p/> It depends
515         * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST
516         * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time
517         *
518         * @param start :start instance offset <p/> domain: start <= 0, start is integer
519         * @param end :end instance offset <p/> domain: end <= 0, end is integer
520         * @return date-time in Oozie processing timezone of the instances from start to end offsets
521         *        delimited by comma. <p/> If the current instance time of the dataset based on the Action Creation Time
522         *        is earlier than the Initial-Instance of DS an empty string is returned.
523         *        If an instance within the range is earlier than Initial-Instance of DS that instance is ignored
524         * @throws Exception
525         */
526        public static String ph2_coord_currentRange(int start, int end) throws Exception {
527            if (isSyncDataSet()) { // For Sync Dataset
528                return coord_currentRange_sync(start, end);
529            }
530            else {
531                throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
532            }
533        }
534    
535        /**
536         * Determine the date-time in Oozie processing timezone of the given offset from the dataset effective nominal time. <p/> It
537         * depends on: <p> 1. Data set frequency <p/> 2. Data set Time Unit <p/> 3. Data set Time zone/DST
538         * <p/> 4. Data set initial instance <p/> 5. Action Creation Time
539         *
540         * @param n offset amount (integer)
541         * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR")
542         * @return date-time in Oozie processing timezone of the given offset from the dataset effective nominal time
543         * @throws Exception if there was a problem formatting
544         */
545        public static String ph2_coord_offset(int n, String timeUnit) throws Exception {
546            if (isSyncDataSet()) { // For Sync Dataset
547                return coord_offset_sync(n, timeUnit);
548            }
549            else {
550                throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
551            }
552        }
553    
554        /**
555         * Determine how many hours is on the date of n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency
556         * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5.
557         * Data set initial instance <p/> 6. Action Creation Time
558         *
559         * @param n instance count <p/> domain: n is integer
560         * @return number of hours on that day <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS
561         * @throws Exception
562         */
563        public static int ph2_coord_hoursInDay(int n) throws Exception {
564            int datasetFrequency = (int) getDSFrequency();
565            // /Calendar nominalInstanceCal =
566            // getCurrentInstance(getActionCreationtime());
567            Calendar nominalInstanceCal = getEffectiveNominalTime();
568            if (nominalInstanceCal == null) {
569                return -1;
570            }
571            nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n);
572            /*
573             * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0)
574             * { return -1; }
575             */
576            nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ
577            // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
578            return DateUtils.hoursInDay(nominalInstanceCal);
579        }
580    
581        public static int ph3_coord_hoursInDay(int n) throws Exception {
582            return ph2_coord_hoursInDay(n);
583        }
584    
585        /**
586         * Calculate number of days in one month for n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency .
587         * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5.
588         * Data set initial instance <p/> 6. Action Creation Time
589         *
590         * @param n instance count. domain: n is integer
591         * @return number of days in that month <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS
592         * @throws Exception
593         */
594        public static int ph2_coord_daysInMonth(int n) throws Exception {
595            int datasetFrequency = (int) getDSFrequency();// in minutes
596            // Calendar nominalInstanceCal =
597            // getCurrentInstance(getActionCreationtime());
598            Calendar nominalInstanceCal = getEffectiveNominalTime();
599            if (nominalInstanceCal == null) {
600                return -1;
601            }
602            nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n);
603            /*
604             * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0)
605             * { return -1; }
606             */
607            nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ
608            // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
609            return nominalInstanceCal.getActualMaximum(Calendar.DAY_OF_MONTH);
610        }
611    
612        public static int ph3_coord_daysInMonth(int n) throws Exception {
613            return ph2_coord_daysInMonth(n);
614        }
615    
616        /**
617         * Determine the date-time in Oozie processing timezone of n-th latest available dataset instance. <p/> It depends
618         * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST
619         * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of
620         * dataset's directory
621         *
622         * @param n :instance count <p/> domain: n <= 0, n is integer
623         * @return date-time in Oozie processing timezone of the n-th instance <p/> returns 'null' means n-th instance is
624         * earlier than Initial-Instance of DS
625         * @throws Exception
626         */
627        public static String ph3_coord_latest(int n) throws Exception {
628            ParamChecker.checkLEZero(n, "latest:n");
629            if (isSyncDataSet()) {// For Sync Dataset
630                return coord_latest_sync(n);
631            }
632            else {
633                throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
634            }
635        }
636    
637        /**
638         * Determine the date-time in Oozie processing timezone of latest available dataset instances
639         * from start to end offsets from the nominal time. <p/> It depends
640         * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST
641         * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of
642         * dataset's directory
643         *
644         * @param start :start instance offset <p/> domain: start <= 0, start is integer
645         * @param end :end instance offset <p/> domain: end <= 0, end is integer
646         * @return date-time in Oozie processing timezone of the instances from start to end offsets
647         *        delimited by comma. <p/> returns 'null' means start offset instance is
648         *        earlier than Initial-Instance of DS
649         * @throws Exception
650         */
651        public static String ph3_coord_latestRange(int start, int end) throws Exception {
652            ParamChecker.checkLEZero(start, "latest:n");
653            ParamChecker.checkLEZero(end, "latest:n");
654            if (isSyncDataSet()) {// For Sync Dataset
655                return coord_latestRange_sync(start, end);
656            }
657            else {
658                throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
659            }
660        }
661    
662        /**
663         * Configure an evaluator with data set and application specific information. <p/> Helper method of associating
664         * dataset and application object
665         *
666         * @param evaluator : to set variables
667         * @param ds : Data Set object
668         * @param coordAction : Application instance
669         */
670        public static void configureEvaluator(ELEvaluator evaluator, SyncCoordDataset ds, SyncCoordAction coordAction) {
671            evaluator.setVariable(COORD_ACTION, coordAction);
672            evaluator.setVariable(DATASET, ds);
673        }
674    
675        /**
676         * Helper method to wrap around with "${..}". <p/>
677         *
678         *
679         * @param eval :EL evaluator
680         * @param expr : expression to evaluate
681         * @return Resolved expression or echo back the same expression
682         * @throws Exception
683         */
684        public static String evalAndWrap(ELEvaluator eval, String expr) throws Exception {
685            try {
686                eval.setVariable(".wrap", null);
687                String result = eval.evaluate(expr, String.class);
688                if (eval.getVariable(".wrap") != null) {
689                    return "${" + result + "}";
690                }
691                else {
692                    return result;
693                }
694            }
695            catch (Exception e) {
696                throw new Exception("Unable to evaluate :" + expr + ":\n", e);
697            }
698        }
699    
700        // Set of echo functions
701    
702        public static String ph1_coord_current_echo(String n) {
703            return echoUnResolved("current", n);
704        }
705    
706        public static String ph1_coord_currentRange_echo(String start, String end) {
707            return echoUnResolved("currentRange", start + ", " + end);
708        }
709    
710        public static String ph1_coord_offset_echo(String n, String timeUnit) {
711            return echoUnResolved("offset", n + " , " + timeUnit);
712        }
713    
714        public static String ph2_coord_current_echo(String n) {
715            return echoUnResolved("current", n);
716        }
717    
718        public static String ph2_coord_currentRange_echo(String start, String end) {
719            return echoUnResolved("currentRange", start + ", " + end);
720        }
721    
722        public static String ph2_coord_offset_echo(String n, String timeUnit) {
723            return echoUnResolved("offset", n + " , " + timeUnit);
724        }
725    
726        public static String ph1_coord_dateOffset_echo(String n, String offset, String unit) {
727            return echoUnResolved("dateOffset", n + " , " + offset + " , " + unit);
728        }
729    
730        public static String ph1_coord_formatTime_echo(String dateTime, String format) {
731            // Quote the dateTime value since it would contain a ':'.
732            return echoUnResolved("formatTime", "'"+dateTime+"'" + " , " + format);
733        }
734    
735        public static String ph1_coord_latest_echo(String n) {
736            return echoUnResolved("latest", n);
737        }
738    
739        public static String ph2_coord_latest_echo(String n) {
740            return ph1_coord_latest_echo(n);
741        }
742    
743        public static String ph1_coord_future_echo(String n, String instance) {
744            return echoUnResolved("future", n + ", " + instance + "");
745        }
746    
747        public static String ph2_coord_future_echo(String n, String instance) {
748            return ph1_coord_future_echo(n, instance);
749        }
750    
751        public static String ph1_coord_latestRange_echo(String start, String end) {
752            return echoUnResolved("latestRange", start + ", " + end);
753        }
754    
755        public static String ph2_coord_latestRange_echo(String start, String end) {
756            return ph1_coord_latestRange_echo(start, end);
757        }
758    
759        public static String ph1_coord_futureRange_echo(String start, String end, String instance) {
760            return echoUnResolved("futureRange", start + ", " + end + ", " + instance);
761        }
762    
763        public static String ph2_coord_futureRange_echo(String start, String end, String instance) {
764            return ph1_coord_futureRange_echo(start, end, instance);
765        }
766    
767        public static String ph1_coord_dataIn_echo(String n) {
768            ELEvaluator eval = ELEvaluator.getCurrent();
769            String val = (String) eval.getVariable("oozie.dataname." + n);
770            if (val == null || val.equals("data-in") == false) {
771                XLog.getLog(CoordELFunctions.class).error("data_in_name " + n + " is not valid");
772                throw new RuntimeException("data_in_name " + n + " is not valid");
773            }
774            return echoUnResolved("dataIn", "'" + n + "'");
775        }
776    
777        public static String ph1_coord_dataOut_echo(String n) {
778            ELEvaluator eval = ELEvaluator.getCurrent();
779            String val = (String) eval.getVariable("oozie.dataname." + n);
780            if (val == null || val.equals("data-out") == false) {
781                XLog.getLog(CoordELFunctions.class).error("data_out_name " + n + " is not valid");
782                throw new RuntimeException("data_out_name " + n + " is not valid");
783            }
784            return echoUnResolved("dataOut", "'" + n + "'");
785        }
786    
787        public static String ph1_coord_nominalTime_echo() {
788            return echoUnResolved("nominalTime", "");
789        }
790    
791        public static String ph1_coord_nominalTime_echo_wrap() {
792            // return "${coord:nominalTime()}"; // no resolution
793            return echoUnResolved("nominalTime", "");
794        }
795    
796        public static String ph1_coord_nominalTime_echo_fixed() {
797            return "2009-03-06T010:00"; // Dummy resolution
798        }
799    
800        public static String ph1_coord_actualTime_echo_wrap() {
801            // return "${coord:actualTime()}"; // no resolution
802            return echoUnResolved("actualTime", "");
803        }
804    
805        public static String ph1_coord_actionId_echo() {
806            return echoUnResolved("actionId", "");
807        }
808    
809        public static String ph1_coord_name_echo() {
810            return echoUnResolved("name", "");
811        }
812    
813        // The following echo functions are not used in any phases yet
814        // They are here for future purpose.
815        public static String coord_minutes_echo(String n) {
816            return echoUnResolved("minutes", n);
817        }
818    
819        public static String coord_hours_echo(String n) {
820            return echoUnResolved("hours", n);
821        }
822    
823        public static String coord_days_echo(String n) {
824            return echoUnResolved("days", n);
825        }
826    
827        public static String coord_endOfDay_echo(String n) {
828            return echoUnResolved("endOfDay", n);
829        }
830    
831        public static String coord_months_echo(String n) {
832            return echoUnResolved("months", n);
833        }
834    
835        public static String coord_endOfMonth_echo(String n) {
836            return echoUnResolved("endOfMonth", n);
837        }
838    
839        public static String coord_actualTime_echo() {
840            return echoUnResolved("actualTime", "");
841        }
842    
843        // This echo function will always return "24" for validation only.
844        // This evaluation ****should not**** replace the original XML
845        // Create a temporary string and validate the function
846        // This is **required** for evaluating an expression like
847        // coord:HoursInDay(0) + 3
848        // actual evaluation will happen in phase 2 or phase 3.
849        public static String ph1_coord_hoursInDay_echo(String n) {
850            return "24";
851            // return echoUnResolved("hoursInDay", n);
852        }
853    
854        // This echo function will always return "30" for validation only.
855        // This evaluation ****should not**** replace the original XML
856        // Create a temporary string and validate the function
857        // This is **required** for evaluating an expression like
858        // coord:daysInMonth(0) + 3
859        // actual evaluation will happen in phase 2 or phase 3.
860        public static String ph1_coord_daysInMonth_echo(String n) {
861            // return echoUnResolved("daysInMonth", n);
862            return "30";
863        }
864    
865        // This echo function will always return "3" for validation only.
866        // This evaluation ****should not**** replace the original XML
867        // Create a temporary string and validate the function
868        // This is **required** for evaluating an expression like coord:tzOffset + 2
869        // actual evaluation will happen in phase 2 or phase 3.
870        public static String ph1_coord_tzOffset_echo() {
871            // return echoUnResolved("tzOffset", "");
872            return "3";
873        }
874    
875        // Local methods
876        /**
877         * @param n
878         * @return n-th instance Date-Time from current instance for data-set <p/> return empty string ("") if the
879         *         Action_Creation_time or the n-th instance <p/> is earlier than the Initial_Instance of dataset.
880         * @throws Exception
881         */
882        private static String coord_current_sync(int n) throws Exception {
883            return coord_currentRange_sync(n, n);
884        }
885    
886        private static String coord_currentRange_sync(int start, int end) throws Exception {
887            final XLog LOG = XLog.getLog(CoordELFunctions.class);
888            int datasetFrequency = getDSFrequency();// in minutes
889            TimeUnit dsTimeUnit = getDSTimeUnit();
890            int[] instCount = new int[1];// used as pass by ref
891            Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
892            StringBuilder instanceList = new StringBuilder();
893            if (nominalInstanceCal == null) {
894                LOG.warn("If the initial instance of the dataset is later than the nominal time, an empty string is"
895                        + " returned. This means that no data is available at the current-instance specified by the user"
896                        + " and the user could try modifying his initial-instance to an earlier time.");
897                return "";
898            } else {
899                Calendar initInstance = getInitialInstanceCal();
900                instCount[0] = instCount[0] + end;
901                // Add in the reverse order - newest instance first.
902                for (int i = end; i >= start; i--) {
903                    // Tried to avoid the clone. But subtracting datasetFrequency gives different results than multiplying
904                    // and Spring DST transition test in TestCoordELfunctions.testCurrent() fails
905                    //nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), -datasetFrequency);
906                    nominalInstanceCal = (Calendar) initInstance.clone();
907                    nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
908                    instCount[0]--;
909                    if (nominalInstanceCal.compareTo(initInstance) < 0) {
910                        LOG.warn("If the initial instance of the dataset is later than the current-instance specified,"
911                                + " such as coord:current({0}) in this case, an empty string is returned. This means that"
912                                + " no data is available at the current-instance specified by the user and the user could"
913                                + " try modifying his initial-instance to an earlier time.", start);
914                        break;
915                    }
916                    else {
917                        instanceList.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
918                        instanceList.append(CoordELFunctions.INSTANCE_SEPARATOR);
919                    }
920                }
921            }
922    
923            if (instanceList.length() > 0) {
924                instanceList.setLength(instanceList.length() - CoordELFunctions.INSTANCE_SEPARATOR.length());
925            }
926            return instanceList.toString();
927        }
928    
929        /**
930         *
931         * @param n offset amount (integer)
932         * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR")
933         * @return the offset time from the effective nominal time <p/> return empty string ("") if the Action_Creation_time or the
934         *         offset instance <p/> is earlier than the Initial_Instance of dataset.
935         * @throws Exception
936         */
937        private static String coord_offset_sync(int n, String timeUnit) throws Exception {
938            Calendar rawCal = resolveOffsetRawTime(n, TimeUnit.valueOf(timeUnit), null);
939            if (rawCal == null) {
940                // warning already logged by resolveOffsetRawTime()
941                return "";
942            }
943    
944            int freq = getDSFrequency();
945            TimeUnit freqUnit = getDSTimeUnit();
946            int freqCount = 0;
947            // We're going to manually turn back/forward cal by decrements/increments of freq and then check that it gives the same
948            // time as rawCal; this is to check that the offset time resolves to a frequency offset of the effective nominal time
949            // In other words, that there exists an integer x, such that coord:offset(n, timeUnit) == coord:current(x) is true
950            // If not, then we'll "rewind" rawCal to the latest instance earlier than rawCal and use that.
951            Calendar cal = getInitialInstanceCal();
952            if (rawCal.before(cal)) {
953                while (cal.after(rawCal)) {
954                    cal.add(freqUnit.getCalendarUnit(), -freq);
955                    freqCount--;
956                }
957            }
958            else if (rawCal.after(cal)) {
959                while (cal.before(rawCal)) {
960                    cal.add(freqUnit.getCalendarUnit(), freq);
961                    freqCount++;
962                }
963            }
964            if (cal.before(rawCal)) {
965                rawCal = cal;
966            }
967            else if (cal.after(rawCal)) {
968                cal.add(freqUnit.getCalendarUnit(), -freq);
969                rawCal = cal;
970                freqCount--;
971            }
972            String rawCalStr = DateUtils.formatDateOozieTZ(rawCal);
973    
974            Calendar nominalInstanceCal = getInitialInstanceCal();
975            nominalInstanceCal.add(freqUnit.getCalendarUnit(), freq * freqCount);
976            if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) {
977                XLog.getLog(CoordELFunctions.class).warn("If the initial instance of the dataset is later than the offset instance"
978                        + " specified, such as coord:offset({0}, {1}) in this case, an empty string is returned. This means that no"
979                        + " data is available at the offset instance specified by the user and the user could try modifying his"
980                        + " initial-instance to an earlier time.", n, timeUnit);
981                return "";
982            }
983            String nominalCalStr = DateUtils.formatDateOozieTZ(nominalInstanceCal);
984    
985            if (!rawCalStr.equals(nominalCalStr)) {
986                throw new RuntimeException("Shouldn't happen");
987            }
988            return rawCalStr;
989        }
990    
991        /**
992         * @param offset
993         * @return n-th available latest instance Date-Time for SYNC data-set
994         * @throws Exception
995         */
996        private static String coord_latest_sync(int offset) throws Exception {
997            return coord_latestRange_sync(offset, offset);
998        }
999    
1000        private static String coord_latestRange_sync(int startOffset, int endOffset) throws Exception {
1001            final XLog LOG = XLog.getLog(CoordELFunctions.class);
1002            final Thread currentThread = Thread.currentThread();
1003            ELEvaluator eval = ELEvaluator.getCurrent();
1004            String retVal = "";
1005            int datasetFrequency = (int) getDSFrequency();// in minutes
1006            TimeUnit dsTimeUnit = getDSTimeUnit();
1007            int[] instCount = new int[1];
1008            boolean useCurrentTime = Services.get().getConf().getBoolean(LATEST_EL_USE_CURRENT_TIME, false);
1009            Calendar nominalInstanceCal;
1010            if (useCurrentTime) {
1011                nominalInstanceCal = getCurrentInstance(new Date(), instCount);
1012            }
1013            else {
1014                nominalInstanceCal = getCurrentInstance(getActualTime(), instCount);
1015            }
1016            StringBuilder resolvedInstances = new StringBuilder();
1017            StringBuilder resolvedURIPaths = new StringBuilder();
1018            if (nominalInstanceCal != null) {
1019                Calendar initInstance = getInitialInstanceCal();
1020                SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1021                if (ds == null) {
1022                    throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1023                }
1024                String uriTemplate = ds.getUriTemplate();
1025                Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
1026                if (conf == null) {
1027                    throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
1028                }
1029                int available = 0;
1030                boolean resolved = false;
1031                String user = ParamChecker
1032                        .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
1033                String doneFlag = ds.getDoneFlag();
1034                URIHandlerService uriService = Services.get().get(URIHandlerService.class);
1035                URIHandler uriHandler = null;
1036                Context uriContext = null;
1037                try {
1038                    while (nominalInstanceCal.compareTo(initInstance) >= 0 && !currentThread.isInterrupted()) {
1039                        ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
1040                        String uriPath = uriEval.evaluate(uriTemplate, String.class);
1041                        if (uriHandler == null) {
1042                            URI uri = new URI(uriPath);
1043                            uriHandler = uriService.getURIHandler(uri);
1044                            uriContext = uriHandler.getContext(uri, conf, user);
1045                        }
1046                        String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
1047                        if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) {
1048                            XLog.getLog(CoordELFunctions.class)
1049                                    .debug("Found latest(" + available + "): " + uriWithDoneFlag);
1050                            if (available == startOffset) {
1051                                LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag);
1052                                resolved = true;
1053                                resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
1054                                resolvedURIPaths.append(uriPath);
1055                                retVal = resolvedInstances.toString();
1056                                eval.setVariable("resolved_path", resolvedURIPaths.toString());
1057                                break;
1058                            }
1059                            else if (available <= endOffset) {
1060                                LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag);
1061                                resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(
1062                                        INSTANCE_SEPARATOR);
1063                                resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
1064                            }
1065    
1066                            available--;
1067                        }
1068                        // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), -datasetFrequency);
1069                        nominalInstanceCal = (Calendar) initInstance.clone();
1070                        instCount[0]--;
1071                        nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
1072                        // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
1073                    }
1074                }
1075                finally {
1076                    if (uriContext != null) {
1077                        uriContext.destroy();
1078                    }
1079                }
1080                if (!resolved) {
1081                    // return unchanged latest function with variable 'is_resolved'
1082                    // to 'false'
1083                    eval.setVariable("is_resolved", Boolean.FALSE);
1084                    if (startOffset == endOffset) {
1085                        retVal = "${coord:latest(" + startOffset + ")}";
1086                    }
1087                    else {
1088                        retVal = "${coord:latestRange(" + startOffset + "," + endOffset + ")}";
1089                    }
1090                }
1091                else {
1092                    eval.setVariable("is_resolved", Boolean.TRUE);
1093                }
1094            }
1095            else {// No feasible nominal time
1096                eval.setVariable("is_resolved", Boolean.FALSE);
1097            }
1098            return retVal;
1099        }
1100    
1101        /**
1102         * @param tm
1103         * @return a new Evaluator to be used for URI-template evaluation
1104         */
1105        private static ELEvaluator getUriEvaluator(Calendar tm) {
1106            tm.setTimeZone(DateUtils.getOozieProcessingTimeZone());
1107            ELEvaluator retEval = new ELEvaluator();
1108            retEval.setVariable("YEAR", tm.get(Calendar.YEAR));
1109            retEval.setVariable("MONTH", (tm.get(Calendar.MONTH) + 1) < 10 ? "0" + (tm.get(Calendar.MONTH) + 1) : (tm
1110                    .get(Calendar.MONTH) + 1));
1111            retEval.setVariable("DAY", tm.get(Calendar.DAY_OF_MONTH) < 10 ? "0" + tm.get(Calendar.DAY_OF_MONTH) : tm
1112                    .get(Calendar.DAY_OF_MONTH));
1113            retEval.setVariable("HOUR", tm.get(Calendar.HOUR_OF_DAY) < 10 ? "0" + tm.get(Calendar.HOUR_OF_DAY) : tm
1114                    .get(Calendar.HOUR_OF_DAY));
1115            retEval.setVariable("MINUTE", tm.get(Calendar.MINUTE) < 10 ? "0" + tm.get(Calendar.MINUTE) : tm
1116                    .get(Calendar.MINUTE));
1117            return retEval;
1118        }
1119    
1120        /**
1121         * @return whether a data set is SYNCH or ASYNC
1122         */
1123        private static boolean isSyncDataSet() {
1124            ELEvaluator eval = ELEvaluator.getCurrent();
1125            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1126            if (ds == null) {
1127                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1128            }
1129            return ds.getType().equalsIgnoreCase("SYNC");
1130        }
1131    
1132        /**
1133         * Check whether a function should be resolved.
1134         *
1135         * @param functionName
1136         * @param n
1137         * @return null if the functionName needs to be resolved otherwise return the calling function unresolved.
1138         */
1139        private static String checkIfResolved(String functionName, String n) {
1140            ELEvaluator eval = ELEvaluator.getCurrent();
1141            String replace = (String) eval.getVariable("resolve_" + functionName);
1142            if (replace == null || (replace != null && replace.equalsIgnoreCase("false"))) { // Don't
1143                // resolve
1144                // return "${coord:" + functionName + "(" + n +")}"; //Unresolved
1145                eval.setVariable(".wrap", "true");
1146                return "coord:" + functionName + "(" + n + ")"; // Unresolved
1147            }
1148            return null; // Resolved it
1149        }
1150    
1151        private static String echoUnResolved(String functionName, String n) {
1152            return echoUnResolvedPre(functionName, n, "coord:");
1153        }
1154    
1155        private static String echoUnResolvedPre(String functionName, String n, String prefix) {
1156            ELEvaluator eval = ELEvaluator.getCurrent();
1157            eval.setVariable(".wrap", "true");
1158            return prefix + functionName + "(" + n + ")"; // Unresolved
1159        }
1160    
1161        /**
1162         * @return the initial instance of a DataSet in DATE
1163         */
1164        private static Date getInitialInstance() {
1165            ELEvaluator eval = ELEvaluator.getCurrent();
1166            return getInitialInstance(eval);
1167        }
1168    
1169        /**
1170         * @return the initial instance of a DataSet in DATE
1171         */
1172        private static Date getInitialInstance(ELEvaluator eval) {
1173            return getInitialInstanceCal(eval).getTime();
1174            // return ds.getInitInstance();
1175        }
1176    
1177        /**
1178         * @return the initial instance of a DataSet in Calendar
1179         */
1180        private static Calendar getInitialInstanceCal() {
1181            ELEvaluator eval = ELEvaluator.getCurrent();
1182            return getInitialInstanceCal(eval);
1183        }
1184    
1185        /**
1186         * @return the initial instance of a DataSet in Calendar
1187         */
1188        private static Calendar getInitialInstanceCal(ELEvaluator eval) {
1189            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1190            if (ds == null) {
1191                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1192            }
1193            Calendar effInitTS = Calendar.getInstance();
1194            effInitTS.setTime(ds.getInitInstance());
1195            effInitTS.setTimeZone(ds.getTimeZone());
1196            // To adjust EOD/EOM
1197            DateUtils.moveToEnd(effInitTS, getDSEndOfFlag(eval));
1198            return effInitTS;
1199            // return ds.getInitInstance();
1200        }
1201    
1202        /**
1203         * @return Nominal or action creation Time when all the dependencies of an application instance are met.
1204         */
1205        private static Date getActionCreationtime() {
1206            ELEvaluator eval = ELEvaluator.getCurrent();
1207            return getActionCreationtime(eval);
1208        }
1209    
1210        /**
1211         * @return Nominal or action creation Time when all the dependencies of an application instance are met.
1212         */
1213        private static Date getActionCreationtime(ELEvaluator eval) {
1214            SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
1215            if (coordAction == null) {
1216                throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
1217            }
1218            return coordAction.getNominalTime();
1219        }
1220    
1221        /**
1222         * @return Actual Time when all the dependencies of an application instance are met.
1223         */
1224        private static Date getActualTime() {
1225            ELEvaluator eval = ELEvaluator.getCurrent();
1226            SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
1227            if (coordAction == null) {
1228                throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
1229            }
1230            return coordAction.getActualTime();
1231        }
1232    
1233        /**
1234         * @return TimeZone for the application or job.
1235         */
1236        private static TimeZone getJobTZ() {
1237            ELEvaluator eval = ELEvaluator.getCurrent();
1238            SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
1239            if (coordAction == null) {
1240                throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
1241            }
1242            return coordAction.getTimeZone();
1243        }
1244    
1245        /**
1246         * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time)
1247         *
1248         * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
1249         *         the dataset.
1250         */
1251        public static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[]) {
1252            ELEvaluator eval = ELEvaluator.getCurrent();
1253            return getCurrentInstance(effectiveTime, instanceCount, eval);
1254        }
1255    
1256        /**
1257         * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time)
1258         *
1259         * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
1260         *         the dataset.
1261         */
1262        private static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[], ELEvaluator eval) {
1263            Date datasetInitialInstance = getInitialInstance(eval);
1264            TimeUnit dsTimeUnit = getDSTimeUnit(eval);
1265            TimeZone dsTZ = getDatasetTZ(eval);
1266            int dsFreq = getDSFrequency(eval);
1267            // Convert Date to Calendar for corresponding TZ
1268            Calendar current = Calendar.getInstance();
1269            current.setTime(datasetInitialInstance);
1270            current.setTimeZone(dsTZ);
1271    
1272            Calendar calEffectiveTime = Calendar.getInstance();
1273            calEffectiveTime.setTime(effectiveTime);
1274            calEffectiveTime.setTimeZone(dsTZ);
1275            if (instanceCount == null) {    // caller doesn't care about this value
1276                instanceCount = new int[1];
1277            }
1278            instanceCount[0] = 0;
1279            if (current.compareTo(calEffectiveTime) > 0) {
1280                return null;
1281            }
1282            Calendar origCurrent = (Calendar) current.clone();
1283            while (current.compareTo(calEffectiveTime) <= 0) {
1284                current = (Calendar) origCurrent.clone();
1285                instanceCount[0]++;
1286                current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq);
1287            }
1288            instanceCount[0]--;
1289    
1290            current = (Calendar) origCurrent.clone();
1291            current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq);
1292            return current;
1293        }
1294    
1295        public static Calendar getEffectiveNominalTime() {
1296            Date datasetInitialInstance = getInitialInstance();
1297            TimeZone dsTZ = getDatasetTZ();
1298            // Convert Date to Calendar for corresponding TZ
1299            Calendar current = Calendar.getInstance();
1300            current.setTime(datasetInitialInstance);
1301            current.setTimeZone(dsTZ);
1302    
1303            Calendar calEffectiveTime = Calendar.getInstance();
1304            calEffectiveTime.setTime(getActionCreationtime());
1305            calEffectiveTime.setTimeZone(dsTZ);
1306            if (current.compareTo(calEffectiveTime) > 0) {
1307                // Nominal Time < initial Instance
1308                // TODO: getClass() call doesn't work from static method.
1309                // XLog.getLog("CoordELFunction.class").warn("ACTION CREATED BEFORE INITIAL INSTACE "+
1310                // current.getTime());
1311                return null;
1312            }
1313            return calEffectiveTime;
1314        }
1315    
1316        /**
1317         * @return dataset frequency in minutes
1318         */
1319        private static int getDSFrequency() {
1320            ELEvaluator eval = ELEvaluator.getCurrent();
1321            return getDSFrequency(eval);
1322        }
1323    
1324        /**
1325         * @return dataset frequency in minutes
1326         */
1327        private static int getDSFrequency(ELEvaluator eval) {
1328            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1329            if (ds == null) {
1330                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1331            }
1332            return ds.getFrequency();
1333        }
1334    
1335        /**
1336         * @return dataset TimeUnit
1337         */
1338        private static TimeUnit getDSTimeUnit() {
1339            ELEvaluator eval = ELEvaluator.getCurrent();
1340            return getDSTimeUnit(eval);
1341        }
1342    
1343        /**
1344         * @return dataset TimeUnit
1345         */
1346        private static TimeUnit getDSTimeUnit(ELEvaluator eval) {
1347            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1348            if (ds == null) {
1349                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1350            }
1351            return ds.getTimeUnit();
1352        }
1353    
1354        /**
1355         * @return dataset TimeZone
1356         */
1357        public static TimeZone getDatasetTZ() {
1358            ELEvaluator eval = ELEvaluator.getCurrent();
1359            return getDatasetTZ(eval);
1360        }
1361    
1362        /**
1363         * @return dataset TimeZone
1364         */
1365        private static TimeZone getDatasetTZ(ELEvaluator eval) {
1366            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1367            if (ds == null) {
1368                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1369            }
1370            return ds.getTimeZone();
1371        }
1372    
1373        /**
1374         * @return dataset TimeUnit
1375         */
1376        private static TimeUnit getDSEndOfFlag() {
1377            ELEvaluator eval = ELEvaluator.getCurrent();
1378            return getDSEndOfFlag(eval);
1379        }
1380    
1381        /**
1382         * @return dataset TimeUnit
1383         */
1384        private static TimeUnit getDSEndOfFlag(ELEvaluator eval) {
1385            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1386            if (ds == null) {
1387                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1388            }
1389            return ds.getEndOfDuration();// == null ? "": ds.getEndOfDuration();
1390        }
1391    
1392        /**
1393         * Return a job configuration property for the coordinator.
1394         *
1395         * @param property property name.
1396         * @return the value of the property, <code>null</code> if the property is undefined.
1397         */
1398        public static String coord_conf(String property) {
1399            ELEvaluator eval = ELEvaluator.getCurrent();
1400            return (String) eval.getVariable(property);
1401        }
1402    
1403        /**
1404         * Return the user that submitted the coordinator job.
1405         *
1406         * @return the user that submitted the coordinator job.
1407         */
1408        public static String coord_user() {
1409            ELEvaluator eval = ELEvaluator.getCurrent();
1410            return (String) eval.getVariable(OozieClient.USER_NAME);
1411        }
1412    
1413        /**
1414         * Takes two offset times and returns a list of multiples of the frequency offset from the effective nominal time that occur
1415         * between them.  The caller should make sure that startCal is earlier than endCal.
1416         * <p>
1417         * As a simple example, assume its the same day: startCal is 1:00, endCal is 2:00, frequency is 20min, and effective nominal
1418         * time is 1:20 -- then this method would return a list containing: -20, 0, 20, 40, 60
1419         *
1420         * @param startCal The earlier offset time
1421         * @param endCal The later offset time
1422         * @param eval The ELEvaluator to use; cannot be null
1423         * @return A list of multiple of the frequency offset from the effective nominal time that occur between the startCal and endCal
1424         */
1425        public static List<Integer> expandOffsetTimes(Calendar startCal, Calendar endCal, ELEvaluator eval) {
1426            List<Integer> expandedFreqs = new ArrayList<Integer>();
1427            // Use eval because the "current" eval isn't set
1428            int freq = getDSFrequency(eval);
1429            TimeUnit freqUnit = getDSTimeUnit(eval);
1430            Calendar cal = getCurrentInstance(getActionCreationtime(eval), null, eval);
1431            int totalFreq = 0;
1432            if (startCal.before(cal)) {
1433                while (cal.after(startCal)) {
1434                    cal.add(freqUnit.getCalendarUnit(), -freq);
1435                    totalFreq += -freq;
1436                }
1437                if (cal.before(startCal)) {
1438                    cal.add(freqUnit.getCalendarUnit(), freq);
1439                    totalFreq += freq;
1440                }
1441            }
1442            else if (startCal.after(cal)) {
1443                while (cal.before(startCal)) {
1444                    cal.add(freqUnit.getCalendarUnit(), freq);
1445                    totalFreq += freq;
1446                }
1447            }
1448            // At this point, cal is the smallest multiple of the dataset frequency that is >= to the startCal and offset from the
1449            // effective nominal time.  Now we can find all of the instances that occur between startCal and endCal, inclusive.
1450            while (cal.before(endCal) || cal.equals(endCal)) {
1451                expandedFreqs.add(totalFreq);
1452                cal.add(freqUnit.getCalendarUnit(), freq);
1453                totalFreq += freq;
1454            }
1455            return expandedFreqs;
1456        }
1457    
1458        /**
1459         * Resolve the offset time from the effective nominal time
1460         *
1461         * @param n offset amount (integer)
1462         * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR")
1463         * @param eval The ELEvaluator to use; or null to use the "current" eval
1464         * @return A Calendar of the offset time
1465         */
1466        public static Calendar resolveOffsetRawTime(int n, TimeUnit timeUnit, ELEvaluator eval) {
1467            // Use eval if given (for when the "current" eval isn't set)
1468            Calendar cal;
1469            if (eval == null) {
1470                cal = getCurrentInstance(getActionCreationtime(), null);
1471            }
1472            else {
1473                cal = getCurrentInstance(getActionCreationtime(eval), null, eval);
1474            }
1475            if (cal == null) {
1476                XLog.getLog(CoordELFunctions.class).warn("If the initial instance of the dataset is later than the nominal time, an"
1477                        + " empty string is returned. This means that no data is available at the offset instance specified by the user"
1478                        + " and the user could try modifying his or her initial-instance to an earlier time.");
1479                return null;
1480            }
1481            cal.add(timeUnit.getCalendarUnit(), n);
1482            return cal;
1483        }
1484    }