001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.coord;
020
021import java.io.IOException;
022import java.io.StringReader;
023import java.net.URI;
024import java.net.URISyntaxException;
025import java.text.ParseException;
026import java.util.ArrayList;
027import java.util.TimeZone;
028import java.util.Map;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Date;
032import java.util.Calendar;
033
034import org.apache.hadoop.conf.Configuration;
035import org.apache.oozie.CoordinatorActionBean;
036import org.apache.oozie.ErrorCode;
037import org.apache.oozie.client.CoordinatorAction;
038import org.apache.oozie.client.OozieClient;
039import org.apache.oozie.command.CommandException;
040import org.apache.oozie.coord.CoordELEvaluator;
041import org.apache.oozie.coord.CoordELFunctions;
042import org.apache.oozie.coord.CoordUtils;
043import org.apache.oozie.coord.CoordinatorJobException;
044import org.apache.oozie.coord.SyncCoordAction;
045import org.apache.oozie.coord.TimeUnit;
046import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorUtil;
047import org.apache.oozie.coord.input.dependency.CoordInputDependency;
048import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluator;
049import org.apache.oozie.coord.input.dependency.CoordInputDependencyFactory;
050import org.apache.oozie.coord.input.dependency.CoordInputInstance;
051import org.apache.oozie.dependency.ActionDependency;
052import org.apache.oozie.dependency.DependencyChecker;
053import org.apache.oozie.dependency.URIHandler;
054import org.apache.oozie.dependency.URIHandler.DependencyType;
055import org.apache.oozie.dependency.URIHandlerException;
056import org.apache.oozie.service.Services;
057import org.apache.oozie.service.URIHandlerService;
058import org.apache.oozie.service.UUIDService;
059import org.apache.oozie.util.DateUtils;
060import org.apache.oozie.util.ELEvaluator;
061import org.apache.oozie.util.ParamChecker;
062import org.apache.oozie.util.XConfiguration;
063import org.apache.oozie.util.XmlUtils;
064import org.jdom.Attribute;
065import org.jdom.Element;
066import org.jdom.JDOMException;
067import org.quartz.CronExpression;
068import org.apache.commons.lang.StringUtils;
069import org.apache.oozie.CoordinatorJobBean;
070
071public class CoordCommandUtils {
072    public static int CURRENT = 0;
073    public static int LATEST = 1;
074    public static int FUTURE = 2;
075    public static int OFFSET = 3;
076    public static int ABSOLUTE = 4;
077    public static int UNEXPECTED = -1;
078
079    public static final String RESOLVED_UNRESOLVED_SEPARATOR = "!!";
080    public static final String UNRESOLVED_INSTANCES_TAG = "unresolved-instances";
081
082    /**
083     * parse a function like coord:latest(n)/future() and return the 'n'.
084     * <p>
085     *
086     * @param function
087     * @param restArg
088     * @return int instanceNumber
089     * @throws Exception
090     */
091    public static int getInstanceNumber(String function, StringBuilder restArg) throws Exception {
092        int funcType = getFuncType(function);
093        if (funcType == ABSOLUTE) {
094            return ABSOLUTE;
095        }
096        if (funcType == CURRENT || funcType == LATEST) {
097            return parseOneArg(function);
098        }
099        else {
100            return parseMoreArgs(function, restArg);
101        }
102    }
103
104    /**
105     * Evaluates function for coord-action-create-inst tag
106     * @param event
107     * @param appInst
108     * @param conf
109     * @param function
110     * @return evaluation result
111     * @throws Exception
112     */
113    private static String evaluateInstanceFunction(Element event, SyncCoordAction appInst, Configuration conf,
114            String function) throws Exception {
115        ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator("coord-action-create-inst", event, appInst, conf);
116        return CoordELFunctions.evalAndWrap(eval, function);
117    }
118
119    public static int parseOneArg(String funcName) throws Exception {
120        int firstPos = funcName.indexOf("(");
121        int lastPos = funcName.lastIndexOf(")");
122        if (firstPos >= 0 && lastPos > firstPos) {
123            String tmp = funcName.substring(firstPos + 1, lastPos).trim();
124            if (tmp.length() > 0) {
125                return (int) Double.parseDouble(tmp);
126            }
127        }
128        throw new RuntimeException("Unformatted function :" + funcName);
129    }
130
131    public static String parseOneStringArg(String funcName) throws Exception {
132        int firstPos = funcName.indexOf("(");
133        int lastPos = funcName.lastIndexOf(")");
134        if (firstPos >= 0 && lastPos > firstPos) {
135            return funcName.substring(firstPos + 1, lastPos).trim();
136        }
137        throw new RuntimeException("Unformatted function :" + funcName);
138    }
139
140    private static int parseMoreArgs(String funcName, StringBuilder restArg) throws Exception {
141        int firstPos = funcName.indexOf("(");
142        int secondPos = funcName.lastIndexOf(",");
143        int lastPos = funcName.lastIndexOf(")");
144        if (firstPos >= 0 && secondPos > firstPos) {
145            String tmp = funcName.substring(firstPos + 1, secondPos).trim();
146            if (tmp.length() > 0) {
147                restArg.append(funcName.substring(secondPos + 1, lastPos).trim());
148                return (int) Double.parseDouble(tmp);
149            }
150        }
151        throw new RuntimeException("Unformatted function :" + funcName);
152    }
153
154    /**
155     * @param function EL function name
156     * @return type of EL function
157     */
158    public static int getFuncType(String function) {
159        if (function.indexOf("current") >= 0) {
160            return CURRENT;
161        }
162        else if (function.indexOf("latest") >= 0) {
163            return LATEST;
164        }
165        else if (function.indexOf("future") >= 0) {
166            return FUTURE;
167        }
168        else if (function.indexOf("offset") >= 0) {
169            return OFFSET;
170        }
171        else if (function.indexOf("absolute") >= 0) {
172            return ABSOLUTE;
173        }
174        return UNEXPECTED;
175        // throw new RuntimeException("Unexpected instance name "+ function);
176    }
177
178    /**
179     * @param startInst: EL function name
180     * @param endInst: EL function name
181     * @throws CommandException if both are not the same function
182     */
183    public static void checkIfBothSameType(String startInst, String endInst) throws CommandException {
184        if (getFuncType(startInst) != getFuncType(endInst)) {
185            if (getFuncType(startInst) == ABSOLUTE) {
186                if (getFuncType(endInst) != CURRENT) {
187                    throw new CommandException(ErrorCode.E1010,
188                            "Only start-instance as absolute and end-instance as current is supported." + " start = "
189                                    + startInst + "  end = " + endInst);
190                }
191            }
192            else {
193                throw new CommandException(ErrorCode.E1010,
194                        " start-instance and end-instance both should be either latest or current or future or offset\n"
195                                + " start " + startInst + " and end " + endInst);
196            }
197        }
198    }
199
200
201    /**
202     * Resolve list of &lt;instance&gt; &lt;/instance&gt; tags.
203     *
204     * @param event
205     * @param instances
206     * @param actionInst
207     * @param conf
208     * @param eval: ELEvalautor
209     * @throws Exception
210     */
211    public static void resolveInstances(Element event, StringBuilder instances, SyncCoordAction actionInst,
212            Configuration conf, ELEvaluator eval) throws Exception {
213        for (Element eInstance : (List<Element>) event.getChildren("instance", event.getNamespace())) {
214
215            if (instances.length() > 0) {
216                instances.append(CoordELFunctions.INSTANCE_SEPARATOR);
217            }
218            instances.append(materializeInstance(event, eInstance.getTextTrim(), actionInst, conf, eval));
219        }
220        event.removeChildren("instance", event.getNamespace());
221    }
222
223    /**
224     * Resolve &lt;start-instance&gt; &lt;end-insatnce&gt; tag. Don't resolve any
225     * latest()/future()
226     *
227     * @param event
228     * @param instances
229     * @param appInst
230     * @param conf
231     * @param eval: ELEvalautor
232     * @throws Exception
233     */
234    public static void resolveInstanceRange(Element event, StringBuilder instances, SyncCoordAction appInst,
235            Configuration conf, ELEvaluator eval) throws Exception {
236        Element eStartInst = event.getChild("start-instance", event.getNamespace());
237        Element eEndInst = event.getChild("end-instance", event.getNamespace());
238        if (eStartInst != null && eEndInst != null) {
239            String strStart = evaluateInstanceFunction(event, appInst, conf, eStartInst.getTextTrim());
240            String strEnd = evaluateInstanceFunction(event, appInst, conf, eEndInst.getTextTrim());
241            checkIfBothSameType(strStart, strEnd);
242            StringBuilder restArg = new StringBuilder(); // To store rest
243                                                         // arguments for
244                                                         // future
245                                                         // function
246            int startIndex = getInstanceNumber(strStart, restArg);
247            String startRestArg = restArg.toString();
248            restArg.delete(0, restArg.length());
249            int endIndex = getInstanceNumber(strEnd, restArg);
250            String endRestArg = restArg.toString();
251            int funcType = getFuncType(strStart);
252
253            if (funcType == ABSOLUTE) {
254                StringBuffer bf = new StringBuffer();
255                bf.append("${coord:absoluteRange(\"").append(parseOneStringArg(strStart))
256                        .append("\",").append(endIndex).append(")}");
257                String matInstance = materializeInstance(event, bf.toString(), appInst, conf, eval);
258                if (matInstance != null && !matInstance.isEmpty()) {
259                    if (instances.length() > 0) {
260                        instances.append(CoordELFunctions.INSTANCE_SEPARATOR);
261                    }
262                    instances.append(matInstance);
263                }
264            }
265            else {
266                if (funcType == OFFSET) {
267                    TimeUnit startU = TimeUnit.valueOf(startRestArg);
268                    TimeUnit endU = TimeUnit.valueOf(endRestArg);
269                    if (startU.getCalendarUnit() * startIndex > endU.getCalendarUnit() * endIndex) {
270                        throw new CommandException(ErrorCode.E1010,
271                                " start-instance should be equal or earlier than the end-instance \n"
272                                        + XmlUtils.prettyPrint(event));
273                    }
274                    Calendar startCal = CoordELFunctions.resolveOffsetRawTime(startIndex, startU, eval);
275                    Calendar endCal = CoordELFunctions.resolveOffsetRawTime(endIndex, endU, eval);
276                    if (startCal != null && endCal != null) {
277                        List<Integer> expandedFreqs = CoordELFunctions.expandOffsetTimes(startCal, endCal, eval);
278                        for (int i = expandedFreqs.size() - 1; i >= 0; i--) {
279                            //we need to use DS timeout, bcz expandOffsetTimes will expand offset in Freqs in DS timeunit
280                            String matInstance = materializeInstance(event, "${coord:offset(" + expandedFreqs.get(i)
281                                    + ", \"" + CoordELFunctions.getDSTimeUnit(eval) + "\")}", appInst, conf, eval);
282                            if (matInstance == null || matInstance.length() == 0) {
283                                // Earlier than dataset's initial instance
284                                break;
285                            }
286                            if (instances.length() > 0) {
287                                instances.append(CoordELFunctions.INSTANCE_SEPARATOR);
288                            }
289                            instances.append(matInstance);
290                        }
291                    }
292                }
293                else {
294                    if (startIndex > endIndex) {
295                        throw new CommandException(ErrorCode.E1010,
296                                " start-instance should be equal or earlier than the end-instance \n"
297                                        + XmlUtils.prettyPrint(event));
298                    }
299                    if (funcType == CURRENT) {
300                        // Everything could be resolved NOW. no latest() ELs
301                        String matInstance = materializeInstance(event, "${coord:currentRange(" + startIndex + ","
302                                + endIndex + ")}", appInst, conf, eval);
303                        if (matInstance != null && !matInstance.isEmpty()) {
304                            if (instances.length() > 0) {
305                                instances.append(CoordELFunctions.INSTANCE_SEPARATOR);
306                            }
307                            instances.append(matInstance);
308                        }
309                    }
310
311                    else { // latest(n)/future() EL is present
312                        if (funcType == LATEST) {
313                            instances.append("${coord:latestRange(").append(startIndex).append(",").append(endIndex)
314                            .append(")}");
315                        }
316                        else if (funcType == FUTURE) {
317                            instances.append("${coord:futureRange(").append(startIndex).append(",").append(endIndex)
318                            .append(",'").append(endRestArg).append("')}");
319                        }
320                    }
321                }
322            }
323            // Remove start-instance and end-instances
324            event.removeChild("start-instance", event.getNamespace());
325            event.removeChild("end-instance", event.getNamespace());
326        }
327    }
328
329    /**
330     * Materialize one instance like current(-2)
331     *
332     * @param event : &lt;data-in&gt;
333     * @param expr : instance like current(-1)
334     * @param appInst : application specific info
335     * @param conf
336     * @param evalInst :ELEvaluator
337     * @return materialized date string
338     * @throws Exception
339     */
340    public static String materializeInstance(Element event, String expr, SyncCoordAction appInst, Configuration conf,
341            ELEvaluator evalInst) throws Exception {
342        if (event == null) {
343            return null;
344        }
345        // ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event,
346        // appInst, conf);
347        return CoordELFunctions.evalAndWrap(evalInst, expr);
348    }
349
350    /**
351     * Create two new tags with &lt;uris&gt; and &lt;unresolved-instances&gt;.
352     *
353     * @param event
354     * @param instances
355     * @throws Exception
356     */
357    private static String separateResolvedAndUnresolved(Element event, StringBuilder instances)
358            throws Exception {
359        StringBuilder unresolvedInstances = new StringBuilder();
360        StringBuilder urisWithDoneFlag = new StringBuilder();
361        StringBuilder depList = new StringBuilder();
362        String uris = createEarlyURIs(event, instances.toString(), unresolvedInstances, urisWithDoneFlag);
363        if (uris.length() > 0) {
364            Element uriInstance = new Element("uris", event.getNamespace());
365            uriInstance.addContent(uris);
366            event.getContent().add(1, uriInstance);
367            if (depList.length() > 0) {
368                depList.append(CoordELFunctions.INSTANCE_SEPARATOR);
369            }
370            depList.append(urisWithDoneFlag);
371        }
372        if (unresolvedInstances.length() > 0) {
373            Element elemInstance = new Element(UNRESOLVED_INSTANCES_TAG, event.getNamespace());
374            elemInstance.addContent(unresolvedInstances.toString());
375            event.getContent().add(1, elemInstance);
376        }
377        return depList.toString();
378    }
379
380    /**
381     * The function create a list of URIs separated by "," using the instances
382     * time stamp and URI-template
383     *
384     * @param event : &lt;data-in&gt; event
385     * @param instances : List of time stamp separated by ","
386     * @param unresolvedInstances : list of instance with latest function
387     * @param urisWithDoneFlag : list of URIs with the done flag appended
388     * @return : list of URIs separated by ";" as a string.
389     * @throws Exception
390     */
391    public static String createEarlyURIs(Element event, String instances, StringBuilder unresolvedInstances,
392            StringBuilder urisWithDoneFlag) throws Exception {
393        if (instances == null || instances.length() == 0) {
394            return "";
395        }
396        String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR);
397        StringBuilder uris = new StringBuilder();
398
399        Element doneFlagElement = event.getChild("dataset", event.getNamespace()).getChild("done-flag",
400                event.getNamespace());
401        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
402
403        for (int i = 0; i < instanceList.length; i++) {
404            if (instanceList[i].trim().length() == 0) {
405                continue;
406            }
407            int funcType = getFuncType(instanceList[i]);
408            if (funcType == LATEST || funcType == FUTURE) {
409                if (unresolvedInstances.length() > 0) {
410                    unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR);
411                }
412                unresolvedInstances.append(instanceList[i]);
413                continue;
414            }
415            ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]);
416            if (uris.length() > 0) {
417                uris.append(CoordELFunctions.INSTANCE_SEPARATOR);
418                urisWithDoneFlag.append(CoordELFunctions.INSTANCE_SEPARATOR);
419            }
420
421            String uriPath = CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace())
422                    .getChild("uri-template", event.getNamespace()).getTextTrim());
423            URIHandler uriHandler = uriService.getURIHandler(uriPath);
424            uriHandler.validate(uriPath);
425            uris.append(uriPath);
426            urisWithDoneFlag.append(uriHandler.getURIWithDoneFlag(uriPath, CoordUtils.getDoneFlag(doneFlagElement)));
427        }
428        return uris.toString();
429    }
430
431    /**
432     * @param eAction
433     * @param coordAction
434     * @param conf
435     * @return boolean to determine whether the SLA element is present or not
436     * @throws CoordinatorJobException
437     */
438    public static boolean materializeSLA(Element eAction, CoordinatorActionBean coordAction, Configuration conf)
439            throws CoordinatorJobException {
440        Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
441        if (eSla == null) {
442            // eAppXml.getNamespace("sla"));
443            return false;
444        }
445        try {
446            ELEvaluator evalSla = CoordELEvaluator.createSLAEvaluator(eAction, coordAction, conf);
447            List<Element> elemList = eSla.getChildren();
448            for (Element elem : elemList) {
449                String updated;
450                try {
451                    updated = CoordELFunctions.evalAndWrap(evalSla, elem.getText().trim());
452                }
453                catch (Exception e) {
454                    throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
455                }
456                elem.removeContent();
457                elem.addContent(updated);
458            }
459        }
460        catch (Exception e) {
461            throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
462        }
463        return true;
464    }
465
466    /**
467     * Materialize one instance for specific nominal time. It includes: 1.
468     * Materialize data events (i.e. &lt;data-in&gt; and &lt;data-out&gt;) 2. Materialize
469     * data properties (i.e dataIn(&lt;DS&gt;) and dataOut(&lt;DS&gt;) 3. remove 'start' and
470     * 'end' tag 4. Add 'instance_number' and 'nominal-time' tag
471     *
472     * @param jobId coordinator job id
473     * @param dryrun true if it is dryrun
474     * @param eAction frequency unexploded-job
475     * @param nominalTime materialization time
476     * @param actualTime action actual time
477     * @param instanceCount instance numbers
478     * @param conf job configuration
479     * @param actionBean CoordinatorActionBean to materialize
480     * @return one materialized action for specific nominal time
481     * @throws Exception
482     */
483    @SuppressWarnings("unchecked")
484    public static String materializeOneInstance(String jobId, boolean dryrun, Element eAction, Date nominalTime,
485            Date actualTime, int instanceCount, Configuration conf, CoordinatorActionBean actionBean) throws Exception {
486        String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, instanceCount + "");
487        SyncCoordAction appInst = new SyncCoordAction();
488        appInst.setActionId(actionId);
489        appInst.setName(eAction.getAttributeValue("name"));
490        appInst.setNominalTime(nominalTime);
491        appInst.setActualTime(actualTime);
492        String frequency = eAction.getAttributeValue("frequency");
493        appInst.setFrequency(frequency);
494        appInst.setTimeUnit(TimeUnit.valueOf(eAction.getAttributeValue("freq_timeunit")));
495        appInst.setTimeZone(DateUtils.getTimeZone(eAction.getAttributeValue("timezone")));
496        appInst.setEndOfDuration(TimeUnit.valueOf(eAction.getAttributeValue("end_of_duration")));
497
498        boolean isInputLogicSpecified = CoordUtils.isInputLogicSpecified(eAction);
499
500        Element inputList = eAction.getChild("input-events", eAction.getNamespace());
501        List<Element> dataInList = null;
502        if (inputList != null) {
503            dataInList = inputList.getChildren("data-in", eAction.getNamespace());
504            materializeInputDataEvents(dataInList, appInst, conf, actionBean, isInputLogicSpecified);
505        }
506
507        if(isInputLogicSpecified){
508            evaluateInputCheck(eAction.getChild(CoordInputLogicEvaluator.INPUT_LOGIC, eAction.getNamespace()),
509                    CoordELEvaluator.createDataEvaluator(eAction, conf, actionId));
510        }
511        Element outputList = eAction.getChild("output-events", eAction.getNamespace());
512        List<Element> dataOutList = null;
513        if (outputList != null) {
514            dataOutList = outputList.getChildren("data-out", eAction.getNamespace());
515            materializeOutputDataEvents(dataOutList, appInst, conf);
516        }
517
518        eAction.removeAttribute("start");
519        eAction.removeAttribute("end");
520        eAction.setAttribute("instance-number", Integer.toString(instanceCount));
521        eAction.setAttribute("action-nominal-time", DateUtils.formatDateOozieTZ(nominalTime));
522        eAction.setAttribute("action-actual-time", DateUtils.formatDateOozieTZ(actualTime));
523
524        // Setting up action bean
525        actionBean.setCreatedConf(XmlUtils.prettyPrint(conf).toString());
526        actionBean.setRunConf(XmlUtils.prettyPrint(conf).toString());
527        actionBean.setCreatedTime(actualTime);
528        actionBean.setJobId(jobId);
529        actionBean.setId(actionId);
530        actionBean.setLastModifiedTime(new Date());
531        actionBean.setStatus(CoordinatorAction.Status.WAITING);
532        actionBean.setActionNumber(instanceCount);
533        actionBean.setNominalTime(nominalTime);
534        boolean isSla = CoordCommandUtils.materializeSLA(eAction, actionBean, conf);
535        if (isSla == true) {
536            actionBean.setSlaXml(XmlUtils.prettyPrint(
537                    eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")))
538                    .toString());
539        }
540
541        // actionBean.setTrackerUri(trackerUri);//TOOD:
542        // actionBean.setConsoleUrl(consoleUrl); //TODO:
543        // actionBean.setType(type);//TODO:
544        // actionBean.setErrorInfo(errorCode, errorMessage); //TODO:
545        // actionBean.setExternalStatus(externalStatus);//TODO
546        if (!dryrun) {
547            return XmlUtils.prettyPrint(eAction).toString();
548        }
549        else {
550            return dryRunCoord(eAction, actionBean);
551        }
552    }
553
554
555    /**
556     * @param eAction the actionXml related element
557     * @param actionBean the coordinator action bean
558     * @return
559     * @throws Exception
560     */
561    static String dryRunCoord(Element eAction, CoordinatorActionBean actionBean) throws Exception {
562        String action = XmlUtils.prettyPrint(eAction).toString();
563        StringBuilder actionXml = new StringBuilder(action);
564        Configuration actionConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
565        actionBean.setActionXml(action);
566
567        if (CoordUtils.isInputLogicSpecified(eAction)) {
568            new CoordInputLogicEvaluatorUtil(actionBean).validateInputLogic();
569        }
570
571        boolean isPushDepAvailable = true;
572        String pushMissingDependencies = actionBean.getPushInputDependencies().getMissingDependencies();
573        if (pushMissingDependencies != null) {
574            ActionDependency actionDependencies = DependencyChecker.checkForAvailability(pushMissingDependencies,
575                    actionConf, true);
576            if (actionDependencies.getMissingDependencies().size() != 0) {
577                isPushDepAvailable = false;
578            }
579
580        }
581        boolean isPullDepAvailable = true;
582        CoordActionInputCheckXCommand coordActionInput = new CoordActionInputCheckXCommand(actionBean.getId(),
583                actionBean.getJobId());
584        if (actionBean.getMissingDependencies() != null) {
585            StringBuilder existList = new StringBuilder();
586            StringBuilder nonExistList = new StringBuilder();
587            StringBuilder nonResolvedList = new StringBuilder();
588            getResolvedList(actionBean.getPullInputDependencies().getMissingDependencies(), nonExistList, nonResolvedList);
589            isPullDepAvailable = actionBean.getPullInputDependencies().checkPullMissingDependencies(actionBean,
590                    existList, nonExistList);
591
592        }
593
594        if (isPullDepAvailable && isPushDepAvailable) {
595            // Check for latest/future
596            boolean isLatestFutureDepAvailable = coordActionInput.checkUnResolvedInput(actionBean, actionXml,
597                    actionConf);
598            if (isLatestFutureDepAvailable) {
599                String newActionXml = CoordActionInputCheckXCommand.resolveCoordConfiguration(actionXml, actionConf,
600                        actionBean.getId());
601                actionXml.replace(0, actionXml.length(), newActionXml);
602            }
603        }
604
605        return actionXml.toString();
606    }
607
608    /**
609     * Materialize all &lt;input-events&gt;/&lt;data-in&gt; or &lt;output-events&gt;/&lt;data-out&gt;
610     * tags Create uris for resolved instances. Create unresolved instance for
611     * latest()/future().
612     *
613     * @param events
614     * @param appInst
615     * @param conf
616     * @throws Exception
617     */
618    private static void materializeOutputDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf)
619            throws Exception {
620
621        if (events == null) {
622            return;
623        }
624
625        for (Element event : events) {
626            StringBuilder instances = new StringBuilder();
627            ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event, appInst, conf);
628            // Handle list of instance tag
629            resolveInstances(event, instances, appInst, conf, eval);
630            // Handle start-instance and end-instance
631            resolveInstanceRange(event, instances, appInst, conf, eval);
632            // Separate out the unresolved instances
633            separateResolvedAndUnresolved(event, instances);
634
635        }
636    }
637
638    private static void evaluateInputCheck(Element root, ELEvaluator evalInputLogic) throws Exception {
639        for (Object event : root.getChildren()) {
640            Element inputElement = (Element) event;
641
642            resolveAttribute("dataset", inputElement, evalInputLogic);
643            resolveAttribute("name", inputElement, evalInputLogic);
644            resolveAttribute("min", inputElement, evalInputLogic);
645            resolveAttribute("wait", inputElement, evalInputLogic);
646            if (!inputElement.getChildren().isEmpty()) {
647                evaluateInputCheck(inputElement, evalInputLogic);
648            }
649        }
650    }
651
652    private static String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
653        Attribute attr = elem.getAttribute(attrName);
654        String val = null;
655        if (attr != null) {
656            try {
657                val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim());
658            }
659            catch (Exception e) {
660                throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
661            }
662            attr.setValue(val);
663        }
664        return val;
665    }
666
667    public static void materializeInputDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf,
668            CoordinatorActionBean actionBean, boolean isInputLogicSpecified) throws Exception {
669
670        if (events == null) {
671            return;
672        }
673        CoordInputDependency coordPullInputDependency = CoordInputDependencyFactory
674                .createPullInputDependencies(isInputLogicSpecified);
675        CoordInputDependency coordPushInputDependency = CoordInputDependencyFactory
676                .createPushInputDependencies(isInputLogicSpecified);
677        Map<String, String> unresolvedList = new HashMap<String, String>();
678
679        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
680
681        for (Element event : events) {
682            StringBuilder instances = new StringBuilder();
683            ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event, appInst, conf);
684            // Handle list of instance tag
685            resolveInstances(event, instances, appInst, conf, eval);
686            // Handle start-instance and end-instance
687            resolveInstanceRange(event, instances, appInst, conf, eval);
688            // Separate out the unresolved instances
689            String resolvedList = separateResolvedAndUnresolved(event, instances);
690            String name = event.getAttribute("name").getValue();
691
692            if (!resolvedList.isEmpty()) {
693                Element uri = event.getChild("dataset", event.getNamespace()).getChild("uri-template",
694                        event.getNamespace());
695
696                String uriTemplate = uri.getText();
697                URI baseURI = uriService.getAuthorityWithScheme(uriTemplate);
698                URIHandler handler = uriService.getURIHandler(baseURI);
699                List<CoordInputInstance> inputInstanceList = new ArrayList<CoordInputInstance>();
700
701                for (String inputInstance : resolvedList.split("#")) {
702                    inputInstanceList.add(new CoordInputInstance(inputInstance, false));
703                }
704
705                if (handler.getDependencyType(baseURI).equals(DependencyType.PULL)) {
706                    coordPullInputDependency.addInputInstanceList(name, inputInstanceList);
707                }
708                else {
709                    coordPushInputDependency.addInputInstanceList(name, inputInstanceList);
710
711                }
712            }
713
714            String tmpUnresolved = event.getChildTextTrim(UNRESOLVED_INSTANCES_TAG, event.getNamespace());
715            if (tmpUnresolved != null) {
716                unresolvedList.put(name, tmpUnresolved);
717            }
718        }
719        for(String unresolvedDatasetName:unresolvedList.keySet()){
720            coordPullInputDependency.addUnResolvedList(unresolvedDatasetName, unresolvedList.get(unresolvedDatasetName));
721        }
722        actionBean.setPullInputDependencies(coordPullInputDependency);
723        actionBean.setPushInputDependencies(coordPushInputDependency);
724        actionBean.setMissingDependencies(coordPullInputDependency.serialize());
725        actionBean.setPushMissingDependencies(coordPushInputDependency.serialize());
726
727    }
728    /**
729     * Get resolved string from missDepList
730     *
731     * @param missDepList
732     * @param resolved
733     * @param unresolved
734     * @return resolved string
735     */
736    public static String getResolvedList(String missDepList, StringBuilder resolved, StringBuilder unresolved) {
737        if (missDepList != null) {
738            int index = missDepList.indexOf(RESOLVED_UNRESOLVED_SEPARATOR);
739            if (index < 0) {
740                resolved.append(missDepList);
741            }
742            else {
743                resolved.append(missDepList.substring(0, index));
744                unresolved.append(missDepList.substring(index + RESOLVED_UNRESOLVED_SEPARATOR.length()));
745            }
746        }
747        return resolved.toString();
748    }
749
750    /**
751     * Get the next action time after a given time
752     *
753     * @param targetDate
754     * @param coordJob
755     * @return the next valid action time
756     */
757    public static Date getNextValidActionTimeForCronFrequency(Date targetDate, CoordinatorJobBean coordJob) throws ParseException {
758
759        String freq = coordJob.getFrequency();
760        TimeZone tz = DateUtils.getOozieProcessingTimeZone();
761        String[] cronArray = freq.split(" ");
762        Date nextTime = null;
763
764        // Current CronExpression doesn't support operations
765        // where both date of months and day of weeks are specified.
766        // As a result, we need to split this scenario into two cases
767        // and return the earlier time
768        if (!cronArray[2].trim().equals("?") && !cronArray[4].trim().equals("?")) {
769
770            // When any one of day of month or day of week fields is a wildcard
771            // we need to replace the wildcard with "?"
772            if (cronArray[2].trim().equals("*") || cronArray[4].trim().equals("*")) {
773                if (cronArray[2].trim().equals("*")) {
774                    cronArray[2] = "?";
775                }
776                else {
777                    cronArray[4] = "?";
778                }
779                freq= StringUtils.join(cronArray, " ");
780
781                // The cronExpression class takes second
782                // as the first field where oozie is operating on
783                // minute basis
784                CronExpression expr = new CronExpression("0 " + freq);
785                expr.setTimeZone(tz);
786                nextTime = expr.getNextValidTimeAfter(targetDate);
787            }
788            // If both fields are specified by non-wildcards,
789            // we need to split it into two expressions
790            else {
791                String[] cronArray1 = freq.split(" ");
792                String[] cronArray2 = freq.split(" ");
793
794                cronArray1[2] = "?";
795                cronArray2[4] = "?";
796
797                String freq1 = StringUtils.join(cronArray1, " ");
798                String freq2 = StringUtils.join(cronArray2, " ");
799
800                // The cronExpression class takes second
801                // as the first field where oozie is operating on
802                // minute basis
803                CronExpression expr1 = new CronExpression("0 " + freq1);
804                expr1.setTimeZone(tz);
805                CronExpression expr2 = new CronExpression("0 " + freq2);
806                expr2.setTimeZone(tz);
807                nextTime = expr1.getNextValidTimeAfter(targetDate);
808                Date nextTime2 = expr2.getNextValidTimeAfter(targetDate);
809                nextTime = nextTime.compareTo(nextTime2) < 0 ? nextTime: nextTime2;
810            }
811        }
812        else {
813            // The cronExpression class takes second
814            // as the first field where oozie is operating on
815            // minute basis
816            CronExpression expr  = new CronExpression("0 " + freq);
817            expr.setTimeZone(tz);
818            nextTime = expr.getNextValidTimeAfter(targetDate);
819        }
820
821        return nextTime;
822    }
823
824    /**
825     * Computes the nominal time of the next action.
826     * Based on CoordMaterializeTransitionXCommand#materializeActions
827     *
828     * The Coordinator Job needs to have the frequency, time unit, time zone, start time, end time, and job xml.
829     * The Coordinator Action needs to have the nominal time and action number.
830     *
831     * @param coordJob The Coordinator Job
832     * @param coordAction The Coordinator Action
833     * @return the nominal time of the next action
834     * @throws ParseException
835     * @throws JDOMException
836     */
837    public static Date computeNextNominalTime(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction)
838            throws ParseException, JDOMException {
839        Date nextNominalTime;
840        boolean isCronFrequency = false;
841        int freq = -1;
842        try {
843            freq = Integer.parseInt(coordJob.getFrequency());
844        } catch (NumberFormatException e) {
845            isCronFrequency = true;
846        }
847
848        if (isCronFrequency) {
849            nextNominalTime = CoordCommandUtils.getNextValidActionTimeForCronFrequency(coordAction.getNominalTime(), coordJob);
850        } else {
851            TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone());
852            Calendar nextNominalTimeCal = Calendar.getInstance(appTz);
853            nextNominalTimeCal.setTime(coordJob.getStartTimestamp());
854            TimeUnit freqTU = TimeUnit.valueOf(coordJob.getTimeUnitStr());
855            // Action Number is indexed by 1, so no need to +1 here
856            nextNominalTimeCal.add(freqTU.getCalendarUnit(), coordAction.getActionNumber() * freq);
857            String jobXml = coordJob.getJobXml();
858            Element eJob = XmlUtils.parseXml(jobXml);
859            TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration"));
860            // Move to the End of duration, if needed.
861            DateUtils.moveToEnd(nextNominalTimeCal, endOfFlag);
862            nextNominalTime = nextNominalTimeCal.getTime();
863        }
864
865        // If the next nominal time is after the job's end time, then this is the last action, so return null
866        if (nextNominalTime.after(coordJob.getEndTime())) {
867            nextNominalTime = null;
868        }
869        return nextNominalTime;
870    }
871
872    public static boolean pathExists(String sPath, Configuration actionConf, String user) throws IOException,
873            URISyntaxException, URIHandlerException {
874        URI uri = new URI(sPath);
875        URIHandlerService service = Services.get().get(URIHandlerService.class);
876        URIHandler handler = service.getURIHandler(uri);
877        return handler.exists(uri, actionConf, user);
878    }
879
880    public static boolean pathExists(String sPath, Configuration actionConf) throws IOException, URISyntaxException,
881            URIHandlerException {
882        String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
883        return pathExists(sPath, actionConf, user);
884    }
885
886}