001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.coord.input.logic;
020
021import java.text.MessageFormat;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Date;
025import java.util.List;
026
027import org.apache.commons.lang.StringUtils;
028import org.apache.oozie.CoordinatorActionBean;
029import org.apache.oozie.coord.CoordELConstants;
030import org.apache.oozie.coord.CoordELEvaluator;
031import org.apache.oozie.coord.CoordELFunctions;
032import org.apache.oozie.coord.input.dependency.AbstractCoordInputDependency;
033import org.apache.oozie.coord.input.dependency.CoordPullInputDependency;
034import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorResult.STATUS;
035import org.apache.oozie.dependency.DependencyChecker;
036import org.apache.oozie.util.ELEvaluator;
037import org.apache.oozie.util.XmlUtils;
038import org.jdom.Element;
039import org.jdom.JDOMException;
040
041public class CoordInputLogicEvaluatorPhaseTwo extends CoordInputLogicEvaluatorPhaseOne {
042
043    Date actualTime;
044
045    public CoordInputLogicEvaluatorPhaseTwo(CoordinatorActionBean coordAction, Date actualTime) {
046        super(coordAction);
047        this.actualTime = actualTime;
048    }
049
050    public CoordInputLogicEvaluatorPhaseTwo(CoordinatorActionBean coordAction,
051            AbstractCoordInputDependency coordInputDependency) {
052        super(coordAction, coordInputDependency);
053    }
054
055    @Override
056    public CoordInputLogicEvaluatorResult evalInput(String dataSet, int min, int wait) {
057        try {
058            CoordPullInputDependency coordPullInputDependency = (CoordPullInputDependency) coordInputDependency;
059            ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, coordAction.getNominalTime(),
060                    getInputSetEvent(dataSet), getConf());
061            if (coordPullInputDependency.getUnResolvedDependency(dataSet) == null) {
062                return super.evalInput(dataSet, min, wait);
063
064            }
065            else {
066                cleanPreviousCheckData(coordPullInputDependency, dataSet);
067                List<String> unresolvedList = coordPullInputDependency.getUnResolvedDependency(dataSet)
068                        .getDependencies();
069                for (String unresolved : unresolvedList) {
070                    String resolvedPath = "";
071
072                    CoordELFunctions.evalAndWrap(eval, unresolved);
073                    boolean isResolved = (Boolean) eval.getVariable(CoordELConstants.IS_RESOLVED);
074
075                    coordPullInputDependency.setDependencyMap(dependencyMap);
076                    if (eval.getVariable(CoordELConstants.RESOLVED_PATH) != null) {
077                        resolvedPath = eval.getVariable(CoordELConstants.RESOLVED_PATH).toString();
078                    }
079                    if (resolvedPath != null) {
080                        resolvedPath = getEvalResult(isResolved, min, wait,
081                                Arrays.asList(DependencyChecker.dependenciesAsArray(resolvedPath.toString())))
082                                .getDataSets();
083
084                    }
085
086                    log.trace(MessageFormat.format("Return data is {0}", resolvedPath));
087                    log.debug(MessageFormat.format("Resolved status of Data set {0} with min {1} and wait {2}  =  {3}",
088                            dataSet, min, wait, !StringUtils.isEmpty(resolvedPath)));
089
090                    if ((isInputWaitElapsed(wait) || isResolved) && !StringUtils.isEmpty(resolvedPath)) {
091                        coordPullInputDependency.addResolvedList(dataSet, resolvedPath.toString());
092                    }
093                    else {
094                        cleanPreviousCheckData(coordPullInputDependency, dataSet);
095                        if (!isInputWaitElapsed(wait)) {
096                            return new CoordInputLogicEvaluatorResult(
097                                    CoordInputLogicEvaluatorResult.STATUS.TIMED_WAITING);
098                        }
099                        else {
100                            return new CoordInputLogicEvaluatorResult(CoordInputLogicEvaluatorResult.STATUS.FALSE);
101                        }
102                    }
103                }
104                coordPullInputDependency.getUnResolvedDependency(dataSet).setResolved(true);
105                return new CoordInputLogicEvaluatorResult(STATUS.TRUE, getListAsString(coordPullInputDependency
106                        .getUnResolvedDependency(dataSet).getResolvedList(), dataSet));
107
108            }
109        }
110        catch (Exception e) {
111            throw new RuntimeException(" event not found" + e, e);
112
113        }
114
115    }
116
117    private void cleanPreviousCheckData(CoordPullInputDependency coordPullInputDependency, String dataSet) {
118        // Previous check might have resolved and added resolved list. Cleanup any resolved list stored by previous
119        // check.
120        if (coordPullInputDependency.getUnResolvedDependency(dataSet) != null) {
121            coordPullInputDependency.getUnResolvedDependency(dataSet).setResolvedList(new ArrayList<String>());
122        }
123
124    }
125
126    @Override
127    public CoordInputLogicEvaluatorResult evalCombineInput(String[] inputSets, int min, int wait) {
128        throw new RuntimeException("Combine is not supported for latest/future");
129
130    }
131
132    @SuppressWarnings("unchecked")
133    private Element getInputSetEvent(String name) throws JDOMException {
134        Element eAction = XmlUtils.parseXml(coordAction.getActionXml().toString());
135        Element inputList = eAction.getChild("input-events", eAction.getNamespace());
136        List<Element> eDataEvents = inputList.getChildren("data-in", eAction.getNamespace());
137        for (Element dEvent : eDataEvents) {
138            if (dEvent.getAttribute("name").getValue().equals(name)) {
139                return dEvent;
140            }
141        }
142        throw new RuntimeException("Event not found");
143    }
144}