001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.io.StringReader;
021    import java.util.Date;
022    import java.util.List;
023    
024    import org.apache.hadoop.conf.Configuration;
025    import org.apache.oozie.CoordinatorActionBean;
026    import org.apache.oozie.ErrorCode;
027    import org.apache.oozie.client.CoordinatorAction;
028    import org.apache.oozie.command.CommandException;
029    import org.apache.oozie.coord.CoordELEvaluator;
030    import org.apache.oozie.coord.CoordELFunctions;
031    import org.apache.oozie.coord.CoordUtils;
032    import org.apache.oozie.coord.CoordinatorJobException;
033    import org.apache.oozie.coord.SyncCoordAction;
034    import org.apache.oozie.coord.TimeUnit;
035    import org.apache.oozie.service.Services;
036    import org.apache.oozie.service.UUIDService;
037    import org.apache.oozie.util.DateUtils;
038    import org.apache.oozie.util.ELEvaluator;
039    import org.apache.oozie.util.XConfiguration;
040    import org.apache.oozie.util.XmlUtils;
041    import org.jdom.Element;
042    
043    public class CoordCommandUtils {
044        public static int CURRENT = 0;
045        public static int LATEST = 1;
046        public static int FUTURE = 2;
047        public static int UNEXPECTED = -1;
048        public static final String RESOLVED_UNRESOLVED_SEPARATOR = ";";
049    
050        /**
051         * parse a function like coord:latest(n)/future() and return the 'n'.
052         * <p/>
053         * @param function
054         * @param event
055         * @param appInst
056         * @param conf
057         * @param restArg
058         * @return int instanceNumber
059         * @throws Exception
060         */
061        public static int getInstanceNumber(String function, Element event, SyncCoordAction appInst, Configuration conf,
062                StringBuilder restArg) throws Exception {
063            ELEvaluator eval = CoordELEvaluator
064                    .createInstancesELEvaluator("coord-action-create-inst", event, appInst, conf);
065            String newFunc = CoordELFunctions.evalAndWrap(eval, function);
066            int funcType = getFuncType(newFunc);
067            if (funcType == CURRENT || funcType == LATEST) {
068                return parseOneArg(newFunc);
069            }
070            else {
071                return parseMoreArgs(newFunc, restArg);
072            }
073        }
074    
075        private static int parseOneArg(String funcName) throws Exception {
076            int firstPos = funcName.indexOf("(");
077            int lastPos = funcName.lastIndexOf(")");
078            if (firstPos >= 0 && lastPos > firstPos) {
079                String tmp = funcName.substring(firstPos + 1, lastPos).trim();
080                if (tmp.length() > 0) {
081                    return (int) Double.parseDouble(tmp);
082                }
083            }
084            throw new RuntimeException("Unformatted function :" + funcName);
085        }
086    
087        private static int parseMoreArgs(String funcName, StringBuilder restArg) throws Exception {
088            int firstPos = funcName.indexOf("(");
089            int secondPos = funcName.lastIndexOf(",");
090            int lastPos = funcName.lastIndexOf(")");
091            if (firstPos >= 0 && secondPos > firstPos) {
092                String tmp = funcName.substring(firstPos + 1, secondPos).trim();
093                if (tmp.length() > 0) {
094                    restArg.append(funcName.substring(secondPos + 1, lastPos).trim());
095                    return (int) Double.parseDouble(tmp);
096                }
097            }
098            throw new RuntimeException("Unformatted function :" + funcName);
099        }
100    
101        /**
102         * @param EL function name
103         * @return type of EL function
104         */
105        public static int getFuncType(String function) {
106            if (function.indexOf("current") >= 0) {
107                return CURRENT;
108            }
109            else if (function.indexOf("latest") >= 0) {
110                return LATEST;
111            }
112            else if (function.indexOf("future") >= 0) {
113                return FUTURE;
114            }
115            return UNEXPECTED;
116            // throw new RuntimeException("Unexpected instance name "+ function);
117        }
118    
119        /**
120         * @param startInst: EL function name
121         * @param endInst: EL function name
122         * @throws CommandException if both are not the same function
123         */
124        public static void checkIfBothSameType(String startInst, String endInst) throws CommandException {
125            if (getFuncType(startInst) != getFuncType(endInst)) {
126                throw new CommandException(ErrorCode.E1010,
127                        " start-instance and end-instance both should be either latest or current or future\n"
128                                + " start " + startInst + " and end " + endInst);
129            }
130        }
131    
132        /**
133         * Resolve list of <instance> </instance> tags.
134         *
135         * @param event
136         * @param instances
137         * @param actionInst
138         * @param conf
139         * @param eval: ELEvalautor
140         * @throws Exception
141         */
142        public static void resolveInstances(Element event, StringBuilder instances, SyncCoordAction actionInst,
143                Configuration conf, ELEvaluator eval) throws Exception {
144            for (Element eInstance : (List<Element>) event.getChildren("instance", event.getNamespace())) {
145                if (instances.length() > 0) {
146                    instances.append(CoordELFunctions.INSTANCE_SEPARATOR);
147                }
148                instances.append(materializeInstance(event, eInstance.getTextTrim(), actionInst, conf, eval));
149            }
150            event.removeChildren("instance", event.getNamespace());
151        }
152    
153        /**
154         * Resolve <start-instance> <end-insatnce> tag. Don't resolve any
155         * latest()/future()
156         *
157         * @param event
158         * @param instances
159         * @param appInst
160         * @param conf
161         * @param eval: ELEvalautor
162         * @throws Exception
163         */
164        public static void resolveInstanceRange(Element event, StringBuilder instances, SyncCoordAction appInst,
165                Configuration conf, ELEvaluator eval) throws Exception {
166            Element eStartInst = event.getChild("start-instance", event.getNamespace());
167            Element eEndInst = event.getChild("end-instance", event.getNamespace());
168            if (eStartInst != null && eEndInst != null) {
169                String strStart = eStartInst.getTextTrim();
170                String strEnd = eEndInst.getTextTrim();
171                checkIfBothSameType(strStart, strEnd);
172                StringBuilder restArg = new StringBuilder(); // To store rest
173                                                             // arguments for
174                                                             // future
175                                                             // function
176                int startIndex = getInstanceNumber(strStart, event, appInst, conf, restArg);
177                restArg.delete(0, restArg.length());
178                int endIndex = getInstanceNumber(strEnd, event, appInst, conf, restArg);
179                if (startIndex > endIndex) {
180                    throw new CommandException(ErrorCode.E1010,
181                            " start-instance should be equal or earlier than the end-instance \n"
182                                    + XmlUtils.prettyPrint(event));
183                }
184                int funcType = getFuncType(strStart);
185                if (funcType == CURRENT) {
186                    // Everything could be resolved NOW. no latest() ELs
187                    for (int i = endIndex; i >= startIndex; i--) {
188                        String matInstance = materializeInstance(event, "${coord:current(" + i + ")}", appInst, conf, eval);
189                        if (matInstance == null || matInstance.length() == 0) {
190                            // Earlier than dataset's initial instance
191                            break;
192                        }
193                        if (instances.length() > 0) {
194                            instances.append(CoordELFunctions.INSTANCE_SEPARATOR);
195                        }
196                        instances.append(matInstance);
197                    }
198                }
199                else { // latest(n)/future() EL is present
200                    for (; startIndex <= endIndex; startIndex++) {
201                        if (instances.length() > 0) {
202                            instances.append(CoordELFunctions.INSTANCE_SEPARATOR);
203                        }
204                        if (funcType == LATEST) {
205                            instances.append("${coord:latest(" + startIndex + ")}");
206                        }
207                        else { // For future
208                            instances.append("${coord:future(" + startIndex + ",'" + restArg + "')}");
209                        }
210                    }
211                }
212                // Remove start-instance and end-instances
213                event.removeChild("start-instance", event.getNamespace());
214                event.removeChild("end-instance", event.getNamespace());
215            }
216        }
217    
218        /**
219         * Materialize one instance like current(-2)
220         *
221         * @param event : <data-in>
222         * @param expr : instance like current(-1)
223         * @param appInst : application specific info
224         * @param conf
225         * @param evalInst :ELEvaluator
226         * @return materialized date string
227         * @throws Exception
228         */
229        public static String materializeInstance(Element event, String expr, SyncCoordAction appInst, Configuration conf,
230                ELEvaluator evalInst) throws Exception {
231            if (event == null) {
232                return null;
233            }
234            // ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event,
235            // appInst, conf);
236            return CoordELFunctions.evalAndWrap(evalInst, expr);
237        }
238    
239        /**
240         * Create two new tags with <uris> and <unresolved-instances>.
241         *
242         * @param event
243         * @param instances
244         * @param dependencyList
245         * @throws Exception
246         */
247        public static void separateResolvedAndUnresolved(Element event, StringBuilder instances, StringBuffer dependencyList)
248                throws Exception {
249            StringBuilder unresolvedInstances = new StringBuilder();
250            StringBuilder urisWithDoneFlag = new StringBuilder();
251            String uris = createEarlyURIs(event, instances.toString(), unresolvedInstances, urisWithDoneFlag);
252            if (uris.length() > 0) {
253                Element uriInstance = new Element("uris", event.getNamespace());
254                uriInstance.addContent(uris);
255                event.getContent().add(1, uriInstance);
256                if (dependencyList.length() > 0) {
257                    dependencyList.append(CoordELFunctions.INSTANCE_SEPARATOR);
258                }
259                dependencyList.append(urisWithDoneFlag);
260            }
261            if (unresolvedInstances.length() > 0) {
262                Element elemInstance = new Element("unresolved-instances", event.getNamespace());
263                elemInstance.addContent(unresolvedInstances.toString());
264                event.getContent().add(1, elemInstance);
265            }
266        }
267    
268        /**
269         * The function create a list of URIs separated by "," using the instances
270         * time stamp and URI-template
271         *
272         * @param event : <data-in> event
273         * @param instances : List of time stamp separated by ","
274         * @param unresolvedInstances : list of instance with latest function
275         * @param urisWithDoneFlag : list of URIs with the done flag appended
276         * @return : list of URIs separated by ";" as a string.
277         * @throws Exception
278         */
279        public static String createEarlyURIs(Element event, String instances, StringBuilder unresolvedInstances,
280                StringBuilder urisWithDoneFlag) throws Exception {
281            if (instances == null || instances.length() == 0) {
282                return "";
283            }
284            String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR);
285            StringBuilder uris = new StringBuilder();
286    
287            Element doneFlagElement = event.getChild("dataset", event.getNamespace()).getChild("done-flag",
288                    event.getNamespace());
289            String doneFlag = CoordUtils.getDoneFlag(doneFlagElement);
290    
291            for (int i = 0; i < instanceList.length; i++) {
292                if(instanceList[i].trim().length() == 0) {
293                    continue;
294                }
295                int funcType = getFuncType(instanceList[i]);
296                if (funcType == LATEST || funcType == FUTURE) {
297                    if (unresolvedInstances.length() > 0) {
298                        unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR);
299                    }
300                    unresolvedInstances.append(instanceList[i]);
301                    continue;
302                }
303                ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]);
304                if (uris.length() > 0) {
305                    uris.append(CoordELFunctions.INSTANCE_SEPARATOR);
306                    urisWithDoneFlag.append(CoordELFunctions.INSTANCE_SEPARATOR);
307                }
308    
309                String uriPath = CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace())
310                        .getChild("uri-template", event.getNamespace()).getTextTrim());
311                uris.append(uriPath);
312                if (doneFlag.length() > 0) {
313                    uriPath += "/" + doneFlag;
314                }
315                urisWithDoneFlag.append(uriPath);
316            }
317            return uris.toString();
318        }
319    
320        /**
321         * @param eSla
322         * @param nominalTime
323         * @param conf
324         * @return boolean to determine whether the SLA element is present or not
325         * @throws CoordinatorJobException
326         */
327        public static boolean materializeSLA(Element eSla, Date nominalTime, Configuration conf)
328                throws CoordinatorJobException {
329            if (eSla == null) {
330                // eAppXml.getNamespace("sla"));
331                return false;
332            }
333            try {
334                ELEvaluator evalSla = CoordELEvaluator.createSLAEvaluator(nominalTime, conf);
335                List<Element> elemList = eSla.getChildren();
336                for (Element elem : elemList) {
337                    String updated;
338                    try {
339                        updated = CoordELFunctions.evalAndWrap(evalSla, elem.getText().trim());
340                    }
341                    catch (Exception e) {
342                        throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
343                    }
344                    elem.removeContent();
345                    elem.addContent(updated);
346                }
347            }
348            catch (Exception e) {
349                throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
350            }
351            return true;
352        }
353    
354        /**
355         * Materialize one instance for specific nominal time. It includes: 1.
356         * Materialize data events (i.e. <data-in> and <data-out>) 2. Materialize
357         * data properties (i.e dataIn(<DS>) and dataOut(<DS>) 3. remove 'start' and
358         * 'end' tag 4. Add 'instance_number' and 'nominal-time' tag
359         *
360         * @param jobId coordinator job id
361         * @param dryrun true if it is dryrun
362         * @param eAction frequency unexploded-job
363         * @param nominalTime materialization time
364         * @param actualTime action actual time
365         * @param instanceCount instance numbers
366         * @param conf job configuration
367         * @param actionBean CoordinatorActionBean to materialize
368         * @return one materialized action for specific nominal time
369         * @throws Exception
370         */
371        @SuppressWarnings("unchecked")
372        public static String materializeOneInstance(String jobId, boolean dryrun, Element eAction, Date nominalTime,
373                Date actualTime, int instanceCount, Configuration conf, CoordinatorActionBean actionBean) throws Exception {
374            String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, instanceCount + "");
375            SyncCoordAction appInst = new SyncCoordAction();
376            appInst.setActionId(actionId);
377            appInst.setName(eAction.getAttributeValue("name"));
378            appInst.setNominalTime(nominalTime);
379            appInst.setActualTime(actualTime);
380            int frequency = Integer.parseInt(eAction.getAttributeValue("frequency"));
381            appInst.setFrequency(frequency);
382            appInst.setTimeUnit(TimeUnit.valueOf(eAction.getAttributeValue("freq_timeunit")));
383            appInst.setTimeZone(DateUtils.getTimeZone(eAction.getAttributeValue("timezone")));
384            appInst.setEndOfDuration(TimeUnit.valueOf(eAction.getAttributeValue("end_of_duration")));
385    
386            StringBuffer dependencyList = new StringBuffer();
387    
388            Element inputList = eAction.getChild("input-events", eAction.getNamespace());
389            List<Element> dataInList = null;
390            if (inputList != null) {
391                dataInList = inputList.getChildren("data-in", eAction.getNamespace());
392                materializeDataEvents(dataInList, appInst, conf, dependencyList);
393            }
394    
395            Element outputList = eAction.getChild("output-events", eAction.getNamespace());
396            List<Element> dataOutList = null;
397            if (outputList != null) {
398                dataOutList = outputList.getChildren("data-out", eAction.getNamespace());
399                StringBuffer tmp = new StringBuffer();
400                // no dependency checks
401                materializeDataEvents(dataOutList, appInst, conf, tmp);
402            }
403    
404            eAction.removeAttribute("start");
405            eAction.removeAttribute("end");
406            eAction.setAttribute("instance-number", Integer.toString(instanceCount));
407            eAction.setAttribute("action-nominal-time", DateUtils.formatDateUTC(nominalTime));
408            eAction.setAttribute("action-actual-time", DateUtils.formatDateUTC(actualTime));
409    
410            boolean isSla = CoordCommandUtils.materializeSLA(eAction.getChild("action", eAction.getNamespace()).getChild(
411                    "info", eAction.getNamespace("sla")), nominalTime, conf);
412    
413            // Setting up action bean
414            actionBean.setCreatedConf(XmlUtils.prettyPrint(conf).toString());
415            actionBean.setRunConf(XmlUtils.prettyPrint(conf).toString());
416            actionBean.setCreatedTime(actualTime);
417            actionBean.setJobId(jobId);
418            actionBean.setId(actionId);
419            actionBean.setLastModifiedTime(new Date());
420            actionBean.setStatus(CoordinatorAction.Status.WAITING);
421            actionBean.setActionNumber(instanceCount);
422            actionBean.setMissingDependencies(dependencyList.toString());
423            actionBean.setNominalTime(nominalTime);
424            if (isSla == true) {
425                actionBean.setSlaXml(XmlUtils.prettyPrint(
426                        eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")))
427                        .toString());
428            }
429    
430            // actionBean.setTrackerUri(trackerUri);//TOOD:
431            // actionBean.setConsoleUrl(consoleUrl); //TODO:
432            // actionBean.setType(type);//TODO:
433            // actionBean.setErrorInfo(errorCode, errorMessage); //TODO:
434            // actionBean.setExternalStatus(externalStatus);//TODO
435            if (!dryrun) {
436                return XmlUtils.prettyPrint(eAction).toString();
437            }
438            else {
439                String action = XmlUtils.prettyPrint(eAction).toString();
440                CoordActionInputCheckXCommand coordActionInput = new CoordActionInputCheckXCommand(actionBean.getId(), actionBean.getJobId());
441                StringBuilder actionXml = new StringBuilder(action);
442                StringBuilder existList = new StringBuilder();
443                StringBuilder nonExistList = new StringBuilder();
444                StringBuilder nonResolvedList = new StringBuilder();
445                getResolvedList(actionBean.getMissingDependencies(), nonExistList, nonResolvedList);
446                Configuration actionConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
447                coordActionInput.checkInput(actionXml, existList, nonExistList, actionConf);
448                return actionXml.toString();
449            }
450        }
451    
452        /**
453         * Materialize all <input-events>/<data-in> or <output-events>/<data-out>
454         * tags Create uris for resolved instances. Create unresolved instance for
455         * latest()/future().
456         *
457         * @param events
458         * @param appInst
459         * @param conf
460         * @throws Exception
461         */
462        public static void materializeDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf,
463                StringBuffer dependencyList) throws Exception {
464    
465            if (events == null) {
466                return;
467            }
468            StringBuffer unresolvedList = new StringBuffer();
469            for (Element event : events) {
470                StringBuilder instances = new StringBuilder();
471                ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event, appInst, conf);
472                // Handle list of instance tag
473                resolveInstances(event, instances, appInst, conf, eval);
474                // Handle start-instance and end-instance
475                resolveInstanceRange(event, instances, appInst, conf, eval);
476                // Separate out the unresolved instances
477                separateResolvedAndUnresolved(event, instances, dependencyList);
478                String tmpUnresolved = event.getChildTextTrim("unresolved-instances", event.getNamespace());
479                if (tmpUnresolved != null) {
480                    if (unresolvedList.length() > 0) {
481                        unresolvedList.append(CoordELFunctions.INSTANCE_SEPARATOR);
482                    }
483                    unresolvedList.append(tmpUnresolved);
484                }
485            }
486            if (unresolvedList.length() > 0) {
487                dependencyList.append(RESOLVED_UNRESOLVED_SEPARATOR);
488                dependencyList.append(unresolvedList);
489            }
490            return;
491        }
492    
493        /**
494         * Get resolved string from missDepList
495         *
496         * @param missDepList
497         * @param resolved
498         * @param unresolved
499         * @return resolved string
500         */
501        public static String getResolvedList(String missDepList, StringBuilder resolved, StringBuilder unresolved) {
502            if (missDepList != null) {
503                int index = missDepList.indexOf(RESOLVED_UNRESOLVED_SEPARATOR);
504                if (index < 0) {
505                    resolved.append(missDepList);
506                }
507                else {
508                    resolved.append(missDepList.substring(0, index));
509                    unresolved.append(missDepList.substring(index + 1));
510                }
511            }
512            return resolved.toString();
513        }
514    
515    }