001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.coord;
019    
020    import java.io.IOException;
021    import java.util.Calendar;
022    import java.util.Date;
023    import java.util.TimeZone;
024    
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.fs.Path;
027    
028    import org.apache.oozie.client.OozieClient;
029    import org.apache.oozie.util.DateUtils;
030    import org.apache.oozie.util.ELEvaluator;
031    import org.apache.oozie.util.ParamChecker;
032    import org.apache.oozie.util.XLog;
033    import org.apache.oozie.service.HadoopAccessorException;
034    import org.apache.oozie.service.Services;
035    import org.apache.oozie.service.HadoopAccessorService;
036    
037    /**
038     * This class implements the EL function related to coordinator
039     */
040    
041    public class CoordELFunctions {
042        final private static String DATASET = "oozie.coord.el.dataset.bean";
043        final private static String COORD_ACTION = "oozie.coord.el.app.bean";
044        final public static String CONFIGURATION = "oozie.coord.el.conf";
045        // INSTANCE_SEPARATOR is used to separate multiple directories into one tag.
046        final public static String INSTANCE_SEPARATOR = "#";
047        final public static String DIR_SEPARATOR = ",";
048        // TODO: in next release, support flexibility
049        private static String END_OF_OPERATION_INDICATOR_FILE = "_SUCCESS";
050    
051        /**
052         * Used in defining the frequency in 'day' unit. <p/> domain: <code> val &gt; 0</code> and should be integer.
053         *
054         * @param val frequency in number of days.
055         * @return number of days and also set the frequency timeunit to "day"
056         */
057        public static int ph1_coord_days(int val) {
058            val = ParamChecker.checkGTZero(val, "n");
059            ELEvaluator eval = ELEvaluator.getCurrent();
060            eval.setVariable("timeunit", TimeUnit.DAY);
061            eval.setVariable("endOfDuration", TimeUnit.NONE);
062            return val;
063        }
064    
065        /**
066         * Used in defining the frequency in 'month' unit. <p/> domain: <code> val &gt; 0</code> and should be integer.
067         *
068         * @param val frequency in number of months.
069         * @return number of months and also set the frequency timeunit to "month"
070         */
071        public static int ph1_coord_months(int val) {
072            val = ParamChecker.checkGTZero(val, "n");
073            ELEvaluator eval = ELEvaluator.getCurrent();
074            eval.setVariable("timeunit", TimeUnit.MONTH);
075            eval.setVariable("endOfDuration", TimeUnit.NONE);
076            return val;
077        }
078    
079        /**
080         * Used in defining the frequency in 'hour' unit. <p/> parameter value domain: <code> val &gt; 0</code> and should
081         * be integer.
082         *
083         * @param val frequency in number of hours.
084         * @return number of minutes and also set the frequency timeunit to "minute"
085         */
086        public static int ph1_coord_hours(int val) {
087            val = ParamChecker.checkGTZero(val, "n");
088            ELEvaluator eval = ELEvaluator.getCurrent();
089            eval.setVariable("timeunit", TimeUnit.MINUTE);
090            eval.setVariable("endOfDuration", TimeUnit.NONE);
091            return val * 60;
092        }
093    
094        /**
095         * Used in defining the frequency in 'minute' unit. <p/> domain: <code> val &gt; 0</code> and should be integer.
096         *
097         * @param val frequency in number of minutes.
098         * @return number of minutes and also set the frequency timeunit to "minute"
099         */
100        public static int ph1_coord_minutes(int val) {
101            val = ParamChecker.checkGTZero(val, "n");
102            ELEvaluator eval = ELEvaluator.getCurrent();
103            eval.setVariable("timeunit", TimeUnit.MINUTE);
104            eval.setVariable("endOfDuration", TimeUnit.NONE);
105            return val;
106        }
107    
108        /**
109         * Used in defining the frequency in 'day' unit and specify the "end of day" property. <p/> Every instance will
110         * start at 00:00 hour of each day. <p/> domain: <code> val &gt; 0</code> and should be integer.
111         *
112         * @param val frequency in number of days.
113         * @return number of days and also set the frequency timeunit to "day" and end_of_duration flag to "day"
114         */
115        public static int ph1_coord_endOfDays(int val) {
116            val = ParamChecker.checkGTZero(val, "n");
117            ELEvaluator eval = ELEvaluator.getCurrent();
118            eval.setVariable("timeunit", TimeUnit.DAY);
119            eval.setVariable("endOfDuration", TimeUnit.END_OF_DAY);
120            return val;
121        }
122    
123        /**
124         * Used in defining the frequency in 'month' unit and specify the "end of month" property. <p/> Every instance will
125         * start at first day of each month at 00:00 hour. <p/> domain: <code> val &gt; 0</code> and should be integer.
126         *
127         * @param val: frequency in number of months.
128         * @return number of months and also set the frequency timeunit to "month" and end_of_duration flag to "month"
129         */
130        public static int ph1_coord_endOfMonths(int val) {
131            val = ParamChecker.checkGTZero(val, "n");
132            ELEvaluator eval = ELEvaluator.getCurrent();
133            eval.setVariable("timeunit", TimeUnit.MONTH);
134            eval.setVariable("endOfDuration", TimeUnit.END_OF_MONTH);
135            return val;
136        }
137    
138        /**
139         * Calculate the difference of timezone offset in minutes between dataset and coordinator job. <p/> Depends on: <p/>
140         * 1. Timezone of both dataset and job <p/> 2. Action creation Time
141         *
142         * @return difference in minutes (DataSet TZ Offset - Application TZ offset)
143         */
144        public static int ph2_coord_tzOffset() {
145            Date actionCreationTime = getActionCreationtime();
146            TimeZone dsTZ = ParamChecker.notNull(getDatasetTZ(), "DatasetTZ");
147            TimeZone jobTZ = ParamChecker.notNull(getJobTZ(), "JobTZ");
148            // Apply the TZ into Calendar object
149            Calendar dsTime = Calendar.getInstance(dsTZ);
150            dsTime.setTime(actionCreationTime);
151            Calendar jobTime = Calendar.getInstance(jobTZ);
152            jobTime.setTime(actionCreationTime);
153            return (dsTime.get(Calendar.ZONE_OFFSET) - jobTime.get(Calendar.ZONE_OFFSET)) / (1000 * 60);
154        }
155    
156        public static int ph3_coord_tzOffset() {
157            return ph2_coord_tzOffset();
158        }
159    
160        /**
161         * Returns the a date string while given a base date in 'strBaseDate',
162         * offset and unit (e.g. DAY, MONTH, HOUR, MINUTE, MONTH).
163         *
164         * @param strBaseDate -- base date
165         * @param offset -- any number
166         * @param unit -- DAY, MONTH, HOUR, MINUTE, MONTH
167         * @return date string
168         * @throws Exception
169         */
170        public static String ph2_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception {
171            Calendar baseCalDate = DateUtils.getCalendar(strBaseDate);
172            StringBuilder buffer = new StringBuilder();
173            baseCalDate.add(TimeUnit.valueOf(unit).getCalendarUnit(), offset);
174            buffer.append(DateUtils.formatDateUTC(baseCalDate));
175            return buffer.toString();
176        }
177    
178        public static String ph3_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception {
179            return ph2_coord_dateOffset(strBaseDate, offset, unit);
180        }
181    
182        /**
183         * Determine the date-time in UTC of n-th future available dataset instance
184         * from nominal Time but not beyond the instance specified as 'instance.
185         * <p/>
186         * It depends on:
187         * <p/>
188         * 1. Data set frequency
189         * <p/>
190         * 2. Data set Time unit (day, month, minute)
191         * <p/>
192         * 3. Data set Time zone/DST
193         * <p/>
194         * 4. End Day/Month flag
195         * <p/>
196         * 5. Data set initial instance
197         * <p/>
198         * 6. Action Creation Time
199         * <p/>
200         * 7. Existence of dataset's directory
201         *
202         * @param n :instance count
203         *        <p/>
204         *        domain: n >= 0, n is integer
205         * @param instance: How many future instance it should check? value should
206         *        be >=0
207         * @return date-time in UTC of the n-th instance
208         *         <p/>
209         * @throws Exception
210         */
211        public static String ph3_coord_future(int n, int instance) throws Exception {
212            ParamChecker.checkGEZero(n, "future:n");
213            ParamChecker.checkGTZero(instance, "future:instance");
214            if (isSyncDataSet()) {// For Sync Dataset
215                return coord_future_sync(n, instance);
216            }
217            else {
218                throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
219            }
220        }
221    
222        private static String coord_future_sync(int n, int instance) throws Exception {
223            ELEvaluator eval = ELEvaluator.getCurrent();
224            String retVal = "";
225            int datasetFrequency = (int) getDSFrequency();// in minutes
226            TimeUnit dsTimeUnit = getDSTimeUnit();
227            int[] instCount = new int[1];
228            Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
229            if (nominalInstanceCal != null) {
230                Calendar initInstance = getInitialInstanceCal();
231                nominalInstanceCal = (Calendar) initInstance.clone();
232                nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
233    
234                SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
235                if (ds == null) {
236                    throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
237                }
238                String uriTemplate = ds.getUriTemplate();
239                Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
240                if (conf == null) {
241                    throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
242                }
243                int available = 0, checkedInstance = 0;
244                boolean resolved = false;
245                String user = ParamChecker
246                        .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
247                String doneFlag = ds.getDoneFlag();
248                while (instance >= checkedInstance) {
249                    ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
250                    String uriPath = uriEval.evaluate(uriTemplate, String.class);
251                    String pathWithDoneFlag = uriPath;
252                    if (doneFlag.length() > 0) {
253                        pathWithDoneFlag += "/" + doneFlag;
254                    }
255                    if (isPathAvailable(pathWithDoneFlag, user, null, conf)) {
256                        XLog.getLog(CoordELFunctions.class).debug("Found future(" + available + "): " + pathWithDoneFlag);
257                        if (available == n) {
258                            XLog.getLog(CoordELFunctions.class).debug("Found future File: " + pathWithDoneFlag);
259                            resolved = true;
260                            retVal = DateUtils.formatDateUTC(nominalInstanceCal);
261                            eval.setVariable("resolved_path", uriPath);
262                            break;
263                        }
264                        available++;
265                    }
266                    // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(),
267                    // -datasetFrequency);
268                    nominalInstanceCal = (Calendar) initInstance.clone();
269                    instCount[0]++;
270                    nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
271                    checkedInstance++;
272                    // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
273                }
274                if (!resolved) {
275                    // return unchanged future function with variable 'is_resolved'
276                    // to 'false'
277                    eval.setVariable("is_resolved", Boolean.FALSE);
278                    retVal = "${coord:future(" + n + ", " + instance + ")}";
279                }
280                else {
281                    eval.setVariable("is_resolved", Boolean.TRUE);
282                }
283            }
284            else {// No feasible nominal time
285                eval.setVariable("is_resolved", Boolean.TRUE);
286                retVal = "";
287            }
288            return retVal;
289        }
290    
291        /**
292         * Return nominal time or Action Creation Time.
293         * <p/>
294         *
295         * @return coordinator action creation or materialization date time
296         * @throws Exception if unable to format the Date object to String
297         */
298        public static String ph2_coord_nominalTime() throws Exception {
299            ELEvaluator eval = ELEvaluator.getCurrent();
300            SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
301                    "Coordinator Action");
302            return DateUtils.formatDateUTC(action.getNominalTime());
303        }
304    
305        public static String ph3_coord_nominalTime() throws Exception {
306            return ph2_coord_nominalTime();
307        }
308    
309        /**
310         * Convert from standard date-time formatting to a desired format.
311         * <p/>
312         * @param dateTimeStr - A timestamp in standard (ISO8601) format.
313         * @param format - A string representing the desired format.
314         * @return coordinator action creation or materialization date time
315         * @throws Exception if unable to format the Date object to String
316         */
317        public static String ph2_coord_formatTime(String dateTimeStr, String format)
318            throws Exception {
319            Date dateTime = DateUtils.parseDateUTC(dateTimeStr);
320            return DateUtils.formatDateCustom(dateTime, format);
321        }
322    
323        public static String ph3_coord_formatTime(String dateTimeStr, String format)
324            throws Exception {
325            return ph2_coord_formatTime(dateTimeStr, format);
326        }
327    
328        /**
329         * Return Action Id. <p/>
330         *
331         * @return coordinator action Id
332         */
333        public static String ph2_coord_actionId() throws Exception {
334            ELEvaluator eval = ELEvaluator.getCurrent();
335            SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
336                    "Coordinator Action");
337            return action.getActionId();
338        }
339    
340        public static String ph3_coord_actionId() throws Exception {
341            return ph2_coord_actionId();
342        }
343    
344        /**
345         * Return Job Name. <p/>
346         *
347         * @return coordinator name
348         */
349        public static String ph2_coord_name() throws Exception {
350            ELEvaluator eval = ELEvaluator.getCurrent();
351            SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
352                    "Coordinator Action");
353            return action.getName();
354        }
355    
356        public static String ph3_coord_name() throws Exception {
357            return ph2_coord_name();
358        }
359    
360        /**
361         * Return Action Start time. <p/>
362         *
363         * @return coordinator action start time
364         * @throws Exception if unable to format the Date object to String
365         */
366        public static String ph2_coord_actualTime() throws Exception {
367            ELEvaluator eval = ELEvaluator.getCurrent();
368            SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
369            if (coordAction == null) {
370                throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
371            }
372            return DateUtils.formatDateUTC(coordAction.getActualTime());
373        }
374    
375        public static String ph3_coord_actualTime() throws Exception {
376            return ph2_coord_actualTime();
377        }
378    
379        /**
380         * Used to specify a list of URI's that are used as input dir to the workflow job. <p/> Look for two evaluator-level
381         * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
382         * URI. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
383         * unresolved, this function will echo back the original function <p/> otherwise it sends the uris.
384         *
385         * @param dataInName : Datain name
386         * @return the list of URI's separated by INSTANCE_SEPARATOR <p/> if there are unresolved EL function (i.e. latest)
387         *         , echo back <p/> the function without resolving the function.
388         */
389        public static String ph3_coord_dataIn(String dataInName) {
390            String uris = "";
391            ELEvaluator eval = ELEvaluator.getCurrent();
392            uris = (String) eval.getVariable(".datain." + dataInName);
393            Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
394            if (unresolved != null && unresolved.booleanValue() == true) {
395                return "${coord:dataIn('" + dataInName + "')}";
396            }
397            return uris;
398        }
399    
400        /**
401         * Used to specify a list of URI's that are output dir of the workflow job. <p/> Look for one evaluator-level
402         * variable <p/> dataout.<DATAOUT_NAME> <p/> It defines the current list of URI. <p/> otherwise it sends the uris.
403         *
404         * @param dataOutName : Dataout name
405         * @return the list of URI's separated by INSTANCE_SEPARATOR
406         */
407        public static String ph3_coord_dataOut(String dataOutName) {
408            String uris = "";
409            ELEvaluator eval = ELEvaluator.getCurrent();
410            uris = (String) eval.getVariable(".dataout." + dataOutName);
411            return uris;
412        }
413    
414        /**
415         * Determine the date-time in UTC of n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency <p/> 2.
416         * Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. Data
417         * set initial instance <p/> 6. Action Creation Time
418         *
419         * @param n instance count domain: n is integer
420         * @return date-time in UTC of the n-th instance returns 'null' means n-th instance is earlier than Initial-Instance
421         *         of DS
422         * @throws Exception
423         */
424        public static String ph2_coord_current(int n) throws Exception {
425            if (isSyncDataSet()) { // For Sync Dataset
426                return coord_current_sync(n);
427            }
428            else {
429                throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
430            }
431        }
432    
433        /**
434         * Determine how many hours is on the date of n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency
435         * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5.
436         * Data set initial instance <p/> 6. Action Creation Time
437         *
438         * @param n instance count <p/> domain: n is integer
439         * @return number of hours on that day <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS
440         * @throws Exception
441         */
442        public static int ph2_coord_hoursInDay(int n) throws Exception {
443            int datasetFrequency = (int) getDSFrequency();
444            // /Calendar nominalInstanceCal =
445            // getCurrentInstance(getActionCreationtime());
446            Calendar nominalInstanceCal = getEffectiveNominalTime();
447            if (nominalInstanceCal == null) {
448                return -1;
449            }
450            nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n);
451            /*
452             * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0)
453             * { return -1; }
454             */
455            nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ
456            // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
457            return DateUtils.hoursInDay(nominalInstanceCal);
458        }
459    
460        public static int ph3_coord_hoursInDay(int n) throws Exception {
461            return ph2_coord_hoursInDay(n);
462        }
463    
464        /**
465         * Calculate number of days in one month for n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency .
466         * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5.
467         * Data set initial instance <p/> 6. Action Creation Time
468         *
469         * @param n instance count. domain: n is integer
470         * @return number of days in that month <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS
471         * @throws Exception
472         */
473        public static int ph2_coord_daysInMonth(int n) throws Exception {
474            int datasetFrequency = (int) getDSFrequency();// in minutes
475            // Calendar nominalInstanceCal =
476            // getCurrentInstance(getActionCreationtime());
477            Calendar nominalInstanceCal = getEffectiveNominalTime();
478            if (nominalInstanceCal == null) {
479                return -1;
480            }
481            nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n);
482            /*
483             * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0)
484             * { return -1; }
485             */
486            nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ
487            // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
488            return nominalInstanceCal.getActualMaximum(Calendar.DAY_OF_MONTH);
489        }
490    
491        public static int ph3_coord_daysInMonth(int n) throws Exception {
492            return ph2_coord_daysInMonth(n);
493        }
494    
495        /**
496         * Determine the date-time in UTC of n-th latest available dataset instance. <p/> It depends on: <p/> 1. Data set
497         * frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month
498         * flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of dataset's directory
499         *
500         * @param n :instance count <p/> domain: n > 0, n is integer
501         * @return date-time in UTC of the n-th instance <p/> returns 'null' means n-th instance is earlier than
502         *         Initial-Instance of DS
503         * @throws Exception
504         */
505        public static String ph3_coord_latest(int n) throws Exception {
506            if (n > 0) {
507                throw new IllegalArgumentException("paramter should be <= 0 but it is " + n);
508            }
509            if (isSyncDataSet()) {// For Sync Dataset
510                return coord_latest_sync(n);
511            }
512            else {
513                throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
514            }
515        }
516    
517        /**
518         * Configure an evaluator with data set and application specific information. <p/> Helper method of associating
519         * dataset and application object
520         *
521         * @param evaluator : to set variables
522         * @param ds : Data Set object
523         * @param coordAction : Application instance
524         */
525        public static void configureEvaluator(ELEvaluator evaluator, SyncCoordDataset ds, SyncCoordAction coordAction) {
526            evaluator.setVariable(COORD_ACTION, coordAction);
527            evaluator.setVariable(DATASET, ds);
528        }
529    
530        /**
531         * Helper method to wrap around with "${..}". <p/>
532         *
533         *
534         * @param eval :EL evaluator
535         * @param expr : expression to evaluate
536         * @return Resolved expression or echo back the same expression
537         * @throws Exception
538         */
539        public static String evalAndWrap(ELEvaluator eval, String expr) throws Exception {
540            try {
541                eval.setVariable(".wrap", null);
542                String result = eval.evaluate(expr, String.class);
543                if (eval.getVariable(".wrap") != null) {
544                    return "${" + result + "}";
545                }
546                else {
547                    return result;
548                }
549            }
550            catch (Exception e) {
551                throw new Exception("Unable to evaluate :" + expr + ":\n", e);
552            }
553        }
554    
555        // Set of echo functions
556    
557        public static String ph1_coord_current_echo(String n) {
558            return echoUnResolved("current", n);
559        }
560    
561        public static String ph2_coord_current_echo(String n) {
562            return echoUnResolved("current", n);
563        }
564    
565        public static String ph1_coord_dateOffset_echo(String n, String offset, String unit) {
566            return echoUnResolved("dateOffset", n + " , " + offset + " , " + unit);
567        }
568    
569        public static String ph1_coord_formatTime_echo(String dateTime, String format) {
570            // Quote the dateTime value since it would contain a ':'.
571            return echoUnResolved("formatTime", "'"+dateTime+"'" + " , " + format);
572        }
573    
574        public static String ph1_coord_latest_echo(String n) {
575            return echoUnResolved("latest", n);
576        }
577    
578        public static String ph2_coord_latest_echo(String n) {
579            return ph1_coord_latest_echo(n);
580        }
581    
582        public static String ph1_coord_future_echo(String n, String instance) {
583            return echoUnResolved("future", n + ", " + instance + "");
584        }
585    
586        public static String ph2_coord_future_echo(String n, String instance) {
587            return ph1_coord_future_echo(n, instance);
588        }
589    
590        public static String ph1_coord_dataIn_echo(String n) {
591            ELEvaluator eval = ELEvaluator.getCurrent();
592            String val = (String) eval.getVariable("oozie.dataname." + n);
593            if (val == null || val.equals("data-in") == false) {
594                XLog.getLog(CoordELFunctions.class).error("data_in_name " + n + " is not valid");
595                throw new RuntimeException("data_in_name " + n + " is not valid");
596            }
597            return echoUnResolved("dataIn", "'" + n + "'");
598        }
599    
600        public static String ph1_coord_dataOut_echo(String n) {
601            ELEvaluator eval = ELEvaluator.getCurrent();
602            String val = (String) eval.getVariable("oozie.dataname." + n);
603            if (val == null || val.equals("data-out") == false) {
604                XLog.getLog(CoordELFunctions.class).error("data_out_name " + n + " is not valid");
605                throw new RuntimeException("data_out_name " + n + " is not valid");
606            }
607            return echoUnResolved("dataOut", "'" + n + "'");
608        }
609    
610        public static String ph1_coord_nominalTime_echo() {
611            return echoUnResolved("nominalTime", "");
612        }
613    
614        public static String ph1_coord_nominalTime_echo_wrap() {
615            // return "${coord:nominalTime()}"; // no resolution
616            return echoUnResolved("nominalTime", "");
617        }
618    
619        public static String ph1_coord_nominalTime_echo_fixed() {
620            return "2009-03-06T010:00"; // Dummy resolution
621        }
622    
623        public static String ph1_coord_actualTime_echo_wrap() {
624            // return "${coord:actualTime()}"; // no resolution
625            return echoUnResolved("actualTime", "");
626        }
627    
628        public static String ph1_coord_actionId_echo() {
629            return echoUnResolved("actionId", "");
630        }
631    
632        public static String ph1_coord_name_echo() {
633            return echoUnResolved("name", "");
634        }
635    
636        // The following echo functions are not used in any phases yet
637        // They are here for future purpose.
638        public static String coord_minutes_echo(String n) {
639            return echoUnResolved("minutes", n);
640        }
641    
642        public static String coord_hours_echo(String n) {
643            return echoUnResolved("hours", n);
644        }
645    
646        public static String coord_days_echo(String n) {
647            return echoUnResolved("days", n);
648        }
649    
650        public static String coord_endOfDay_echo(String n) {
651            return echoUnResolved("endOfDay", n);
652        }
653    
654        public static String coord_months_echo(String n) {
655            return echoUnResolved("months", n);
656        }
657    
658        public static String coord_endOfMonth_echo(String n) {
659            return echoUnResolved("endOfMonth", n);
660        }
661    
662        public static String coord_actualTime_echo() {
663            return echoUnResolved("actualTime", "");
664        }
665    
666        // This echo function will always return "24" for validation only.
667        // This evaluation ****should not**** replace the original XML
668        // Create a temporary string and validate the function
669        // This is **required** for evaluating an expression like
670        // coord:HoursInDay(0) + 3
671        // actual evaluation will happen in phase 2 or phase 3.
672        public static String ph1_coord_hoursInDay_echo(String n) {
673            return "24";
674            // return echoUnResolved("hoursInDay", n);
675        }
676    
677        // This echo function will always return "30" for validation only.
678        // This evaluation ****should not**** replace the original XML
679        // Create a temporary string and validate the function
680        // This is **required** for evaluating an expression like
681        // coord:daysInMonth(0) + 3
682        // actual evaluation will happen in phase 2 or phase 3.
683        public static String ph1_coord_daysInMonth_echo(String n) {
684            // return echoUnResolved("daysInMonth", n);
685            return "30";
686        }
687    
688        // This echo function will always return "3" for validation only.
689        // This evaluation ****should not**** replace the original XML
690        // Create a temporary string and validate the function
691        // This is **required** for evaluating an expression like coord:tzOffset + 2
692        // actual evaluation will happen in phase 2 or phase 3.
693        public static String ph1_coord_tzOffset_echo() {
694            // return echoUnResolved("tzOffset", "");
695            return "3";
696        }
697    
698        // Local methods
699        /**
700         * @param n
701         * @return n-th instance Date-Time from current instance for data-set <p/> return empty string ("") if the
702         *         Action_Creation_time or the n-th instance <p/> is earlier than the Initial_Instance of dataset.
703         * @throws Exception
704         */
705        private static String coord_current_sync(int n) throws Exception {
706            int datasetFrequency = getDSFrequency();// in minutes
707            TimeUnit dsTimeUnit = getDSTimeUnit();
708            int[] instCount = new int[1];// used as pass by ref
709            Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
710            if (nominalInstanceCal == null) {
711                XLog.getLog(CoordELFunctions.class)
712                        .warn("If the initial instance of the dataset is later than the nominal time, an empty string is returned. This means that no data is available at the current-instance specified by the user and the user could try modifying his initial-instance to an earlier time.");
713                return "";
714            }
715            nominalInstanceCal = getInitialInstanceCal();
716            int absInstanceCount = instCount[0] + n;
717            nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency * absInstanceCount);
718    
719            if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) {
720                XLog.getLog(CoordELFunctions.class)
721                        .warn("If the initial instance of the dataset is later than the current-instance specified, such as coord:current({0}) in this case, an empty string is returned. This means that no data is available at the current-instance specified by the user and the user could try modifying his initial-instance to an earlier time.", n);
722                return "";
723            }
724            String str = DateUtils.formatDateUTC(nominalInstanceCal);
725            return str;
726        }
727    
728        /**
729         * @param offset
730         * @return n-th available latest instance Date-Time for SYNC data-set
731         * @throws Exception
732         */
733        private static String coord_latest_sync(int offset) throws Exception {
734            if (offset > 0) {
735                throw new RuntimeException("For latest there is no meaning " + "of positive instance. n should be <=0"
736                        + offset);
737            }
738            ELEvaluator eval = ELEvaluator.getCurrent();
739            String retVal = "";
740            int datasetFrequency = (int) getDSFrequency();// in minutes
741            TimeUnit dsTimeUnit = getDSTimeUnit();
742            int[] instCount = new int[1];
743            Calendar nominalInstanceCal = getCurrentInstance(getActualTime(), instCount);
744            if (nominalInstanceCal != null) {
745                Calendar initInstance = getInitialInstanceCal();
746                SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
747                if (ds == null) {
748                    throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
749                }
750                String uriTemplate = ds.getUriTemplate();
751                Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
752                if (conf == null) {
753                    throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
754                }
755                int available = 0;
756                boolean resolved = false;
757                String user = ParamChecker
758                        .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
759                String doneFlag = ds.getDoneFlag();
760                while (nominalInstanceCal.compareTo(initInstance) >= 0) {
761                    ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
762                    String uriPath = uriEval.evaluate(uriTemplate, String.class);
763                    String pathWithDoneFlag = uriPath;
764                    if (doneFlag.length() > 0) {
765                        pathWithDoneFlag += "/" + doneFlag;
766                    }
767                    if (isPathAvailable(pathWithDoneFlag, user, null, conf)) {
768                        XLog.getLog(CoordELFunctions.class).debug("Found latest(" + available + "): " + pathWithDoneFlag);
769                        if (available == offset) {
770                            XLog.getLog(CoordELFunctions.class).debug("Found Latest File: " + pathWithDoneFlag);
771                            resolved = true;
772                            retVal = DateUtils.formatDateUTC(nominalInstanceCal);
773                            eval.setVariable("resolved_path", uriPath);
774                            break;
775                        }
776    
777                        available--;
778                    }
779                    // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(),
780                    // -datasetFrequency);
781                    nominalInstanceCal = (Calendar) initInstance.clone();
782                    instCount[0]--;
783                    nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
784                    // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
785                }
786                if (!resolved) {
787                    // return unchanged latest function with variable 'is_resolved'
788                    // to 'false'
789                    eval.setVariable("is_resolved", Boolean.FALSE);
790                    retVal = "${coord:latest(" + offset + ")}";
791                }
792                else {
793                    eval.setVariable("is_resolved", Boolean.TRUE);
794                }
795            }
796            else {// No feasible nominal time
797                eval.setVariable("is_resolved", Boolean.FALSE);
798            }
799            return retVal;
800        }
801    
802        // TODO : Not an efficient way. In a loop environment, we could do something
803        // outside the loop
804        /**
805         * Check whether a URI path exists
806         *
807         * @param sPath
808         * @param conf
809         * @return
810         * @throws IOException
811         */
812    
813        private static boolean isPathAvailable(String sPath, String user, String group, Configuration conf)
814                throws IOException, HadoopAccessorException {
815            // sPath += "/" + END_OF_OPERATION_INDICATOR_FILE;
816            Path path = new Path(sPath);
817            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
818            Configuration fsConf = has.createJobConf(path.toUri().getAuthority());
819            return has.createFileSystem(user, path.toUri(), fsConf).exists(path);
820        }
821    
822        /**
823         * @param tm
824         * @return a new Evaluator to be used for URI-template evaluation
825         */
826        private static ELEvaluator getUriEvaluator(Calendar tm) {
827            ELEvaluator retEval = new ELEvaluator();
828            retEval.setVariable("YEAR", tm.get(Calendar.YEAR));
829            retEval.setVariable("MONTH", (tm.get(Calendar.MONTH) + 1) < 10 ? "0" + (tm.get(Calendar.MONTH) + 1) : (tm
830                    .get(Calendar.MONTH) + 1));
831            retEval.setVariable("DAY", tm.get(Calendar.DAY_OF_MONTH) < 10 ? "0" + tm.get(Calendar.DAY_OF_MONTH) : tm
832                    .get(Calendar.DAY_OF_MONTH));
833            retEval.setVariable("HOUR", tm.get(Calendar.HOUR_OF_DAY) < 10 ? "0" + tm.get(Calendar.HOUR_OF_DAY) : tm
834                    .get(Calendar.HOUR_OF_DAY));
835            retEval.setVariable("MINUTE", tm.get(Calendar.MINUTE) < 10 ? "0" + tm.get(Calendar.MINUTE) : tm
836                    .get(Calendar.MINUTE));
837            return retEval;
838        }
839    
840        /**
841         * @return whether a data set is SYNCH or ASYNC
842         */
843        private static boolean isSyncDataSet() {
844            ELEvaluator eval = ELEvaluator.getCurrent();
845            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
846            if (ds == null) {
847                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
848            }
849            return ds.getType().equalsIgnoreCase("SYNC");
850        }
851    
852        /**
853         * Check whether a function should be resolved.
854         *
855         * @param functionName
856         * @param n
857         * @return null if the functionName needs to be resolved otherwise return the calling function unresolved.
858         */
859        private static String checkIfResolved(String functionName, String n) {
860            ELEvaluator eval = ELEvaluator.getCurrent();
861            String replace = (String) eval.getVariable("resolve_" + functionName);
862            if (replace == null || (replace != null && replace.equalsIgnoreCase("false"))) { // Don't
863                // resolve
864                // return "${coord:" + functionName + "(" + n +")}"; //Unresolved
865                eval.setVariable(".wrap", "true");
866                return "coord:" + functionName + "(" + n + ")"; // Unresolved
867            }
868            return null; // Resolved it
869        }
870    
871        private static String echoUnResolved(String functionName, String n) {
872            return echoUnResolvedPre(functionName, n, "coord:");
873        }
874    
875        private static String echoUnResolvedPre(String functionName, String n, String prefix) {
876            ELEvaluator eval = ELEvaluator.getCurrent();
877            eval.setVariable(".wrap", "true");
878            return prefix + functionName + "(" + n + ")"; // Unresolved
879        }
880    
881        /**
882         * @return the initial instance of a DataSet in DATE
883         */
884        private static Date getInitialInstance() {
885            return getInitialInstanceCal().getTime();
886            // return ds.getInitInstance();
887        }
888    
889        /**
890         * @return the initial instance of a DataSet in Calendar
891         */
892        private static Calendar getInitialInstanceCal() {
893            ELEvaluator eval = ELEvaluator.getCurrent();
894            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
895            if (ds == null) {
896                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
897            }
898            Calendar effInitTS = Calendar.getInstance();
899            effInitTS.setTime(ds.getInitInstance());
900            effInitTS.setTimeZone(ds.getTimeZone());
901            // To adjust EOD/EOM
902            DateUtils.moveToEnd(effInitTS, getDSEndOfFlag());
903            return effInitTS;
904            // return ds.getInitInstance();
905        }
906    
907        /**
908         * @return Nominal or action creation Time when all the dependencies of an application instance are met.
909         */
910        private static Date getActionCreationtime() {
911            ELEvaluator eval = ELEvaluator.getCurrent();
912            SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
913            if (coordAction == null) {
914                throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
915            }
916            return coordAction.getNominalTime();
917        }
918    
919        /**
920         * @return Actual Time when all the dependencies of an application instance are met.
921         */
922        private static Date getActualTime() {
923            ELEvaluator eval = ELEvaluator.getCurrent();
924            SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
925            if (coordAction == null) {
926                throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
927            }
928            return coordAction.getActualTime();
929        }
930    
931        /**
932         * @return TimeZone for the application or job.
933         */
934        private static TimeZone getJobTZ() {
935            ELEvaluator eval = ELEvaluator.getCurrent();
936            SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
937            if (coordAction == null) {
938                throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
939            }
940            return coordAction.getTimeZone();
941        }
942    
943        /**
944         * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time)
945         *
946         * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
947         *         the dataset.
948         */
949        private static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[]) {
950            Date datasetInitialInstance = getInitialInstance();
951            TimeUnit dsTimeUnit = getDSTimeUnit();
952            TimeZone dsTZ = getDatasetTZ();
953            // Convert Date to Calendar for corresponding TZ
954            Calendar current = Calendar.getInstance();
955            current.setTime(datasetInitialInstance);
956            current.setTimeZone(dsTZ);
957    
958            Calendar calEffectiveTime = Calendar.getInstance();
959            calEffectiveTime.setTime(effectiveTime);
960            calEffectiveTime.setTimeZone(dsTZ);
961            instanceCount[0] = 0;
962            if (current.compareTo(calEffectiveTime) > 0) {
963                // Nominal Time < initial Instance
964                // TODO: getClass() call doesn't work from static method.
965                // XLog.getLog("CoordELFunction.class").warn("ACTION CREATED BEFORE INITIAL INSTACE "+
966                // current.getTime());
967                return null;
968            }
969            Calendar origCurrent = (Calendar) current.clone();
970            while (current.compareTo(calEffectiveTime) <= 0) {
971                current = (Calendar) origCurrent.clone();
972                instanceCount[0]++;
973                current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * getDSFrequency());
974            }
975            instanceCount[0]--;
976    
977            current = (Calendar) origCurrent.clone();
978            current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * getDSFrequency());
979            return current;
980        }
981    
982        private static Calendar getEffectiveNominalTime() {
983            Date datasetInitialInstance = getInitialInstance();
984            TimeZone dsTZ = getDatasetTZ();
985            // Convert Date to Calendar for corresponding TZ
986            Calendar current = Calendar.getInstance();
987            current.setTime(datasetInitialInstance);
988            current.setTimeZone(dsTZ);
989    
990            Calendar calEffectiveTime = Calendar.getInstance();
991            calEffectiveTime.setTime(getActionCreationtime());
992            calEffectiveTime.setTimeZone(dsTZ);
993            if (current.compareTo(calEffectiveTime) > 0) {
994                // Nominal Time < initial Instance
995                // TODO: getClass() call doesn't work from static method.
996                // XLog.getLog("CoordELFunction.class").warn("ACTION CREATED BEFORE INITIAL INSTACE "+
997                // current.getTime());
998                return null;
999            }
1000            return calEffectiveTime;
1001        }
1002    
1003        /**
1004         * @return dataset frequency in minutes
1005         */
1006        private static int getDSFrequency() {
1007            ELEvaluator eval = ELEvaluator.getCurrent();
1008            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1009            if (ds == null) {
1010                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1011            }
1012            return ds.getFrequency();
1013        }
1014    
1015        /**
1016         * @return dataset TimeUnit
1017         */
1018        private static TimeUnit getDSTimeUnit() {
1019            ELEvaluator eval = ELEvaluator.getCurrent();
1020            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1021            if (ds == null) {
1022                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1023            }
1024            return ds.getTimeUnit();
1025        }
1026    
1027        /**
1028         * @return dataset TimeZone
1029         */
1030        private static TimeZone getDatasetTZ() {
1031            ELEvaluator eval = ELEvaluator.getCurrent();
1032            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1033            if (ds == null) {
1034                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1035            }
1036            return ds.getTimeZone();
1037        }
1038    
1039        /**
1040         * @return dataset TimeUnit
1041         */
1042        private static TimeUnit getDSEndOfFlag() {
1043            ELEvaluator eval = ELEvaluator.getCurrent();
1044            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1045            if (ds == null) {
1046                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1047            }
1048            return ds.getEndOfDuration();// == null ? "": ds.getEndOfDuration();
1049        }
1050    
1051        /**
1052         * Return a job configuration property for the coordinator.
1053         *
1054         * @param property property name.
1055         * @return the value of the property, <code>null</code> if the property is undefined.
1056         */
1057        public static String coord_conf(String property) {
1058            ELEvaluator eval = ELEvaluator.getCurrent();
1059            return (String) eval.getVariable(property);
1060        }
1061    
1062        /**
1063         * Return the user that submitted the coordinator job.
1064         *
1065         * @return the user that submitted the coordinator job.
1066         */
1067        public static String coord_user() {
1068            ELEvaluator eval = ELEvaluator.getCurrent();
1069            return (String) eval.getVariable(OozieClient.USER_NAME);
1070        }
1071    }