001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.io.StringReader;
021    import java.util.Calendar;
022    import java.util.Date;
023    import java.util.List;
024    
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.oozie.CoordinatorActionBean;
027    import org.apache.oozie.ErrorCode;
028    import org.apache.oozie.client.CoordinatorAction;
029    import org.apache.oozie.command.CommandException;
030    import org.apache.oozie.coord.CoordELEvaluator;
031    import org.apache.oozie.coord.CoordELFunctions;
032    import org.apache.oozie.coord.CoordUtils;
033    import org.apache.oozie.coord.CoordinatorJobException;
034    import org.apache.oozie.coord.SyncCoordAction;
035    import org.apache.oozie.coord.TimeUnit;
036    import org.apache.oozie.service.Services;
037    import org.apache.oozie.service.UUIDService;
038    import org.apache.oozie.util.DateUtils;
039    import org.apache.oozie.util.ELEvaluator;
040    import org.apache.oozie.util.XConfiguration;
041    import org.apache.oozie.util.XmlUtils;
042    import org.jdom.Element;
043    
044    public class CoordCommandUtils {
045        public static int CURRENT = 0;
046        public static int LATEST = 1;
047        public static int FUTURE = 2;
048        public static int OFFSET = 3;
049        public static int UNEXPECTED = -1;
050        public static final String RESOLVED_UNRESOLVED_SEPARATOR = ";";
051    
052        /**
053         * parse a function like coord:latest(n)/future() and return the 'n'.
054         * <p/>
055         * @param function
056         * @param event
057         * @param appInst
058         * @param conf
059         * @param restArg
060         * @return int instanceNumber
061         * @throws Exception
062         */
063        public static int getInstanceNumber(String function, Element event, SyncCoordAction appInst, Configuration conf,
064                StringBuilder restArg) throws Exception {
065            ELEvaluator eval = CoordELEvaluator
066                    .createInstancesELEvaluator("coord-action-create-inst", event, appInst, conf);
067            String newFunc = CoordELFunctions.evalAndWrap(eval, function);
068            int funcType = getFuncType(newFunc);
069            if (funcType == CURRENT || funcType == LATEST) {
070                return parseOneArg(newFunc);
071            }
072            else {
073                return parseMoreArgs(newFunc, restArg);
074            }
075        }
076    
077        private static int parseOneArg(String funcName) throws Exception {
078            int firstPos = funcName.indexOf("(");
079            int lastPos = funcName.lastIndexOf(")");
080            if (firstPos >= 0 && lastPos > firstPos) {
081                String tmp = funcName.substring(firstPos + 1, lastPos).trim();
082                if (tmp.length() > 0) {
083                    return (int) Double.parseDouble(tmp);
084                }
085            }
086            throw new RuntimeException("Unformatted function :" + funcName);
087        }
088    
089        private static int parseMoreArgs(String funcName, StringBuilder restArg) throws Exception {
090            int firstPos = funcName.indexOf("(");
091            int secondPos = funcName.lastIndexOf(",");
092            int lastPos = funcName.lastIndexOf(")");
093            if (firstPos >= 0 && secondPos > firstPos) {
094                String tmp = funcName.substring(firstPos + 1, secondPos).trim();
095                if (tmp.length() > 0) {
096                    restArg.append(funcName.substring(secondPos + 1, lastPos).trim());
097                    return (int) Double.parseDouble(tmp);
098                }
099            }
100            throw new RuntimeException("Unformatted function :" + funcName);
101        }
102    
103        /**
104         * @param EL function name
105         * @return type of EL function
106         */
107        public static int getFuncType(String function) {
108            if (function.indexOf("current") >= 0) {
109                return CURRENT;
110            }
111            else if (function.indexOf("latest") >= 0) {
112                return LATEST;
113            }
114            else if (function.indexOf("future") >= 0) {
115                return FUTURE;
116            }
117            else if (function.indexOf("offset") >= 0) {
118                return OFFSET;
119            }
120            return UNEXPECTED;
121            // throw new RuntimeException("Unexpected instance name "+ function);
122        }
123    
124        /**
125         * @param startInst: EL function name
126         * @param endInst: EL function name
127         * @throws CommandException if both are not the same function
128         */
129        public static void checkIfBothSameType(String startInst, String endInst) throws CommandException {
130            if (getFuncType(startInst) != getFuncType(endInst)) {
131                throw new CommandException(ErrorCode.E1010,
132                        " start-instance and end-instance both should be either latest or current or future or offset\n"
133                                + " start " + startInst + " and end " + endInst);
134            }
135        }
136    
137        /**
138         * Resolve list of <instance> </instance> tags.
139         *
140         * @param event
141         * @param instances
142         * @param actionInst
143         * @param conf
144         * @param eval: ELEvalautor
145         * @throws Exception
146         */
147        public static void resolveInstances(Element event, StringBuilder instances, SyncCoordAction actionInst,
148                Configuration conf, ELEvaluator eval) throws Exception {
149            for (Element eInstance : (List<Element>) event.getChildren("instance", event.getNamespace())) {
150                if (instances.length() > 0) {
151                    instances.append(CoordELFunctions.INSTANCE_SEPARATOR);
152                }
153                instances.append(materializeInstance(event, eInstance.getTextTrim(), actionInst, conf, eval));
154            }
155            event.removeChildren("instance", event.getNamespace());
156        }
157    
158        /**
159         * Resolve <start-instance> <end-insatnce> tag. Don't resolve any
160         * latest()/future()
161         *
162         * @param event
163         * @param instances
164         * @param appInst
165         * @param conf
166         * @param eval: ELEvalautor
167         * @throws Exception
168         */
169        public static void resolveInstanceRange(Element event, StringBuilder instances, SyncCoordAction appInst,
170                Configuration conf, ELEvaluator eval) throws Exception {
171            Element eStartInst = event.getChild("start-instance", event.getNamespace());
172            Element eEndInst = event.getChild("end-instance", event.getNamespace());
173            if (eStartInst != null && eEndInst != null) {
174                String strStart = eStartInst.getTextTrim();
175                String strEnd = eEndInst.getTextTrim();
176                checkIfBothSameType(strStart, strEnd);
177                StringBuilder restArg = new StringBuilder(); // To store rest
178                                                             // arguments for
179                                                             // future
180                                                             // function
181                int startIndex = getInstanceNumber(strStart, event, appInst, conf, restArg);
182                String startRestArg = restArg.toString();
183                restArg.delete(0, restArg.length());
184                int endIndex = getInstanceNumber(strEnd, event, appInst, conf, restArg);
185                String endRestArg = restArg.toString();
186                int funcType = getFuncType(strStart);
187                if (funcType == OFFSET) {
188                    TimeUnit startU = TimeUnit.valueOf(startRestArg);
189                    TimeUnit endU = TimeUnit.valueOf(endRestArg);
190                    if (startU.getCalendarUnit() * startIndex > endU.getCalendarUnit() * endIndex) {
191                        throw new CommandException(ErrorCode.E1010,
192                                " start-instance should be equal or earlier than the end-instance \n"
193                                        + XmlUtils.prettyPrint(event));
194                    }
195                    Calendar startCal = CoordELFunctions.resolveOffsetRawTime(startIndex, startU, eval);
196                    Calendar endCal = CoordELFunctions.resolveOffsetRawTime(endIndex, endU, eval);
197                    if (startCal != null && endCal != null) {
198                        List<Integer> expandedFreqs = CoordELFunctions.expandOffsetTimes(startCal, endCal, eval);
199                        for (int i = expandedFreqs.size() - 1; i >= 0; i--) {
200                            String matInstance = materializeInstance(event, "${coord:offset(" + expandedFreqs.get(i) + ", \"MINUTE\")}",
201                                                                        appInst, conf, eval);
202                            if (matInstance == null || matInstance.length() == 0) {
203                                // Earlier than dataset's initial instance
204                                break;
205                            }
206                            if (instances.length() > 0) {
207                                instances.append(CoordELFunctions.INSTANCE_SEPARATOR);
208                            }
209                            instances.append(matInstance);
210                        }
211                    }
212                }
213                else {
214                    if (startIndex > endIndex) {
215                        throw new CommandException(ErrorCode.E1010,
216                                " start-instance should be equal or earlier than the end-instance \n"
217                                        + XmlUtils.prettyPrint(event));
218                    }
219                    if (funcType == CURRENT) {
220                        // Everything could be resolved NOW. no latest() ELs
221                        for (int i = endIndex; i >= startIndex; i--) {
222                            String matInstance = materializeInstance(event, "${coord:current(" + i + ")}", appInst, conf, eval);
223                            if (matInstance == null || matInstance.length() == 0) {
224                                // Earlier than dataset's initial instance
225                                break;
226                            }
227                            if (instances.length() > 0) {
228                                instances.append(CoordELFunctions.INSTANCE_SEPARATOR);
229                            }
230                            instances.append(matInstance);
231                        }
232                    }
233                    else { // latest(n)/future() EL is present
234                        if (funcType == LATEST) {
235                            instances.append("${coord:latestRange(").append(startIndex).append(",").append(endIndex)
236                                    .append(")}");
237                        }
238                        else if (funcType == FUTURE) {
239                            instances.append("${coord:futureRange(").append(startIndex).append(",").append(endIndex)
240                                    .append(",'").append(endRestArg).append("')}");
241                        }
242                    }
243                }
244                // Remove start-instance and end-instances
245                event.removeChild("start-instance", event.getNamespace());
246                event.removeChild("end-instance", event.getNamespace());
247            }
248        }
249    
250        /**
251         * Materialize one instance like current(-2)
252         *
253         * @param event : <data-in>
254         * @param expr : instance like current(-1)
255         * @param appInst : application specific info
256         * @param conf
257         * @param evalInst :ELEvaluator
258         * @return materialized date string
259         * @throws Exception
260         */
261        public static String materializeInstance(Element event, String expr, SyncCoordAction appInst, Configuration conf,
262                ELEvaluator evalInst) throws Exception {
263            if (event == null) {
264                return null;
265            }
266            // ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event,
267            // appInst, conf);
268            return CoordELFunctions.evalAndWrap(evalInst, expr);
269        }
270    
271        /**
272         * Create two new tags with <uris> and <unresolved-instances>.
273         *
274         * @param event
275         * @param instances
276         * @param dependencyList
277         * @throws Exception
278         */
279        public static void separateResolvedAndUnresolved(Element event, StringBuilder instances, StringBuffer dependencyList)
280                throws Exception {
281            StringBuilder unresolvedInstances = new StringBuilder();
282            StringBuilder urisWithDoneFlag = new StringBuilder();
283            String uris = createEarlyURIs(event, instances.toString(), unresolvedInstances, urisWithDoneFlag);
284            if (uris.length() > 0) {
285                Element uriInstance = new Element("uris", event.getNamespace());
286                uriInstance.addContent(uris);
287                event.getContent().add(1, uriInstance);
288                if (dependencyList.length() > 0) {
289                    dependencyList.append(CoordELFunctions.INSTANCE_SEPARATOR);
290                }
291                dependencyList.append(urisWithDoneFlag);
292            }
293            if (unresolvedInstances.length() > 0) {
294                Element elemInstance = new Element("unresolved-instances", event.getNamespace());
295                elemInstance.addContent(unresolvedInstances.toString());
296                event.getContent().add(1, elemInstance);
297            }
298        }
299    
300        /**
301         * The function create a list of URIs separated by "," using the instances
302         * time stamp and URI-template
303         *
304         * @param event : <data-in> event
305         * @param instances : List of time stamp separated by ","
306         * @param unresolvedInstances : list of instance with latest function
307         * @param urisWithDoneFlag : list of URIs with the done flag appended
308         * @return : list of URIs separated by ";" as a string.
309         * @throws Exception
310         */
311        public static String createEarlyURIs(Element event, String instances, StringBuilder unresolvedInstances,
312                StringBuilder urisWithDoneFlag) throws Exception {
313            if (instances == null || instances.length() == 0) {
314                return "";
315            }
316            String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR);
317            StringBuilder uris = new StringBuilder();
318    
319            Element doneFlagElement = event.getChild("dataset", event.getNamespace()).getChild("done-flag",
320                    event.getNamespace());
321            String doneFlag = CoordUtils.getDoneFlag(doneFlagElement);
322    
323            for (int i = 0; i < instanceList.length; i++) {
324                if(instanceList[i].trim().length() == 0) {
325                    continue;
326                }
327                int funcType = getFuncType(instanceList[i]);
328                if (funcType == LATEST || funcType == FUTURE) {
329                    if (unresolvedInstances.length() > 0) {
330                        unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR);
331                    }
332                    unresolvedInstances.append(instanceList[i]);
333                    continue;
334                }
335                ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]);
336                if (uris.length() > 0) {
337                    uris.append(CoordELFunctions.INSTANCE_SEPARATOR);
338                    urisWithDoneFlag.append(CoordELFunctions.INSTANCE_SEPARATOR);
339                }
340    
341                String uriPath = CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace())
342                        .getChild("uri-template", event.getNamespace()).getTextTrim());
343                uris.append(uriPath);
344                if (doneFlag.length() > 0) {
345                    uriPath += "/" + doneFlag;
346                }
347                urisWithDoneFlag.append(uriPath);
348            }
349            return uris.toString();
350        }
351    
352        /**
353         * @param eSla
354         * @param nominalTime
355         * @param conf
356         * @return boolean to determine whether the SLA element is present or not
357         * @throws CoordinatorJobException
358         */
359        public static boolean materializeSLA(Element eSla, Date nominalTime, Configuration conf)
360                throws CoordinatorJobException {
361            if (eSla == null) {
362                // eAppXml.getNamespace("sla"));
363                return false;
364            }
365            try {
366                ELEvaluator evalSla = CoordELEvaluator.createSLAEvaluator(nominalTime, conf);
367                List<Element> elemList = eSla.getChildren();
368                for (Element elem : elemList) {
369                    String updated;
370                    try {
371                        updated = CoordELFunctions.evalAndWrap(evalSla, elem.getText().trim());
372                    }
373                    catch (Exception e) {
374                        throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
375                    }
376                    elem.removeContent();
377                    elem.addContent(updated);
378                }
379            }
380            catch (Exception e) {
381                throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
382            }
383            return true;
384        }
385    
386        /**
387         * Materialize one instance for specific nominal time. It includes: 1.
388         * Materialize data events (i.e. <data-in> and <data-out>) 2. Materialize
389         * data properties (i.e dataIn(<DS>) and dataOut(<DS>) 3. remove 'start' and
390         * 'end' tag 4. Add 'instance_number' and 'nominal-time' tag
391         *
392         * @param jobId coordinator job id
393         * @param dryrun true if it is dryrun
394         * @param eAction frequency unexploded-job
395         * @param nominalTime materialization time
396         * @param actualTime action actual time
397         * @param instanceCount instance numbers
398         * @param conf job configuration
399         * @param actionBean CoordinatorActionBean to materialize
400         * @return one materialized action for specific nominal time
401         * @throws Exception
402         */
403        @SuppressWarnings("unchecked")
404        public static String materializeOneInstance(String jobId, boolean dryrun, Element eAction, Date nominalTime,
405                Date actualTime, int instanceCount, Configuration conf, CoordinatorActionBean actionBean) throws Exception {
406            String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, instanceCount + "");
407            SyncCoordAction appInst = new SyncCoordAction();
408            appInst.setActionId(actionId);
409            appInst.setName(eAction.getAttributeValue("name"));
410            appInst.setNominalTime(nominalTime);
411            appInst.setActualTime(actualTime);
412            int frequency = Integer.parseInt(eAction.getAttributeValue("frequency"));
413            appInst.setFrequency(frequency);
414            appInst.setTimeUnit(TimeUnit.valueOf(eAction.getAttributeValue("freq_timeunit")));
415            appInst.setTimeZone(DateUtils.getTimeZone(eAction.getAttributeValue("timezone")));
416            appInst.setEndOfDuration(TimeUnit.valueOf(eAction.getAttributeValue("end_of_duration")));
417    
418            StringBuffer dependencyList = new StringBuffer();
419    
420            Element inputList = eAction.getChild("input-events", eAction.getNamespace());
421            List<Element> dataInList = null;
422            if (inputList != null) {
423                dataInList = inputList.getChildren("data-in", eAction.getNamespace());
424                materializeDataEvents(dataInList, appInst, conf, dependencyList);
425            }
426    
427            Element outputList = eAction.getChild("output-events", eAction.getNamespace());
428            List<Element> dataOutList = null;
429            if (outputList != null) {
430                dataOutList = outputList.getChildren("data-out", eAction.getNamespace());
431                StringBuffer tmp = new StringBuffer();
432                // no dependency checks
433                materializeDataEvents(dataOutList, appInst, conf, tmp);
434            }
435    
436            eAction.removeAttribute("start");
437            eAction.removeAttribute("end");
438            eAction.setAttribute("instance-number", Integer.toString(instanceCount));
439            eAction.setAttribute("action-nominal-time", DateUtils.formatDateOozieTZ(nominalTime));
440            eAction.setAttribute("action-actual-time", DateUtils.formatDateOozieTZ(actualTime));
441    
442            boolean isSla = CoordCommandUtils.materializeSLA(eAction.getChild("action", eAction.getNamespace()).getChild(
443                    "info", eAction.getNamespace("sla")), nominalTime, conf);
444    
445            // Setting up action bean
446            actionBean.setCreatedConf(XmlUtils.prettyPrint(conf).toString());
447            actionBean.setRunConf(XmlUtils.prettyPrint(conf).toString());
448            actionBean.setCreatedTime(actualTime);
449            actionBean.setJobId(jobId);
450            actionBean.setId(actionId);
451            actionBean.setLastModifiedTime(new Date());
452            actionBean.setStatus(CoordinatorAction.Status.WAITING);
453            actionBean.setActionNumber(instanceCount);
454            actionBean.setMissingDependencies(dependencyList.toString());
455            actionBean.setNominalTime(nominalTime);
456            if (isSla == true) {
457                actionBean.setSlaXml(XmlUtils.prettyPrint(
458                        eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")))
459                        .toString());
460            }
461    
462            // actionBean.setTrackerUri(trackerUri);//TOOD:
463            // actionBean.setConsoleUrl(consoleUrl); //TODO:
464            // actionBean.setType(type);//TODO:
465            // actionBean.setErrorInfo(errorCode, errorMessage); //TODO:
466            // actionBean.setExternalStatus(externalStatus);//TODO
467            if (!dryrun) {
468                return XmlUtils.prettyPrint(eAction).toString();
469            }
470            else {
471                String action = XmlUtils.prettyPrint(eAction).toString();
472                CoordActionInputCheckXCommand coordActionInput = new CoordActionInputCheckXCommand(actionBean.getId(), actionBean.getJobId());
473                StringBuilder actionXml = new StringBuilder(action);
474                StringBuilder existList = new StringBuilder();
475                StringBuilder nonExistList = new StringBuilder();
476                StringBuilder nonResolvedList = new StringBuilder();
477                getResolvedList(actionBean.getMissingDependencies(), nonExistList, nonResolvedList);
478                Configuration actionConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
479                coordActionInput.checkInput(actionXml, existList, nonExistList, actionConf);
480                return actionXml.toString();
481            }
482        }
483    
484        /**
485         * Materialize all <input-events>/<data-in> or <output-events>/<data-out>
486         * tags Create uris for resolved instances. Create unresolved instance for
487         * latest()/future().
488         *
489         * @param events
490         * @param appInst
491         * @param conf
492         * @throws Exception
493         */
494        public static void materializeDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf,
495                StringBuffer dependencyList) throws Exception {
496    
497            if (events == null) {
498                return;
499            }
500            StringBuffer unresolvedList = new StringBuffer();
501            for (Element event : events) {
502                StringBuilder instances = new StringBuilder();
503                ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event, appInst, conf);
504                // Handle list of instance tag
505                resolveInstances(event, instances, appInst, conf, eval);
506                // Handle start-instance and end-instance
507                resolveInstanceRange(event, instances, appInst, conf, eval);
508                // Separate out the unresolved instances
509                separateResolvedAndUnresolved(event, instances, dependencyList);
510                String tmpUnresolved = event.getChildTextTrim("unresolved-instances", event.getNamespace());
511                if (tmpUnresolved != null) {
512                    if (unresolvedList.length() > 0) {
513                        unresolvedList.append(CoordELFunctions.INSTANCE_SEPARATOR);
514                    }
515                    unresolvedList.append(tmpUnresolved);
516                }
517            }
518            if (unresolvedList.length() > 0) {
519                dependencyList.append(RESOLVED_UNRESOLVED_SEPARATOR);
520                dependencyList.append(unresolvedList);
521            }
522            return;
523        }
524    
525        /**
526         * Get resolved string from missDepList
527         *
528         * @param missDepList
529         * @param resolved
530         * @param unresolved
531         * @return resolved string
532         */
533        public static String getResolvedList(String missDepList, StringBuilder resolved, StringBuilder unresolved) {
534            if (missDepList != null) {
535                int index = missDepList.indexOf(RESOLVED_UNRESOLVED_SEPARATOR);
536                if (index < 0) {
537                    resolved.append(missDepList);
538                }
539                else {
540                    resolved.append(missDepList.substring(0, index));
541                    unresolved.append(missDepList.substring(index + 1));
542                }
543            }
544            return resolved.toString();
545        }
546    
547    }