001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.coord;
019    
020    import java.io.IOException;
021    import java.util.ArrayList;
022    import java.util.Calendar;
023    import java.util.Date;
024    import java.util.List;
025    import java.util.TimeZone;
026    
027    import org.apache.hadoop.conf.Configuration;
028    import org.apache.hadoop.fs.Path;
029    
030    import org.apache.oozie.client.OozieClient;
031    import org.apache.oozie.util.DateUtils;
032    import org.apache.oozie.util.ELEvaluator;
033    import org.apache.oozie.util.ParamChecker;
034    import org.apache.oozie.util.XLog;
035    import org.apache.oozie.service.HadoopAccessorException;
036    import org.apache.oozie.service.Services;
037    import org.apache.oozie.service.HadoopAccessorService;
038    
039    /**
040     * This class implements the EL function related to coordinator
041     */
042    
043    public class CoordELFunctions {
044        final private static XLog LOG = XLog.getLog(CoordELFunctions.class);
045        final private static String DATASET = "oozie.coord.el.dataset.bean";
046        final private static String COORD_ACTION = "oozie.coord.el.app.bean";
047        final public static String CONFIGURATION = "oozie.coord.el.conf";
048        final public static String LATEST_EL_USE_CURRENT_TIME = "oozie.service.ELService.latest-el.use-current-time";
049        // INSTANCE_SEPARATOR is used to separate multiple directories into one tag.
050        final public static String INSTANCE_SEPARATOR = "#";
051        final public static String DIR_SEPARATOR = ",";
052        // TODO: in next release, support flexibility
053        private static String END_OF_OPERATION_INDICATOR_FILE = "_SUCCESS";
054    
055        /**
056         * Used in defining the frequency in 'day' unit. <p/> domain: <code> val &gt; 0</code> and should be integer.
057         *
058         * @param val frequency in number of days.
059         * @return number of days and also set the frequency timeunit to "day"
060         */
061        public static int ph1_coord_days(int val) {
062            val = ParamChecker.checkGTZero(val, "n");
063            ELEvaluator eval = ELEvaluator.getCurrent();
064            eval.setVariable("timeunit", TimeUnit.DAY);
065            eval.setVariable("endOfDuration", TimeUnit.NONE);
066            return val;
067        }
068    
069        /**
070         * Used in defining the frequency in 'month' unit. <p/> domain: <code> val &gt; 0</code> and should be integer.
071         *
072         * @param val frequency in number of months.
073         * @return number of months and also set the frequency timeunit to "month"
074         */
075        public static int ph1_coord_months(int val) {
076            val = ParamChecker.checkGTZero(val, "n");
077            ELEvaluator eval = ELEvaluator.getCurrent();
078            eval.setVariable("timeunit", TimeUnit.MONTH);
079            eval.setVariable("endOfDuration", TimeUnit.NONE);
080            return val;
081        }
082    
083        /**
084         * Used in defining the frequency in 'hour' unit. <p/> parameter value domain: <code> val &gt; 0</code> and should
085         * be integer.
086         *
087         * @param val frequency in number of hours.
088         * @return number of minutes and also set the frequency timeunit to "minute"
089         */
090        public static int ph1_coord_hours(int val) {
091            val = ParamChecker.checkGTZero(val, "n");
092            ELEvaluator eval = ELEvaluator.getCurrent();
093            eval.setVariable("timeunit", TimeUnit.MINUTE);
094            eval.setVariable("endOfDuration", TimeUnit.NONE);
095            return val * 60;
096        }
097    
098        /**
099         * Used in defining the frequency in 'minute' unit. <p/> domain: <code> val &gt; 0</code> and should be integer.
100         *
101         * @param val frequency in number of minutes.
102         * @return number of minutes and also set the frequency timeunit to "minute"
103         */
104        public static int ph1_coord_minutes(int val) {
105            val = ParamChecker.checkGTZero(val, "n");
106            ELEvaluator eval = ELEvaluator.getCurrent();
107            eval.setVariable("timeunit", TimeUnit.MINUTE);
108            eval.setVariable("endOfDuration", TimeUnit.NONE);
109            return val;
110        }
111    
112        /**
113         * Used in defining the frequency in 'day' unit and specify the "end of day" property. <p/> Every instance will
114         * start at 00:00 hour of each day. <p/> domain: <code> val &gt; 0</code> and should be integer.
115         *
116         * @param val frequency in number of days.
117         * @return number of days and also set the frequency timeunit to "day" and end_of_duration flag to "day"
118         */
119        public static int ph1_coord_endOfDays(int val) {
120            val = ParamChecker.checkGTZero(val, "n");
121            ELEvaluator eval = ELEvaluator.getCurrent();
122            eval.setVariable("timeunit", TimeUnit.DAY);
123            eval.setVariable("endOfDuration", TimeUnit.END_OF_DAY);
124            return val;
125        }
126    
127        /**
128         * Used in defining the frequency in 'month' unit and specify the "end of month" property. <p/> Every instance will
129         * start at first day of each month at 00:00 hour. <p/> domain: <code> val &gt; 0</code> and should be integer.
130         *
131         * @param val: frequency in number of months.
132         * @return number of months and also set the frequency timeunit to "month" and end_of_duration flag to "month"
133         */
134        public static int ph1_coord_endOfMonths(int val) {
135            val = ParamChecker.checkGTZero(val, "n");
136            ELEvaluator eval = ELEvaluator.getCurrent();
137            eval.setVariable("timeunit", TimeUnit.MONTH);
138            eval.setVariable("endOfDuration", TimeUnit.END_OF_MONTH);
139            return val;
140        }
141    
142        /**
143         * Calculate the difference of timezone offset in minutes between dataset and coordinator job. <p/> Depends on: <p/>
144         * 1. Timezone of both dataset and job <p/> 2. Action creation Time
145         *
146         * @return difference in minutes (DataSet TZ Offset - Application TZ offset)
147         */
148        public static int ph2_coord_tzOffset() {
149            Date actionCreationTime = getActionCreationtime();
150            TimeZone dsTZ = ParamChecker.notNull(getDatasetTZ(), "DatasetTZ");
151            TimeZone jobTZ = ParamChecker.notNull(getJobTZ(), "JobTZ");
152            // Apply the TZ into Calendar object
153            Calendar dsTime = Calendar.getInstance(dsTZ);
154            dsTime.setTime(actionCreationTime);
155            Calendar jobTime = Calendar.getInstance(jobTZ);
156            jobTime.setTime(actionCreationTime);
157            return (dsTime.get(Calendar.ZONE_OFFSET) - jobTime.get(Calendar.ZONE_OFFSET)) / (1000 * 60);
158        }
159    
160        public static int ph3_coord_tzOffset() {
161            return ph2_coord_tzOffset();
162        }
163    
164        /**
165         * Returns the a date string while given a base date in 'strBaseDate',
166         * offset and unit (e.g. DAY, MONTH, HOUR, MINUTE, MONTH).
167         *
168         * @param strBaseDate -- base date
169         * @param offset -- any number
170         * @param unit -- DAY, MONTH, HOUR, MINUTE, MONTH
171         * @return date string
172         * @throws Exception
173         */
174        public static String ph2_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception {
175            Calendar baseCalDate = DateUtils.getCalendar(strBaseDate);
176            StringBuilder buffer = new StringBuilder();
177            baseCalDate.add(TimeUnit.valueOf(unit).getCalendarUnit(), offset);
178            buffer.append(DateUtils.formatDateOozieTZ(baseCalDate));
179            return buffer.toString();
180        }
181    
182        public static String ph3_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception {
183            return ph2_coord_dateOffset(strBaseDate, offset, unit);
184        }
185    
186        /**
187         * Determine the date-time in Oozie processing timezone of n-th future available dataset instance
188         * from nominal Time but not beyond the instance specified as 'instance.
189         * <p/>
190         * It depends on:
191         * <p/>
192         * 1. Data set frequency
193         * <p/>
194         * 2. Data set Time unit (day, month, minute)
195         * <p/>
196         * 3. Data set Time zone/DST
197         * <p/>
198         * 4. End Day/Month flag
199         * <p/>
200         * 5. Data set initial instance
201         * <p/>
202         * 6. Action Creation Time
203         * <p/>
204         * 7. Existence of dataset's directory
205         *
206         * @param n :instance count
207         *        <p/>
208         *        domain: n >= 0, n is integer
209         * @param instance: How many future instance it should check? value should
210         *        be >=0
211         * @return date-time in Oozie processing timezone of the n-th instance
212         *         <p/>
213         * @throws Exception
214         */
215        public static String ph3_coord_future(int n, int instance) throws Exception {
216            ParamChecker.checkGEZero(n, "future:n");
217            ParamChecker.checkGTZero(instance, "future:instance");
218            if (isSyncDataSet()) {// For Sync Dataset
219                return coord_future_sync(n, instance);
220            }
221            else {
222                throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
223            }
224        }
225    
226        /**
227         * Determine the date-time in Oozie processing timezone of the future available dataset instances
228         * from start to end offsets from nominal Time but not beyond the instance specified as 'instance'.
229         * <p/>
230         * It depends on:
231         * <p/>
232         * 1. Data set frequency
233         * <p/>
234         * 2. Data set Time unit (day, month, minute)
235         * <p/>
236         * 3. Data set Time zone/DST
237         * <p/>
238         * 4. End Day/Month flag
239         * <p/>
240         * 5. Data set initial instance
241         * <p/>
242         * 6. Action Creation Time
243         * <p/>
244         * 7. Existence of dataset's directory
245         *
246         * @param start : start instance offset
247         *        <p/>
248         *        domain: start >= 0, start is integer
249         * @param end : end instance offset
250         *        <p/>
251         *        domain: end >= 0, end is integer
252         * @param instance: How many future instance it should check? value should
253         *        be >=0
254         * @return date-time in Oozie processing timezone of the instances from start to end offsets
255         *        delimited by comma.
256         *         <p/>
257         * @throws Exception
258         */
259        public static String ph3_coord_futureRange(int start, int end, int instance) throws Exception {
260            ParamChecker.checkGEZero(start, "future:n");
261            ParamChecker.checkGEZero(end, "future:n");
262            ParamChecker.checkGTZero(instance, "future:instance");
263            if (isSyncDataSet()) {// For Sync Dataset
264                return coord_futureRange_sync(start, end, instance);
265            }
266            else {
267                throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
268            }
269        }
270    
271        private static String coord_future_sync(int n, int instance) throws Exception {
272            return coord_futureRange_sync(n, n, instance);
273        }
274    
275        private static String coord_futureRange_sync(int startOffset, int endOffset, int instance) throws Exception {
276            ELEvaluator eval = ELEvaluator.getCurrent();
277            String retVal = "";
278            int datasetFrequency = (int) getDSFrequency();// in minutes
279            TimeUnit dsTimeUnit = getDSTimeUnit();
280            int[] instCount = new int[1];
281            Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
282            StringBuilder resolvedInstances = new StringBuilder();
283            StringBuilder resolvedURIPaths = new StringBuilder();
284            if (nominalInstanceCal != null) {
285                Calendar initInstance = getInitialInstanceCal();
286                nominalInstanceCal = (Calendar) initInstance.clone();
287                nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
288    
289                SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
290                if (ds == null) {
291                    throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
292                }
293                String uriTemplate = ds.getUriTemplate();
294                Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
295                if (conf == null) {
296                    throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
297                }
298                int available = 0, checkedInstance = 0;
299                boolean resolved = false;
300                String user = ParamChecker
301                        .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
302                String doneFlag = ds.getDoneFlag();
303                while (instance >= checkedInstance) {
304                    ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
305                    String uriPath = uriEval.evaluate(uriTemplate, String.class);
306                    String pathWithDoneFlag = uriPath;
307                    if (doneFlag.length() > 0) {
308                        pathWithDoneFlag += "/" + doneFlag;
309                    }
310                    if (isPathAvailable(pathWithDoneFlag, user, null, conf)) {
311                        LOG.debug("Found future(" + available + "): " + pathWithDoneFlag);
312                        if (available == endOffset) {
313                            LOG.debug("Matched future(" + available + "): " + pathWithDoneFlag);
314                            resolved = true;
315                            resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
316                            resolvedURIPaths.append(uriPath);
317                            retVal = resolvedInstances.toString();
318                            eval.setVariable("resolved_path", resolvedURIPaths.toString());
319                            break;
320                        } else if (available >= startOffset) {
321                            LOG.debug("Matched future(" + available + "): " + pathWithDoneFlag);
322                            resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(INSTANCE_SEPARATOR);
323                            resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
324                        }
325                        available++;
326                    }
327                    // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(),
328                    // -datasetFrequency);
329                    nominalInstanceCal = (Calendar) initInstance.clone();
330                    instCount[0]++;
331                    nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
332                    checkedInstance++;
333                    // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
334                }
335                if (!resolved) {
336                    // return unchanged future function with variable 'is_resolved'
337                    // to 'false'
338                    eval.setVariable("is_resolved", Boolean.FALSE);
339                    if (startOffset == endOffset) {
340                        retVal = "${coord:future(" + startOffset + ", " + instance + ")}";
341                    }
342                    else {
343                        retVal = "${coord:futureRange(" + startOffset + ", " + endOffset + ", " + instance + ")}";
344                    }
345                }
346                else {
347                    eval.setVariable("is_resolved", Boolean.TRUE);
348                }
349            }
350            else {// No feasible nominal time
351                eval.setVariable("is_resolved", Boolean.TRUE);
352                retVal = "";
353            }
354            return retVal;
355        }
356    
357        /**
358         * Return nominal time or Action Creation Time.
359         * <p/>
360         *
361         * @return coordinator action creation or materialization date time
362         * @throws Exception if unable to format the Date object to String
363         */
364        public static String ph2_coord_nominalTime() throws Exception {
365            ELEvaluator eval = ELEvaluator.getCurrent();
366            SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
367                    "Coordinator Action");
368            return DateUtils.formatDateOozieTZ(action.getNominalTime());
369        }
370    
371        public static String ph3_coord_nominalTime() throws Exception {
372            return ph2_coord_nominalTime();
373        }
374    
375        /**
376         * Convert from standard date-time formatting to a desired format.
377         * <p/>
378         * @param dateTimeStr - A timestamp in standard (ISO8601) format.
379         * @param format - A string representing the desired format.
380         * @return coordinator action creation or materialization date time
381         * @throws Exception if unable to format the Date object to String
382         */
383        public static String ph2_coord_formatTime(String dateTimeStr, String format)
384            throws Exception {
385            Date dateTime = DateUtils.parseDateOozieTZ(dateTimeStr);
386            return DateUtils.formatDateCustom(dateTime, format);
387        }
388    
389        public static String ph3_coord_formatTime(String dateTimeStr, String format)
390            throws Exception {
391            return ph2_coord_formatTime(dateTimeStr, format);
392        }
393    
394        /**
395         * Return Action Id. <p/>
396         *
397         * @return coordinator action Id
398         */
399        public static String ph2_coord_actionId() throws Exception {
400            ELEvaluator eval = ELEvaluator.getCurrent();
401            SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
402                    "Coordinator Action");
403            return action.getActionId();
404        }
405    
406        public static String ph3_coord_actionId() throws Exception {
407            return ph2_coord_actionId();
408        }
409    
410        /**
411         * Return Job Name. <p/>
412         *
413         * @return coordinator name
414         */
415        public static String ph2_coord_name() throws Exception {
416            ELEvaluator eval = ELEvaluator.getCurrent();
417            SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
418                    "Coordinator Action");
419            return action.getName();
420        }
421    
422        public static String ph3_coord_name() throws Exception {
423            return ph2_coord_name();
424        }
425    
426        /**
427         * Return Action Start time. <p/>
428         *
429         * @return coordinator action start time
430         * @throws Exception if unable to format the Date object to String
431         */
432        public static String ph2_coord_actualTime() throws Exception {
433            ELEvaluator eval = ELEvaluator.getCurrent();
434            SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
435            if (coordAction == null) {
436                throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
437            }
438            return DateUtils.formatDateOozieTZ(coordAction.getActualTime());
439        }
440    
441        public static String ph3_coord_actualTime() throws Exception {
442            return ph2_coord_actualTime();
443        }
444    
445        /**
446         * Used to specify a list of URI's that are used as input dir to the workflow job. <p/> Look for two evaluator-level
447         * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
448         * URI. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
449         * unresolved, this function will echo back the original function <p/> otherwise it sends the uris.
450         *
451         * @param dataInName : Datain name
452         * @return the list of URI's separated by INSTANCE_SEPARATOR <p/> if there are unresolved EL function (i.e. latest)
453         *         , echo back <p/> the function without resolving the function.
454         */
455        public static String ph3_coord_dataIn(String dataInName) {
456            String uris = "";
457            ELEvaluator eval = ELEvaluator.getCurrent();
458            uris = (String) eval.getVariable(".datain." + dataInName);
459            Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
460            if (unresolved != null && unresolved.booleanValue() == true) {
461                return "${coord:dataIn('" + dataInName + "')}";
462            }
463            return uris;
464        }
465    
466        /**
467         * Used to specify a list of URI's that are output dir of the workflow job. <p/> Look for one evaluator-level
468         * variable <p/> dataout.<DATAOUT_NAME> <p/> It defines the current list of URI. <p/> otherwise it sends the uris.
469         *
470         * @param dataOutName : Dataout name
471         * @return the list of URI's separated by INSTANCE_SEPARATOR
472         */
473        public static String ph3_coord_dataOut(String dataOutName) {
474            String uris = "";
475            ELEvaluator eval = ELEvaluator.getCurrent();
476            uris = (String) eval.getVariable(".dataout." + dataOutName);
477            return uris;
478        }
479    
480        /**
481         * Determine the date-time in Oozie processing timezone of n-th dataset instance. <p/> It depends on: <p/> 1.
482         * Data set frequency <p/> 2.
483         * Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. Data
484         * set initial instance <p/> 6. Action Creation Time
485         *
486         * @param n instance count domain: n is integer
487         * @return date-time in Oozie processing timezone of the n-th instance returns 'null' means n-th instance is
488         * earlier than Initial-Instance of DS
489         * @throws Exception
490         */
491        public static String ph2_coord_current(int n) throws Exception {
492            if (isSyncDataSet()) { // For Sync Dataset
493                return coord_current_sync(n);
494            }
495            else {
496                throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
497            }
498        }
499    
500        /**
501         * Determine the date-time in Oozie processing timezone of the given offset from the dataset effective nominal time. <p/> It
502         * depends on: <p> 1. Data set frequency <p/> 2. Data set Time Unit <p/> 3. Data set Time zone/DST
503         * <p/> 4. Data set initial instance <p/> 5. Action Creation Time
504         *
505         * @param n offset amount (integer)
506         * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR")
507         * @return date-time in Oozie processing timezone of the given offset from the dataset effective nominal time
508         * @throws Exception if there was a problem formatting
509         */
510        public static String ph2_coord_offset(int n, String timeUnit) throws Exception {
511            if (isSyncDataSet()) { // For Sync Dataset
512                return coord_offset_sync(n, timeUnit);
513            }
514            else {
515                throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
516            }
517        }
518    
519        /**
520         * Determine how many hours is on the date of n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency
521         * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5.
522         * Data set initial instance <p/> 6. Action Creation Time
523         *
524         * @param n instance count <p/> domain: n is integer
525         * @return number of hours on that day <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS
526         * @throws Exception
527         */
528        public static int ph2_coord_hoursInDay(int n) throws Exception {
529            int datasetFrequency = (int) getDSFrequency();
530            // /Calendar nominalInstanceCal =
531            // getCurrentInstance(getActionCreationtime());
532            Calendar nominalInstanceCal = getEffectiveNominalTime();
533            if (nominalInstanceCal == null) {
534                return -1;
535            }
536            nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n);
537            /*
538             * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0)
539             * { return -1; }
540             */
541            nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ
542            // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
543            return DateUtils.hoursInDay(nominalInstanceCal);
544        }
545    
546        public static int ph3_coord_hoursInDay(int n) throws Exception {
547            return ph2_coord_hoursInDay(n);
548        }
549    
550        /**
551         * Calculate number of days in one month for n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency .
552         * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5.
553         * Data set initial instance <p/> 6. Action Creation Time
554         *
555         * @param n instance count. domain: n is integer
556         * @return number of days in that month <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS
557         * @throws Exception
558         */
559        public static int ph2_coord_daysInMonth(int n) throws Exception {
560            int datasetFrequency = (int) getDSFrequency();// in minutes
561            // Calendar nominalInstanceCal =
562            // getCurrentInstance(getActionCreationtime());
563            Calendar nominalInstanceCal = getEffectiveNominalTime();
564            if (nominalInstanceCal == null) {
565                return -1;
566            }
567            nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n);
568            /*
569             * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0)
570             * { return -1; }
571             */
572            nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ
573            // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
574            return nominalInstanceCal.getActualMaximum(Calendar.DAY_OF_MONTH);
575        }
576    
577        public static int ph3_coord_daysInMonth(int n) throws Exception {
578            return ph2_coord_daysInMonth(n);
579        }
580    
581        /**
582         * Determine the date-time in Oozie processing timezone of n-th latest available dataset instance. <p/> It depends
583         * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST
584         * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of
585         * dataset's directory
586         *
587         * @param n :instance count <p/> domain: n > 0, n is integer
588         * @return date-time in Oozie processing timezone of the n-th instance <p/> returns 'null' means n-th instance is
589         * earlier than Initial-Instance of DS
590         * @throws Exception
591         */
592        public static String ph3_coord_latest(int n) throws Exception {
593            if (n > 0) {
594                throw new IllegalArgumentException("paramter should be <= 0 but it is " + n);
595            }
596            if (isSyncDataSet()) {// For Sync Dataset
597                return coord_latest_sync(n);
598            }
599            else {
600                throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
601            }
602        }
603    
604        /**
605         * Determine the date-time in Oozie processing timezone of latest available dataset instances
606         * from start to end offsets from the nominal time. <p/> It depends
607         * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST
608         * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of
609         * dataset's directory
610         *
611         * @param start :start instance offset <p/> domain: start > 0, start is integer
612         * @param end :end instance offset <p/> domain: end > 0, end is integer
613         * @return date-time in Oozie processing timezone of the instances from start to end offsets
614         *        delimited by comma. <p/> returns 'null' means start offset instance is
615         *        earlier than Initial-Instance of DS
616         * @throws Exception
617         */
618        public static String ph3_coord_latestRange(int start, int end) throws Exception {
619            if (isSyncDataSet()) {// For Sync Dataset
620                return coord_latestRange_sync(start, end);
621            }
622            else {
623                throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
624            }
625        }
626    
627        /**
628         * Configure an evaluator with data set and application specific information. <p/> Helper method of associating
629         * dataset and application object
630         *
631         * @param evaluator : to set variables
632         * @param ds : Data Set object
633         * @param coordAction : Application instance
634         */
635        public static void configureEvaluator(ELEvaluator evaluator, SyncCoordDataset ds, SyncCoordAction coordAction) {
636            evaluator.setVariable(COORD_ACTION, coordAction);
637            evaluator.setVariable(DATASET, ds);
638        }
639    
640        /**
641         * Helper method to wrap around with "${..}". <p/>
642         *
643         *
644         * @param eval :EL evaluator
645         * @param expr : expression to evaluate
646         * @return Resolved expression or echo back the same expression
647         * @throws Exception
648         */
649        public static String evalAndWrap(ELEvaluator eval, String expr) throws Exception {
650            try {
651                eval.setVariable(".wrap", null);
652                String result = eval.evaluate(expr, String.class);
653                if (eval.getVariable(".wrap") != null) {
654                    return "${" + result + "}";
655                }
656                else {
657                    return result;
658                }
659            }
660            catch (Exception e) {
661                throw new Exception("Unable to evaluate :" + expr + ":\n", e);
662            }
663        }
664    
665        // Set of echo functions
666    
667        public static String ph1_coord_current_echo(String n) {
668            return echoUnResolved("current", n);
669        }
670    
671        public static String ph1_coord_offset_echo(String n, String timeUnit) {
672            return echoUnResolved("offset", n + " , " + timeUnit);
673        }
674    
675        public static String ph2_coord_current_echo(String n) {
676            return echoUnResolved("current", n);
677        }
678    
679        public static String ph2_coord_offset_echo(String n, String timeUnit) {
680            return echoUnResolved("offset", n + " , " + timeUnit);
681        }
682    
683        public static String ph1_coord_dateOffset_echo(String n, String offset, String unit) {
684            return echoUnResolved("dateOffset", n + " , " + offset + " , " + unit);
685        }
686    
687        public static String ph1_coord_formatTime_echo(String dateTime, String format) {
688            // Quote the dateTime value since it would contain a ':'.
689            return echoUnResolved("formatTime", "'"+dateTime+"'" + " , " + format);
690        }
691    
692        public static String ph1_coord_latest_echo(String n) {
693            return echoUnResolved("latest", n);
694        }
695    
696        public static String ph2_coord_latest_echo(String n) {
697            return ph1_coord_latest_echo(n);
698        }
699    
700        public static String ph1_coord_future_echo(String n, String instance) {
701            return echoUnResolved("future", n + ", " + instance + "");
702        }
703    
704        public static String ph2_coord_future_echo(String n, String instance) {
705            return ph1_coord_future_echo(n, instance);
706        }
707    
708        public static String ph1_coord_latestRange_echo(String start, String end) {
709            return echoUnResolved("latestRange", start + ", " + end);
710        }
711    
712        public static String ph2_coord_latestRange_echo(String start, String end) {
713            return ph1_coord_latestRange_echo(start, end);
714        }
715    
716        public static String ph1_coord_futureRange_echo(String start, String end, String instance) {
717            return echoUnResolved("futureRange", start + ", " + end + ", " + instance);
718        }
719    
720        public static String ph2_coord_futureRange_echo(String start, String end, String instance) {
721            return ph1_coord_futureRange_echo(start, end, instance);
722        }
723    
724        public static String ph1_coord_dataIn_echo(String n) {
725            ELEvaluator eval = ELEvaluator.getCurrent();
726            String val = (String) eval.getVariable("oozie.dataname." + n);
727            if (val == null || val.equals("data-in") == false) {
728                LOG.error("data_in_name " + n + " is not valid");
729                throw new RuntimeException("data_in_name " + n + " is not valid");
730            }
731            return echoUnResolved("dataIn", "'" + n + "'");
732        }
733    
734        public static String ph1_coord_dataOut_echo(String n) {
735            ELEvaluator eval = ELEvaluator.getCurrent();
736            String val = (String) eval.getVariable("oozie.dataname." + n);
737            if (val == null || val.equals("data-out") == false) {
738                LOG.error("data_out_name " + n + " is not valid");
739                throw new RuntimeException("data_out_name " + n + " is not valid");
740            }
741            return echoUnResolved("dataOut", "'" + n + "'");
742        }
743    
744        public static String ph1_coord_nominalTime_echo() {
745            return echoUnResolved("nominalTime", "");
746        }
747    
748        public static String ph1_coord_nominalTime_echo_wrap() {
749            // return "${coord:nominalTime()}"; // no resolution
750            return echoUnResolved("nominalTime", "");
751        }
752    
753        public static String ph1_coord_nominalTime_echo_fixed() {
754            return "2009-03-06T010:00"; // Dummy resolution
755        }
756    
757        public static String ph1_coord_actualTime_echo_wrap() {
758            // return "${coord:actualTime()}"; // no resolution
759            return echoUnResolved("actualTime", "");
760        }
761    
762        public static String ph1_coord_actionId_echo() {
763            return echoUnResolved("actionId", "");
764        }
765    
766        public static String ph1_coord_name_echo() {
767            return echoUnResolved("name", "");
768        }
769    
770        // The following echo functions are not used in any phases yet
771        // They are here for future purpose.
772        public static String coord_minutes_echo(String n) {
773            return echoUnResolved("minutes", n);
774        }
775    
776        public static String coord_hours_echo(String n) {
777            return echoUnResolved("hours", n);
778        }
779    
780        public static String coord_days_echo(String n) {
781            return echoUnResolved("days", n);
782        }
783    
784        public static String coord_endOfDay_echo(String n) {
785            return echoUnResolved("endOfDay", n);
786        }
787    
788        public static String coord_months_echo(String n) {
789            return echoUnResolved("months", n);
790        }
791    
792        public static String coord_endOfMonth_echo(String n) {
793            return echoUnResolved("endOfMonth", n);
794        }
795    
796        public static String coord_actualTime_echo() {
797            return echoUnResolved("actualTime", "");
798        }
799    
800        // This echo function will always return "24" for validation only.
801        // This evaluation ****should not**** replace the original XML
802        // Create a temporary string and validate the function
803        // This is **required** for evaluating an expression like
804        // coord:HoursInDay(0) + 3
805        // actual evaluation will happen in phase 2 or phase 3.
806        public static String ph1_coord_hoursInDay_echo(String n) {
807            return "24";
808            // return echoUnResolved("hoursInDay", n);
809        }
810    
811        // This echo function will always return "30" for validation only.
812        // This evaluation ****should not**** replace the original XML
813        // Create a temporary string and validate the function
814        // This is **required** for evaluating an expression like
815        // coord:daysInMonth(0) + 3
816        // actual evaluation will happen in phase 2 or phase 3.
817        public static String ph1_coord_daysInMonth_echo(String n) {
818            // return echoUnResolved("daysInMonth", n);
819            return "30";
820        }
821    
822        // This echo function will always return "3" for validation only.
823        // This evaluation ****should not**** replace the original XML
824        // Create a temporary string and validate the function
825        // This is **required** for evaluating an expression like coord:tzOffset + 2
826        // actual evaluation will happen in phase 2 or phase 3.
827        public static String ph1_coord_tzOffset_echo() {
828            // return echoUnResolved("tzOffset", "");
829            return "3";
830        }
831    
832        // Local methods
833        /**
834         * @param n
835         * @return n-th instance Date-Time from current instance for data-set <p/> return empty string ("") if the
836         *         Action_Creation_time or the n-th instance <p/> is earlier than the Initial_Instance of dataset.
837         * @throws Exception
838         */
839        private static String coord_current_sync(int n) throws Exception {
840            int datasetFrequency = getDSFrequency();// in minutes
841            TimeUnit dsTimeUnit = getDSTimeUnit();
842            int[] instCount = new int[1];// used as pass by ref
843            Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
844            if (nominalInstanceCal == null) {
845                LOG.warn("If the initial instance of the dataset is later than the nominal time, an empty string is"
846                        + " returned. This means that no data is available at the current-instance specified by the user"
847                        + " and the user could try modifying his initial-instance to an earlier time.");
848                return "";
849            }
850            nominalInstanceCal = getInitialInstanceCal();
851            int absInstanceCount = instCount[0] + n;
852            nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency * absInstanceCount);
853    
854            if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) {
855                LOG.warn("If the initial instance of the dataset is later than the current-instance specified, such as"
856                        + " coord:current({0}) in this case, an empty string is returned. This means that no data is"
857                        + " available at the current-instance specified by the user and the user could try modifying his"
858                        + " initial-instance to an earlier time.", n);
859                return "";
860            }
861            String str = DateUtils.formatDateOozieTZ(nominalInstanceCal);
862            return str;
863        }
864    
865        /**
866         *
867         * @param n offset amount (integer)
868         * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR")
869         * @return the offset time from the effective nominal time <p/> return empty string ("") if the Action_Creation_time or the
870         *         offset instance <p/> is earlier than the Initial_Instance of dataset.
871         * @throws Exception
872         */
873        private static String coord_offset_sync(int n, String timeUnit) throws Exception {
874            Calendar rawCal = resolveOffsetRawTime(n, TimeUnit.valueOf(timeUnit), null);
875            if (rawCal == null) {
876                // warning already logged by resolveOffsetRawTime()
877                return "";
878            }
879    
880            int freq = getDSFrequency();
881            TimeUnit freqUnit = getDSTimeUnit();
882            int freqCount = 0;
883            // We're going to manually turn back/forward cal by decrements/increments of freq and then check that it gives the same
884            // time as rawCal; this is to check that the offset time resolves to a frequency offset of the effective nominal time
885            // In other words, that there exists an integer x, such that coord:offset(n, timeUnit) == coord:current(x) is true
886            // If not, then we'll "rewind" rawCal to the latest instance earlier than rawCal and use that.
887            Calendar cal = getInitialInstanceCal();
888            if (rawCal.before(cal)) {
889                while (cal.after(rawCal)) {
890                    cal.add(freqUnit.getCalendarUnit(), -freq);
891                    freqCount--;
892                }
893            }
894            else if (rawCal.after(cal)) {
895                while (cal.before(rawCal)) {
896                    cal.add(freqUnit.getCalendarUnit(), freq);
897                    freqCount++;
898                }
899            }
900            if (cal.before(rawCal)) {
901                rawCal = cal;
902            }
903            else if (cal.after(rawCal)) {
904                cal.add(freqUnit.getCalendarUnit(), -freq);
905                rawCal = cal;
906                freqCount--;
907            }
908            String rawCalStr = DateUtils.formatDateOozieTZ(rawCal);
909    
910            Calendar nominalInstanceCal = getInitialInstanceCal();
911            nominalInstanceCal.add(freqUnit.getCalendarUnit(), freq * freqCount);
912            if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) {
913                LOG.warn("If the initial instance of the dataset is later than the offset instance"
914                        + " specified, such as coord:offset({0}, {1}) in this case, an empty string is returned. This means that no"
915                        + " data is available at the offset instance specified by the user and the user could try modifying his"
916                        + " initial-instance to an earlier time.", n, timeUnit);
917                return "";
918            }
919            String nominalCalStr = DateUtils.formatDateOozieTZ(nominalInstanceCal);
920    
921            if (!rawCalStr.equals(nominalCalStr)) {
922                throw new RuntimeException("Shouldn't happen");
923            }
924            return rawCalStr;
925        }
926    
927        /**
928         * @param offset
929         * @return n-th available latest instance Date-Time for SYNC data-set
930         * @throws Exception
931         */
932        private static String coord_latest_sync(int offset) throws Exception {
933            return coord_latestRange_sync(offset, offset);
934        }
935    
936        private static String coord_latestRange_sync(int startOffset, int endOffset) throws Exception {
937            if (startOffset > 0) {
938                throw new RuntimeException("For latest there is no meaning " + "of positive instance. n should be <=0"
939                        + startOffset);
940            }
941            if (endOffset > 0) {
942                throw new RuntimeException("For latest there is no meaning " + "of positive instance. n should be <=0"
943                        + endOffset);
944            }
945            ELEvaluator eval = ELEvaluator.getCurrent();
946            String retVal = "";
947            int datasetFrequency = (int) getDSFrequency();// in minutes
948            TimeUnit dsTimeUnit = getDSTimeUnit();
949            int[] instCount = new int[1];
950            boolean useCurrentTime = Services.get().getConf().getBoolean(LATEST_EL_USE_CURRENT_TIME, false);
951            Calendar nominalInstanceCal;
952            if (useCurrentTime) {
953                nominalInstanceCal = getCurrentInstance(new Date(), instCount);
954            }
955            else {
956                nominalInstanceCal = getCurrentInstance(getActualTime(), instCount);
957            }
958            StringBuilder resolvedInstances = new StringBuilder();
959            StringBuilder resolvedURIPaths = new StringBuilder();
960            if (nominalInstanceCal != null) {
961                Calendar initInstance = getInitialInstanceCal();
962                SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
963                if (ds == null) {
964                    throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
965                }
966                String uriTemplate = ds.getUriTemplate();
967                Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
968                if (conf == null) {
969                    throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
970                }
971                int available = 0;
972                boolean resolved = false;
973                String user = ParamChecker
974                        .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
975                String doneFlag = ds.getDoneFlag();
976                while (nominalInstanceCal.compareTo(initInstance) >= 0) {
977                    ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
978                    String uriPath = uriEval.evaluate(uriTemplate, String.class);
979                    String pathWithDoneFlag = uriPath;
980                    if (doneFlag.length() > 0) {
981                        pathWithDoneFlag += "/" + doneFlag;
982                    }
983                    if (isPathAvailable(pathWithDoneFlag, user, null, conf)) {
984                        LOG.debug("Found latest(" + available + "): " + pathWithDoneFlag);
985                        if (available == startOffset) {
986                            LOG.debug("Matched latest(" + available + "): " + pathWithDoneFlag);
987                            resolved = true;
988                            resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
989                            resolvedURIPaths.append(uriPath);
990                            retVal = resolvedInstances.toString();
991                            eval.setVariable("resolved_path", resolvedURIPaths.toString());
992                            break;
993                        } else if (available <= endOffset) {
994                            LOG.debug("Matched latest(" + available + "): " + pathWithDoneFlag);
995                            resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(INSTANCE_SEPARATOR);
996                            resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
997                        }
998    
999                        available--;
1000                    }
1001                    // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(),
1002                    // -datasetFrequency);
1003                    nominalInstanceCal = (Calendar) initInstance.clone();
1004                    instCount[0]--;
1005                    nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
1006                    // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
1007                }
1008                if (!resolved) {
1009                    // return unchanged latest function with variable 'is_resolved'
1010                    // to 'false'
1011                    eval.setVariable("is_resolved", Boolean.FALSE);
1012                    if (startOffset == endOffset) {
1013                        retVal = "${coord:latest(" + startOffset + ")}";
1014                    }
1015                    else {
1016                        retVal = "${coord:latestRange(" + startOffset + "," + endOffset + ")}";
1017                    }
1018                }
1019                else {
1020                    eval.setVariable("is_resolved", Boolean.TRUE);
1021                }
1022            }
1023            else {// No feasible nominal time
1024                eval.setVariable("is_resolved", Boolean.FALSE);
1025            }
1026            return retVal;
1027        }
1028    
1029        // TODO : Not an efficient way. In a loop environment, we could do something
1030        // outside the loop
1031        /**
1032         * Check whether a URI path exists
1033         *
1034         * @param sPath
1035         * @param conf
1036         * @return
1037         * @throws IOException
1038         */
1039    
1040        private static boolean isPathAvailable(String sPath, String user, String group, Configuration conf)
1041                throws IOException, HadoopAccessorException {
1042            // sPath += "/" + END_OF_OPERATION_INDICATOR_FILE;
1043            Path path = new Path(sPath);
1044            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
1045            Configuration fsConf = has.createJobConf(path.toUri().getAuthority());
1046            return has.createFileSystem(user, path.toUri(), fsConf).exists(path);
1047        }
1048    
1049        /**
1050         * @param tm
1051         * @return a new Evaluator to be used for URI-template evaluation
1052         */
1053        private static ELEvaluator getUriEvaluator(Calendar tm) {
1054            tm.setTimeZone(DateUtils.getOozieProcessingTimeZone());
1055            ELEvaluator retEval = new ELEvaluator();
1056            retEval.setVariable("YEAR", tm.get(Calendar.YEAR));
1057            retEval.setVariable("MONTH", (tm.get(Calendar.MONTH) + 1) < 10 ? "0" + (tm.get(Calendar.MONTH) + 1) : (tm
1058                    .get(Calendar.MONTH) + 1));
1059            retEval.setVariable("DAY", tm.get(Calendar.DAY_OF_MONTH) < 10 ? "0" + tm.get(Calendar.DAY_OF_MONTH) : tm
1060                    .get(Calendar.DAY_OF_MONTH));
1061            retEval.setVariable("HOUR", tm.get(Calendar.HOUR_OF_DAY) < 10 ? "0" + tm.get(Calendar.HOUR_OF_DAY) : tm
1062                    .get(Calendar.HOUR_OF_DAY));
1063            retEval.setVariable("MINUTE", tm.get(Calendar.MINUTE) < 10 ? "0" + tm.get(Calendar.MINUTE) : tm
1064                    .get(Calendar.MINUTE));
1065            return retEval;
1066        }
1067    
1068        /**
1069         * @return whether a data set is SYNCH or ASYNC
1070         */
1071        private static boolean isSyncDataSet() {
1072            ELEvaluator eval = ELEvaluator.getCurrent();
1073            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1074            if (ds == null) {
1075                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1076            }
1077            return ds.getType().equalsIgnoreCase("SYNC");
1078        }
1079    
1080        /**
1081         * Check whether a function should be resolved.
1082         *
1083         * @param functionName
1084         * @param n
1085         * @return null if the functionName needs to be resolved otherwise return the calling function unresolved.
1086         */
1087        private static String checkIfResolved(String functionName, String n) {
1088            ELEvaluator eval = ELEvaluator.getCurrent();
1089            String replace = (String) eval.getVariable("resolve_" + functionName);
1090            if (replace == null || (replace != null && replace.equalsIgnoreCase("false"))) { // Don't
1091                // resolve
1092                // return "${coord:" + functionName + "(" + n +")}"; //Unresolved
1093                eval.setVariable(".wrap", "true");
1094                return "coord:" + functionName + "(" + n + ")"; // Unresolved
1095            }
1096            return null; // Resolved it
1097        }
1098    
1099        private static String echoUnResolved(String functionName, String n) {
1100            return echoUnResolvedPre(functionName, n, "coord:");
1101        }
1102    
1103        private static String echoUnResolvedPre(String functionName, String n, String prefix) {
1104            ELEvaluator eval = ELEvaluator.getCurrent();
1105            eval.setVariable(".wrap", "true");
1106            return prefix + functionName + "(" + n + ")"; // Unresolved
1107        }
1108    
1109        /**
1110         * @return the initial instance of a DataSet in DATE
1111         */
1112        private static Date getInitialInstance() {
1113            ELEvaluator eval = ELEvaluator.getCurrent();
1114            return getInitialInstance(eval);
1115        }
1116    
1117        /**
1118         * @return the initial instance of a DataSet in DATE
1119         */
1120        private static Date getInitialInstance(ELEvaluator eval) {
1121            return getInitialInstanceCal(eval).getTime();
1122            // return ds.getInitInstance();
1123        }
1124    
1125        /**
1126         * @return the initial instance of a DataSet in Calendar
1127         */
1128        private static Calendar getInitialInstanceCal() {
1129            ELEvaluator eval = ELEvaluator.getCurrent();
1130            return getInitialInstanceCal(eval);
1131        }
1132    
1133        /**
1134         * @return the initial instance of a DataSet in Calendar
1135         */
1136        private static Calendar getInitialInstanceCal(ELEvaluator eval) {
1137            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1138            if (ds == null) {
1139                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1140            }
1141            Calendar effInitTS = Calendar.getInstance();
1142            effInitTS.setTime(ds.getInitInstance());
1143            effInitTS.setTimeZone(ds.getTimeZone());
1144            // To adjust EOD/EOM
1145            DateUtils.moveToEnd(effInitTS, getDSEndOfFlag(eval));
1146            return effInitTS;
1147            // return ds.getInitInstance();
1148        }
1149    
1150        /**
1151         * @return Nominal or action creation Time when all the dependencies of an application instance are met.
1152         */
1153        private static Date getActionCreationtime() {
1154            ELEvaluator eval = ELEvaluator.getCurrent();
1155            return getActionCreationtime(eval);
1156        }
1157    
1158        /**
1159         * @return Nominal or action creation Time when all the dependencies of an application instance are met.
1160         */
1161        private static Date getActionCreationtime(ELEvaluator eval) {
1162            SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
1163            if (coordAction == null) {
1164                throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
1165            }
1166            return coordAction.getNominalTime();
1167        }
1168    
1169        /**
1170         * @return Actual Time when all the dependencies of an application instance are met.
1171         */
1172        private static Date getActualTime() {
1173            ELEvaluator eval = ELEvaluator.getCurrent();
1174            SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
1175            if (coordAction == null) {
1176                throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
1177            }
1178            return coordAction.getActualTime();
1179        }
1180    
1181        /**
1182         * @return TimeZone for the application or job.
1183         */
1184        private static TimeZone getJobTZ() {
1185            ELEvaluator eval = ELEvaluator.getCurrent();
1186            SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
1187            if (coordAction == null) {
1188                throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
1189            }
1190            return coordAction.getTimeZone();
1191        }
1192    
1193        /**
1194         * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time)
1195         *
1196         * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
1197         *         the dataset.
1198         */
1199        private static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[]) {
1200            ELEvaluator eval = ELEvaluator.getCurrent();
1201            return getCurrentInstance(effectiveTime, instanceCount, eval);
1202        }
1203    
1204        /**
1205         * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time)
1206         *
1207         * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
1208         *         the dataset.
1209         */
1210        private static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[], ELEvaluator eval) {
1211            Date datasetInitialInstance = getInitialInstance(eval);
1212            TimeUnit dsTimeUnit = getDSTimeUnit(eval);
1213            TimeZone dsTZ = getDatasetTZ(eval);
1214            int dsFreq = getDSFrequency(eval);
1215            // Convert Date to Calendar for corresponding TZ
1216            Calendar current = Calendar.getInstance();
1217            current.setTime(datasetInitialInstance);
1218            current.setTimeZone(dsTZ);
1219    
1220            Calendar calEffectiveTime = Calendar.getInstance();
1221            calEffectiveTime.setTime(effectiveTime);
1222            calEffectiveTime.setTimeZone(dsTZ);
1223            if (instanceCount == null) {    // caller doesn't care about this value
1224                instanceCount = new int[1];
1225            }
1226            instanceCount[0] = 0;
1227            if (current.compareTo(calEffectiveTime) > 0) {
1228                return null;
1229            }
1230            Calendar origCurrent = (Calendar) current.clone();
1231            while (current.compareTo(calEffectiveTime) <= 0) {
1232                current = (Calendar) origCurrent.clone();
1233                instanceCount[0]++;
1234                current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq);
1235            }
1236            instanceCount[0]--;
1237    
1238            current = (Calendar) origCurrent.clone();
1239            current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq);
1240            return current;
1241        }
1242    
1243        private static Calendar getEffectiveNominalTime() {
1244            Date datasetInitialInstance = getInitialInstance();
1245            TimeZone dsTZ = getDatasetTZ();
1246            // Convert Date to Calendar for corresponding TZ
1247            Calendar current = Calendar.getInstance();
1248            current.setTime(datasetInitialInstance);
1249            current.setTimeZone(dsTZ);
1250    
1251            Calendar calEffectiveTime = Calendar.getInstance();
1252            calEffectiveTime.setTime(getActionCreationtime());
1253            calEffectiveTime.setTimeZone(dsTZ);
1254            if (current.compareTo(calEffectiveTime) > 0) {
1255                // Nominal Time < initial Instance
1256                // TODO: getClass() call doesn't work from static method.
1257                // XLog.getLog("CoordELFunction.class").warn("ACTION CREATED BEFORE INITIAL INSTACE "+
1258                // current.getTime());
1259                return null;
1260            }
1261            return calEffectiveTime;
1262        }
1263    
1264        /**
1265         * @return dataset frequency in minutes
1266         */
1267        private static int getDSFrequency() {
1268            ELEvaluator eval = ELEvaluator.getCurrent();
1269            return getDSFrequency(eval);
1270        }
1271    
1272        /**
1273         * @return dataset frequency in minutes
1274         */
1275        private static int getDSFrequency(ELEvaluator eval) {
1276            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1277            if (ds == null) {
1278                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1279            }
1280            return ds.getFrequency();
1281        }
1282    
1283        /**
1284         * @return dataset TimeUnit
1285         */
1286        private static TimeUnit getDSTimeUnit() {
1287            ELEvaluator eval = ELEvaluator.getCurrent();
1288            return getDSTimeUnit(eval);
1289        }
1290    
1291        /**
1292         * @return dataset TimeUnit
1293         */
1294        private static TimeUnit getDSTimeUnit(ELEvaluator eval) {
1295            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1296            if (ds == null) {
1297                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1298            }
1299            return ds.getTimeUnit();
1300        }
1301    
1302        /**
1303         * @return dataset TimeZone
1304         */
1305        private static TimeZone getDatasetTZ() {
1306            ELEvaluator eval = ELEvaluator.getCurrent();
1307            return getDatasetTZ(eval);
1308        }
1309    
1310        /**
1311         * @return dataset TimeZone
1312         */
1313        private static TimeZone getDatasetTZ(ELEvaluator eval) {
1314            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1315            if (ds == null) {
1316                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1317            }
1318            return ds.getTimeZone();
1319        }
1320    
1321        /**
1322         * @return dataset TimeUnit
1323         */
1324        private static TimeUnit getDSEndOfFlag() {
1325            ELEvaluator eval = ELEvaluator.getCurrent();
1326            return getDSEndOfFlag(eval);
1327        }
1328    
1329        /**
1330         * @return dataset TimeUnit
1331         */
1332        private static TimeUnit getDSEndOfFlag(ELEvaluator eval) {
1333            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1334            if (ds == null) {
1335                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1336            }
1337            return ds.getEndOfDuration();// == null ? "": ds.getEndOfDuration();
1338        }
1339    
1340        /**
1341         * Return a job configuration property for the coordinator.
1342         *
1343         * @param property property name.
1344         * @return the value of the property, <code>null</code> if the property is undefined.
1345         */
1346        public static String coord_conf(String property) {
1347            ELEvaluator eval = ELEvaluator.getCurrent();
1348            return (String) eval.getVariable(property);
1349        }
1350    
1351        /**
1352         * Return the user that submitted the coordinator job.
1353         *
1354         * @return the user that submitted the coordinator job.
1355         */
1356        public static String coord_user() {
1357            ELEvaluator eval = ELEvaluator.getCurrent();
1358            return (String) eval.getVariable(OozieClient.USER_NAME);
1359        }
1360    
1361        /**
1362         * Takes two offset times and returns a list of multiples of the frequency offset from the effective nominal time that occur
1363         * between them.  The caller should make sure that startCal is earlier than endCal.
1364         * <p>
1365         * As a simple example, assume its the same day: startCal is 1:00, endCal is 2:00, frequency is 20min, and effective nominal
1366         * time is 1:20 -- then this method would return a list containing: -20, 0, 20, 40, 60
1367         *
1368         * @param startCal The earlier offset time
1369         * @param endCal The later offset time
1370         * @param eval The ELEvaluator to use; cannot be null
1371         * @return A list of multiple of the frequency offset from the effective nominal time that occur between the startCal and endCal
1372         */
1373        public static List<Integer> expandOffsetTimes(Calendar startCal, Calendar endCal, ELEvaluator eval) {
1374            List<Integer> expandedFreqs = new ArrayList<Integer>();
1375            // Use eval because the "current" eval isn't set
1376            int freq = getDSFrequency(eval);
1377            TimeUnit freqUnit = getDSTimeUnit(eval);
1378            Calendar cal = getCurrentInstance(getActionCreationtime(eval), null, eval);
1379            int totalFreq = 0;
1380            if (startCal.before(cal)) {
1381                while (cal.after(startCal)) {
1382                    cal.add(freqUnit.getCalendarUnit(), -freq);
1383                    totalFreq += -freq;
1384                }
1385                if (cal.before(startCal)) {
1386                    cal.add(freqUnit.getCalendarUnit(), freq);
1387                    totalFreq += freq;
1388                }
1389            }
1390            else if (startCal.after(cal)) {
1391                while (cal.before(startCal)) {
1392                    cal.add(freqUnit.getCalendarUnit(), freq);
1393                    totalFreq += freq;
1394                }
1395            }
1396            // At this point, cal is the smallest multiple of the dataset frequency that is >= to the startCal and offset from the
1397            // effective nominal time.  Now we can find all of the instances that occur between startCal and endCal, inclusive.
1398            while (cal.before(endCal) || cal.equals(endCal)) {
1399                expandedFreqs.add(totalFreq);
1400                cal.add(freqUnit.getCalendarUnit(), freq);
1401                totalFreq += freq;
1402            }
1403            return expandedFreqs;
1404        }
1405    
1406        /**
1407         * Resolve the offset time from the effective nominal time
1408         *
1409         * @param n offset amount (integer)
1410         * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR")
1411         * @param eval The ELEvaluator to use; or null to use the "current" eval
1412         * @return A Calendar of the offset time
1413         */
1414        public static Calendar resolveOffsetRawTime(int n, TimeUnit timeUnit, ELEvaluator eval) {
1415            // Use eval if given (for when the "current" eval isn't set)
1416            Calendar cal;
1417            if (eval == null) {
1418                cal = getCurrentInstance(getActionCreationtime(), null);
1419            }
1420            else {
1421                cal = getCurrentInstance(getActionCreationtime(eval), null, eval);
1422            }
1423            if (cal == null) {
1424                LOG.warn("If the initial instance of the dataset is later than the nominal time, an"
1425                        + " empty string is returned. This means that no data is available at the offset instance specified by the user"
1426                        + " and the user could try modifying his or her initial-instance to an earlier time.");
1427                return null;
1428            }
1429            cal.add(timeUnit.getCalendarUnit(), n);
1430            return cal;
1431        }
1432    }