001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.coord;
020
021import com.google.common.collect.Lists;
022
023import org.apache.commons.lang.StringUtils;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.oozie.ErrorCode;
026import org.apache.oozie.client.OozieClient;
027import org.apache.oozie.command.CommandException;
028import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorUtil;
029import org.apache.oozie.dependency.URIHandler;
030import org.apache.oozie.dependency.URIHandler.Context;
031import org.apache.oozie.service.Services;
032import org.apache.oozie.service.URIHandlerService;
033import org.apache.oozie.util.DateUtils;
034import org.apache.oozie.util.ELEvaluator;
035import org.apache.oozie.util.ParamChecker;
036import org.apache.oozie.util.XLog;
037import org.jdom.JDOMException;
038
039import java.net.URI;
040import java.util.ArrayList;
041import java.util.Calendar;
042import java.util.Date;
043import java.util.GregorianCalendar;
044import java.util.List;
045import java.util.TimeZone;
046
047/**
048 * This class implements the EL function related to coordinator
049 */
050
051public class CoordELFunctions {
052    final public static String DATASET = "oozie.coord.el.dataset.bean";
053    final public static String COORD_ACTION = "oozie.coord.el.app.bean";
054    final public static String CONFIGURATION = "oozie.coord.el.conf";
055    final public static String LATEST_EL_USE_CURRENT_TIME = "oozie.service.ELService.latest-el.use-current-time";
056    // INSTANCE_SEPARATOR is used to separate multiple directories into one tag.
057    final public static String INSTANCE_SEPARATOR = "#";
058    final public static String DIR_SEPARATOR = ",";
059    // TODO: in next release, support flexibility
060    private static String END_OF_OPERATION_INDICATOR_FILE = "_SUCCESS";
061
062    public static final long MINUTE_MSEC = 60 * 1000L;
063    public static final long HOUR_MSEC = 60 * MINUTE_MSEC;
064    public static final long DAY_MSEC = 24 * HOUR_MSEC;
065    /**
066     * Used in defining the frequency in 'day' unit. <p> domain: <code> val &gt; 0</code> and should be integer.
067     *
068     * @param val frequency in number of days.
069     * @return number of days and also set the frequency timeunit to "day"
070     */
071    public static int ph1_coord_days(int val) {
072        val = ParamChecker.checkGTZero(val, "n");
073        ELEvaluator eval = ELEvaluator.getCurrent();
074        eval.setVariable("timeunit", TimeUnit.DAY);
075        eval.setVariable("endOfDuration", TimeUnit.NONE);
076        return val;
077    }
078
079    /**
080     * Used in defining the frequency in 'month' unit. <p> domain: <code> val &gt; 0</code> and should be integer.
081     *
082     * @param val frequency in number of months.
083     * @return number of months and also set the frequency timeunit to "month"
084     */
085    public static int ph1_coord_months(int val) {
086        val = ParamChecker.checkGTZero(val, "n");
087        ELEvaluator eval = ELEvaluator.getCurrent();
088        eval.setVariable("timeunit", TimeUnit.MONTH);
089        eval.setVariable("endOfDuration", TimeUnit.NONE);
090        return val;
091    }
092
093    /**
094     * Used in defining the frequency in 'hour' unit. <p> parameter value domain: <code> val &gt; 0</code> and should
095     * be integer.
096     *
097     * @param val frequency in number of hours.
098     * @return number of minutes and also set the frequency timeunit to "minute"
099     */
100    public static int ph1_coord_hours(int val) {
101        val = ParamChecker.checkGTZero(val, "n");
102        ELEvaluator eval = ELEvaluator.getCurrent();
103        eval.setVariable("timeunit", TimeUnit.MINUTE);
104        eval.setVariable("endOfDuration", TimeUnit.NONE);
105        return val * 60;
106    }
107
108    /**
109     * Used in defining the frequency in 'minute' unit. <p> domain: <code> val &gt; 0</code> and should be integer.
110     *
111     * @param val frequency in number of minutes.
112     * @return number of minutes and also set the frequency timeunit to "minute"
113     */
114    public static int ph1_coord_minutes(int val) {
115        val = ParamChecker.checkGTZero(val, "n");
116        ELEvaluator eval = ELEvaluator.getCurrent();
117        eval.setVariable("timeunit", TimeUnit.MINUTE);
118        eval.setVariable("endOfDuration", TimeUnit.NONE);
119        return val;
120    }
121
122    /**
123     * Used in defining the frequency in 'day' unit and specify the "end of day" property. <p> Every instance will
124     * start at 00:00 hour of each day. <p> domain: <code> val &gt; 0</code> and should be integer.
125     *
126     * @param val frequency in number of days.
127     * @return number of days and also set the frequency timeunit to "day" and end_of_duration flag to "day"
128     */
129    public static int ph1_coord_endOfDays(int val) {
130        val = ParamChecker.checkGTZero(val, "n");
131        ELEvaluator eval = ELEvaluator.getCurrent();
132        eval.setVariable("timeunit", TimeUnit.DAY);
133        eval.setVariable("endOfDuration", TimeUnit.END_OF_DAY);
134        return val;
135    }
136
137    /**
138     * Used in defining the frequency in 'month' unit and specify the "end of month" property. <p> Every instance will
139     * start at first day of each month at 00:00 hour. <p> domain: <code> val &gt; 0</code> and should be integer.
140     *
141     * @param val: frequency in number of months.
142     * @return number of months and also set the frequency timeunit to "month" and end_of_duration flag to "month"
143     */
144    public static int ph1_coord_endOfMonths(int val) {
145        val = ParamChecker.checkGTZero(val, "n");
146        ELEvaluator eval = ELEvaluator.getCurrent();
147        eval.setVariable("timeunit", TimeUnit.MONTH);
148        eval.setVariable("endOfDuration", TimeUnit.END_OF_MONTH);
149        return val;
150    }
151
152    /**
153     * Calculate the difference of timezone offset in minutes between dataset and coordinator job. <p> Depends on: <p>
154     * 1. Timezone of both dataset and job <p> 2. Action creation Time
155     *
156     * @return difference in minutes (DataSet TZ Offset - Application TZ offset)
157     */
158    public static int ph2_coord_tzOffset() {
159        long actionCreationTime = getActionCreationtime().getTime();
160        TimeZone dsTZ = ParamChecker.notNull(getDatasetTZ(), "DatasetTZ");
161        TimeZone jobTZ = ParamChecker.notNull(getJobTZ(), "JobTZ");
162        return (dsTZ.getOffset(actionCreationTime) - jobTZ.getOffset(actionCreationTime)) / (1000 * 60);
163    }
164
165    public static int ph3_coord_tzOffset() {
166        return ph2_coord_tzOffset();
167    }
168
169    /**
170     * Returns a date string that is offset from 'strBaseDate' by the amount specified.  The unit can be one of
171     * DAY, MONTH, HOUR, MINUTE, MONTH.
172     *
173     * @param strBaseDate The base date
174     * @param offset any number
175     * @param unit one of DAY, MONTH, HOUR, MINUTE, MONTH
176     * @return the offset date string
177     * @throws Exception
178     */
179    public static String ph2_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception {
180        Calendar baseCalDate = DateUtils.getCalendar(strBaseDate);
181        StringBuilder buffer = new StringBuilder();
182        baseCalDate.add(TimeUnit.valueOf(unit).getCalendarUnit(), offset);
183        buffer.append(DateUtils.formatDateOozieTZ(baseCalDate));
184        return buffer.toString();
185    }
186
187    public static String ph3_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception {
188        return ph2_coord_dateOffset(strBaseDate, offset, unit);
189    }
190
191    /**
192     * Returns a date string that is offset from 'strBaseDate' by the difference from Oozie processing timezone to the given
193     * timezone. It will account for daylight saving time based on the given 'strBaseDate' and 'timezone'.
194     *
195     * @param strBaseDate The base date
196     * @param timezone
197     * @return the offset date string
198     * @throws Exception
199     */
200    public static String ph2_coord_dateTzOffset(String strBaseDate, String timezone) throws Exception {
201        Calendar baseCalDate = DateUtils.getCalendar(strBaseDate);
202        StringBuilder buffer = new StringBuilder();
203        baseCalDate.setTimeZone(DateUtils.getTimeZone(timezone));
204        buffer.append(DateUtils.formatDate(baseCalDate));
205        return buffer.toString();
206    }
207
208    public static String ph3_coord_dateTzOffset(String strBaseDate, String timezone) throws Exception{
209        return ph2_coord_dateTzOffset(strBaseDate, timezone);
210    }
211
212    /**
213     * Determine the date-time in Oozie processing timezone of n-th future available dataset instance
214     * from nominal Time but not beyond the instance specified as 'instance.
215     * <p>
216     * It depends on:
217     * <p>
218     * 1. Data set frequency
219     * <p>
220     * 2. Data set Time unit (day, month, minute)
221     * <p>
222     * 3. Data set Time zone/DST
223     * <p>
224     * 4. End Day/Month flag
225     * <p>
226     * 5. Data set initial instance
227     * <p>
228     * 6. Action Creation Time
229     * <p>
230     * 7. Existence of dataset's directory
231     *
232     * @param n :instance count
233     *        <p>
234     *        domain: n &gt;= 0, n is integer
235     * @param instance: How many future instance it should check? value should
236     *        be &gt;=0
237     * @return date-time in Oozie processing timezone of the n-th instance
238     *         <p>
239     * @throws Exception
240     */
241    public static String ph3_coord_future(int n, int instance) throws Exception {
242        ParamChecker.checkGEZero(n, "future:n");
243        ParamChecker.checkGTZero(instance, "future:instance");
244        if (isSyncDataSet()) {// For Sync Dataset
245            return coord_future_sync(n, instance);
246        }
247        else {
248            throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
249        }
250    }
251
252    /**
253     * Determine the date-time in Oozie processing timezone of the future available dataset instances
254     * from start to end offsets from nominal Time but not beyond the instance specified as 'instance'.
255     * <p>
256     * It depends on:
257     * <p>
258     * 1. Data set frequency
259     * <p>
260     * 2. Data set Time unit (day, month, minute)
261     * <p>
262     * 3. Data set Time zone/DST
263     * <p>
264     * 4. End Day/Month flag
265     * <p>
266     * 5. Data set initial instance
267     * <p>
268     * 6. Action Creation Time
269     * <p>
270     * 7. Existence of dataset's directory
271     *
272     * @param start : start instance offset
273     *        <p>
274     *        domain: start &gt;= 0, start is integer
275     * @param end : end instance offset
276     *        <p>
277     *        domain: end &gt;= 0, end is integer
278     * @param instance: How many future instance it should check? value should
279     *        be &gt;=0
280     * @return date-time in Oozie processing timezone of the instances from start to end offsets
281     *        delimited by comma.
282     *         <p>
283     * @throws Exception
284     */
285    public static String ph3_coord_futureRange(int start, int end, int instance) throws Exception {
286        ParamChecker.checkGEZero(start, "future:n");
287        ParamChecker.checkGEZero(end, "future:n");
288        ParamChecker.checkGTZero(instance, "future:instance");
289        if (isSyncDataSet()) {// For Sync Dataset
290            return coord_futureRange_sync(start, end, instance);
291        }
292        else {
293            throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
294        }
295    }
296
297    private static String coord_future_sync(int n, int instance) throws Exception {
298        return coord_futureRange_sync(n, n, instance);
299    }
300
301    private static String coord_futureRange_sync(int startOffset, int endOffset, int instance) throws Exception {
302        final XLog LOG = XLog.getLog(CoordELFunctions.class);
303        final Thread currentThread = Thread.currentThread();
304        ELEvaluator eval = ELEvaluator.getCurrent();
305        String retVal = "";
306        int datasetFrequency = (int) getDSFrequency();// in minutes
307        TimeUnit dsTimeUnit = getDSTimeUnit();
308        int[] instCount = new int[1];
309        Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
310        StringBuilder resolvedInstances = new StringBuilder();
311        StringBuilder resolvedURIPaths = new StringBuilder();
312        if (nominalInstanceCal != null) {
313            Calendar initInstance = getInitialInstanceCal();
314            nominalInstanceCal = (Calendar) initInstance.clone();
315            nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
316
317            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
318            if (ds == null) {
319                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
320            }
321            String uriTemplate = ds.getUriTemplate();
322            Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
323            if (conf == null) {
324                throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
325            }
326            int available = 0, checkedInstance = 0;
327            boolean resolved = false;
328            String user = ParamChecker
329                    .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
330            String doneFlag = ds.getDoneFlag();
331            URIHandlerService uriService = Services.get().get(URIHandlerService.class);
332            URIHandler uriHandler = null;
333            Context uriContext = null;
334            try {
335                while (instance >= checkedInstance && !currentThread.isInterrupted()) {
336                    ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
337                    String uriPath = uriEval.evaluate(uriTemplate, String.class);
338                    if (uriHandler == null) {
339                        URI uri = new URI(uriPath);
340                        uriHandler = uriService.getURIHandler(uri);
341                        uriContext = uriHandler.getContext(uri, conf, user, true);
342                    }
343                    String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
344                    if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) {
345                        if (available == endOffset) {
346                            LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag);
347                            resolved = true;
348                            resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
349                            resolvedURIPaths.append(uriPath);
350                            retVal = resolvedInstances.toString();
351                            eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString());
352                            break;
353                        }
354                        else if (available >= startOffset) {
355                            LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag);
356                            resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(
357                                    INSTANCE_SEPARATOR);
358                            resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
359
360                        }
361                        available++;
362                    }
363                    // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency);
364                    nominalInstanceCal = (Calendar) initInstance.clone();
365                    instCount[0]++;
366                    nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
367                    checkedInstance++;
368                    // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
369                }
370                if (!StringUtils.isEmpty(resolvedURIPaths.toString()) && eval.getVariable(CoordELConstants.RESOLVED_PATH) == null) {
371                    eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString());
372                }
373
374            }
375            finally {
376                if (uriContext != null) {
377                    uriContext.destroy();
378                }
379            }
380            if (!resolved) {
381                // return unchanged future function with variable 'is_resolved'
382                // to 'false'
383                eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE);
384                if (startOffset == endOffset) {
385                    retVal = "${coord:future(" + startOffset + ", " + instance + ")}";
386                }
387                else {
388                    retVal = "${coord:futureRange(" + startOffset + ", " + endOffset + ", " + instance + ")}";
389                }
390            }
391            else {
392                eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE);
393            }
394        }
395        else {// No feasible nominal time
396            eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE);
397            retVal = "";
398        }
399        return retVal;
400    }
401
402    /**
403     * Return nominal time or Action Creation Time.
404     * <p>
405     *
406     * @return coordinator action creation or materialization date time
407     * @throws Exception if unable to format the Date object to String
408     */
409    public static String ph2_coord_nominalTime() throws Exception {
410        ELEvaluator eval = ELEvaluator.getCurrent();
411        SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
412                "Coordinator Action");
413        return DateUtils.formatDateOozieTZ(action.getNominalTime());
414    }
415
416    public static String ph3_coord_nominalTime() throws Exception {
417        return ph2_coord_nominalTime();
418    }
419
420    /**
421     * Convert from standard date-time formatting to a desired format.
422     * <p>
423     * @param dateTimeStr - A timestamp in standard (ISO8601) format.
424     * @param format - A string representing the desired format.
425     * @return coordinator action creation or materialization date time
426     * @throws Exception if unable to format the Date object to String
427     */
428    public static String ph2_coord_formatTime(String dateTimeStr, String format)
429            throws Exception {
430        Date dateTime = DateUtils.parseDateOozieTZ(dateTimeStr);
431        return DateUtils.formatDateCustom(dateTime, format);
432    }
433
434    public static String ph3_coord_formatTime(String dateTimeStr, String format)
435            throws Exception {
436        return ph2_coord_formatTime(dateTimeStr, format);
437    }
438
439    /**
440     * Convert from standard date-time formatting to a Unix epoch time.
441     * <p/>
442     * @param dateTimeStr - A timestamp in standard (ISO8601) format.
443     * @param millis - "true" to include millis; otherwise will only include seconds
444     * @return coordinator action creation or materialization date time
445     * @throws Exception if unable to format the Date object to String
446     */
447    public static String ph2_coord_epochTime(String dateTimeStr, String millis)
448            throws Exception {
449        Date dateTime = DateUtils.parseDateOozieTZ(dateTimeStr);
450        return DateUtils.formatDateEpoch(dateTime, Boolean.valueOf(millis));
451    }
452
453    public static String ph3_coord_epochTime(String dateTimeStr, String millis)
454            throws Exception {
455        return  ph2_coord_epochTime(dateTimeStr, millis);
456    }
457
458    /**
459     * Return Action Id. <p>
460     *
461     * @return coordinator action Id
462     */
463    public static String ph2_coord_actionId() throws Exception {
464        ELEvaluator eval = ELEvaluator.getCurrent();
465        SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
466                "Coordinator Action");
467        return action.getActionId();
468    }
469
470    public static String ph3_coord_actionId() throws Exception {
471        return ph2_coord_actionId();
472    }
473
474    /**
475     * Return Job Name. <p>
476     *
477     * @return coordinator name
478     */
479    public static String ph2_coord_name() throws Exception {
480        ELEvaluator eval = ELEvaluator.getCurrent();
481        SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
482                "Coordinator Action");
483        return action.getName();
484    }
485
486    public static String ph3_coord_name() throws Exception {
487        return ph2_coord_name();
488    }
489
490    /**
491     * Return Action Start time. <p>
492     *
493     * @return coordinator action start time
494     * @throws Exception if unable to format the Date object to String
495     */
496    public static String ph2_coord_actualTime() throws Exception {
497        ELEvaluator eval = ELEvaluator.getCurrent();
498        SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
499        if (coordAction == null) {
500            throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
501        }
502        return DateUtils.formatDateOozieTZ(coordAction.getActualTime());
503    }
504
505    public static String ph3_coord_actualTime() throws Exception {
506        return ph2_coord_actualTime();
507    }
508
509    /**
510     * Used to specify a list of URI's that are used as input dir to the workflow job. <p> Look for two evaluator-level
511     * variables <p> A) .datain.&lt;DATAIN_NAME&gt; B) .datain.&lt;DATAIN_NAME&gt;.unresolved <p> A defines the current list of
512     * URI. <p> B defines whether there are any unresolved EL-function (i.e latest) <p> If there are something
513     * unresolved, this function will echo back the original function <p> otherwise it sends the uris.
514     *
515     * @param dataInName : Datain name
516     * @return the list of URI's separated by INSTANCE_SEPARATOR <p> if there are unresolved EL function (i.e. latest)
517     *         , echo back <p> the function without resolving the function.
518     */
519    public static String ph3_coord_dataIn(String dataInName) {
520        String uris = "";
521        ELEvaluator eval = ELEvaluator.getCurrent();
522        if (eval.getVariable(".datain." + dataInName) == null
523                && (eval.getVariable(".actionInputLogic") != null && !StringUtils.isEmpty(eval.getVariable(
524                        ".actionInputLogic").toString()))) {
525            try {
526                return new CoordInputLogicEvaluatorUtil().getInputDependencies(dataInName,
527                        (SyncCoordAction) eval.getVariable(COORD_ACTION));
528            }
529            catch (JDOMException e) {
530                XLog.getLog(CoordELFunctions.class).error(e);
531                throw new RuntimeException(e.getMessage());
532            }
533        }
534
535        uris = (String) eval.getVariable(".datain." + dataInName);
536        Object unResolvedObj = eval.getVariable(".datain." + dataInName + ".unresolved");
537        if (unResolvedObj == null) {
538            return uris;
539        }
540        Boolean unresolved = Boolean.parseBoolean(unResolvedObj.toString());
541        if (unresolved != null && unresolved.booleanValue() == true) {
542            return "${coord:dataIn('" + dataInName + "')}";
543        }
544        return uris;
545    }
546
547    /**
548     * Used to specify a list of URI's that are output dir of the workflow job. <p> Look for one evaluator-level
549     * variable <p> dataout.&lt;DATAOUT_NAME&gt; <p> It defines the current list of URI. <p> otherwise it sends the uris.
550     *
551     * @param dataOutName : Dataout name
552     * @return the list of URI's separated by INSTANCE_SEPARATOR
553     */
554    public static String ph3_coord_dataOut(String dataOutName) {
555        String uris = "";
556        ELEvaluator eval = ELEvaluator.getCurrent();
557        uris = (String) eval.getVariable(".dataout." + dataOutName);
558        return uris;
559    }
560
561    /**
562     * Determine the date-time in Oozie processing timezone of n-th dataset instance. <p> It depends on: <p> 1.
563     * Data set frequency <p> 2.
564     * Data set Time unit (day, month, minute) <p> 3. Data set Time zone/DST <p> 4. End Day/Month flag <p> 5. Data
565     * set initial instance <p> 6. Action Creation Time
566     *
567     * @param n instance count domain: n is integer
568     * @return date-time in Oozie processing timezone of the n-th instance returns 'null' means n-th instance is
569     * earlier than Initial-Instance of DS
570     * @throws Exception
571     */
572    public static String ph2_coord_current(int n) throws Exception {
573        if (isSyncDataSet()) { // For Sync Dataset
574            return coord_current_sync(n);
575        }
576        else {
577            throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
578        }
579    }
580
581    /**
582     * Determine the date-time in Oozie processing timezone of current dataset instances
583     * from start to end offsets from the nominal time. <p> It depends
584     * on: <p> 1. Data set frequency <p> 2. Data set Time unit (day, month, minute) <p> 3. Data set Time zone/DST
585     * <p> 4. End Day/Month flag <p> 5. Data set initial instance <p> 6. Action Creation Time
586     *
587     * @param start :start instance offset <p> domain: start &lt;= 0, start is integer
588     * @param end :end instance offset <p> domain: end &lt;= 0, end is integer
589     * @return date-time in Oozie processing timezone of the instances from start to end offsets
590     *        delimited by comma. <p> If the current instance time of the dataset based on the Action Creation Time
591     *        is earlier than the Initial-Instance of DS an empty string is returned.
592     *        If an instance within the range is earlier than Initial-Instance of DS that instance is ignored
593     * @throws Exception
594     */
595    public static String ph2_coord_currentRange(int start, int end) throws Exception {
596        if (isSyncDataSet()) { // For Sync Dataset
597            return coord_currentRange_sync(start, end);
598        }
599        else {
600            throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
601        }
602    }
603    /**
604     * Determine the date-time in Oozie processing timezone of the given offset from the dataset effective nominal time. <p> It
605     * depends on: <p> 1. Data set frequency <p> 2. Data set Time Unit <p> 3. Data set Time zone/DST
606     * <p> 4. Data set initial instance <p> 5. Action Creation Time
607     *
608     * @param n offset amount (integer)
609     * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR")
610     * @return date-time in Oozie processing timezone of the given offset from the dataset effective nominal time
611     * @throws Exception if there was a problem formatting
612     */
613    public static String ph2_coord_offset(int n, String timeUnit) throws Exception {
614        if (isSyncDataSet()) { // For Sync Dataset
615            return coord_offset_sync(n, timeUnit);
616        }
617        else {
618            throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
619        }
620    }
621
622    /**
623     * Determine how many hours is on the date of n-th dataset instance. <p> It depends on: <p> 1. Data set frequency
624     * <p> 2. Data set Time unit (day, month, minute) <p> 3. Data set Time zone/DST <p> 4. End Day/Month flag <p> 5.
625     * Data set initial instance <p> 6. Action Creation Time
626     *
627     * @param n instance count <p> domain: n is integer
628     * @return number of hours on that day <p> returns -1 means n-th instance is earlier than Initial-Instance of DS
629     * @throws Exception
630     */
631    public static int ph2_coord_hoursInDay(int n) throws Exception {
632        int datasetFrequency = (int) getDSFrequency();
633        // /Calendar nominalInstanceCal =
634        // getCurrentInstance(getActionCreationtime());
635        Calendar nominalInstanceCal = getEffectiveNominalTime();
636        if (nominalInstanceCal == null) {
637            return -1;
638        }
639        nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n);
640        /*
641         * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0)
642         * { return -1; }
643         */
644        nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ
645        // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
646        return DateUtils.hoursInDay(nominalInstanceCal);
647    }
648
649    public static int ph3_coord_hoursInDay(int n) throws Exception {
650        return ph2_coord_hoursInDay(n);
651    }
652
653    /**
654     * Calculate number of days in one month for n-th dataset instance. <p> It depends on: <p> 1. Data set frequency .
655     * <p> 2. Data set Time unit (day, month, minute) <p> 3. Data set Time zone/DST <p> 4. End Day/Month flag <p> 5.
656     * Data set initial instance <p> 6. Action Creation Time
657     *
658     * @param n instance count. domain: n is integer
659     * @return number of days in that month <p> returns -1 means n-th instance is earlier than Initial-Instance of DS
660     * @throws Exception
661     */
662    public static int ph2_coord_daysInMonth(int n) throws Exception {
663        int datasetFrequency = (int) getDSFrequency();// in minutes
664        // Calendar nominalInstanceCal =
665        // getCurrentInstance(getActionCreationtime());
666        Calendar nominalInstanceCal = getEffectiveNominalTime();
667        if (nominalInstanceCal == null) {
668            return -1;
669        }
670        nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n);
671        /*
672         * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0)
673         * { return -1; }
674         */
675        nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ
676        // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
677        return nominalInstanceCal.getActualMaximum(Calendar.DAY_OF_MONTH);
678    }
679
680    public static int ph3_coord_daysInMonth(int n) throws Exception {
681        return ph2_coord_daysInMonth(n);
682    }
683
684    /**
685     * Determine the date-time in Oozie processing timezone of n-th latest available dataset instance. <p> It depends
686     * on: <p> 1. Data set frequency <p> 2. Data set Time unit (day, month, minute) <p> 3. Data set Time zone/DST
687     * <p> 4. End Day/Month flag <p> 5. Data set initial instance <p> 6. Action Creation Time <p> 7. Existence of
688     * dataset's directory
689     *
690     * @param n :instance count <p> domain: n &lt;= 0, n is integer
691     * @return date-time in Oozie processing timezone of the n-th instance <p> returns 'null' means n-th instance is
692     * earlier than Initial-Instance of DS
693     * @throws Exception
694     */
695    public static String ph3_coord_latest(int n) throws Exception {
696        ParamChecker.checkLEZero(n, "latest:n");
697        if (isSyncDataSet()) {// For Sync Dataset
698            return coord_latest_sync(n);
699        }
700        else {
701            throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
702        }
703    }
704
705    /**
706     * Determine the date-time in Oozie processing timezone of latest available dataset instances
707     * from start to end offsets from the nominal time. <p> It depends
708     * on: <p> 1. Data set frequency <p> 2. Data set Time unit (day, month, minute) <p> 3. Data set Time zone/DST
709     * <p> 4. End Day/Month flag <p> 5. Data set initial instance <p> 6. Action Creation Time <p> 7. Existence of
710     * dataset's directory
711     *
712     * @param start :start instance offset <p> domain: start &lt;= 0, start is integer
713     * @param end :end instance offset <p> domain: end &lt;= 0, end is integer
714     * @return date-time in Oozie processing timezone of the instances from start to end offsets
715     *        delimited by comma. <p> returns 'null' means start offset instance is
716     *        earlier than Initial-Instance of DS
717     * @throws Exception
718     */
719    public static String ph3_coord_latestRange(int start, int end) throws Exception {
720        ParamChecker.checkLEZero(start, "latest:n");
721        ParamChecker.checkLEZero(end, "latest:n");
722        if (isSyncDataSet()) {// For Sync Dataset
723            return coord_latestRange_sync(start, end);
724        }
725        else {
726            throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
727        }
728    }
729
730    /**
731     * Configure an evaluator with data set and application specific information. <p> Helper method of associating
732     * dataset and application object
733     *
734     * @param evaluator : to set variables
735     * @param ds : Data Set object
736     * @param coordAction : Application instance
737     */
738    public static void configureEvaluator(ELEvaluator evaluator, SyncCoordDataset ds, SyncCoordAction coordAction) {
739        evaluator.setVariable(COORD_ACTION, coordAction);
740        evaluator.setVariable(DATASET, ds);
741    }
742
743    /**
744     * Helper method to wrap around with "${..}". <p>
745     *
746     *
747     * @param eval :EL evaluator
748     * @param expr : expression to evaluate
749     * @return Resolved expression or echo back the same expression
750     * @throws Exception
751     */
752    public static String evalAndWrap(ELEvaluator eval, String expr) throws Exception {
753        try {
754            eval.setVariable(".wrap", null);
755            String result = eval.evaluate(expr, String.class);
756            if (eval.getVariable(".wrap") != null) {
757                return "${" + result + "}";
758            }
759            else {
760                return result;
761            }
762        }
763        catch (Exception e) {
764            throw new Exception("Unable to evaluate :" + expr + ":\n", e);
765        }
766    }
767
768    // Set of echo functions
769
770    public static String ph1_coord_current_echo(String n) {
771        return echoUnResolved("current", n);
772    }
773
774    public static String ph1_coord_absolute_echo(String date) {
775        return echoUnResolved("absolute", date);
776    }
777
778    public static String ph1_coord_currentRange_echo(String start, String end) {
779        return echoUnResolved("currentRange", start + ", " + end);
780    }
781
782    public static String ph1_coord_offset_echo(String n, String timeUnit) {
783        return echoUnResolved("offset", n + " , " + timeUnit);
784    }
785
786    public static String ph2_coord_current_echo(String n) {
787        return echoUnResolved("current", n);
788    }
789
790    public static String ph2_coord_currentRange_echo(String start, String end) {
791        return echoUnResolved("currentRange", start + ", " + end);
792    }
793
794    public static String ph2_coord_offset_echo(String n, String timeUnit) {
795        return echoUnResolved("offset", n + " , " + timeUnit);
796    }
797
798    public static String ph2_coord_absolute_echo(String date) {
799        return echoUnResolved("absolute", date);
800    }
801
802    public static String ph2_coord_absolute_range(String startInstance, int end) throws Exception {
803        int[] instanceCount = new int[1];
804
805        // getCurrentInstance() returns null, which means startInstance is less
806        // than initial instance
807        if (getCurrentInstance(DateUtils.getCalendar(startInstance).getTime(), instanceCount) == null) {
808            throw new CommandException(ErrorCode.E1010,
809                    "intial-instance should be equal or earlier than the start-instance. intial-instance is "
810                            + getInitialInstance() + " and start-instance is " + startInstance);
811        }
812        int[] nominalCount = new int[1];
813        if (getCurrentInstance(getActionCreationtime(), nominalCount) == null) {
814            throw new CommandException(ErrorCode.E1010,
815                    "intial-instance should be equal or earlier than the nominal time. intial-instance is "
816                            + getInitialInstance() + " and nominal time is " + getActionCreationtime());
817        }
818        // getCurrentInstance return offset relative to initial instance.
819        // start instance offset - nominal offset = start offset relative to
820        // nominal time-stamp.
821        int start = instanceCount[0] - nominalCount[0];
822        if (start > end) {
823            throw new CommandException(ErrorCode.E1010,
824                    "start-instance should be equal or earlier than the end-instance. startInstance is "
825                            + startInstance + " which is equivalent to current (" + instanceCount[0]
826                            + ") but end is specified as current (" + end + ")");
827        }
828        return ph2_coord_currentRange(start, end);
829    }
830
831    public static String ph1_coord_dateOffset_echo(String n, String offset, String unit) {
832        return echoUnResolved("dateOffset", n + " , " + offset + " , " + unit);
833    }
834
835    public static String ph1_coord_dateTzOffset_echo(String n, String timezone) {
836        return echoUnResolved("dateTzOffset", n + " , " + timezone);
837    }
838
839    public static String ph1_coord_epochTime_echo(String dateTime, String millis) {
840        // Quote the dateTime value since it would contain a ':'.
841        return echoUnResolved("epochTime", "'"+dateTime+"'" + " , " + millis);
842    }
843
844    public static String ph1_coord_formatTime_echo(String dateTime, String format) {
845        // Quote the dateTime value since it would contain a ':'.
846        return echoUnResolved("formatTime", "'"+dateTime+"'" + " , " + format);
847    }
848
849    public static String ph1_coord_latest_echo(String n) {
850        return echoUnResolved("latest", n);
851    }
852
853    public static String ph2_coord_latest_echo(String n) {
854        return ph1_coord_latest_echo(n);
855    }
856
857    public static String ph1_coord_future_echo(String n, String instance) {
858        return echoUnResolved("future", n + ", " + instance + "");
859    }
860
861    public static String ph2_coord_future_echo(String n, String instance) {
862        return ph1_coord_future_echo(n, instance);
863    }
864
865    public static String ph1_coord_latestRange_echo(String start, String end) {
866        return echoUnResolved("latestRange", start + ", " + end);
867    }
868
869    public static String ph2_coord_latestRange_echo(String start, String end) {
870        return ph1_coord_latestRange_echo(start, end);
871    }
872
873    public static String ph1_coord_futureRange_echo(String start, String end, String instance) {
874        return echoUnResolved("futureRange", start + ", " + end + ", " + instance);
875    }
876
877    public static String ph2_coord_futureRange_echo(String start, String end, String instance) {
878        return ph1_coord_futureRange_echo(start, end, instance);
879    }
880
881    public static String ph1_coord_dataIn_echo(String n) {
882        ELEvaluator eval = ELEvaluator.getCurrent();
883        String val = (String) eval.getVariable("oozie.dataname." + n);
884        if ((val == null || val.equals("data-in") == false)) {
885            XLog.getLog(CoordELFunctions.class).error("data_in_name " + n + " is not valid");
886            throw new RuntimeException("data_in_name " + n + " is not valid");
887        }
888        return echoUnResolved("dataIn", "'" + n + "'");
889    }
890
891    public static String ph1_coord_dataOut_echo(String n) {
892        ELEvaluator eval = ELEvaluator.getCurrent();
893        String val = (String) eval.getVariable("oozie.dataname." + n);
894        if (val == null || val.equals("data-out") == false) {
895            XLog.getLog(CoordELFunctions.class).error("data_out_name " + n + " is not valid");
896            throw new RuntimeException("data_out_name " + n + " is not valid");
897        }
898        return echoUnResolved("dataOut", "'" + n + "'");
899    }
900
901    public static String ph1_coord_nominalTime_echo() {
902        return echoUnResolved("nominalTime", "");
903    }
904
905    public static String ph1_coord_nominalTime_echo_wrap() {
906        // return "${coord:nominalTime()}"; // no resolution
907        return echoUnResolved("nominalTime", "");
908    }
909
910    public static String ph1_coord_nominalTime_echo_fixed() {
911        return "2009-03-06T010:00"; // Dummy resolution
912    }
913
914    public static String ph1_coord_actualTime_echo_wrap() {
915        // return "${coord:actualTime()}"; // no resolution
916        return echoUnResolved("actualTime", "");
917    }
918
919    public static String ph1_coord_actionId_echo() {
920        return echoUnResolved("actionId", "");
921    }
922
923    public static String ph1_coord_name_echo() {
924        return echoUnResolved("name", "");
925    }
926
927    // The following echo functions are not used in any phases yet
928    // They are here for future purpose.
929    public static String coord_minutes_echo(String n) {
930        return echoUnResolved("minutes", n);
931    }
932
933    public static String coord_hours_echo(String n) {
934        return echoUnResolved("hours", n);
935    }
936
937    public static String coord_days_echo(String n) {
938        return echoUnResolved("days", n);
939    }
940
941    public static String coord_endOfDay_echo(String n) {
942        return echoUnResolved("endOfDay", n);
943    }
944
945    public static String coord_months_echo(String n) {
946        return echoUnResolved("months", n);
947    }
948
949    public static String coord_endOfMonth_echo(String n) {
950        return echoUnResolved("endOfMonth", n);
951    }
952
953    public static String coord_actualTime_echo() {
954        return echoUnResolved("actualTime", "");
955    }
956
957    // This echo function will always return "24" for validation only.
958    // This evaluation ****should not**** replace the original XML
959    // Create a temporary string and validate the function
960    // This is **required** for evaluating an expression like
961    // coord:HoursInDay(0) + 3
962    // actual evaluation will happen in phase 2 or phase 3.
963    public static String ph1_coord_hoursInDay_echo(String n) {
964        return "24";
965        // return echoUnResolved("hoursInDay", n);
966    }
967
968    // This echo function will always return "30" for validation only.
969    // This evaluation ****should not**** replace the original XML
970    // Create a temporary string and validate the function
971    // This is **required** for evaluating an expression like
972    // coord:daysInMonth(0) + 3
973    // actual evaluation will happen in phase 2 or phase 3.
974    public static String ph1_coord_daysInMonth_echo(String n) {
975        // return echoUnResolved("daysInMonth", n);
976        return "30";
977    }
978
979    // This echo function will always return "3" for validation only.
980    // This evaluation ****should not**** replace the original XML
981    // Create a temporary string and validate the function
982    // This is **required** for evaluating an expression like coord:tzOffset + 2
983    // actual evaluation will happen in phase 2 or phase 3.
984    public static String ph1_coord_tzOffset_echo() {
985        // return echoUnResolved("tzOffset", "");
986        return "3";
987    }
988
989    // Local methods
990    /**
991     * @param n
992     * @return n-th instance Date-Time from current instance for data-set <p> return empty string ("") if the
993     *         Action_Creation_time or the n-th instance <p> is earlier than the Initial_Instance of dataset.
994     * @throws Exception
995     */
996    private static String coord_current_sync(int n) throws Exception {
997        return coord_currentRange_sync(n, n);
998    }
999
1000    private static String coord_currentRange_sync(int start, int end) throws Exception {
1001        final XLog LOG = XLog.getLog(CoordELFunctions.class);
1002        int datasetFrequency = getDSFrequency();// in minutes
1003        TimeUnit dsTimeUnit = getDSTimeUnit();
1004        int[] instCount = new int[1];// used as pass by ref
1005        Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
1006        if (nominalInstanceCal == null) {
1007            LOG.warn("If the initial instance of the dataset is later than the nominal time, an empty string is"
1008                    + " returned. This means that no data is available at the current-instance specified by the user"
1009                    + " and the user could try modifying his initial-instance to an earlier time.");
1010            return "";
1011        } else {
1012            Calendar initInstance = getInitialInstanceCal();
1013            // Add in the reverse order - newest instance first.
1014            nominalInstanceCal = (Calendar) initInstance.clone();
1015            nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), (instCount[0] + start) * datasetFrequency);
1016            List<String> instances = new ArrayList<String>();
1017            for (int i = start; i <= end; i++) {
1018                if (nominalInstanceCal.compareTo(initInstance) < 0) {
1019                    LOG.warn("If the initial instance of the dataset is later than the current-instance specified,"
1020                            + " such as coord:current({0}) in this case, an empty string is returned. This means that"
1021                            + " no data is available at the current-instance specified by the user and the user could"
1022                            + " try modifying his initial-instance to an earlier time.", start);
1023                }
1024                else {
1025                    instances.add(DateUtils.formatDateOozieTZ(nominalInstanceCal));
1026                }
1027                nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency);
1028            }
1029            instances = Lists.reverse(instances);
1030            return StringUtils.join(instances, CoordELFunctions.INSTANCE_SEPARATOR);
1031        }
1032    }
1033
1034    /**
1035     *
1036     * @param n offset amount (integer)
1037     * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR")
1038     * @return the offset time from the effective nominal time <p> return empty string ("") if the Action_Creation_time or the
1039     *         offset instance <p> is earlier than the Initial_Instance of dataset.
1040     * @throws Exception
1041     */
1042    private static String coord_offset_sync(int n, String timeUnit) throws Exception {
1043        Calendar rawCal = resolveOffsetRawTime(n, TimeUnit.valueOf(timeUnit), null);
1044        if (rawCal == null) {
1045            // warning already logged by resolveOffsetRawTime()
1046            return "";
1047        }
1048
1049        int freq = getDSFrequency();
1050        TimeUnit freqUnit = getDSTimeUnit();
1051        int freqCount = 0;
1052        // We're going to manually turn back/forward cal by decrements/increments of freq and then check that it gives the same
1053        // time as rawCal; this is to check that the offset time resolves to a frequency offset of the effective nominal time
1054        // In other words, that there exists an integer x, such that coord:offset(n, timeUnit) == coord:current(x) is true
1055        // If not, then we'll "rewind" rawCal to the latest instance earlier than rawCal and use that.
1056        Calendar cal = getInitialInstanceCal();
1057        if (rawCal.before(cal)) {
1058            while (cal.after(rawCal)) {
1059                cal.add(freqUnit.getCalendarUnit(), -freq);
1060                freqCount--;
1061            }
1062        }
1063        else if (rawCal.after(cal)) {
1064            while (cal.before(rawCal)) {
1065                cal.add(freqUnit.getCalendarUnit(), freq);
1066                freqCount++;
1067            }
1068        }
1069        if (cal.before(rawCal)) {
1070            rawCal = cal;
1071        }
1072        else if (cal.after(rawCal)) {
1073            cal.add(freqUnit.getCalendarUnit(), -freq);
1074            rawCal = cal;
1075            freqCount--;
1076        }
1077        String rawCalStr = DateUtils.formatDateOozieTZ(rawCal);
1078
1079        Calendar nominalInstanceCal = getInitialInstanceCal();
1080        nominalInstanceCal.add(freqUnit.getCalendarUnit(), freq * freqCount);
1081        if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) {
1082            XLog.getLog(CoordELFunctions.class).warn("If the initial instance of the dataset is later than the offset instance"
1083                    + " specified, such as coord:offset({0}, {1}) in this case, an empty string is returned. This means that no"
1084                    + " data is available at the offset instance specified by the user and the user could try modifying his"
1085                    + " initial-instance to an earlier time.", n, timeUnit);
1086            return "";
1087        }
1088        String nominalCalStr = DateUtils.formatDateOozieTZ(nominalInstanceCal);
1089
1090        if (!rawCalStr.equals(nominalCalStr)) {
1091            throw new RuntimeException("Shouldn't happen");
1092        }
1093        return rawCalStr;
1094    }
1095
1096    /**
1097     * @param offset
1098     * @return n-th available latest instance Date-Time for SYNC data-set
1099     * @throws Exception
1100     */
1101    private static String coord_latest_sync(int offset) throws Exception {
1102        return coord_latestRange_sync(offset, offset);
1103    }
1104
1105    private static String coord_latestRange_sync(int startOffset, int endOffset) throws Exception {
1106        final XLog LOG = XLog.getLog(CoordELFunctions.class);
1107        final Thread currentThread = Thread.currentThread();
1108        ELEvaluator eval = ELEvaluator.getCurrent();
1109        String retVal = "";
1110        int datasetFrequency = (int) getDSFrequency();// in minutes
1111        TimeUnit dsTimeUnit = getDSTimeUnit();
1112        int[] instCount = new int[1];
1113        boolean useCurrentTime = Services.get().getConf().getBoolean(LATEST_EL_USE_CURRENT_TIME, false);
1114        Calendar nominalInstanceCal;
1115        if (useCurrentTime) {
1116            nominalInstanceCal = getCurrentInstance(new Date(), instCount);
1117        }
1118        else {
1119            nominalInstanceCal = getCurrentInstance(getActualTime(), instCount);
1120        }
1121        StringBuilder resolvedInstances = new StringBuilder();
1122        StringBuilder resolvedURIPaths = new StringBuilder();
1123        if (nominalInstanceCal != null) {
1124            Calendar initInstance = getInitialInstanceCal();
1125            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1126            if (ds == null) {
1127                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1128            }
1129            String uriTemplate = ds.getUriTemplate();
1130            Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
1131            if (conf == null) {
1132                throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
1133            }
1134            int available = 0;
1135            boolean resolved = false;
1136            String user = ParamChecker
1137                    .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
1138            String doneFlag = ds.getDoneFlag();
1139            URIHandlerService uriService = Services.get().get(URIHandlerService.class);
1140            URIHandler uriHandler = null;
1141            Context uriContext = null;
1142            try {
1143                while (nominalInstanceCal.compareTo(initInstance) >= 0 && !currentThread.isInterrupted()) {
1144                    ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
1145                    String uriPath = uriEval.evaluate(uriTemplate, String.class);
1146                    if (uriHandler == null) {
1147                        URI uri = new URI(uriPath);
1148                        uriHandler = uriService.getURIHandler(uri);
1149                        uriContext = uriHandler.getContext(uri, conf, user, true);
1150                    }
1151                    String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
1152                    if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) {
1153                        XLog.getLog(CoordELFunctions.class)
1154                        .debug("Found latest(" + available + "): " + uriWithDoneFlag);
1155                        if (available == startOffset) {
1156                            LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag);
1157                            resolved = true;
1158                            resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
1159                            resolvedURIPaths.append(uriPath);
1160                            retVal = resolvedInstances.toString();
1161                            eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString());
1162
1163                            break;
1164                        }
1165                        else if (available <= endOffset) {
1166                            LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag);
1167                            resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(
1168                                    INSTANCE_SEPARATOR);
1169                            resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
1170                        }
1171
1172                        available--;
1173                    }
1174                    // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), -datasetFrequency);
1175                    nominalInstanceCal = (Calendar) initInstance.clone();
1176                    instCount[0]--;
1177                    nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
1178                    // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
1179                }
1180                if (!StringUtils.isEmpty(resolvedURIPaths.toString()) && eval.getVariable(CoordELConstants.RESOLVED_PATH) == null) {
1181                    eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString());
1182                }
1183            }
1184            finally {
1185                if (uriContext != null) {
1186                    uriContext.destroy();
1187                }
1188            }
1189            if (!resolved) {
1190                // return unchanged latest function with variable 'is_resolved'
1191                // to 'false'
1192                eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE);
1193                if (startOffset == endOffset) {
1194                    retVal = "${coord:latest(" + startOffset + ")}";
1195                }
1196                else {
1197                    retVal = "${coord:latestRange(" + startOffset + "," + endOffset + ")}";
1198                }
1199            }
1200            else {
1201                eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE);
1202            }
1203        }
1204        else {// No feasible nominal time
1205            eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE);
1206        }
1207        return retVal;
1208    }
1209
1210    /**
1211     * @param tm
1212     * @return a new Evaluator to be used for URI-template evaluation
1213     */
1214    private static ELEvaluator getUriEvaluator(Calendar tm) {
1215        tm.setTimeZone(DateUtils.getOozieProcessingTimeZone());
1216        ELEvaluator retEval = new ELEvaluator();
1217        retEval.setVariable("YEAR", tm.get(Calendar.YEAR));
1218        retEval.setVariable("MONTH", (tm.get(Calendar.MONTH) + 1) < 10 ? "0" + (tm.get(Calendar.MONTH) + 1) : (tm
1219                .get(Calendar.MONTH) + 1));
1220        retEval.setVariable("DAY", tm.get(Calendar.DAY_OF_MONTH) < 10 ? "0" + tm.get(Calendar.DAY_OF_MONTH) : tm
1221                .get(Calendar.DAY_OF_MONTH));
1222        retEval.setVariable("HOUR", tm.get(Calendar.HOUR_OF_DAY) < 10 ? "0" + tm.get(Calendar.HOUR_OF_DAY) : tm
1223                .get(Calendar.HOUR_OF_DAY));
1224        retEval.setVariable("MINUTE", tm.get(Calendar.MINUTE) < 10 ? "0" + tm.get(Calendar.MINUTE) : tm
1225                .get(Calendar.MINUTE));
1226        return retEval;
1227    }
1228
1229    /**
1230     * @return whether a data set is SYNCH or ASYNC
1231     */
1232    private static boolean isSyncDataSet() {
1233        ELEvaluator eval = ELEvaluator.getCurrent();
1234        SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1235        if (ds == null) {
1236            throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1237        }
1238        return ds.getType().equalsIgnoreCase("SYNC");
1239    }
1240
1241    /**
1242     * Check whether a function should be resolved.
1243     *
1244     * @param functionName
1245     * @param n
1246     * @return null if the functionName needs to be resolved otherwise return the calling function unresolved.
1247     */
1248    private static String checkIfResolved(String functionName, String n) {
1249        ELEvaluator eval = ELEvaluator.getCurrent();
1250        String replace = (String) eval.getVariable("resolve_" + functionName);
1251        if (replace == null || (replace != null && replace.equalsIgnoreCase("false"))) { // Don't
1252            // resolve
1253            // return "${coord:" + functionName + "(" + n +")}"; //Unresolved
1254            eval.setVariable(".wrap", "true");
1255            return "coord:" + functionName + "(" + n + ")"; // Unresolved
1256        }
1257        return null; // Resolved it
1258    }
1259
1260    private static String echoUnResolved(String functionName, String n) {
1261        return echoUnResolvedPre(functionName, n, "coord:");
1262    }
1263
1264    private static String echoUnResolvedPre(String functionName, String n, String prefix) {
1265        ELEvaluator eval = ELEvaluator.getCurrent();
1266        eval.setVariable(".wrap", "true");
1267        return prefix + functionName + "(" + n + ")"; // Unresolved
1268    }
1269
1270    /**
1271     * @return the initial instance of a DataSet in DATE
1272     */
1273    private static Date getInitialInstance() {
1274        ELEvaluator eval = ELEvaluator.getCurrent();
1275        return getInitialInstance(eval);
1276    }
1277
1278    /**
1279     * @return the initial instance of a DataSet in DATE
1280     */
1281    private static Date getInitialInstance(ELEvaluator eval) {
1282        return getInitialInstanceCal(eval).getTime();
1283        // return ds.getInitInstance();
1284    }
1285
1286    /**
1287     * @return the initial instance of a DataSet in Calendar
1288     */
1289    private static Calendar getInitialInstanceCal() {
1290        ELEvaluator eval = ELEvaluator.getCurrent();
1291        return getInitialInstanceCal(eval);
1292    }
1293
1294    /**
1295     * @return the initial instance of a DataSet in Calendar
1296     */
1297    private static Calendar getInitialInstanceCal(ELEvaluator eval) {
1298        SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1299        if (ds == null) {
1300            throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1301        }
1302        Calendar effInitTS = new GregorianCalendar(ds.getTimeZone());
1303        effInitTS.setTime(ds.getInitInstance());
1304        // To adjust EOD/EOM
1305        DateUtils.moveToEnd(effInitTS, getDSEndOfFlag(eval));
1306        return effInitTS;
1307        // return ds.getInitInstance();
1308    }
1309
1310    /**
1311     * @return Nominal or action creation Time when all the dependencies of an application instance are met.
1312     */
1313    private static Date getActionCreationtime() {
1314        ELEvaluator eval = ELEvaluator.getCurrent();
1315        return getActionCreationtime(eval);
1316    }
1317
1318    /**
1319     * @return Nominal or action creation Time when all the dependencies of an application instance are met.
1320     */
1321    private static Date getActionCreationtime(ELEvaluator eval) {
1322        SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
1323        if (coordAction == null) {
1324            throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
1325        }
1326        return coordAction.getNominalTime();
1327    }
1328
1329    /**
1330     * @return Actual Time when all the dependencies of an application instance are met.
1331     */
1332    private static Date getActualTime() {
1333        ELEvaluator eval = ELEvaluator.getCurrent();
1334        SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
1335        if (coordAction == null) {
1336            throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
1337        }
1338        return coordAction.getActualTime();
1339    }
1340
1341    /**
1342     * @return TimeZone for the application or job.
1343     */
1344    private static TimeZone getJobTZ() {
1345        ELEvaluator eval = ELEvaluator.getCurrent();
1346        SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
1347        if (coordAction == null) {
1348            throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
1349        }
1350        return coordAction.getTimeZone();
1351    }
1352
1353    /**
1354     * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time)
1355     *
1356     * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
1357     *         the dataset.
1358     */
1359    public static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[]) {
1360        ELEvaluator eval = ELEvaluator.getCurrent();
1361        return getCurrentInstance(effectiveTime, instanceCount, eval);
1362    }
1363
1364    /**
1365     * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time)
1366     *
1367     * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
1368     *         the dataset.
1369     */
1370    private static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[], ELEvaluator eval) {
1371        Date datasetInitialInstance = getInitialInstance(eval);
1372        TimeUnit dsTimeUnit = getDSTimeUnit(eval);
1373        TimeZone dsTZ = getDatasetTZ(eval);
1374        int dsFreq = getDSFrequency(eval);
1375        // Convert Date to Calendar for corresponding TZ
1376        Calendar current = Calendar.getInstance(dsTZ);
1377        current.setTime(datasetInitialInstance);
1378
1379        Calendar calEffectiveTime = new GregorianCalendar(dsTZ);
1380        calEffectiveTime.setTime(effectiveTime);
1381        if (instanceCount == null) {    // caller doesn't care about this value
1382            instanceCount = new int[1];
1383        }
1384        instanceCount[0] = 0;
1385        if (current.compareTo(calEffectiveTime) > 0) {
1386            return null;
1387        }
1388
1389        switch(dsTimeUnit) {
1390            case MINUTE:
1391                instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / MINUTE_MSEC);
1392                break;
1393            case HOUR:
1394                instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / HOUR_MSEC);
1395                break;
1396            case DAY:
1397            case END_OF_DAY:
1398                instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / DAY_MSEC);
1399                break;
1400            case MONTH:
1401            case END_OF_MONTH:
1402                int diffYear = calEffectiveTime.get(Calendar.YEAR) - current.get(Calendar.YEAR);
1403                instanceCount[0] = diffYear * 12 + calEffectiveTime.get(Calendar.MONTH) - current.get(Calendar.MONTH);
1404                break;
1405            case YEAR:
1406                instanceCount[0] = calEffectiveTime.get(Calendar.YEAR) - current.get(Calendar.YEAR);
1407                break;
1408            default:
1409                throw new IllegalArgumentException("Unhandled dataset time unit " + dsTimeUnit);
1410        }
1411
1412        if (instanceCount[0] > 2) {
1413            instanceCount[0] = (instanceCount[0] / dsFreq);
1414            current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq);
1415        } else {
1416            instanceCount[0] = 0;
1417        }
1418        while (!current.getTime().after(effectiveTime)) {
1419            current.add(dsTimeUnit.getCalendarUnit(), dsFreq);
1420            instanceCount[0]++;
1421        }
1422        current.add(dsTimeUnit.getCalendarUnit(), -dsFreq);
1423        instanceCount[0]--;
1424        return current;
1425    }
1426
1427    /**
1428     * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time)
1429     *
1430     * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
1431     *         the dataset.
1432     */
1433    private static Calendar getCurrentInstance_old(Date effectiveTime, int instanceCount[], ELEvaluator eval) {
1434        Date datasetInitialInstance = getInitialInstance(eval);
1435        TimeUnit dsTimeUnit = getDSTimeUnit(eval);
1436        TimeZone dsTZ = getDatasetTZ(eval);
1437        int dsFreq = getDSFrequency(eval);
1438        // Convert Date to Calendar for corresponding TZ
1439        Calendar current = Calendar.getInstance();
1440        current.setTime(datasetInitialInstance);
1441        current.setTimeZone(dsTZ);
1442
1443        Calendar calEffectiveTime = Calendar.getInstance();
1444        calEffectiveTime.setTime(effectiveTime);
1445        calEffectiveTime.setTimeZone(dsTZ);
1446        if (instanceCount == null) {    // caller doesn't care about this value
1447            instanceCount = new int[1];
1448        }
1449        instanceCount[0] = 0;
1450        if (current.compareTo(calEffectiveTime) > 0) {
1451            return null;
1452        }
1453        Calendar origCurrent = (Calendar) current.clone();
1454        while (current.compareTo(calEffectiveTime) <= 0) {
1455            current = (Calendar) origCurrent.clone();
1456            instanceCount[0]++;
1457            current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq);
1458        }
1459        instanceCount[0]--;
1460
1461        current = (Calendar) origCurrent.clone();
1462        current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq);
1463        return current;
1464    }
1465
1466    public static Calendar getEffectiveNominalTime() {
1467        Date datasetInitialInstance = getInitialInstance();
1468        TimeZone dsTZ = getDatasetTZ();
1469        // Convert Date to Calendar for corresponding TZ
1470        Calendar current = Calendar.getInstance();
1471        current.setTime(datasetInitialInstance);
1472        current.setTimeZone(dsTZ);
1473
1474        Calendar calEffectiveTime = Calendar.getInstance();
1475        calEffectiveTime.setTime(getActionCreationtime());
1476        calEffectiveTime.setTimeZone(dsTZ);
1477        if (current.compareTo(calEffectiveTime) > 0) {
1478            // Nominal Time < initial Instance
1479            // TODO: getClass() call doesn't work from static method.
1480            // XLog.getLog("CoordELFunction.class").warn("ACTION CREATED BEFORE INITIAL INSTACE "+
1481            // current.getTime());
1482            return null;
1483        }
1484        return calEffectiveTime;
1485    }
1486
1487    /**
1488     * @return dataset frequency in minutes
1489     */
1490    private static int getDSFrequency() {
1491        ELEvaluator eval = ELEvaluator.getCurrent();
1492        return getDSFrequency(eval);
1493    }
1494
1495    /**
1496     * @return dataset frequency in minutes
1497     */
1498    private static int getDSFrequency(ELEvaluator eval) {
1499        SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1500        if (ds == null) {
1501            throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1502        }
1503        return ds.getFrequency();
1504    }
1505
1506    /**
1507     * @return dataset TimeUnit
1508     */
1509    private static TimeUnit getDSTimeUnit() {
1510        ELEvaluator eval = ELEvaluator.getCurrent();
1511        return getDSTimeUnit(eval);
1512    }
1513
1514    /**
1515     * @return dataset TimeUnit
1516     */
1517    public static TimeUnit getDSTimeUnit(ELEvaluator eval) {
1518        SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1519        if (ds == null) {
1520            throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1521        }
1522        return ds.getTimeUnit();
1523    }
1524
1525    /**
1526     * @return dataset TimeZone
1527     */
1528    public static TimeZone getDatasetTZ() {
1529        ELEvaluator eval = ELEvaluator.getCurrent();
1530        return getDatasetTZ(eval);
1531    }
1532
1533    /**
1534     * @return dataset TimeZone
1535     */
1536    private static TimeZone getDatasetTZ(ELEvaluator eval) {
1537        SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1538        if (ds == null) {
1539            throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1540        }
1541        return ds.getTimeZone();
1542    }
1543
1544    /**
1545     * @return dataset TimeUnit
1546     */
1547    private static TimeUnit getDSEndOfFlag() {
1548        ELEvaluator eval = ELEvaluator.getCurrent();
1549        return getDSEndOfFlag(eval);
1550    }
1551
1552    /**
1553     * @return dataset TimeUnit
1554     */
1555    private static TimeUnit getDSEndOfFlag(ELEvaluator eval) {
1556        SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1557        if (ds == null) {
1558            throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1559        }
1560        return ds.getEndOfDuration();// == null ? "": ds.getEndOfDuration();
1561    }
1562
1563    /**
1564     * Return a job configuration property for the coordinator.
1565     *
1566     * @param property property name.
1567     * @return the value of the property, <code>null</code> if the property is undefined.
1568     */
1569    public static String coord_conf(String property) {
1570        ELEvaluator eval = ELEvaluator.getCurrent();
1571        return (String) eval.getVariable(property);
1572    }
1573
1574    /**
1575     * Return the user that submitted the coordinator job.
1576     *
1577     * @return the user that submitted the coordinator job.
1578     */
1579    public static String coord_user() {
1580        ELEvaluator eval = ELEvaluator.getCurrent();
1581        return (String) eval.getVariable(OozieClient.USER_NAME);
1582    }
1583
1584    /**
1585     * Takes two offset times and returns a list of multiples of the frequency offset from the effective nominal time that occur
1586     * between them.  The caller should make sure that startCal is earlier than endCal.
1587     * <p>
1588     * As a simple example, assume its the same day: startCal is 1:00, endCal is 2:00, frequency is 20min, and effective nominal
1589     * time is 1:20 -- then this method would return a list containing: -20, 0, 20, 40, 60
1590     *
1591     * @param startCal The earlier offset time
1592     * @param endCal The later offset time
1593     * @param eval The ELEvaluator to use; cannot be null
1594     * @return A list of multiple of the frequency offset from the effective nominal time that occur between the startCal and endCal
1595     */
1596    public static List<Integer> expandOffsetTimes(Calendar startCal, Calendar endCal, ELEvaluator eval) {
1597        List<Integer> expandedFreqs = new ArrayList<Integer>();
1598        // Use eval because the "current" eval isn't set
1599        int freq = getDSFrequency(eval);
1600        TimeUnit freqUnit = getDSTimeUnit(eval);
1601        Calendar cal = getCurrentInstance(getActionCreationtime(eval), null, eval);
1602        int totalFreq = 0;
1603        if (startCal.before(cal)) {
1604            while (cal.after(startCal)) {
1605                cal.add(freqUnit.getCalendarUnit(), -freq);
1606                totalFreq += -freq;
1607            }
1608            if (cal.before(startCal)) {
1609                cal.add(freqUnit.getCalendarUnit(), freq);
1610                totalFreq += freq;
1611            }
1612        }
1613        else if (startCal.after(cal)) {
1614            while (cal.before(startCal)) {
1615                cal.add(freqUnit.getCalendarUnit(), freq);
1616                totalFreq += freq;
1617            }
1618        }
1619        // At this point, cal is the smallest multiple of the dataset frequency that is >= to the startCal and offset from the
1620        // effective nominal time.  Now we can find all of the instances that occur between startCal and endCal, inclusive.
1621        while (cal.before(endCal) || cal.equals(endCal)) {
1622            expandedFreqs.add(totalFreq);
1623            cal.add(freqUnit.getCalendarUnit(), freq);
1624            totalFreq += freq;
1625        }
1626        return expandedFreqs;
1627    }
1628
1629    /**
1630     * Resolve the offset time from the effective nominal time
1631     *
1632     * @param n offset amount (integer)
1633     * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR")
1634     * @param eval The ELEvaluator to use; or null to use the "current" eval
1635     * @return A Calendar of the offset time
1636     */
1637    public static Calendar resolveOffsetRawTime(int n, TimeUnit timeUnit, ELEvaluator eval) {
1638        // Use eval if given (for when the "current" eval isn't set)
1639        Calendar cal;
1640        if (eval == null) {
1641            cal = getCurrentInstance(getActionCreationtime(), null);
1642        }
1643        else {
1644            cal = getCurrentInstance(getActionCreationtime(eval), null, eval);
1645        }
1646        if (cal == null) {
1647            XLog.getLog(CoordELFunctions.class).warn("If the initial instance of the dataset is later than the nominal time, an"
1648                    + " empty string is returned. This means that no data is available at the offset instance specified by the user"
1649                    + " and the user could try modifying his or her initial-instance to an earlier time.");
1650            return null;
1651        }
1652        cal.add(timeUnit.getCalendarUnit(), n);
1653        return cal;
1654    }
1655}