001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.coord;
019    
020    import java.io.IOException;
021    import java.util.Calendar;
022    import java.util.Date;
023    import java.util.TimeZone;
024    
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.fs.Path;
027    
028    import org.apache.oozie.client.OozieClient;
029    import org.apache.oozie.util.DateUtils;
030    import org.apache.oozie.util.ELEvaluator;
031    import org.apache.oozie.util.ParamChecker;
032    import org.apache.oozie.util.XLog;
033    import org.apache.oozie.service.HadoopAccessorException;
034    import org.apache.oozie.service.Services;
035    import org.apache.oozie.service.HadoopAccessorService;
036    
037    /**
038     * This class implements the EL function related to coordinator
039     */
040    
041    public class CoordELFunctions {
042        final private static String DATASET = "oozie.coord.el.dataset.bean";
043        final private static String COORD_ACTION = "oozie.coord.el.app.bean";
044        final public static String CONFIGURATION = "oozie.coord.el.conf";
045        // INSTANCE_SEPARATOR is used to separate multiple directories into one tag.
046        final public static String INSTANCE_SEPARATOR = "#";
047        final public static String DIR_SEPARATOR = ",";
048        // TODO: in next release, support flexibility
049        private static String END_OF_OPERATION_INDICATOR_FILE = "_SUCCESS";
050    
051        /**
052         * Used in defining the frequency in 'day' unit. <p/> domain: <code> val &gt; 0</code> and should be integer.
053         *
054         * @param val frequency in number of days.
055         * @return number of days and also set the frequency timeunit to "day"
056         */
057        public static int ph1_coord_days(int val) {
058            val = ParamChecker.checkGTZero(val, "n");
059            ELEvaluator eval = ELEvaluator.getCurrent();
060            eval.setVariable("timeunit", TimeUnit.DAY);
061            eval.setVariable("endOfDuration", TimeUnit.NONE);
062            return val;
063        }
064    
065        /**
066         * Used in defining the frequency in 'month' unit. <p/> domain: <code> val &gt; 0</code> and should be integer.
067         *
068         * @param val frequency in number of months.
069         * @return number of months and also set the frequency timeunit to "month"
070         */
071        public static int ph1_coord_months(int val) {
072            val = ParamChecker.checkGTZero(val, "n");
073            ELEvaluator eval = ELEvaluator.getCurrent();
074            eval.setVariable("timeunit", TimeUnit.MONTH);
075            eval.setVariable("endOfDuration", TimeUnit.NONE);
076            return val;
077        }
078    
079        /**
080         * Used in defining the frequency in 'hour' unit. <p/> parameter value domain: <code> val &gt; 0</code> and should
081         * be integer.
082         *
083         * @param val frequency in number of hours.
084         * @return number of minutes and also set the frequency timeunit to "minute"
085         */
086        public static int ph1_coord_hours(int val) {
087            val = ParamChecker.checkGTZero(val, "n");
088            ELEvaluator eval = ELEvaluator.getCurrent();
089            eval.setVariable("timeunit", TimeUnit.MINUTE);
090            eval.setVariable("endOfDuration", TimeUnit.NONE);
091            return val * 60;
092        }
093    
094        /**
095         * Used in defining the frequency in 'minute' unit. <p/> domain: <code> val &gt; 0</code> and should be integer.
096         *
097         * @param val frequency in number of minutes.
098         * @return number of minutes and also set the frequency timeunit to "minute"
099         */
100        public static int ph1_coord_minutes(int val) {
101            val = ParamChecker.checkGTZero(val, "n");
102            ELEvaluator eval = ELEvaluator.getCurrent();
103            eval.setVariable("timeunit", TimeUnit.MINUTE);
104            eval.setVariable("endOfDuration", TimeUnit.NONE);
105            return val;
106        }
107    
108        /**
109         * Used in defining the frequency in 'day' unit and specify the "end of day" property. <p/> Every instance will
110         * start at 00:00 hour of each day. <p/> domain: <code> val &gt; 0</code> and should be integer.
111         *
112         * @param val frequency in number of days.
113         * @return number of days and also set the frequency timeunit to "day" and end_of_duration flag to "day"
114         */
115        public static int ph1_coord_endOfDays(int val) {
116            val = ParamChecker.checkGTZero(val, "n");
117            ELEvaluator eval = ELEvaluator.getCurrent();
118            eval.setVariable("timeunit", TimeUnit.DAY);
119            eval.setVariable("endOfDuration", TimeUnit.END_OF_DAY);
120            return val;
121        }
122    
123        /**
124         * Used in defining the frequency in 'month' unit and specify the "end of month" property. <p/> Every instance will
125         * start at first day of each month at 00:00 hour. <p/> domain: <code> val &gt; 0</code> and should be integer.
126         *
127         * @param val: frequency in number of months.
128         * @return number of months and also set the frequency timeunit to "month" and end_of_duration flag to "month"
129         */
130        public static int ph1_coord_endOfMonths(int val) {
131            val = ParamChecker.checkGTZero(val, "n");
132            ELEvaluator eval = ELEvaluator.getCurrent();
133            eval.setVariable("timeunit", TimeUnit.MONTH);
134            eval.setVariable("endOfDuration", TimeUnit.END_OF_MONTH);
135            return val;
136        }
137    
138        /**
139         * Calculate the difference of timezone offset in minutes between dataset and coordinator job. <p/> Depends on: <p/>
140         * 1. Timezone of both dataset and job <p/> 2. Action creation Time
141         *
142         * @return difference in minutes (DataSet TZ Offset - Application TZ offset)
143         */
144        public static int ph2_coord_tzOffset() {
145            Date actionCreationTime = getActionCreationtime();
146            TimeZone dsTZ = ParamChecker.notNull(getDatasetTZ(), "DatasetTZ");
147            TimeZone jobTZ = ParamChecker.notNull(getJobTZ(), "JobTZ");
148            // Apply the TZ into Calendar object
149            Calendar dsTime = Calendar.getInstance(dsTZ);
150            dsTime.setTime(actionCreationTime);
151            Calendar jobTime = Calendar.getInstance(jobTZ);
152            jobTime.setTime(actionCreationTime);
153            return (dsTime.get(Calendar.ZONE_OFFSET) - jobTime.get(Calendar.ZONE_OFFSET)) / (1000 * 60);
154        }
155    
156        public static int ph3_coord_tzOffset() {
157            return ph2_coord_tzOffset();
158        }
159    
160        /**
161         * Returns the a date string while given a base date in 'strBaseDate',
162         * offset and unit (e.g. DAY, MONTH, HOUR, MINUTE, MONTH).
163         *
164         * @param strBaseDate -- base date
165         * @param offset -- any number
166         * @param unit -- DAY, MONTH, HOUR, MINUTE, MONTH
167         * @return date string
168         * @throws Exception
169         */
170        public static String ph2_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception {
171            Calendar baseCalDate = DateUtils.getCalendar(strBaseDate);
172            StringBuilder buffer = new StringBuilder();
173            baseCalDate.add(TimeUnit.valueOf(unit).getCalendarUnit(), offset);
174            buffer.append(DateUtils.formatDateOozieTZ(baseCalDate));
175            return buffer.toString();
176        }
177    
178        public static String ph3_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception {
179            return ph2_coord_dateOffset(strBaseDate, offset, unit);
180        }
181    
182        /**
183         * Determine the date-time in Oozie processing timezone of n-th future available dataset instance
184         * from nominal Time but not beyond the instance specified as 'instance.
185         * <p/>
186         * It depends on:
187         * <p/>
188         * 1. Data set frequency
189         * <p/>
190         * 2. Data set Time unit (day, month, minute)
191         * <p/>
192         * 3. Data set Time zone/DST
193         * <p/>
194         * 4. End Day/Month flag
195         * <p/>
196         * 5. Data set initial instance
197         * <p/>
198         * 6. Action Creation Time
199         * <p/>
200         * 7. Existence of dataset's directory
201         *
202         * @param n :instance count
203         *        <p/>
204         *        domain: n >= 0, n is integer
205         * @param instance: How many future instance it should check? value should
206         *        be >=0
207         * @return date-time in Oozie processing timezone of the n-th instance
208         *         <p/>
209         * @throws Exception
210         */
211        public static String ph3_coord_future(int n, int instance) throws Exception {
212            ParamChecker.checkGEZero(n, "future:n");
213            ParamChecker.checkGTZero(instance, "future:instance");
214            if (isSyncDataSet()) {// For Sync Dataset
215                return coord_future_sync(n, instance);
216            }
217            else {
218                throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
219            }
220        }
221    
222        private static String coord_future_sync(int n, int instance) throws Exception {
223            ELEvaluator eval = ELEvaluator.getCurrent();
224            String retVal = "";
225            int datasetFrequency = (int) getDSFrequency();// in minutes
226            TimeUnit dsTimeUnit = getDSTimeUnit();
227            int[] instCount = new int[1];
228            Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
229            if (nominalInstanceCal != null) {
230                Calendar initInstance = getInitialInstanceCal();
231                nominalInstanceCal = (Calendar) initInstance.clone();
232                nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
233    
234                SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
235                if (ds == null) {
236                    throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
237                }
238                String uriTemplate = ds.getUriTemplate();
239                Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
240                if (conf == null) {
241                    throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
242                }
243                int available = 0, checkedInstance = 0;
244                boolean resolved = false;
245                String user = ParamChecker
246                        .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
247                String doneFlag = ds.getDoneFlag();
248                while (instance >= checkedInstance) {
249                    ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
250                    String uriPath = uriEval.evaluate(uriTemplate, String.class);
251                    String pathWithDoneFlag = uriPath;
252                    if (doneFlag.length() > 0) {
253                        pathWithDoneFlag += "/" + doneFlag;
254                    }
255                    if (isPathAvailable(pathWithDoneFlag, user, null, conf)) {
256                        XLog.getLog(CoordELFunctions.class).debug("Found future(" + available + "): " + pathWithDoneFlag);
257                        if (available == n) {
258                            XLog.getLog(CoordELFunctions.class).debug("Found future File: " + pathWithDoneFlag);
259                            resolved = true;
260                            retVal = DateUtils.formatDateOozieTZ(nominalInstanceCal);
261                            eval.setVariable("resolved_path", uriPath);
262                            break;
263                        }
264                        available++;
265                    }
266                    // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(),
267                    // -datasetFrequency);
268                    nominalInstanceCal = (Calendar) initInstance.clone();
269                    instCount[0]++;
270                    nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
271                    checkedInstance++;
272                    // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
273                }
274                if (!resolved) {
275                    // return unchanged future function with variable 'is_resolved'
276                    // to 'false'
277                    eval.setVariable("is_resolved", Boolean.FALSE);
278                    retVal = "${coord:future(" + n + ", " + instance + ")}";
279                }
280                else {
281                    eval.setVariable("is_resolved", Boolean.TRUE);
282                }
283            }
284            else {// No feasible nominal time
285                eval.setVariable("is_resolved", Boolean.TRUE);
286                retVal = "";
287            }
288            return retVal;
289        }
290    
291        /**
292         * Return nominal time or Action Creation Time.
293         * <p/>
294         *
295         * @return coordinator action creation or materialization date time
296         * @throws Exception if unable to format the Date object to String
297         */
298        public static String ph2_coord_nominalTime() throws Exception {
299            ELEvaluator eval = ELEvaluator.getCurrent();
300            SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
301                    "Coordinator Action");
302            return DateUtils.formatDateOozieTZ(action.getNominalTime());
303        }
304    
305        public static String ph3_coord_nominalTime() throws Exception {
306            return ph2_coord_nominalTime();
307        }
308    
309        /**
310         * Convert from standard date-time formatting to a desired format.
311         * <p/>
312         * @param dateTimeStr - A timestamp in standard (ISO8601) format.
313         * @param format - A string representing the desired format.
314         * @return coordinator action creation or materialization date time
315         * @throws Exception if unable to format the Date object to String
316         */
317        public static String ph2_coord_formatTime(String dateTimeStr, String format)
318            throws Exception {
319            Date dateTime = DateUtils.parseDateOozieTZ(dateTimeStr);
320            return DateUtils.formatDateCustom(dateTime, format);
321        }
322    
323        public static String ph3_coord_formatTime(String dateTimeStr, String format)
324            throws Exception {
325            return ph2_coord_formatTime(dateTimeStr, format);
326        }
327    
328        /**
329         * Return Action Id. <p/>
330         *
331         * @return coordinator action Id
332         */
333        public static String ph2_coord_actionId() throws Exception {
334            ELEvaluator eval = ELEvaluator.getCurrent();
335            SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
336                    "Coordinator Action");
337            return action.getActionId();
338        }
339    
340        public static String ph3_coord_actionId() throws Exception {
341            return ph2_coord_actionId();
342        }
343    
344        /**
345         * Return Job Name. <p/>
346         *
347         * @return coordinator name
348         */
349        public static String ph2_coord_name() throws Exception {
350            ELEvaluator eval = ELEvaluator.getCurrent();
351            SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
352                    "Coordinator Action");
353            return action.getName();
354        }
355    
356        public static String ph3_coord_name() throws Exception {
357            return ph2_coord_name();
358        }
359    
360        /**
361         * Return Action Start time. <p/>
362         *
363         * @return coordinator action start time
364         * @throws Exception if unable to format the Date object to String
365         */
366        public static String ph2_coord_actualTime() throws Exception {
367            ELEvaluator eval = ELEvaluator.getCurrent();
368            SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
369            if (coordAction == null) {
370                throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
371            }
372            return DateUtils.formatDateOozieTZ(coordAction.getActualTime());
373        }
374    
375        public static String ph3_coord_actualTime() throws Exception {
376            return ph2_coord_actualTime();
377        }
378    
379        /**
380         * Used to specify a list of URI's that are used as input dir to the workflow job. <p/> Look for two evaluator-level
381         * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
382         * URI. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
383         * unresolved, this function will echo back the original function <p/> otherwise it sends the uris.
384         *
385         * @param dataInName : Datain name
386         * @return the list of URI's separated by INSTANCE_SEPARATOR <p/> if there are unresolved EL function (i.e. latest)
387         *         , echo back <p/> the function without resolving the function.
388         */
389        public static String ph3_coord_dataIn(String dataInName) {
390            String uris = "";
391            ELEvaluator eval = ELEvaluator.getCurrent();
392            uris = (String) eval.getVariable(".datain." + dataInName);
393            Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
394            if (unresolved != null && unresolved.booleanValue() == true) {
395                return "${coord:dataIn('" + dataInName + "')}";
396            }
397            return uris;
398        }
399    
400        /**
401         * Used to specify a list of URI's that are output dir of the workflow job. <p/> Look for one evaluator-level
402         * variable <p/> dataout.<DATAOUT_NAME> <p/> It defines the current list of URI. <p/> otherwise it sends the uris.
403         *
404         * @param dataOutName : Dataout name
405         * @return the list of URI's separated by INSTANCE_SEPARATOR
406         */
407        public static String ph3_coord_dataOut(String dataOutName) {
408            String uris = "";
409            ELEvaluator eval = ELEvaluator.getCurrent();
410            uris = (String) eval.getVariable(".dataout." + dataOutName);
411            return uris;
412        }
413    
414        /**
415         * Determine the date-time in Oozie processing timezone of n-th dataset instance. <p/> It depends on: <p/> 1.
416         * Data set frequency <p/> 2.
417         * Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. Data
418         * set initial instance <p/> 6. Action Creation Time
419         *
420         * @param n instance count domain: n is integer
421         * @return date-time in Oozie processing timezone of the n-th instance returns 'null' means n-th instance is
422         * earlier than Initial-Instance of DS
423         * @throws Exception
424         */
425        public static String ph2_coord_current(int n) throws Exception {
426            if (isSyncDataSet()) { // For Sync Dataset
427                return coord_current_sync(n);
428            }
429            else {
430                throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
431            }
432        }
433    
434        /**
435         * Determine how many hours is on the date of n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency
436         * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5.
437         * Data set initial instance <p/> 6. Action Creation Time
438         *
439         * @param n instance count <p/> domain: n is integer
440         * @return number of hours on that day <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS
441         * @throws Exception
442         */
443        public static int ph2_coord_hoursInDay(int n) throws Exception {
444            int datasetFrequency = (int) getDSFrequency();
445            // /Calendar nominalInstanceCal =
446            // getCurrentInstance(getActionCreationtime());
447            Calendar nominalInstanceCal = getEffectiveNominalTime();
448            if (nominalInstanceCal == null) {
449                return -1;
450            }
451            nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n);
452            /*
453             * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0)
454             * { return -1; }
455             */
456            nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ
457            // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
458            return DateUtils.hoursInDay(nominalInstanceCal);
459        }
460    
461        public static int ph3_coord_hoursInDay(int n) throws Exception {
462            return ph2_coord_hoursInDay(n);
463        }
464    
465        /**
466         * Calculate number of days in one month for n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency .
467         * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5.
468         * Data set initial instance <p/> 6. Action Creation Time
469         *
470         * @param n instance count. domain: n is integer
471         * @return number of days in that month <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS
472         * @throws Exception
473         */
474        public static int ph2_coord_daysInMonth(int n) throws Exception {
475            int datasetFrequency = (int) getDSFrequency();// in minutes
476            // Calendar nominalInstanceCal =
477            // getCurrentInstance(getActionCreationtime());
478            Calendar nominalInstanceCal = getEffectiveNominalTime();
479            if (nominalInstanceCal == null) {
480                return -1;
481            }
482            nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n);
483            /*
484             * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0)
485             * { return -1; }
486             */
487            nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ
488            // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
489            return nominalInstanceCal.getActualMaximum(Calendar.DAY_OF_MONTH);
490        }
491    
492        public static int ph3_coord_daysInMonth(int n) throws Exception {
493            return ph2_coord_daysInMonth(n);
494        }
495    
496        /**
497         * Determine the date-time in Oozie processing timezone of n-th latest available dataset instance. <p/> It depends
498         * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST
499         * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of
500         * dataset's directory
501         *
502         * @param n :instance count <p/> domain: n > 0, n is integer
503         * @return date-time in Oozie processing timezone of the n-th instance <p/> returns 'null' means n-th instance is
504         * earlier than Initial-Instance of DS
505         * @throws Exception
506         */
507        public static String ph3_coord_latest(int n) throws Exception {
508            if (n > 0) {
509                throw new IllegalArgumentException("paramter should be <= 0 but it is " + n);
510            }
511            if (isSyncDataSet()) {// For Sync Dataset
512                return coord_latest_sync(n);
513            }
514            else {
515                throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
516            }
517        }
518    
519        /**
520         * Configure an evaluator with data set and application specific information. <p/> Helper method of associating
521         * dataset and application object
522         *
523         * @param evaluator : to set variables
524         * @param ds : Data Set object
525         * @param coordAction : Application instance
526         */
527        public static void configureEvaluator(ELEvaluator evaluator, SyncCoordDataset ds, SyncCoordAction coordAction) {
528            evaluator.setVariable(COORD_ACTION, coordAction);
529            evaluator.setVariable(DATASET, ds);
530        }
531    
532        /**
533         * Helper method to wrap around with "${..}". <p/>
534         *
535         *
536         * @param eval :EL evaluator
537         * @param expr : expression to evaluate
538         * @return Resolved expression or echo back the same expression
539         * @throws Exception
540         */
541        public static String evalAndWrap(ELEvaluator eval, String expr) throws Exception {
542            try {
543                eval.setVariable(".wrap", null);
544                String result = eval.evaluate(expr, String.class);
545                if (eval.getVariable(".wrap") != null) {
546                    return "${" + result + "}";
547                }
548                else {
549                    return result;
550                }
551            }
552            catch (Exception e) {
553                throw new Exception("Unable to evaluate :" + expr + ":\n", e);
554            }
555        }
556    
557        // Set of echo functions
558    
559        public static String ph1_coord_current_echo(String n) {
560            return echoUnResolved("current", n);
561        }
562    
563        public static String ph2_coord_current_echo(String n) {
564            return echoUnResolved("current", n);
565        }
566    
567        public static String ph1_coord_dateOffset_echo(String n, String offset, String unit) {
568            return echoUnResolved("dateOffset", n + " , " + offset + " , " + unit);
569        }
570    
571        public static String ph1_coord_formatTime_echo(String dateTime, String format) {
572            // Quote the dateTime value since it would contain a ':'.
573            return echoUnResolved("formatTime", "'"+dateTime+"'" + " , " + format);
574        }
575    
576        public static String ph1_coord_latest_echo(String n) {
577            return echoUnResolved("latest", n);
578        }
579    
580        public static String ph2_coord_latest_echo(String n) {
581            return ph1_coord_latest_echo(n);
582        }
583    
584        public static String ph1_coord_future_echo(String n, String instance) {
585            return echoUnResolved("future", n + ", " + instance + "");
586        }
587    
588        public static String ph2_coord_future_echo(String n, String instance) {
589            return ph1_coord_future_echo(n, instance);
590        }
591    
592        public static String ph1_coord_dataIn_echo(String n) {
593            ELEvaluator eval = ELEvaluator.getCurrent();
594            String val = (String) eval.getVariable("oozie.dataname." + n);
595            if (val == null || val.equals("data-in") == false) {
596                XLog.getLog(CoordELFunctions.class).error("data_in_name " + n + " is not valid");
597                throw new RuntimeException("data_in_name " + n + " is not valid");
598            }
599            return echoUnResolved("dataIn", "'" + n + "'");
600        }
601    
602        public static String ph1_coord_dataOut_echo(String n) {
603            ELEvaluator eval = ELEvaluator.getCurrent();
604            String val = (String) eval.getVariable("oozie.dataname." + n);
605            if (val == null || val.equals("data-out") == false) {
606                XLog.getLog(CoordELFunctions.class).error("data_out_name " + n + " is not valid");
607                throw new RuntimeException("data_out_name " + n + " is not valid");
608            }
609            return echoUnResolved("dataOut", "'" + n + "'");
610        }
611    
612        public static String ph1_coord_nominalTime_echo() {
613            return echoUnResolved("nominalTime", "");
614        }
615    
616        public static String ph1_coord_nominalTime_echo_wrap() {
617            // return "${coord:nominalTime()}"; // no resolution
618            return echoUnResolved("nominalTime", "");
619        }
620    
621        public static String ph1_coord_nominalTime_echo_fixed() {
622            return "2009-03-06T010:00"; // Dummy resolution
623        }
624    
625        public static String ph1_coord_actualTime_echo_wrap() {
626            // return "${coord:actualTime()}"; // no resolution
627            return echoUnResolved("actualTime", "");
628        }
629    
630        public static String ph1_coord_actionId_echo() {
631            return echoUnResolved("actionId", "");
632        }
633    
634        public static String ph1_coord_name_echo() {
635            return echoUnResolved("name", "");
636        }
637    
638        // The following echo functions are not used in any phases yet
639        // They are here for future purpose.
640        public static String coord_minutes_echo(String n) {
641            return echoUnResolved("minutes", n);
642        }
643    
644        public static String coord_hours_echo(String n) {
645            return echoUnResolved("hours", n);
646        }
647    
648        public static String coord_days_echo(String n) {
649            return echoUnResolved("days", n);
650        }
651    
652        public static String coord_endOfDay_echo(String n) {
653            return echoUnResolved("endOfDay", n);
654        }
655    
656        public static String coord_months_echo(String n) {
657            return echoUnResolved("months", n);
658        }
659    
660        public static String coord_endOfMonth_echo(String n) {
661            return echoUnResolved("endOfMonth", n);
662        }
663    
664        public static String coord_actualTime_echo() {
665            return echoUnResolved("actualTime", "");
666        }
667    
668        // This echo function will always return "24" for validation only.
669        // This evaluation ****should not**** replace the original XML
670        // Create a temporary string and validate the function
671        // This is **required** for evaluating an expression like
672        // coord:HoursInDay(0) + 3
673        // actual evaluation will happen in phase 2 or phase 3.
674        public static String ph1_coord_hoursInDay_echo(String n) {
675            return "24";
676            // return echoUnResolved("hoursInDay", n);
677        }
678    
679        // This echo function will always return "30" for validation only.
680        // This evaluation ****should not**** replace the original XML
681        // Create a temporary string and validate the function
682        // This is **required** for evaluating an expression like
683        // coord:daysInMonth(0) + 3
684        // actual evaluation will happen in phase 2 or phase 3.
685        public static String ph1_coord_daysInMonth_echo(String n) {
686            // return echoUnResolved("daysInMonth", n);
687            return "30";
688        }
689    
690        // This echo function will always return "3" for validation only.
691        // This evaluation ****should not**** replace the original XML
692        // Create a temporary string and validate the function
693        // This is **required** for evaluating an expression like coord:tzOffset + 2
694        // actual evaluation will happen in phase 2 or phase 3.
695        public static String ph1_coord_tzOffset_echo() {
696            // return echoUnResolved("tzOffset", "");
697            return "3";
698        }
699    
700        // Local methods
701        /**
702         * @param n
703         * @return n-th instance Date-Time from current instance for data-set <p/> return empty string ("") if the
704         *         Action_Creation_time or the n-th instance <p/> is earlier than the Initial_Instance of dataset.
705         * @throws Exception
706         */
707        private static String coord_current_sync(int n) throws Exception {
708            int datasetFrequency = getDSFrequency();// in minutes
709            TimeUnit dsTimeUnit = getDSTimeUnit();
710            int[] instCount = new int[1];// used as pass by ref
711            Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
712            if (nominalInstanceCal == null) {
713                XLog.getLog(CoordELFunctions.class)
714                        .warn("If the initial instance of the dataset is later than the nominal time, an empty string is returned. This means that no data is available at the current-instance specified by the user and the user could try modifying his initial-instance to an earlier time.");
715                return "";
716            }
717            nominalInstanceCal = getInitialInstanceCal();
718            int absInstanceCount = instCount[0] + n;
719            nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency * absInstanceCount);
720    
721            if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) {
722                XLog.getLog(CoordELFunctions.class)
723                        .warn("If the initial instance of the dataset is later than the current-instance specified, such as coord:current({0}) in this case, an empty string is returned. This means that no data is available at the current-instance specified by the user and the user could try modifying his initial-instance to an earlier time.", n);
724                return "";
725            }
726            String str = DateUtils.formatDateOozieTZ(nominalInstanceCal);
727            return str;
728        }
729    
730        /**
731         * @param offset
732         * @return n-th available latest instance Date-Time for SYNC data-set
733         * @throws Exception
734         */
735        private static String coord_latest_sync(int offset) throws Exception {
736            if (offset > 0) {
737                throw new RuntimeException("For latest there is no meaning " + "of positive instance. n should be <=0"
738                        + offset);
739            }
740            ELEvaluator eval = ELEvaluator.getCurrent();
741            String retVal = "";
742            int datasetFrequency = (int) getDSFrequency();// in minutes
743            TimeUnit dsTimeUnit = getDSTimeUnit();
744            int[] instCount = new int[1];
745            Calendar nominalInstanceCal = getCurrentInstance(getActualTime(), instCount);
746            if (nominalInstanceCal != null) {
747                Calendar initInstance = getInitialInstanceCal();
748                SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
749                if (ds == null) {
750                    throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
751                }
752                String uriTemplate = ds.getUriTemplate();
753                Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
754                if (conf == null) {
755                    throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
756                }
757                int available = 0;
758                boolean resolved = false;
759                String user = ParamChecker
760                        .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
761                String doneFlag = ds.getDoneFlag();
762                while (nominalInstanceCal.compareTo(initInstance) >= 0) {
763                    ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
764                    String uriPath = uriEval.evaluate(uriTemplate, String.class);
765                    String pathWithDoneFlag = uriPath;
766                    if (doneFlag.length() > 0) {
767                        pathWithDoneFlag += "/" + doneFlag;
768                    }
769                    if (isPathAvailable(pathWithDoneFlag, user, null, conf)) {
770                        XLog.getLog(CoordELFunctions.class).debug("Found latest(" + available + "): " + pathWithDoneFlag);
771                        if (available == offset) {
772                            XLog.getLog(CoordELFunctions.class).debug("Found Latest File: " + pathWithDoneFlag);
773                            resolved = true;
774                            retVal = DateUtils.formatDateOozieTZ(nominalInstanceCal);
775                            eval.setVariable("resolved_path", uriPath);
776                            break;
777                        }
778    
779                        available--;
780                    }
781                    // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(),
782                    // -datasetFrequency);
783                    nominalInstanceCal = (Calendar) initInstance.clone();
784                    instCount[0]--;
785                    nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
786                    // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
787                }
788                if (!resolved) {
789                    // return unchanged latest function with variable 'is_resolved'
790                    // to 'false'
791                    eval.setVariable("is_resolved", Boolean.FALSE);
792                    retVal = "${coord:latest(" + offset + ")}";
793                }
794                else {
795                    eval.setVariable("is_resolved", Boolean.TRUE);
796                }
797            }
798            else {// No feasible nominal time
799                eval.setVariable("is_resolved", Boolean.FALSE);
800            }
801            return retVal;
802        }
803    
804        // TODO : Not an efficient way. In a loop environment, we could do something
805        // outside the loop
806        /**
807         * Check whether a URI path exists
808         *
809         * @param sPath
810         * @param conf
811         * @return
812         * @throws IOException
813         */
814    
815        private static boolean isPathAvailable(String sPath, String user, String group, Configuration conf)
816                throws IOException, HadoopAccessorException {
817            // sPath += "/" + END_OF_OPERATION_INDICATOR_FILE;
818            Path path = new Path(sPath);
819            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
820            Configuration fsConf = has.createJobConf(path.toUri().getAuthority());
821            return has.createFileSystem(user, path.toUri(), fsConf).exists(path);
822        }
823    
824        /**
825         * @param tm
826         * @return a new Evaluator to be used for URI-template evaluation
827         */
828        private static ELEvaluator getUriEvaluator(Calendar tm) {
829            ELEvaluator retEval = new ELEvaluator();
830            retEval.setVariable("YEAR", tm.get(Calendar.YEAR));
831            retEval.setVariable("MONTH", (tm.get(Calendar.MONTH) + 1) < 10 ? "0" + (tm.get(Calendar.MONTH) + 1) : (tm
832                    .get(Calendar.MONTH) + 1));
833            retEval.setVariable("DAY", tm.get(Calendar.DAY_OF_MONTH) < 10 ? "0" + tm.get(Calendar.DAY_OF_MONTH) : tm
834                    .get(Calendar.DAY_OF_MONTH));
835            retEval.setVariable("HOUR", tm.get(Calendar.HOUR_OF_DAY) < 10 ? "0" + tm.get(Calendar.HOUR_OF_DAY) : tm
836                    .get(Calendar.HOUR_OF_DAY));
837            retEval.setVariable("MINUTE", tm.get(Calendar.MINUTE) < 10 ? "0" + tm.get(Calendar.MINUTE) : tm
838                    .get(Calendar.MINUTE));
839            return retEval;
840        }
841    
842        /**
843         * @return whether a data set is SYNCH or ASYNC
844         */
845        private static boolean isSyncDataSet() {
846            ELEvaluator eval = ELEvaluator.getCurrent();
847            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
848            if (ds == null) {
849                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
850            }
851            return ds.getType().equalsIgnoreCase("SYNC");
852        }
853    
854        /**
855         * Check whether a function should be resolved.
856         *
857         * @param functionName
858         * @param n
859         * @return null if the functionName needs to be resolved otherwise return the calling function unresolved.
860         */
861        private static String checkIfResolved(String functionName, String n) {
862            ELEvaluator eval = ELEvaluator.getCurrent();
863            String replace = (String) eval.getVariable("resolve_" + functionName);
864            if (replace == null || (replace != null && replace.equalsIgnoreCase("false"))) { // Don't
865                // resolve
866                // return "${coord:" + functionName + "(" + n +")}"; //Unresolved
867                eval.setVariable(".wrap", "true");
868                return "coord:" + functionName + "(" + n + ")"; // Unresolved
869            }
870            return null; // Resolved it
871        }
872    
873        private static String echoUnResolved(String functionName, String n) {
874            return echoUnResolvedPre(functionName, n, "coord:");
875        }
876    
877        private static String echoUnResolvedPre(String functionName, String n, String prefix) {
878            ELEvaluator eval = ELEvaluator.getCurrent();
879            eval.setVariable(".wrap", "true");
880            return prefix + functionName + "(" + n + ")"; // Unresolved
881        }
882    
883        /**
884         * @return the initial instance of a DataSet in DATE
885         */
886        private static Date getInitialInstance() {
887            return getInitialInstanceCal().getTime();
888            // return ds.getInitInstance();
889        }
890    
891        /**
892         * @return the initial instance of a DataSet in Calendar
893         */
894        private static Calendar getInitialInstanceCal() {
895            ELEvaluator eval = ELEvaluator.getCurrent();
896            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
897            if (ds == null) {
898                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
899            }
900            Calendar effInitTS = Calendar.getInstance();
901            effInitTS.setTime(ds.getInitInstance());
902            effInitTS.setTimeZone(ds.getTimeZone());
903            // To adjust EOD/EOM
904            DateUtils.moveToEnd(effInitTS, getDSEndOfFlag());
905            return effInitTS;
906            // return ds.getInitInstance();
907        }
908    
909        /**
910         * @return Nominal or action creation Time when all the dependencies of an application instance are met.
911         */
912        private static Date getActionCreationtime() {
913            ELEvaluator eval = ELEvaluator.getCurrent();
914            SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
915            if (coordAction == null) {
916                throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
917            }
918            return coordAction.getNominalTime();
919        }
920    
921        /**
922         * @return Actual Time when all the dependencies of an application instance are met.
923         */
924        private static Date getActualTime() {
925            ELEvaluator eval = ELEvaluator.getCurrent();
926            SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
927            if (coordAction == null) {
928                throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
929            }
930            return coordAction.getActualTime();
931        }
932    
933        /**
934         * @return TimeZone for the application or job.
935         */
936        private static TimeZone getJobTZ() {
937            ELEvaluator eval = ELEvaluator.getCurrent();
938            SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
939            if (coordAction == null) {
940                throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
941            }
942            return coordAction.getTimeZone();
943        }
944    
945        /**
946         * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time)
947         *
948         * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
949         *         the dataset.
950         */
951        private static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[]) {
952            Date datasetInitialInstance = getInitialInstance();
953            TimeUnit dsTimeUnit = getDSTimeUnit();
954            TimeZone dsTZ = getDatasetTZ();
955            // Convert Date to Calendar for corresponding TZ
956            Calendar current = Calendar.getInstance();
957            current.setTime(datasetInitialInstance);
958            current.setTimeZone(dsTZ);
959    
960            Calendar calEffectiveTime = Calendar.getInstance();
961            calEffectiveTime.setTime(effectiveTime);
962            calEffectiveTime.setTimeZone(dsTZ);
963            instanceCount[0] = 0;
964            if (current.compareTo(calEffectiveTime) > 0) {
965                // Nominal Time < initial Instance
966                // TODO: getClass() call doesn't work from static method.
967                // XLog.getLog("CoordELFunction.class").warn("ACTION CREATED BEFORE INITIAL INSTACE "+
968                // current.getTime());
969                return null;
970            }
971            Calendar origCurrent = (Calendar) current.clone();
972            while (current.compareTo(calEffectiveTime) <= 0) {
973                current = (Calendar) origCurrent.clone();
974                instanceCount[0]++;
975                current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * getDSFrequency());
976            }
977            instanceCount[0]--;
978    
979            current = (Calendar) origCurrent.clone();
980            current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * getDSFrequency());
981            return current;
982        }
983    
984        private static Calendar getEffectiveNominalTime() {
985            Date datasetInitialInstance = getInitialInstance();
986            TimeZone dsTZ = getDatasetTZ();
987            // Convert Date to Calendar for corresponding TZ
988            Calendar current = Calendar.getInstance();
989            current.setTime(datasetInitialInstance);
990            current.setTimeZone(dsTZ);
991    
992            Calendar calEffectiveTime = Calendar.getInstance();
993            calEffectiveTime.setTime(getActionCreationtime());
994            calEffectiveTime.setTimeZone(dsTZ);
995            if (current.compareTo(calEffectiveTime) > 0) {
996                // Nominal Time < initial Instance
997                // TODO: getClass() call doesn't work from static method.
998                // XLog.getLog("CoordELFunction.class").warn("ACTION CREATED BEFORE INITIAL INSTACE "+
999                // current.getTime());
1000                return null;
1001            }
1002            return calEffectiveTime;
1003        }
1004    
1005        /**
1006         * @return dataset frequency in minutes
1007         */
1008        private static int getDSFrequency() {
1009            ELEvaluator eval = ELEvaluator.getCurrent();
1010            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1011            if (ds == null) {
1012                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1013            }
1014            return ds.getFrequency();
1015        }
1016    
1017        /**
1018         * @return dataset TimeUnit
1019         */
1020        private static TimeUnit getDSTimeUnit() {
1021            ELEvaluator eval = ELEvaluator.getCurrent();
1022            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1023            if (ds == null) {
1024                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1025            }
1026            return ds.getTimeUnit();
1027        }
1028    
1029        /**
1030         * @return dataset TimeZone
1031         */
1032        private static TimeZone getDatasetTZ() {
1033            ELEvaluator eval = ELEvaluator.getCurrent();
1034            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1035            if (ds == null) {
1036                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1037            }
1038            return ds.getTimeZone();
1039        }
1040    
1041        /**
1042         * @return dataset TimeUnit
1043         */
1044        private static TimeUnit getDSEndOfFlag() {
1045            ELEvaluator eval = ELEvaluator.getCurrent();
1046            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1047            if (ds == null) {
1048                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1049            }
1050            return ds.getEndOfDuration();// == null ? "": ds.getEndOfDuration();
1051        }
1052    
1053        /**
1054         * Return a job configuration property for the coordinator.
1055         *
1056         * @param property property name.
1057         * @return the value of the property, <code>null</code> if the property is undefined.
1058         */
1059        public static String coord_conf(String property) {
1060            ELEvaluator eval = ELEvaluator.getCurrent();
1061            return (String) eval.getVariable(property);
1062        }
1063    
1064        /**
1065         * Return the user that submitted the coordinator job.
1066         *
1067         * @return the user that submitted the coordinator job.
1068         */
1069        public static String coord_user() {
1070            ELEvaluator eval = ELEvaluator.getCurrent();
1071            return (String) eval.getVariable(OozieClient.USER_NAME);
1072        }
1073    }