001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.coord;
019
020import java.io.StringReader;
021import java.net.URI;
022import java.text.ParseException;
023import java.util.TimeZone;
024import java.util.Map;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Date;
028import java.util.Calendar;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.oozie.CoordinatorActionBean;
032import org.apache.oozie.ErrorCode;
033import org.apache.oozie.client.CoordinatorAction;
034import org.apache.oozie.command.CommandException;
035import org.apache.oozie.coord.CoordELEvaluator;
036import org.apache.oozie.coord.CoordELFunctions;
037import org.apache.oozie.coord.CoordUtils;
038import org.apache.oozie.coord.CoordinatorJobException;
039import org.apache.oozie.coord.SyncCoordAction;
040import org.apache.oozie.coord.TimeUnit;
041import org.apache.oozie.dependency.ActionDependency;
042import org.apache.oozie.dependency.DependencyChecker;
043import org.apache.oozie.dependency.URIHandler;
044import org.apache.oozie.dependency.URIHandler.DependencyType;
045import org.apache.oozie.service.Services;
046import org.apache.oozie.service.URIHandlerService;
047import org.apache.oozie.service.UUIDService;
048import org.apache.oozie.util.DateUtils;
049import org.apache.oozie.util.ELEvaluator;
050import org.apache.oozie.util.XConfiguration;
051import org.apache.oozie.util.XmlUtils;
052import org.jdom.Element;
053import org.quartz.CronExpression;
054import org.apache.commons.lang.StringUtils;
055import org.apache.oozie.CoordinatorJobBean;
056
057public class CoordCommandUtils {
058    public static int CURRENT = 0;
059    public static int LATEST = 1;
060    public static int FUTURE = 2;
061    public static int OFFSET = 3;
062    public static int ABSOLUTE = 4;
063    public static int UNEXPECTED = -1;
064    public static final String RESOLVED_UNRESOLVED_SEPARATOR = "!!";
065    public static final String UNRESOLVED_INST_TAG = "unresolved-instances";
066
067    /**
068     * parse a function like coord:latest(n)/future() and return the 'n'.
069     * <p/>
070     *
071     * @param function
072     * @param restArg
073     * @return int instanceNumber
074     * @throws Exception
075     */
076    public static int getInstanceNumber(String function, StringBuilder restArg) throws Exception {
077        int funcType = getFuncType(function);
078        if (funcType == ABSOLUTE) {
079            return ABSOLUTE;
080        }
081        if (funcType == CURRENT || funcType == LATEST) {
082            return parseOneArg(function);
083        }
084        else {
085            return parseMoreArgs(function, restArg);
086        }
087    }
088
089    /**
090     * Evaluates function for coord-action-create-inst tag
091     * @param event
092     * @param appInst
093     * @param conf
094     * @param function
095     * @return evaluation result
096     * @throws Exception
097     */
098    private static String evaluateInstanceFunction(Element event, SyncCoordAction appInst, Configuration conf,
099            String function) throws Exception {
100        ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator("coord-action-create-inst", event, appInst, conf);
101        return CoordELFunctions.evalAndWrap(eval, function);
102    }
103
104    public static int parseOneArg(String funcName) throws Exception {
105        int firstPos = funcName.indexOf("(");
106        int lastPos = funcName.lastIndexOf(")");
107        if (firstPos >= 0 && lastPos > firstPos) {
108            String tmp = funcName.substring(firstPos + 1, lastPos).trim();
109            if (tmp.length() > 0) {
110                return (int) Double.parseDouble(tmp);
111            }
112        }
113        throw new RuntimeException("Unformatted function :" + funcName);
114    }
115
116    public static String parseOneStringArg(String funcName) throws Exception {
117        int firstPos = funcName.indexOf("(");
118        int lastPos = funcName.lastIndexOf(")");
119        if (firstPos >= 0 && lastPos > firstPos) {
120            return funcName.substring(firstPos + 1, lastPos).trim();
121        }
122        throw new RuntimeException("Unformatted function :" + funcName);
123    }
124
125    private static int parseMoreArgs(String funcName, StringBuilder restArg) throws Exception {
126        int firstPos = funcName.indexOf("(");
127        int secondPos = funcName.lastIndexOf(",");
128        int lastPos = funcName.lastIndexOf(")");
129        if (firstPos >= 0 && secondPos > firstPos) {
130            String tmp = funcName.substring(firstPos + 1, secondPos).trim();
131            if (tmp.length() > 0) {
132                restArg.append(funcName.substring(secondPos + 1, lastPos).trim());
133                return (int) Double.parseDouble(tmp);
134            }
135        }
136        throw new RuntimeException("Unformatted function :" + funcName);
137    }
138
139    /**
140     * @param EL function name
141     * @return type of EL function
142     */
143    public static int getFuncType(String function) {
144        if (function.indexOf("current") >= 0) {
145            return CURRENT;
146        }
147        else if (function.indexOf("latest") >= 0) {
148            return LATEST;
149        }
150        else if (function.indexOf("future") >= 0) {
151            return FUTURE;
152        }
153        else if (function.indexOf("offset") >= 0) {
154            return OFFSET;
155        }
156        else if (function.indexOf("absolute") >= 0) {
157            return ABSOLUTE;
158        }
159        return UNEXPECTED;
160        // throw new RuntimeException("Unexpected instance name "+ function);
161    }
162
163    /**
164     * @param startInst: EL function name
165     * @param endInst: EL function name
166     * @throws CommandException if both are not the same function
167     */
168    public static void checkIfBothSameType(String startInst, String endInst) throws CommandException {
169        if (getFuncType(startInst) != getFuncType(endInst)) {
170            if (getFuncType(startInst) == ABSOLUTE) {
171                if (getFuncType(endInst) != CURRENT) {
172                    throw new CommandException(ErrorCode.E1010,
173                            "Only start-instance as absolute and end-instance as current is supported." + " start = "
174                                    + startInst + "  end = " + endInst);
175                }
176            }
177            else {
178                throw new CommandException(ErrorCode.E1010,
179                        " start-instance and end-instance both should be either latest or current or future or offset\n"
180                                + " start " + startInst + " and end " + endInst);
181            }
182        }
183    }
184
185
186    /**
187     * Resolve list of <instance> </instance> tags.
188     *
189     * @param event
190     * @param instances
191     * @param actionInst
192     * @param conf
193     * @param eval: ELEvalautor
194     * @throws Exception
195     */
196    public static void resolveInstances(Element event, StringBuilder instances, SyncCoordAction actionInst,
197            Configuration conf, ELEvaluator eval) throws Exception {
198        for (Element eInstance : (List<Element>) event.getChildren("instance", event.getNamespace())) {
199
200            if (instances.length() > 0) {
201                instances.append(CoordELFunctions.INSTANCE_SEPARATOR);
202            }
203            instances.append(materializeInstance(event, eInstance.getTextTrim(), actionInst, conf, eval));
204        }
205        event.removeChildren("instance", event.getNamespace());
206    }
207
208    /**
209     * Resolve <start-instance> <end-insatnce> tag. Don't resolve any
210     * latest()/future()
211     *
212     * @param event
213     * @param instances
214     * @param appInst
215     * @param conf
216     * @param eval: ELEvalautor
217     * @throws Exception
218     */
219    public static void resolveInstanceRange(Element event, StringBuilder instances, SyncCoordAction appInst,
220            Configuration conf, ELEvaluator eval) throws Exception {
221        Element eStartInst = event.getChild("start-instance", event.getNamespace());
222        Element eEndInst = event.getChild("end-instance", event.getNamespace());
223        if (eStartInst != null && eEndInst != null) {
224            String strStart = evaluateInstanceFunction(event, appInst, conf, eStartInst.getTextTrim());
225            String strEnd = evaluateInstanceFunction(event, appInst, conf, eEndInst.getTextTrim());
226            checkIfBothSameType(strStart, strEnd);
227            StringBuilder restArg = new StringBuilder(); // To store rest
228                                                         // arguments for
229                                                         // future
230                                                         // function
231            int startIndex = getInstanceNumber(strStart, restArg);
232            String startRestArg = restArg.toString();
233            restArg.delete(0, restArg.length());
234            int endIndex = getInstanceNumber(strEnd, restArg);
235            String endRestArg = restArg.toString();
236            int funcType = getFuncType(strStart);
237
238            if (funcType == ABSOLUTE) {
239                StringBuffer bf = new StringBuffer();
240                bf.append("${coord:absoluteRange(\"").append(parseOneStringArg(strStart))
241                        .append("\",").append(endIndex).append(")}");
242                String matInstance = materializeInstance(event, bf.toString(), appInst, conf, eval);
243                if (matInstance != null && !matInstance.isEmpty()) {
244                    if (instances.length() > 0) {
245                        instances.append(CoordELFunctions.INSTANCE_SEPARATOR);
246                    }
247                    instances.append(matInstance);
248                }
249            }
250            else {
251                if (funcType == OFFSET) {
252                    TimeUnit startU = TimeUnit.valueOf(startRestArg);
253                    TimeUnit endU = TimeUnit.valueOf(endRestArg);
254                    if (startU.getCalendarUnit() * startIndex > endU.getCalendarUnit() * endIndex) {
255                        throw new CommandException(ErrorCode.E1010,
256                                " start-instance should be equal or earlier than the end-instance \n"
257                                        + XmlUtils.prettyPrint(event));
258                    }
259                    Calendar startCal = CoordELFunctions.resolveOffsetRawTime(startIndex, startU, eval);
260                    Calendar endCal = CoordELFunctions.resolveOffsetRawTime(endIndex, endU, eval);
261                    if (startCal != null && endCal != null) {
262                        List<Integer> expandedFreqs = CoordELFunctions.expandOffsetTimes(startCal, endCal, eval);
263                        for (int i = expandedFreqs.size() - 1; i >= 0; i--) {
264                            String matInstance = materializeInstance(event, "${coord:offset(" + expandedFreqs.get(i)
265                                    + ", \"MINUTE\")}", appInst, conf, eval);
266                            if (matInstance == null || matInstance.length() == 0) {
267                                // Earlier than dataset's initial instance
268                                break;
269                            }
270                            if (instances.length() > 0) {
271                                instances.append(CoordELFunctions.INSTANCE_SEPARATOR);
272                            }
273                            instances.append(matInstance);
274                        }
275                    }
276                }
277                else {
278                    if (startIndex > endIndex) {
279                        throw new CommandException(ErrorCode.E1010,
280                                " start-instance should be equal or earlier than the end-instance \n"
281                                        + XmlUtils.prettyPrint(event));
282                    }
283                    if (funcType == CURRENT) {
284                        // Everything could be resolved NOW. no latest() ELs
285                        String matInstance = materializeInstance(event, "${coord:currentRange(" + startIndex + ","
286                                + endIndex + ")}", appInst, conf, eval);
287                        if (matInstance != null && !matInstance.isEmpty()) {
288                            if (instances.length() > 0) {
289                                instances.append(CoordELFunctions.INSTANCE_SEPARATOR);
290                            }
291                            instances.append(matInstance);
292                        }
293                    }
294
295                    else { // latest(n)/future() EL is present
296                        if (funcType == LATEST) {
297                            instances.append("${coord:latestRange(").append(startIndex).append(",").append(endIndex)
298                            .append(")}");
299                        }
300                        else if (funcType == FUTURE) {
301                            instances.append("${coord:futureRange(").append(startIndex).append(",").append(endIndex)
302                            .append(",'").append(endRestArg).append("')}");
303                        }
304                    }
305                }
306            }
307            // Remove start-instance and end-instances
308            event.removeChild("start-instance", event.getNamespace());
309            event.removeChild("end-instance", event.getNamespace());
310        }
311    }
312
313    /**
314     * Materialize one instance like current(-2)
315     *
316     * @param event : <data-in>
317     * @param expr : instance like current(-1)
318     * @param appInst : application specific info
319     * @param conf
320     * @param evalInst :ELEvaluator
321     * @return materialized date string
322     * @throws Exception
323     */
324    public static String materializeInstance(Element event, String expr, SyncCoordAction appInst, Configuration conf,
325            ELEvaluator evalInst) throws Exception {
326        if (event == null) {
327            return null;
328        }
329        // ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event,
330        // appInst, conf);
331        return CoordELFunctions.evalAndWrap(evalInst, expr);
332    }
333
334    /**
335     * Create two new tags with <uris> and <unresolved-instances>.
336     *
337     * @param event
338     * @param instances
339     * @throws Exception
340     */
341    private static String separateResolvedAndUnresolved(Element event, StringBuilder instances)
342            throws Exception {
343        StringBuilder unresolvedInstances = new StringBuilder();
344        StringBuilder urisWithDoneFlag = new StringBuilder();
345        StringBuilder depList = new StringBuilder();
346        String uris = createEarlyURIs(event, instances.toString(), unresolvedInstances, urisWithDoneFlag);
347        if (uris.length() > 0) {
348            Element uriInstance = new Element("uris", event.getNamespace());
349            uriInstance.addContent(uris);
350            event.getContent().add(1, uriInstance);
351            if (depList.length() > 0) {
352                depList.append(CoordELFunctions.INSTANCE_SEPARATOR);
353            }
354            depList.append(urisWithDoneFlag);
355        }
356        if (unresolvedInstances.length() > 0) {
357            Element elemInstance = new Element(UNRESOLVED_INST_TAG, event.getNamespace());
358            elemInstance.addContent(unresolvedInstances.toString());
359            event.getContent().add(1, elemInstance);
360        }
361        return depList.toString();
362    }
363
364    /**
365     * The function create a list of URIs separated by "," using the instances
366     * time stamp and URI-template
367     *
368     * @param event : <data-in> event
369     * @param instances : List of time stamp separated by ","
370     * @param unresolvedInstances : list of instance with latest function
371     * @param urisWithDoneFlag : list of URIs with the done flag appended
372     * @return : list of URIs separated by ";" as a string.
373     * @throws Exception
374     */
375    public static String createEarlyURIs(Element event, String instances, StringBuilder unresolvedInstances,
376            StringBuilder urisWithDoneFlag) throws Exception {
377        if (instances == null || instances.length() == 0) {
378            return "";
379        }
380        String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR);
381        StringBuilder uris = new StringBuilder();
382
383        Element doneFlagElement = event.getChild("dataset", event.getNamespace()).getChild("done-flag",
384                event.getNamespace());
385        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
386
387        for (int i = 0; i < instanceList.length; i++) {
388            if (instanceList[i].trim().length() == 0) {
389                continue;
390            }
391            int funcType = getFuncType(instanceList[i]);
392            if (funcType == LATEST || funcType == FUTURE) {
393                if (unresolvedInstances.length() > 0) {
394                    unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR);
395                }
396                unresolvedInstances.append(instanceList[i]);
397                continue;
398            }
399            ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]);
400            if (uris.length() > 0) {
401                uris.append(CoordELFunctions.INSTANCE_SEPARATOR);
402                urisWithDoneFlag.append(CoordELFunctions.INSTANCE_SEPARATOR);
403            }
404
405            String uriPath = CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace())
406                    .getChild("uri-template", event.getNamespace()).getTextTrim());
407            URIHandler uriHandler = uriService.getURIHandler(uriPath);
408            uriHandler.validate(uriPath);
409            uris.append(uriPath);
410            urisWithDoneFlag.append(uriHandler.getURIWithDoneFlag(uriPath, CoordUtils.getDoneFlag(doneFlagElement)));
411        }
412        return uris.toString();
413    }
414
415    /**
416     * @param eAction
417     * @param coordAction
418     * @param conf
419     * @return boolean to determine whether the SLA element is present or not
420     * @throws CoordinatorJobException
421     */
422    public static boolean materializeSLA(Element eAction, CoordinatorActionBean coordAction, Configuration conf)
423            throws CoordinatorJobException {
424        Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
425        if (eSla == null) {
426            // eAppXml.getNamespace("sla"));
427            return false;
428        }
429        try {
430            ELEvaluator evalSla = CoordELEvaluator.createSLAEvaluator(eAction, coordAction, conf);
431            List<Element> elemList = eSla.getChildren();
432            for (Element elem : elemList) {
433                String updated;
434                try {
435                    updated = CoordELFunctions.evalAndWrap(evalSla, elem.getText().trim());
436                }
437                catch (Exception e) {
438                    throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
439                }
440                elem.removeContent();
441                elem.addContent(updated);
442            }
443        }
444        catch (Exception e) {
445            throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
446        }
447        return true;
448    }
449
450    /**
451     * Materialize one instance for specific nominal time. It includes: 1.
452     * Materialize data events (i.e. <data-in> and <data-out>) 2. Materialize
453     * data properties (i.e dataIn(<DS>) and dataOut(<DS>) 3. remove 'start' and
454     * 'end' tag 4. Add 'instance_number' and 'nominal-time' tag
455     *
456     * @param jobId coordinator job id
457     * @param dryrun true if it is dryrun
458     * @param eAction frequency unexploded-job
459     * @param nominalTime materialization time
460     * @param actualTime action actual time
461     * @param instanceCount instance numbers
462     * @param conf job configuration
463     * @param actionBean CoordinatorActionBean to materialize
464     * @return one materialized action for specific nominal time
465     * @throws Exception
466     */
467    @SuppressWarnings("unchecked")
468    public static String materializeOneInstance(String jobId, boolean dryrun, Element eAction, Date nominalTime,
469            Date actualTime, int instanceCount, Configuration conf, CoordinatorActionBean actionBean) throws Exception {
470        String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, instanceCount + "");
471        SyncCoordAction appInst = new SyncCoordAction();
472        appInst.setActionId(actionId);
473        appInst.setName(eAction.getAttributeValue("name"));
474        appInst.setNominalTime(nominalTime);
475        appInst.setActualTime(actualTime);
476        String frequency = eAction.getAttributeValue("frequency");
477        appInst.setFrequency(frequency);
478        appInst.setTimeUnit(TimeUnit.valueOf(eAction.getAttributeValue("freq_timeunit")));
479        appInst.setTimeZone(DateUtils.getTimeZone(eAction.getAttributeValue("timezone")));
480        appInst.setEndOfDuration(TimeUnit.valueOf(eAction.getAttributeValue("end_of_duration")));
481
482        Map<String, StringBuilder> dependencyMap = null;
483
484        Element inputList = eAction.getChild("input-events", eAction.getNamespace());
485        List<Element> dataInList = null;
486        if (inputList != null) {
487            dataInList = inputList.getChildren("data-in", eAction.getNamespace());
488            dependencyMap = materializeDataEvents(dataInList, appInst, conf);
489        }
490
491        Element outputList = eAction.getChild("output-events", eAction.getNamespace());
492        List<Element> dataOutList = null;
493        if (outputList != null) {
494            dataOutList = outputList.getChildren("data-out", eAction.getNamespace());
495            materializeDataEvents(dataOutList, appInst, conf);
496        }
497
498        eAction.removeAttribute("start");
499        eAction.removeAttribute("end");
500        eAction.setAttribute("instance-number", Integer.toString(instanceCount));
501        eAction.setAttribute("action-nominal-time", DateUtils.formatDateOozieTZ(nominalTime));
502        eAction.setAttribute("action-actual-time", DateUtils.formatDateOozieTZ(actualTime));
503
504        // Setting up action bean
505        actionBean.setCreatedConf(XmlUtils.prettyPrint(conf).toString());
506        actionBean.setRunConf(XmlUtils.prettyPrint(conf).toString());
507        actionBean.setCreatedTime(actualTime);
508        actionBean.setJobId(jobId);
509        actionBean.setId(actionId);
510        actionBean.setLastModifiedTime(new Date());
511        actionBean.setStatus(CoordinatorAction.Status.WAITING);
512        actionBean.setActionNumber(instanceCount);
513        if (dependencyMap != null) {
514            StringBuilder sbPull = dependencyMap.get(DependencyType.PULL.name());
515            if (sbPull != null) {
516                actionBean.setMissingDependencies(sbPull.toString());
517            }
518            StringBuilder sbPush = dependencyMap.get(DependencyType.PUSH.name());
519            if (sbPush != null) {
520                actionBean.setPushMissingDependencies(sbPush.toString());
521            }
522        }
523        actionBean.setNominalTime(nominalTime);
524        boolean isSla = CoordCommandUtils.materializeSLA(eAction, actionBean, conf);
525        if (isSla == true) {
526            actionBean.setSlaXml(XmlUtils.prettyPrint(
527                    eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")))
528                    .toString());
529        }
530
531        // actionBean.setTrackerUri(trackerUri);//TOOD:
532        // actionBean.setConsoleUrl(consoleUrl); //TODO:
533        // actionBean.setType(type);//TODO:
534        // actionBean.setErrorInfo(errorCode, errorMessage); //TODO:
535        // actionBean.setExternalStatus(externalStatus);//TODO
536        if (!dryrun) {
537            return XmlUtils.prettyPrint(eAction).toString();
538        }
539        else {
540            return dryRunCoord(eAction, actionBean);
541        }
542    }
543
544    /**
545     * @param eAction the actionXml related element
546     * @param actionBean the coordinator action bean
547     * @return
548     * @throws Exception
549     */
550    static String dryRunCoord(Element eAction, CoordinatorActionBean actionBean) throws Exception {
551        String action = XmlUtils.prettyPrint(eAction).toString();
552        StringBuilder actionXml = new StringBuilder(action);
553        Configuration actionConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
554
555        boolean isPushDepAvailable = true;
556        if (actionBean.getPushMissingDependencies() != null) {
557            ActionDependency actionDep = DependencyChecker.checkForAvailability(
558                    actionBean.getPushMissingDependencies(), actionConf, true);
559            if (actionDep.getMissingDependencies().size() != 0) {
560                isPushDepAvailable = false;
561            }
562
563        }
564        boolean isPullDepAvailable = true;
565        CoordActionInputCheckXCommand coordActionInput = new CoordActionInputCheckXCommand(actionBean.getId(),
566                actionBean.getJobId());
567        if (actionBean.getMissingDependencies() != null) {
568            StringBuilder existList = new StringBuilder();
569            StringBuilder nonExistList = new StringBuilder();
570            StringBuilder nonResolvedList = new StringBuilder();
571            getResolvedList(actionBean.getMissingDependencies(), nonExistList, nonResolvedList);
572            isPullDepAvailable = coordActionInput.checkInput(actionXml, existList, nonExistList, actionConf);
573        }
574
575        if (isPullDepAvailable && isPushDepAvailable) {
576            // Check for latest/future
577            boolean isLatestFutureDepAvailable = coordActionInput.checkUnResolvedInput(actionXml, actionConf);
578            if (isLatestFutureDepAvailable) {
579                String newActionXml = CoordActionInputCheckXCommand.resolveCoordConfiguration(actionXml, actionConf,
580                        actionBean.getId());
581                actionXml.replace(0, actionXml.length(), newActionXml);
582            }
583        }
584
585        return actionXml.toString();
586    }
587
588    /**
589     * Materialize all <input-events>/<data-in> or <output-events>/<data-out>
590     * tags Create uris for resolved instances. Create unresolved instance for
591     * latest()/future().
592     *
593     * @param events
594     * @param appInst
595     * @param conf
596     * @throws Exception
597     */
598    public static Map<String, StringBuilder> materializeDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf
599            ) throws Exception {
600
601        if (events == null) {
602            return null;
603        }
604        StringBuilder unresolvedList = new StringBuilder();
605        Map<String, StringBuilder> dependencyMap = new HashMap<String, StringBuilder>();
606        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
607        StringBuilder pullMissingDep = null;
608        StringBuilder pushMissingDep = null;
609
610        for (Element event : events) {
611            StringBuilder instances = new StringBuilder();
612            ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event, appInst, conf);
613            // Handle list of instance tag
614            resolveInstances(event, instances, appInst, conf, eval);
615            // Handle start-instance and end-instance
616            resolveInstanceRange(event, instances, appInst, conf, eval);
617            // Separate out the unresolved instances
618            String resolvedList = separateResolvedAndUnresolved(event, instances);
619            if (!resolvedList.isEmpty()) {
620                Element uri = event.getChild("dataset", event.getNamespace()).getChild("uri-template",
621                        event.getNamespace());
622                String uriTemplate = uri.getText();
623                URI baseURI = uriService.getAuthorityWithScheme(uriTemplate);
624                URIHandler handler = uriService.getURIHandler(baseURI);
625                if (handler.getDependencyType(baseURI).equals(DependencyType.PULL)) {
626                    pullMissingDep = (pullMissingDep == null) ? new StringBuilder(resolvedList) : pullMissingDep.append(
627                            CoordELFunctions.INSTANCE_SEPARATOR).append(resolvedList);
628                }
629                else {
630                    pushMissingDep = (pushMissingDep == null) ? new StringBuilder(resolvedList) : pushMissingDep.append(
631                            CoordELFunctions.INSTANCE_SEPARATOR).append(resolvedList);
632                }
633            }
634
635            String tmpUnresolved = event.getChildTextTrim(UNRESOLVED_INST_TAG, event.getNamespace());
636            if (tmpUnresolved != null) {
637                if (unresolvedList.length() > 0) {
638                    unresolvedList.append(CoordELFunctions.INSTANCE_SEPARATOR);
639                }
640                unresolvedList.append(tmpUnresolved);
641            }
642        }
643        if (unresolvedList.length() > 0) {
644            if (pullMissingDep == null) {
645                pullMissingDep = new StringBuilder();
646            }
647            pullMissingDep.append(RESOLVED_UNRESOLVED_SEPARATOR).append(unresolvedList);
648        }
649        dependencyMap.put(DependencyType.PULL.name(), pullMissingDep);
650        dependencyMap.put(DependencyType.PUSH.name(), pushMissingDep);
651        return dependencyMap;
652    }
653
654    /**
655     * Get resolved string from missDepList
656     *
657     * @param missDepList
658     * @param resolved
659     * @param unresolved
660     * @return resolved string
661     */
662    public static String getResolvedList(String missDepList, StringBuilder resolved, StringBuilder unresolved) {
663        if (missDepList != null) {
664            int index = missDepList.indexOf(RESOLVED_UNRESOLVED_SEPARATOR);
665            if (index < 0) {
666                resolved.append(missDepList);
667            }
668            else {
669                resolved.append(missDepList.substring(0, index));
670                unresolved.append(missDepList.substring(index + RESOLVED_UNRESOLVED_SEPARATOR.length()));
671            }
672        }
673        return resolved.toString();
674    }
675
676    /**
677     * Get the next action time after a given time
678     *
679     * @param targetDate
680     * @param coordJob
681     * @return the next valid action time
682     */
683    public static Date getNextValidActionTimeForCronFrequency(Date targetDate, CoordinatorJobBean coordJob) throws ParseException {
684
685        String freq = coordJob.getFrequency();
686        TimeZone tz = DateUtils.getOozieProcessingTimeZone();
687        String[] cronArray = freq.split(" ");
688        Date nextTime = null;
689
690        // Current CronExpression doesn't support operations
691        // where both date of months and day of weeks are specified.
692        // As a result, we need to split this scenario into two cases
693        // and return the earlier time
694        if (!cronArray[2].trim().equals("?") && !cronArray[4].trim().equals("?")) {
695
696            // When any one of day of month or day of week fields is a wildcard
697            // we need to replace the wildcard with "?"
698            if (cronArray[2].trim().equals("*") || cronArray[4].trim().equals("*")) {
699                if (cronArray[2].trim().equals("*")) {
700                    cronArray[2] = "?";
701                }
702                else {
703                    cronArray[4] = "?";
704                }
705                freq= StringUtils.join(cronArray, " ");
706
707                // The cronExpression class takes second
708                // as the first field where oozie is operating on
709                // minute basis
710                CronExpression expr = new CronExpression("0 " + freq);
711                expr.setTimeZone(tz);
712                nextTime = expr.getNextValidTimeAfter(targetDate);
713            }
714            // If both fields are specified by non-wildcards,
715            // we need to split it into two expressions
716            else {
717                String[] cronArray1 = freq.split(" ");
718                String[] cronArray2 = freq.split(" ");
719
720                cronArray1[2] = "?";
721                cronArray2[4] = "?";
722
723                String freq1 = StringUtils.join(cronArray1, " ");
724                String freq2 = StringUtils.join(cronArray2, " ");
725
726                // The cronExpression class takes second
727                // as the first field where oozie is operating on
728                // minute basis
729                CronExpression expr1 = new CronExpression("0 " + freq1);
730                expr1.setTimeZone(tz);
731                CronExpression expr2 = new CronExpression("0 " + freq2);
732                expr2.setTimeZone(tz);
733                nextTime = expr1.getNextValidTimeAfter(targetDate);
734                Date nextTime2 = expr2.getNextValidTimeAfter(targetDate);
735                nextTime = nextTime.compareTo(nextTime2) < 0 ? nextTime: nextTime2;
736            }
737        }
738        else {
739            // The cronExpression class takes second
740            // as the first field where oozie is operating on
741            // minute basis
742            CronExpression expr  = new CronExpression("0 " + freq);
743            expr.setTimeZone(tz);
744            nextTime = expr.getNextValidTimeAfter(targetDate);
745        }
746
747        return nextTime;
748    }
749
750}