001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.coord;
019
020import com.google.common.collect.Lists;
021import org.apache.commons.lang.StringUtils;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.oozie.ErrorCode;
024import org.apache.oozie.client.OozieClient;
025import org.apache.oozie.command.CommandException;
026import org.apache.oozie.dependency.URIHandler;
027import org.apache.oozie.dependency.URIHandler.Context;
028import org.apache.oozie.service.Services;
029import org.apache.oozie.service.URIHandlerService;
030import org.apache.oozie.util.DateUtils;
031import org.apache.oozie.util.ELEvaluator;
032import org.apache.oozie.util.ParamChecker;
033import org.apache.oozie.util.XLog;
034
035import java.net.URI;
036import java.util.ArrayList;
037import java.util.Calendar;
038import java.util.Date;
039import java.util.GregorianCalendar;
040import java.util.List;
041import java.util.TimeZone;
042
043/**
044 * This class implements the EL function related to coordinator
045 */
046
047public class CoordELFunctions {
048    final public static String DATASET = "oozie.coord.el.dataset.bean";
049    final public static String COORD_ACTION = "oozie.coord.el.app.bean";
050    final public static String CONFIGURATION = "oozie.coord.el.conf";
051    final public static String LATEST_EL_USE_CURRENT_TIME = "oozie.service.ELService.latest-el.use-current-time";
052    // INSTANCE_SEPARATOR is used to separate multiple directories into one tag.
053    final public static String INSTANCE_SEPARATOR = "#";
054    final public static String DIR_SEPARATOR = ",";
055    // TODO: in next release, support flexibility
056    private static String END_OF_OPERATION_INDICATOR_FILE = "_SUCCESS";
057
058    public static final long MINUTE_MSEC = 60 * 1000L;
059    public static final long HOUR_MSEC = 60 * MINUTE_MSEC;
060    public static final long DAY_MSEC = 24 * HOUR_MSEC;
061    public static final long MONTH_MSEC = 30 * DAY_MSEC;
062    public static final long YEAR_MSEC = 365 * DAY_MSEC;
063
064    /**
065     * Used in defining the frequency in 'day' unit. <p/> domain: <code> val &gt; 0</code> and should be integer.
066     *
067     * @param val frequency in number of days.
068     * @return number of days and also set the frequency timeunit to "day"
069     */
070    public static int ph1_coord_days(int val) {
071        val = ParamChecker.checkGTZero(val, "n");
072        ELEvaluator eval = ELEvaluator.getCurrent();
073        eval.setVariable("timeunit", TimeUnit.DAY);
074        eval.setVariable("endOfDuration", TimeUnit.NONE);
075        return val;
076    }
077
078    /**
079     * Used in defining the frequency in 'month' unit. <p/> domain: <code> val &gt; 0</code> and should be integer.
080     *
081     * @param val frequency in number of months.
082     * @return number of months and also set the frequency timeunit to "month"
083     */
084    public static int ph1_coord_months(int val) {
085        val = ParamChecker.checkGTZero(val, "n");
086        ELEvaluator eval = ELEvaluator.getCurrent();
087        eval.setVariable("timeunit", TimeUnit.MONTH);
088        eval.setVariable("endOfDuration", TimeUnit.NONE);
089        return val;
090    }
091
092    /**
093     * Used in defining the frequency in 'hour' unit. <p/> parameter value domain: <code> val &gt; 0</code> and should
094     * be integer.
095     *
096     * @param val frequency in number of hours.
097     * @return number of minutes and also set the frequency timeunit to "minute"
098     */
099    public static int ph1_coord_hours(int val) {
100        val = ParamChecker.checkGTZero(val, "n");
101        ELEvaluator eval = ELEvaluator.getCurrent();
102        eval.setVariable("timeunit", TimeUnit.MINUTE);
103        eval.setVariable("endOfDuration", TimeUnit.NONE);
104        return val * 60;
105    }
106
107    /**
108     * Used in defining the frequency in 'minute' unit. <p/> domain: <code> val &gt; 0</code> and should be integer.
109     *
110     * @param val frequency in number of minutes.
111     * @return number of minutes and also set the frequency timeunit to "minute"
112     */
113    public static int ph1_coord_minutes(int val) {
114        val = ParamChecker.checkGTZero(val, "n");
115        ELEvaluator eval = ELEvaluator.getCurrent();
116        eval.setVariable("timeunit", TimeUnit.MINUTE);
117        eval.setVariable("endOfDuration", TimeUnit.NONE);
118        return val;
119    }
120
121    /**
122     * Used in defining the frequency in 'day' unit and specify the "end of day" property. <p/> Every instance will
123     * start at 00:00 hour of each day. <p/> domain: <code> val &gt; 0</code> and should be integer.
124     *
125     * @param val frequency in number of days.
126     * @return number of days and also set the frequency timeunit to "day" and end_of_duration flag to "day"
127     */
128    public static int ph1_coord_endOfDays(int val) {
129        val = ParamChecker.checkGTZero(val, "n");
130        ELEvaluator eval = ELEvaluator.getCurrent();
131        eval.setVariable("timeunit", TimeUnit.DAY);
132        eval.setVariable("endOfDuration", TimeUnit.END_OF_DAY);
133        return val;
134    }
135
136    /**
137     * Used in defining the frequency in 'month' unit and specify the "end of month" property. <p/> Every instance will
138     * start at first day of each month at 00:00 hour. <p/> domain: <code> val &gt; 0</code> and should be integer.
139     *
140     * @param val: frequency in number of months.
141     * @return number of months and also set the frequency timeunit to "month" and end_of_duration flag to "month"
142     */
143    public static int ph1_coord_endOfMonths(int val) {
144        val = ParamChecker.checkGTZero(val, "n");
145        ELEvaluator eval = ELEvaluator.getCurrent();
146        eval.setVariable("timeunit", TimeUnit.MONTH);
147        eval.setVariable("endOfDuration", TimeUnit.END_OF_MONTH);
148        return val;
149    }
150
151    /**
152     * Calculate the difference of timezone offset in minutes between dataset and coordinator job. <p/> Depends on: <p/>
153     * 1. Timezone of both dataset and job <p/> 2. Action creation Time
154     *
155     * @return difference in minutes (DataSet TZ Offset - Application TZ offset)
156     */
157    public static int ph2_coord_tzOffset() {
158        long actionCreationTime = getActionCreationtime().getTime();
159        TimeZone dsTZ = ParamChecker.notNull(getDatasetTZ(), "DatasetTZ");
160        TimeZone jobTZ = ParamChecker.notNull(getJobTZ(), "JobTZ");
161        return (dsTZ.getOffset(actionCreationTime) - jobTZ.getOffset(actionCreationTime)) / (1000 * 60);
162    }
163
164    public static int ph3_coord_tzOffset() {
165        return ph2_coord_tzOffset();
166    }
167
168    /**
169     * Returns the a date string while given a base date in 'strBaseDate',
170     * offset and unit (e.g. DAY, MONTH, HOUR, MINUTE, MONTH).
171     *
172     * @param strBaseDate -- base date
173     * @param offset -- any number
174     * @param unit -- DAY, MONTH, HOUR, MINUTE, MONTH
175     * @return date string
176     * @throws Exception
177     */
178    public static String ph2_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception {
179        Calendar baseCalDate = DateUtils.getCalendar(strBaseDate);
180        StringBuilder buffer = new StringBuilder();
181        baseCalDate.add(TimeUnit.valueOf(unit).getCalendarUnit(), offset);
182        buffer.append(DateUtils.formatDateOozieTZ(baseCalDate));
183        return buffer.toString();
184    }
185
186    public static String ph3_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception {
187        return ph2_coord_dateOffset(strBaseDate, offset, unit);
188    }
189
190    /**
191     * Determine the date-time in Oozie processing timezone of n-th future available dataset instance
192     * from nominal Time but not beyond the instance specified as 'instance.
193     * <p/>
194     * It depends on:
195     * <p/>
196     * 1. Data set frequency
197     * <p/>
198     * 2. Data set Time unit (day, month, minute)
199     * <p/>
200     * 3. Data set Time zone/DST
201     * <p/>
202     * 4. End Day/Month flag
203     * <p/>
204     * 5. Data set initial instance
205     * <p/>
206     * 6. Action Creation Time
207     * <p/>
208     * 7. Existence of dataset's directory
209     *
210     * @param n :instance count
211     *        <p/>
212     *        domain: n >= 0, n is integer
213     * @param instance: How many future instance it should check? value should
214     *        be >=0
215     * @return date-time in Oozie processing timezone of the n-th instance
216     *         <p/>
217     * @throws Exception
218     */
219    public static String ph3_coord_future(int n, int instance) throws Exception {
220        ParamChecker.checkGEZero(n, "future:n");
221        ParamChecker.checkGTZero(instance, "future:instance");
222        if (isSyncDataSet()) {// For Sync Dataset
223            return coord_future_sync(n, instance);
224        }
225        else {
226            throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
227        }
228    }
229
230    /**
231     * Determine the date-time in Oozie processing timezone of the future available dataset instances
232     * from start to end offsets from nominal Time but not beyond the instance specified as 'instance'.
233     * <p/>
234     * It depends on:
235     * <p/>
236     * 1. Data set frequency
237     * <p/>
238     * 2. Data set Time unit (day, month, minute)
239     * <p/>
240     * 3. Data set Time zone/DST
241     * <p/>
242     * 4. End Day/Month flag
243     * <p/>
244     * 5. Data set initial instance
245     * <p/>
246     * 6. Action Creation Time
247     * <p/>
248     * 7. Existence of dataset's directory
249     *
250     * @param start : start instance offset
251     *        <p/>
252     *        domain: start >= 0, start is integer
253     * @param end : end instance offset
254     *        <p/>
255     *        domain: end >= 0, end is integer
256     * @param instance: How many future instance it should check? value should
257     *        be >=0
258     * @return date-time in Oozie processing timezone of the instances from start to end offsets
259     *        delimited by comma.
260     *         <p/>
261     * @throws Exception
262     */
263    public static String ph3_coord_futureRange(int start, int end, int instance) throws Exception {
264        ParamChecker.checkGEZero(start, "future:n");
265        ParamChecker.checkGEZero(end, "future:n");
266        ParamChecker.checkGTZero(instance, "future:instance");
267        if (isSyncDataSet()) {// For Sync Dataset
268            return coord_futureRange_sync(start, end, instance);
269        }
270        else {
271            throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
272        }
273    }
274
275    private static String coord_future_sync(int n, int instance) throws Exception {
276        return coord_futureRange_sync(n, n, instance);
277    }
278
279    private static String coord_futureRange_sync(int startOffset, int endOffset, int instance) throws Exception {
280        final XLog LOG = XLog.getLog(CoordELFunctions.class);
281        final Thread currentThread = Thread.currentThread();
282        ELEvaluator eval = ELEvaluator.getCurrent();
283        String retVal = "";
284        int datasetFrequency = (int) getDSFrequency();// in minutes
285        TimeUnit dsTimeUnit = getDSTimeUnit();
286        int[] instCount = new int[1];
287        Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
288        StringBuilder resolvedInstances = new StringBuilder();
289        StringBuilder resolvedURIPaths = new StringBuilder();
290        if (nominalInstanceCal != null) {
291            Calendar initInstance = getInitialInstanceCal();
292            nominalInstanceCal = (Calendar) initInstance.clone();
293            nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
294
295            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
296            if (ds == null) {
297                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
298            }
299            String uriTemplate = ds.getUriTemplate();
300            Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
301            if (conf == null) {
302                throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
303            }
304            int available = 0, checkedInstance = 0;
305            boolean resolved = false;
306            String user = ParamChecker
307                    .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
308            String doneFlag = ds.getDoneFlag();
309            URIHandlerService uriService = Services.get().get(URIHandlerService.class);
310            URIHandler uriHandler = null;
311            Context uriContext = null;
312            try {
313                while (instance >= checkedInstance && !currentThread.isInterrupted()) {
314                    ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
315                    String uriPath = uriEval.evaluate(uriTemplate, String.class);
316                    if (uriHandler == null) {
317                        URI uri = new URI(uriPath);
318                        uriHandler = uriService.getURIHandler(uri);
319                        uriContext = uriHandler.getContext(uri, conf, user);
320                    }
321                    String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
322                    if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) {
323                        if (available == endOffset) {
324                            LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag);
325                            resolved = true;
326                            resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
327                            resolvedURIPaths.append(uriPath);
328                            retVal = resolvedInstances.toString();
329                            eval.setVariable("resolved_path", resolvedURIPaths.toString());
330                            break;
331                        }
332                        else if (available >= startOffset) {
333                            LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag);
334                            resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(
335                                    INSTANCE_SEPARATOR);
336                            resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
337                        }
338                        available++;
339                    }
340                    // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency);
341                    nominalInstanceCal = (Calendar) initInstance.clone();
342                    instCount[0]++;
343                    nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
344                    checkedInstance++;
345                    // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
346                }
347            }
348            finally {
349                if (uriContext != null) {
350                    uriContext.destroy();
351                }
352            }
353            if (!resolved) {
354                // return unchanged future function with variable 'is_resolved'
355                // to 'false'
356                eval.setVariable("is_resolved", Boolean.FALSE);
357                if (startOffset == endOffset) {
358                    retVal = "${coord:future(" + startOffset + ", " + instance + ")}";
359                }
360                else {
361                    retVal = "${coord:futureRange(" + startOffset + ", " + endOffset + ", " + instance + ")}";
362                }
363            }
364            else {
365                eval.setVariable("is_resolved", Boolean.TRUE);
366            }
367        }
368        else {// No feasible nominal time
369            eval.setVariable("is_resolved", Boolean.TRUE);
370            retVal = "";
371        }
372        return retVal;
373    }
374
375    /**
376     * Return nominal time or Action Creation Time.
377     * <p/>
378     *
379     * @return coordinator action creation or materialization date time
380     * @throws Exception if unable to format the Date object to String
381     */
382    public static String ph2_coord_nominalTime() throws Exception {
383        ELEvaluator eval = ELEvaluator.getCurrent();
384        SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
385                "Coordinator Action");
386        return DateUtils.formatDateOozieTZ(action.getNominalTime());
387    }
388
389    public static String ph3_coord_nominalTime() throws Exception {
390        return ph2_coord_nominalTime();
391    }
392
393    /**
394     * Convert from standard date-time formatting to a desired format.
395     * <p/>
396     * @param dateTimeStr - A timestamp in standard (ISO8601) format.
397     * @param format - A string representing the desired format.
398     * @return coordinator action creation or materialization date time
399     * @throws Exception if unable to format the Date object to String
400     */
401    public static String ph2_coord_formatTime(String dateTimeStr, String format)
402            throws Exception {
403        Date dateTime = DateUtils.parseDateOozieTZ(dateTimeStr);
404        return DateUtils.formatDateCustom(dateTime, format);
405    }
406
407    public static String ph3_coord_formatTime(String dateTimeStr, String format)
408            throws Exception {
409        return ph2_coord_formatTime(dateTimeStr, format);
410    }
411
412    /**
413     * Return Action Id. <p/>
414     *
415     * @return coordinator action Id
416     */
417    public static String ph2_coord_actionId() throws Exception {
418        ELEvaluator eval = ELEvaluator.getCurrent();
419        SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
420                "Coordinator Action");
421        return action.getActionId();
422    }
423
424    public static String ph3_coord_actionId() throws Exception {
425        return ph2_coord_actionId();
426    }
427
428    /**
429     * Return Job Name. <p/>
430     *
431     * @return coordinator name
432     */
433    public static String ph2_coord_name() throws Exception {
434        ELEvaluator eval = ELEvaluator.getCurrent();
435        SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
436                "Coordinator Action");
437        return action.getName();
438    }
439
440    public static String ph3_coord_name() throws Exception {
441        return ph2_coord_name();
442    }
443
444    /**
445     * Return Action Start time. <p/>
446     *
447     * @return coordinator action start time
448     * @throws Exception if unable to format the Date object to String
449     */
450    public static String ph2_coord_actualTime() throws Exception {
451        ELEvaluator eval = ELEvaluator.getCurrent();
452        SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
453        if (coordAction == null) {
454            throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
455        }
456        return DateUtils.formatDateOozieTZ(coordAction.getActualTime());
457    }
458
459    public static String ph3_coord_actualTime() throws Exception {
460        return ph2_coord_actualTime();
461    }
462
463    /**
464     * Used to specify a list of URI's that are used as input dir to the workflow job. <p/> Look for two evaluator-level
465     * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
466     * URI. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
467     * unresolved, this function will echo back the original function <p/> otherwise it sends the uris.
468     *
469     * @param dataInName : Datain name
470     * @return the list of URI's separated by INSTANCE_SEPARATOR <p/> if there are unresolved EL function (i.e. latest)
471     *         , echo back <p/> the function without resolving the function.
472     */
473    public static String ph3_coord_dataIn(String dataInName) {
474        String uris = "";
475        ELEvaluator eval = ELEvaluator.getCurrent();
476        uris = (String) eval.getVariable(".datain." + dataInName);
477        Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
478        if (unresolved != null && unresolved.booleanValue() == true) {
479            return "${coord:dataIn('" + dataInName + "')}";
480        }
481        return uris;
482    }
483
484    /**
485     * Used to specify a list of URI's that are output dir of the workflow job. <p/> Look for one evaluator-level
486     * variable <p/> dataout.<DATAOUT_NAME> <p/> It defines the current list of URI. <p/> otherwise it sends the uris.
487     *
488     * @param dataOutName : Dataout name
489     * @return the list of URI's separated by INSTANCE_SEPARATOR
490     */
491    public static String ph3_coord_dataOut(String dataOutName) {
492        String uris = "";
493        ELEvaluator eval = ELEvaluator.getCurrent();
494        uris = (String) eval.getVariable(".dataout." + dataOutName);
495        return uris;
496    }
497
498    /**
499     * Determine the date-time in Oozie processing timezone of n-th dataset instance. <p/> It depends on: <p/> 1.
500     * Data set frequency <p/> 2.
501     * Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. Data
502     * set initial instance <p/> 6. Action Creation Time
503     *
504     * @param n instance count domain: n is integer
505     * @return date-time in Oozie processing timezone of the n-th instance returns 'null' means n-th instance is
506     * earlier than Initial-Instance of DS
507     * @throws Exception
508     */
509    public static String ph2_coord_current(int n) throws Exception {
510        if (isSyncDataSet()) { // For Sync Dataset
511            return coord_current_sync(n);
512        }
513        else {
514            throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
515        }
516    }
517
518    /**
519     * Determine the date-time in Oozie processing timezone of current dataset instances
520     * from start to end offsets from the nominal time. <p/> It depends
521     * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST
522     * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time
523     *
524     * @param start :start instance offset <p/> domain: start <= 0, start is integer
525     * @param end :end instance offset <p/> domain: end <= 0, end is integer
526     * @return date-time in Oozie processing timezone of the instances from start to end offsets
527     *        delimited by comma. <p/> If the current instance time of the dataset based on the Action Creation Time
528     *        is earlier than the Initial-Instance of DS an empty string is returned.
529     *        If an instance within the range is earlier than Initial-Instance of DS that instance is ignored
530     * @throws Exception
531     */
532    public static String ph2_coord_currentRange(int start, int end) throws Exception {
533        if (isSyncDataSet()) { // For Sync Dataset
534            return coord_currentRange_sync(start, end);
535        }
536        else {
537            throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
538        }
539    }
540    /**
541     * Determine the date-time in Oozie processing timezone of the given offset from the dataset effective nominal time. <p/> It
542     * depends on: <p> 1. Data set frequency <p/> 2. Data set Time Unit <p/> 3. Data set Time zone/DST
543     * <p/> 4. Data set initial instance <p/> 5. Action Creation Time
544     *
545     * @param n offset amount (integer)
546     * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR")
547     * @return date-time in Oozie processing timezone of the given offset from the dataset effective nominal time
548     * @throws Exception if there was a problem formatting
549     */
550    public static String ph2_coord_offset(int n, String timeUnit) throws Exception {
551        if (isSyncDataSet()) { // For Sync Dataset
552            return coord_offset_sync(n, timeUnit);
553        }
554        else {
555            throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
556        }
557    }
558
559    /**
560     * Determine how many hours is on the date of n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency
561     * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5.
562     * Data set initial instance <p/> 6. Action Creation Time
563     *
564     * @param n instance count <p/> domain: n is integer
565     * @return number of hours on that day <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS
566     * @throws Exception
567     */
568    public static int ph2_coord_hoursInDay(int n) throws Exception {
569        int datasetFrequency = (int) getDSFrequency();
570        // /Calendar nominalInstanceCal =
571        // getCurrentInstance(getActionCreationtime());
572        Calendar nominalInstanceCal = getEffectiveNominalTime();
573        if (nominalInstanceCal == null) {
574            return -1;
575        }
576        nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n);
577        /*
578         * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0)
579         * { return -1; }
580         */
581        nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ
582        // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
583        return DateUtils.hoursInDay(nominalInstanceCal);
584    }
585
586    public static int ph3_coord_hoursInDay(int n) throws Exception {
587        return ph2_coord_hoursInDay(n);
588    }
589
590    /**
591     * Calculate number of days in one month for n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency .
592     * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5.
593     * Data set initial instance <p/> 6. Action Creation Time
594     *
595     * @param n instance count. domain: n is integer
596     * @return number of days in that month <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS
597     * @throws Exception
598     */
599    public static int ph2_coord_daysInMonth(int n) throws Exception {
600        int datasetFrequency = (int) getDSFrequency();// in minutes
601        // Calendar nominalInstanceCal =
602        // getCurrentInstance(getActionCreationtime());
603        Calendar nominalInstanceCal = getEffectiveNominalTime();
604        if (nominalInstanceCal == null) {
605            return -1;
606        }
607        nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n);
608        /*
609         * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0)
610         * { return -1; }
611         */
612        nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ
613        // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
614        return nominalInstanceCal.getActualMaximum(Calendar.DAY_OF_MONTH);
615    }
616
617    public static int ph3_coord_daysInMonth(int n) throws Exception {
618        return ph2_coord_daysInMonth(n);
619    }
620
621    /**
622     * Determine the date-time in Oozie processing timezone of n-th latest available dataset instance. <p/> It depends
623     * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST
624     * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of
625     * dataset's directory
626     *
627     * @param n :instance count <p/> domain: n <= 0, n is integer
628     * @return date-time in Oozie processing timezone of the n-th instance <p/> returns 'null' means n-th instance is
629     * earlier than Initial-Instance of DS
630     * @throws Exception
631     */
632    public static String ph3_coord_latest(int n) throws Exception {
633        ParamChecker.checkLEZero(n, "latest:n");
634        if (isSyncDataSet()) {// For Sync Dataset
635            return coord_latest_sync(n);
636        }
637        else {
638            throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
639        }
640    }
641
642    /**
643     * Determine the date-time in Oozie processing timezone of latest available dataset instances
644     * from start to end offsets from the nominal time. <p/> It depends
645     * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST
646     * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of
647     * dataset's directory
648     *
649     * @param start :start instance offset <p/> domain: start <= 0, start is integer
650     * @param end :end instance offset <p/> domain: end <= 0, end is integer
651     * @return date-time in Oozie processing timezone of the instances from start to end offsets
652     *        delimited by comma. <p/> returns 'null' means start offset instance is
653     *        earlier than Initial-Instance of DS
654     * @throws Exception
655     */
656    public static String ph3_coord_latestRange(int start, int end) throws Exception {
657        ParamChecker.checkLEZero(start, "latest:n");
658        ParamChecker.checkLEZero(end, "latest:n");
659        if (isSyncDataSet()) {// For Sync Dataset
660            return coord_latestRange_sync(start, end);
661        }
662        else {
663            throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
664        }
665    }
666
667    /**
668     * Configure an evaluator with data set and application specific information. <p/> Helper method of associating
669     * dataset and application object
670     *
671     * @param evaluator : to set variables
672     * @param ds : Data Set object
673     * @param coordAction : Application instance
674     */
675    public static void configureEvaluator(ELEvaluator evaluator, SyncCoordDataset ds, SyncCoordAction coordAction) {
676        evaluator.setVariable(COORD_ACTION, coordAction);
677        evaluator.setVariable(DATASET, ds);
678    }
679
680    /**
681     * Helper method to wrap around with "${..}". <p/>
682     *
683     *
684     * @param eval :EL evaluator
685     * @param expr : expression to evaluate
686     * @return Resolved expression or echo back the same expression
687     * @throws Exception
688     */
689    public static String evalAndWrap(ELEvaluator eval, String expr) throws Exception {
690        try {
691            eval.setVariable(".wrap", null);
692            String result = eval.evaluate(expr, String.class);
693            if (eval.getVariable(".wrap") != null) {
694                return "${" + result + "}";
695            }
696            else {
697                return result;
698            }
699        }
700        catch (Exception e) {
701            throw new Exception("Unable to evaluate :" + expr + ":\n", e);
702        }
703    }
704
705    // Set of echo functions
706
707    public static String ph1_coord_current_echo(String n) {
708        return echoUnResolved("current", n);
709    }
710
711    public static String ph1_coord_absolute_echo(String date) {
712        return echoUnResolved("absolute", date);
713    }
714
715    public static String ph1_coord_currentRange_echo(String start, String end) {
716        return echoUnResolved("currentRange", start + ", " + end);
717    }
718
719    public static String ph1_coord_offset_echo(String n, String timeUnit) {
720        return echoUnResolved("offset", n + " , " + timeUnit);
721    }
722
723    public static String ph2_coord_current_echo(String n) {
724        return echoUnResolved("current", n);
725    }
726
727    public static String ph2_coord_currentRange_echo(String start, String end) {
728        return echoUnResolved("currentRange", start + ", " + end);
729    }
730
731    public static String ph2_coord_offset_echo(String n, String timeUnit) {
732        return echoUnResolved("offset", n + " , " + timeUnit);
733    }
734
735    public static String ph2_coord_absolute_echo(String date) {
736        return echoUnResolved("absolute", date);
737    }
738
739    public static String ph2_coord_absolute_range(String startInstance, int end) throws Exception {
740        int[] instanceCount = new int[1];
741
742        // getCurrentInstance() returns null, which means startInstance is less
743        // than initial instance
744        if (getCurrentInstance(DateUtils.getCalendar(startInstance).getTime(), instanceCount) == null) {
745            throw new CommandException(ErrorCode.E1010,
746                    "intial-instance should be equal or earlier than the start-instance. intial-instance is "
747                            + getInitialInstance() + " and start-instance is " + startInstance);
748        }
749        int[] nominalCount = new int[1];
750        if (getCurrentInstance(getActionCreationtime(), nominalCount) == null) {
751            throw new CommandException(ErrorCode.E1010,
752                    "intial-instance should be equal or earlier than the nominal time. intial-instance is "
753                            + getInitialInstance() + " and nominal time is " + getActionCreationtime());
754        }
755        // getCurrentInstance return offset relative to initial instance.
756        // start instance offset - nominal offset = start offset relative to
757        // nominal time-stamp.
758        int start = instanceCount[0] - nominalCount[0];
759        if (start > end) {
760            throw new CommandException(ErrorCode.E1010,
761                    "start-instance should be equal or earlier than the end-instance. startInstance is "
762                            + startInstance + " which is equivalent to current (" + instanceCount[0]
763                            + ") but end is specified as current (" + end + ")");
764        }
765        return ph2_coord_currentRange(start, end);
766    }
767
768    public static String ph1_coord_dateOffset_echo(String n, String offset, String unit) {
769        return echoUnResolved("dateOffset", n + " , " + offset + " , " + unit);
770    }
771
772    public static String ph1_coord_formatTime_echo(String dateTime, String format) {
773        // Quote the dateTime value since it would contain a ':'.
774        return echoUnResolved("formatTime", "'"+dateTime+"'" + " , " + format);
775    }
776
777    public static String ph1_coord_latest_echo(String n) {
778        return echoUnResolved("latest", n);
779    }
780
781    public static String ph2_coord_latest_echo(String n) {
782        return ph1_coord_latest_echo(n);
783    }
784
785    public static String ph1_coord_future_echo(String n, String instance) {
786        return echoUnResolved("future", n + ", " + instance + "");
787    }
788
789    public static String ph2_coord_future_echo(String n, String instance) {
790        return ph1_coord_future_echo(n, instance);
791    }
792
793    public static String ph1_coord_latestRange_echo(String start, String end) {
794        return echoUnResolved("latestRange", start + ", " + end);
795    }
796
797    public static String ph2_coord_latestRange_echo(String start, String end) {
798        return ph1_coord_latestRange_echo(start, end);
799    }
800
801    public static String ph1_coord_futureRange_echo(String start, String end, String instance) {
802        return echoUnResolved("futureRange", start + ", " + end + ", " + instance);
803    }
804
805    public static String ph2_coord_futureRange_echo(String start, String end, String instance) {
806        return ph1_coord_futureRange_echo(start, end, instance);
807    }
808
809    public static String ph1_coord_dataIn_echo(String n) {
810        ELEvaluator eval = ELEvaluator.getCurrent();
811        String val = (String) eval.getVariable("oozie.dataname." + n);
812        if (val == null || val.equals("data-in") == false) {
813            XLog.getLog(CoordELFunctions.class).error("data_in_name " + n + " is not valid");
814            throw new RuntimeException("data_in_name " + n + " is not valid");
815        }
816        return echoUnResolved("dataIn", "'" + n + "'");
817    }
818
819    public static String ph1_coord_dataOut_echo(String n) {
820        ELEvaluator eval = ELEvaluator.getCurrent();
821        String val = (String) eval.getVariable("oozie.dataname." + n);
822        if (val == null || val.equals("data-out") == false) {
823            XLog.getLog(CoordELFunctions.class).error("data_out_name " + n + " is not valid");
824            throw new RuntimeException("data_out_name " + n + " is not valid");
825        }
826        return echoUnResolved("dataOut", "'" + n + "'");
827    }
828
829    public static String ph1_coord_nominalTime_echo() {
830        return echoUnResolved("nominalTime", "");
831    }
832
833    public static String ph1_coord_nominalTime_echo_wrap() {
834        // return "${coord:nominalTime()}"; // no resolution
835        return echoUnResolved("nominalTime", "");
836    }
837
838    public static String ph1_coord_nominalTime_echo_fixed() {
839        return "2009-03-06T010:00"; // Dummy resolution
840    }
841
842    public static String ph1_coord_actualTime_echo_wrap() {
843        // return "${coord:actualTime()}"; // no resolution
844        return echoUnResolved("actualTime", "");
845    }
846
847    public static String ph1_coord_actionId_echo() {
848        return echoUnResolved("actionId", "");
849    }
850
851    public static String ph1_coord_name_echo() {
852        return echoUnResolved("name", "");
853    }
854
855    // The following echo functions are not used in any phases yet
856    // They are here for future purpose.
857    public static String coord_minutes_echo(String n) {
858        return echoUnResolved("minutes", n);
859    }
860
861    public static String coord_hours_echo(String n) {
862        return echoUnResolved("hours", n);
863    }
864
865    public static String coord_days_echo(String n) {
866        return echoUnResolved("days", n);
867    }
868
869    public static String coord_endOfDay_echo(String n) {
870        return echoUnResolved("endOfDay", n);
871    }
872
873    public static String coord_months_echo(String n) {
874        return echoUnResolved("months", n);
875    }
876
877    public static String coord_endOfMonth_echo(String n) {
878        return echoUnResolved("endOfMonth", n);
879    }
880
881    public static String coord_actualTime_echo() {
882        return echoUnResolved("actualTime", "");
883    }
884
885    // This echo function will always return "24" for validation only.
886    // This evaluation ****should not**** replace the original XML
887    // Create a temporary string and validate the function
888    // This is **required** for evaluating an expression like
889    // coord:HoursInDay(0) + 3
890    // actual evaluation will happen in phase 2 or phase 3.
891    public static String ph1_coord_hoursInDay_echo(String n) {
892        return "24";
893        // return echoUnResolved("hoursInDay", n);
894    }
895
896    // This echo function will always return "30" for validation only.
897    // This evaluation ****should not**** replace the original XML
898    // Create a temporary string and validate the function
899    // This is **required** for evaluating an expression like
900    // coord:daysInMonth(0) + 3
901    // actual evaluation will happen in phase 2 or phase 3.
902    public static String ph1_coord_daysInMonth_echo(String n) {
903        // return echoUnResolved("daysInMonth", n);
904        return "30";
905    }
906
907    // This echo function will always return "3" for validation only.
908    // This evaluation ****should not**** replace the original XML
909    // Create a temporary string and validate the function
910    // This is **required** for evaluating an expression like coord:tzOffset + 2
911    // actual evaluation will happen in phase 2 or phase 3.
912    public static String ph1_coord_tzOffset_echo() {
913        // return echoUnResolved("tzOffset", "");
914        return "3";
915    }
916
917    // Local methods
918    /**
919     * @param n
920     * @return n-th instance Date-Time from current instance for data-set <p/> return empty string ("") if the
921     *         Action_Creation_time or the n-th instance <p/> is earlier than the Initial_Instance of dataset.
922     * @throws Exception
923     */
924    private static String coord_current_sync(int n) throws Exception {
925        return coord_currentRange_sync(n, n);
926    }
927
928    private static String coord_currentRange_sync(int start, int end) throws Exception {
929        final XLog LOG = XLog.getLog(CoordELFunctions.class);
930        int datasetFrequency = getDSFrequency();// in minutes
931        TimeUnit dsTimeUnit = getDSTimeUnit();
932        int[] instCount = new int[1];// used as pass by ref
933        Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
934        if (nominalInstanceCal == null) {
935            LOG.warn("If the initial instance of the dataset is later than the nominal time, an empty string is"
936                    + " returned. This means that no data is available at the current-instance specified by the user"
937                    + " and the user could try modifying his initial-instance to an earlier time.");
938            return "";
939        } else {
940            Calendar initInstance = getInitialInstanceCal();
941            // Add in the reverse order - newest instance first.
942            nominalInstanceCal = (Calendar) initInstance.clone();
943            nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), (instCount[0] + start) * datasetFrequency);
944            List<String> instances = new ArrayList<String>();
945            for (int i = start; i <= end; i++) {
946                if (nominalInstanceCal.compareTo(initInstance) < 0) {
947                    LOG.warn("If the initial instance of the dataset is later than the current-instance specified,"
948                            + " such as coord:current({0}) in this case, an empty string is returned. This means that"
949                            + " no data is available at the current-instance specified by the user and the user could"
950                            + " try modifying his initial-instance to an earlier time.", start);
951                }
952                else {
953                    instances.add(DateUtils.formatDateOozieTZ(nominalInstanceCal));
954                }
955                nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency);
956            }
957            instances = Lists.reverse(instances);
958            return StringUtils.join(instances, CoordELFunctions.INSTANCE_SEPARATOR);
959        }
960    }
961
962    /**
963     *
964     * @param n offset amount (integer)
965     * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR")
966     * @return the offset time from the effective nominal time <p/> return empty string ("") if the Action_Creation_time or the
967     *         offset instance <p/> is earlier than the Initial_Instance of dataset.
968     * @throws Exception
969     */
970    private static String coord_offset_sync(int n, String timeUnit) throws Exception {
971        Calendar rawCal = resolveOffsetRawTime(n, TimeUnit.valueOf(timeUnit), null);
972        if (rawCal == null) {
973            // warning already logged by resolveOffsetRawTime()
974            return "";
975        }
976
977        int freq = getDSFrequency();
978        TimeUnit freqUnit = getDSTimeUnit();
979        int freqCount = 0;
980        // We're going to manually turn back/forward cal by decrements/increments of freq and then check that it gives the same
981        // time as rawCal; this is to check that the offset time resolves to a frequency offset of the effective nominal time
982        // In other words, that there exists an integer x, such that coord:offset(n, timeUnit) == coord:current(x) is true
983        // If not, then we'll "rewind" rawCal to the latest instance earlier than rawCal and use that.
984        Calendar cal = getInitialInstanceCal();
985        if (rawCal.before(cal)) {
986            while (cal.after(rawCal)) {
987                cal.add(freqUnit.getCalendarUnit(), -freq);
988                freqCount--;
989            }
990        }
991        else if (rawCal.after(cal)) {
992            while (cal.before(rawCal)) {
993                cal.add(freqUnit.getCalendarUnit(), freq);
994                freqCount++;
995            }
996        }
997        if (cal.before(rawCal)) {
998            rawCal = cal;
999        }
1000        else if (cal.after(rawCal)) {
1001            cal.add(freqUnit.getCalendarUnit(), -freq);
1002            rawCal = cal;
1003            freqCount--;
1004        }
1005        String rawCalStr = DateUtils.formatDateOozieTZ(rawCal);
1006
1007        Calendar nominalInstanceCal = getInitialInstanceCal();
1008        nominalInstanceCal.add(freqUnit.getCalendarUnit(), freq * freqCount);
1009        if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) {
1010            XLog.getLog(CoordELFunctions.class).warn("If the initial instance of the dataset is later than the offset instance"
1011                    + " specified, such as coord:offset({0}, {1}) in this case, an empty string is returned. This means that no"
1012                    + " data is available at the offset instance specified by the user and the user could try modifying his"
1013                    + " initial-instance to an earlier time.", n, timeUnit);
1014            return "";
1015        }
1016        String nominalCalStr = DateUtils.formatDateOozieTZ(nominalInstanceCal);
1017
1018        if (!rawCalStr.equals(nominalCalStr)) {
1019            throw new RuntimeException("Shouldn't happen");
1020        }
1021        return rawCalStr;
1022    }
1023
1024    /**
1025     * @param offset
1026     * @return n-th available latest instance Date-Time for SYNC data-set
1027     * @throws Exception
1028     */
1029    private static String coord_latest_sync(int offset) throws Exception {
1030        return coord_latestRange_sync(offset, offset);
1031    }
1032
1033    private static String coord_latestRange_sync(int startOffset, int endOffset) throws Exception {
1034        final XLog LOG = XLog.getLog(CoordELFunctions.class);
1035        final Thread currentThread = Thread.currentThread();
1036        ELEvaluator eval = ELEvaluator.getCurrent();
1037        String retVal = "";
1038        int datasetFrequency = (int) getDSFrequency();// in minutes
1039        TimeUnit dsTimeUnit = getDSTimeUnit();
1040        int[] instCount = new int[1];
1041        boolean useCurrentTime = Services.get().getConf().getBoolean(LATEST_EL_USE_CURRENT_TIME, false);
1042        Calendar nominalInstanceCal;
1043        if (useCurrentTime) {
1044            nominalInstanceCal = getCurrentInstance(new Date(), instCount);
1045        }
1046        else {
1047            nominalInstanceCal = getCurrentInstance(getActualTime(), instCount);
1048        }
1049        StringBuilder resolvedInstances = new StringBuilder();
1050        StringBuilder resolvedURIPaths = new StringBuilder();
1051        if (nominalInstanceCal != null) {
1052            Calendar initInstance = getInitialInstanceCal();
1053            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1054            if (ds == null) {
1055                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1056            }
1057            String uriTemplate = ds.getUriTemplate();
1058            Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
1059            if (conf == null) {
1060                throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
1061            }
1062            int available = 0;
1063            boolean resolved = false;
1064            String user = ParamChecker
1065                    .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
1066            String doneFlag = ds.getDoneFlag();
1067            URIHandlerService uriService = Services.get().get(URIHandlerService.class);
1068            URIHandler uriHandler = null;
1069            Context uriContext = null;
1070            try {
1071                while (nominalInstanceCal.compareTo(initInstance) >= 0 && !currentThread.isInterrupted()) {
1072                    ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
1073                    String uriPath = uriEval.evaluate(uriTemplate, String.class);
1074                    if (uriHandler == null) {
1075                        URI uri = new URI(uriPath);
1076                        uriHandler = uriService.getURIHandler(uri);
1077                        uriContext = uriHandler.getContext(uri, conf, user);
1078                    }
1079                    String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
1080                    if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) {
1081                        XLog.getLog(CoordELFunctions.class)
1082                        .debug("Found latest(" + available + "): " + uriWithDoneFlag);
1083                        if (available == startOffset) {
1084                            LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag);
1085                            resolved = true;
1086                            resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
1087                            resolvedURIPaths.append(uriPath);
1088                            retVal = resolvedInstances.toString();
1089                            eval.setVariable("resolved_path", resolvedURIPaths.toString());
1090                            break;
1091                        }
1092                        else if (available <= endOffset) {
1093                            LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag);
1094                            resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(
1095                                    INSTANCE_SEPARATOR);
1096                            resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
1097                        }
1098
1099                        available--;
1100                    }
1101                    // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), -datasetFrequency);
1102                    nominalInstanceCal = (Calendar) initInstance.clone();
1103                    instCount[0]--;
1104                    nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
1105                    // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
1106                }
1107            }
1108            finally {
1109                if (uriContext != null) {
1110                    uriContext.destroy();
1111                }
1112            }
1113            if (!resolved) {
1114                // return unchanged latest function with variable 'is_resolved'
1115                // to 'false'
1116                eval.setVariable("is_resolved", Boolean.FALSE);
1117                if (startOffset == endOffset) {
1118                    retVal = "${coord:latest(" + startOffset + ")}";
1119                }
1120                else {
1121                    retVal = "${coord:latestRange(" + startOffset + "," + endOffset + ")}";
1122                }
1123            }
1124            else {
1125                eval.setVariable("is_resolved", Boolean.TRUE);
1126            }
1127        }
1128        else {// No feasible nominal time
1129            eval.setVariable("is_resolved", Boolean.FALSE);
1130        }
1131        return retVal;
1132    }
1133
1134    /**
1135     * @param tm
1136     * @return a new Evaluator to be used for URI-template evaluation
1137     */
1138    private static ELEvaluator getUriEvaluator(Calendar tm) {
1139        tm.setTimeZone(DateUtils.getOozieProcessingTimeZone());
1140        ELEvaluator retEval = new ELEvaluator();
1141        retEval.setVariable("YEAR", tm.get(Calendar.YEAR));
1142        retEval.setVariable("MONTH", (tm.get(Calendar.MONTH) + 1) < 10 ? "0" + (tm.get(Calendar.MONTH) + 1) : (tm
1143                .get(Calendar.MONTH) + 1));
1144        retEval.setVariable("DAY", tm.get(Calendar.DAY_OF_MONTH) < 10 ? "0" + tm.get(Calendar.DAY_OF_MONTH) : tm
1145                .get(Calendar.DAY_OF_MONTH));
1146        retEval.setVariable("HOUR", tm.get(Calendar.HOUR_OF_DAY) < 10 ? "0" + tm.get(Calendar.HOUR_OF_DAY) : tm
1147                .get(Calendar.HOUR_OF_DAY));
1148        retEval.setVariable("MINUTE", tm.get(Calendar.MINUTE) < 10 ? "0" + tm.get(Calendar.MINUTE) : tm
1149                .get(Calendar.MINUTE));
1150        return retEval;
1151    }
1152
1153    /**
1154     * @return whether a data set is SYNCH or ASYNC
1155     */
1156    private static boolean isSyncDataSet() {
1157        ELEvaluator eval = ELEvaluator.getCurrent();
1158        SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1159        if (ds == null) {
1160            throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1161        }
1162        return ds.getType().equalsIgnoreCase("SYNC");
1163    }
1164
1165    /**
1166     * Check whether a function should be resolved.
1167     *
1168     * @param functionName
1169     * @param n
1170     * @return null if the functionName needs to be resolved otherwise return the calling function unresolved.
1171     */
1172    private static String checkIfResolved(String functionName, String n) {
1173        ELEvaluator eval = ELEvaluator.getCurrent();
1174        String replace = (String) eval.getVariable("resolve_" + functionName);
1175        if (replace == null || (replace != null && replace.equalsIgnoreCase("false"))) { // Don't
1176            // resolve
1177            // return "${coord:" + functionName + "(" + n +")}"; //Unresolved
1178            eval.setVariable(".wrap", "true");
1179            return "coord:" + functionName + "(" + n + ")"; // Unresolved
1180        }
1181        return null; // Resolved it
1182    }
1183
1184    private static String echoUnResolved(String functionName, String n) {
1185        return echoUnResolvedPre(functionName, n, "coord:");
1186    }
1187
1188    private static String echoUnResolvedPre(String functionName, String n, String prefix) {
1189        ELEvaluator eval = ELEvaluator.getCurrent();
1190        eval.setVariable(".wrap", "true");
1191        return prefix + functionName + "(" + n + ")"; // Unresolved
1192    }
1193
1194    /**
1195     * @return the initial instance of a DataSet in DATE
1196     */
1197    private static Date getInitialInstance() {
1198        ELEvaluator eval = ELEvaluator.getCurrent();
1199        return getInitialInstance(eval);
1200    }
1201
1202    /**
1203     * @return the initial instance of a DataSet in DATE
1204     */
1205    private static Date getInitialInstance(ELEvaluator eval) {
1206        return getInitialInstanceCal(eval).getTime();
1207        // return ds.getInitInstance();
1208    }
1209
1210    /**
1211     * @return the initial instance of a DataSet in Calendar
1212     */
1213    private static Calendar getInitialInstanceCal() {
1214        ELEvaluator eval = ELEvaluator.getCurrent();
1215        return getInitialInstanceCal(eval);
1216    }
1217
1218    /**
1219     * @return the initial instance of a DataSet in Calendar
1220     */
1221    private static Calendar getInitialInstanceCal(ELEvaluator eval) {
1222        SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1223        if (ds == null) {
1224            throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1225        }
1226        Calendar effInitTS = new GregorianCalendar(ds.getTimeZone());
1227        effInitTS.setTime(ds.getInitInstance());
1228        // To adjust EOD/EOM
1229        DateUtils.moveToEnd(effInitTS, getDSEndOfFlag(eval));
1230        return effInitTS;
1231        // return ds.getInitInstance();
1232    }
1233
1234    /**
1235     * @return Nominal or action creation Time when all the dependencies of an application instance are met.
1236     */
1237    private static Date getActionCreationtime() {
1238        ELEvaluator eval = ELEvaluator.getCurrent();
1239        return getActionCreationtime(eval);
1240    }
1241
1242    /**
1243     * @return Nominal or action creation Time when all the dependencies of an application instance are met.
1244     */
1245    private static Date getActionCreationtime(ELEvaluator eval) {
1246        SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
1247        if (coordAction == null) {
1248            throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
1249        }
1250        return coordAction.getNominalTime();
1251    }
1252
1253    /**
1254     * @return Actual Time when all the dependencies of an application instance are met.
1255     */
1256    private static Date getActualTime() {
1257        ELEvaluator eval = ELEvaluator.getCurrent();
1258        SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
1259        if (coordAction == null) {
1260            throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
1261        }
1262        return coordAction.getActualTime();
1263    }
1264
1265    /**
1266     * @return TimeZone for the application or job.
1267     */
1268    private static TimeZone getJobTZ() {
1269        ELEvaluator eval = ELEvaluator.getCurrent();
1270        SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
1271        if (coordAction == null) {
1272            throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
1273        }
1274        return coordAction.getTimeZone();
1275    }
1276
1277    /**
1278     * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time)
1279     *
1280     * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
1281     *         the dataset.
1282     */
1283    public static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[]) {
1284        ELEvaluator eval = ELEvaluator.getCurrent();
1285        return getCurrentInstance(effectiveTime, instanceCount, eval);
1286    }
1287
1288    /**
1289     * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time)
1290     *
1291     * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
1292     *         the dataset.
1293     */
1294    private static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[], ELEvaluator eval) {
1295        Date datasetInitialInstance = getInitialInstance(eval);
1296        TimeUnit dsTimeUnit = getDSTimeUnit(eval);
1297        TimeZone dsTZ = getDatasetTZ(eval);
1298        int dsFreq = getDSFrequency(eval);
1299        // Convert Date to Calendar for corresponding TZ
1300        Calendar current = Calendar.getInstance(dsTZ);
1301        current.setTime(datasetInitialInstance);
1302
1303        Calendar calEffectiveTime = new GregorianCalendar(dsTZ);
1304        calEffectiveTime.setTime(effectiveTime);
1305        if (instanceCount == null) {    // caller doesn't care about this value
1306            instanceCount = new int[1];
1307        }
1308        instanceCount[0] = 0;
1309        if (current.compareTo(calEffectiveTime) > 0) {
1310            return null;
1311        }
1312
1313        switch(dsTimeUnit) {
1314            case MINUTE:
1315                instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / MINUTE_MSEC);
1316                break;
1317            case HOUR:
1318                instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / HOUR_MSEC);
1319                break;
1320            case DAY:
1321            case END_OF_DAY:
1322                instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / DAY_MSEC);
1323                break;
1324            case MONTH:
1325            case END_OF_MONTH:
1326                instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / MONTH_MSEC);
1327                break;
1328            case YEAR:
1329                instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / YEAR_MSEC);
1330                break;
1331            default:
1332                throw new IllegalArgumentException("Unhandled dataset time unit " + dsTimeUnit);
1333        }
1334
1335        if (instanceCount[0] > 2) {
1336            instanceCount[0] = (instanceCount[0] / dsFreq);
1337            current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq);
1338        } else {
1339            instanceCount[0] = 0;
1340        }
1341        while (!current.getTime().after(effectiveTime)) {
1342            current.add(dsTimeUnit.getCalendarUnit(), dsFreq);
1343            instanceCount[0]++;
1344        }
1345        current.add(dsTimeUnit.getCalendarUnit(), -dsFreq);
1346        instanceCount[0]--;
1347        return current;
1348    }
1349
1350    /**
1351     * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time)
1352     *
1353     * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
1354     *         the dataset.
1355     */
1356    private static Calendar getCurrentInstance_old(Date effectiveTime, int instanceCount[], ELEvaluator eval) {
1357        Date datasetInitialInstance = getInitialInstance(eval);
1358        TimeUnit dsTimeUnit = getDSTimeUnit(eval);
1359        TimeZone dsTZ = getDatasetTZ(eval);
1360        int dsFreq = getDSFrequency(eval);
1361        // Convert Date to Calendar for corresponding TZ
1362        Calendar current = Calendar.getInstance();
1363        current.setTime(datasetInitialInstance);
1364        current.setTimeZone(dsTZ);
1365
1366        Calendar calEffectiveTime = Calendar.getInstance();
1367        calEffectiveTime.setTime(effectiveTime);
1368        calEffectiveTime.setTimeZone(dsTZ);
1369        if (instanceCount == null) {    // caller doesn't care about this value
1370            instanceCount = new int[1];
1371        }
1372        instanceCount[0] = 0;
1373        if (current.compareTo(calEffectiveTime) > 0) {
1374            return null;
1375        }
1376        Calendar origCurrent = (Calendar) current.clone();
1377        while (current.compareTo(calEffectiveTime) <= 0) {
1378            current = (Calendar) origCurrent.clone();
1379            instanceCount[0]++;
1380            current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq);
1381        }
1382        instanceCount[0]--;
1383
1384        current = (Calendar) origCurrent.clone();
1385        current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq);
1386        return current;
1387    }
1388
1389    public static Calendar getEffectiveNominalTime() {
1390        Date datasetInitialInstance = getInitialInstance();
1391        TimeZone dsTZ = getDatasetTZ();
1392        // Convert Date to Calendar for corresponding TZ
1393        Calendar current = Calendar.getInstance();
1394        current.setTime(datasetInitialInstance);
1395        current.setTimeZone(dsTZ);
1396
1397        Calendar calEffectiveTime = Calendar.getInstance();
1398        calEffectiveTime.setTime(getActionCreationtime());
1399        calEffectiveTime.setTimeZone(dsTZ);
1400        if (current.compareTo(calEffectiveTime) > 0) {
1401            // Nominal Time < initial Instance
1402            // TODO: getClass() call doesn't work from static method.
1403            // XLog.getLog("CoordELFunction.class").warn("ACTION CREATED BEFORE INITIAL INSTACE "+
1404            // current.getTime());
1405            return null;
1406        }
1407        return calEffectiveTime;
1408    }
1409
1410    /**
1411     * @return dataset frequency in minutes
1412     */
1413    private static int getDSFrequency() {
1414        ELEvaluator eval = ELEvaluator.getCurrent();
1415        return getDSFrequency(eval);
1416    }
1417
1418    /**
1419     * @return dataset frequency in minutes
1420     */
1421    private static int getDSFrequency(ELEvaluator eval) {
1422        SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1423        if (ds == null) {
1424            throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1425        }
1426        return ds.getFrequency();
1427    }
1428
1429    /**
1430     * @return dataset TimeUnit
1431     */
1432    private static TimeUnit getDSTimeUnit() {
1433        ELEvaluator eval = ELEvaluator.getCurrent();
1434        return getDSTimeUnit(eval);
1435    }
1436
1437    /**
1438     * @return dataset TimeUnit
1439     */
1440    private static TimeUnit getDSTimeUnit(ELEvaluator eval) {
1441        SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1442        if (ds == null) {
1443            throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1444        }
1445        return ds.getTimeUnit();
1446    }
1447
1448    /**
1449     * @return dataset TimeZone
1450     */
1451    public static TimeZone getDatasetTZ() {
1452        ELEvaluator eval = ELEvaluator.getCurrent();
1453        return getDatasetTZ(eval);
1454    }
1455
1456    /**
1457     * @return dataset TimeZone
1458     */
1459    private static TimeZone getDatasetTZ(ELEvaluator eval) {
1460        SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1461        if (ds == null) {
1462            throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1463        }
1464        return ds.getTimeZone();
1465    }
1466
1467    /**
1468     * @return dataset TimeUnit
1469     */
1470    private static TimeUnit getDSEndOfFlag() {
1471        ELEvaluator eval = ELEvaluator.getCurrent();
1472        return getDSEndOfFlag(eval);
1473    }
1474
1475    /**
1476     * @return dataset TimeUnit
1477     */
1478    private static TimeUnit getDSEndOfFlag(ELEvaluator eval) {
1479        SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1480        if (ds == null) {
1481            throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1482        }
1483        return ds.getEndOfDuration();// == null ? "": ds.getEndOfDuration();
1484    }
1485
1486    /**
1487     * Return a job configuration property for the coordinator.
1488     *
1489     * @param property property name.
1490     * @return the value of the property, <code>null</code> if the property is undefined.
1491     */
1492    public static String coord_conf(String property) {
1493        ELEvaluator eval = ELEvaluator.getCurrent();
1494        return (String) eval.getVariable(property);
1495    }
1496
1497    /**
1498     * Return the user that submitted the coordinator job.
1499     *
1500     * @return the user that submitted the coordinator job.
1501     */
1502    public static String coord_user() {
1503        ELEvaluator eval = ELEvaluator.getCurrent();
1504        return (String) eval.getVariable(OozieClient.USER_NAME);
1505    }
1506
1507    /**
1508     * Takes two offset times and returns a list of multiples of the frequency offset from the effective nominal time that occur
1509     * between them.  The caller should make sure that startCal is earlier than endCal.
1510     * <p>
1511     * As a simple example, assume its the same day: startCal is 1:00, endCal is 2:00, frequency is 20min, and effective nominal
1512     * time is 1:20 -- then this method would return a list containing: -20, 0, 20, 40, 60
1513     *
1514     * @param startCal The earlier offset time
1515     * @param endCal The later offset time
1516     * @param eval The ELEvaluator to use; cannot be null
1517     * @return A list of multiple of the frequency offset from the effective nominal time that occur between the startCal and endCal
1518     */
1519    public static List<Integer> expandOffsetTimes(Calendar startCal, Calendar endCal, ELEvaluator eval) {
1520        List<Integer> expandedFreqs = new ArrayList<Integer>();
1521        // Use eval because the "current" eval isn't set
1522        int freq = getDSFrequency(eval);
1523        TimeUnit freqUnit = getDSTimeUnit(eval);
1524        Calendar cal = getCurrentInstance(getActionCreationtime(eval), null, eval);
1525        int totalFreq = 0;
1526        if (startCal.before(cal)) {
1527            while (cal.after(startCal)) {
1528                cal.add(freqUnit.getCalendarUnit(), -freq);
1529                totalFreq += -freq;
1530            }
1531            if (cal.before(startCal)) {
1532                cal.add(freqUnit.getCalendarUnit(), freq);
1533                totalFreq += freq;
1534            }
1535        }
1536        else if (startCal.after(cal)) {
1537            while (cal.before(startCal)) {
1538                cal.add(freqUnit.getCalendarUnit(), freq);
1539                totalFreq += freq;
1540            }
1541        }
1542        // At this point, cal is the smallest multiple of the dataset frequency that is >= to the startCal and offset from the
1543        // effective nominal time.  Now we can find all of the instances that occur between startCal and endCal, inclusive.
1544        while (cal.before(endCal) || cal.equals(endCal)) {
1545            expandedFreqs.add(totalFreq);
1546            cal.add(freqUnit.getCalendarUnit(), freq);
1547            totalFreq += freq;
1548        }
1549        return expandedFreqs;
1550    }
1551
1552    /**
1553     * Resolve the offset time from the effective nominal time
1554     *
1555     * @param n offset amount (integer)
1556     * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR")
1557     * @param eval The ELEvaluator to use; or null to use the "current" eval
1558     * @return A Calendar of the offset time
1559     */
1560    public static Calendar resolveOffsetRawTime(int n, TimeUnit timeUnit, ELEvaluator eval) {
1561        // Use eval if given (for when the "current" eval isn't set)
1562        Calendar cal;
1563        if (eval == null) {
1564            cal = getCurrentInstance(getActionCreationtime(), null);
1565        }
1566        else {
1567            cal = getCurrentInstance(getActionCreationtime(eval), null, eval);
1568        }
1569        if (cal == null) {
1570            XLog.getLog(CoordELFunctions.class).warn("If the initial instance of the dataset is later than the nominal time, an"
1571                    + " empty string is returned. This means that no data is available at the offset instance specified by the user"
1572                    + " and the user could try modifying his or her initial-instance to an earlier time.");
1573            return null;
1574        }
1575        cal.add(timeUnit.getCalendarUnit(), n);
1576        return cal;
1577    }
1578}