001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.coord.input.logic;
020
021import java.io.IOException;
022import java.net.URISyntaxException;
023import java.util.ArrayList;
024import java.util.List;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.oozie.CoordinatorActionBean;
028import org.apache.oozie.ErrorCode;
029import org.apache.oozie.coord.input.dependency.AbstractCoordInputDependency;
030import org.apache.oozie.coord.input.dependency.CoordInputInstance;
031import org.apache.oozie.dependency.URIHandler;
032import org.apache.oozie.dependency.URIHandlerException;
033import org.apache.oozie.service.Services;
034import org.apache.oozie.service.URIHandlerService;
035import org.apache.oozie.util.ELEvaluator;
036
037public class CoordInputLogicEvaluatorPhaseThree extends CoordInputLogicEvaluatorPhaseOne {
038
039    ELEvaluator eval;
040
041    public CoordInputLogicEvaluatorPhaseThree(CoordinatorActionBean coordAction, ELEvaluator eval) {
042        super(coordAction, (AbstractCoordInputDependency) coordAction.getPullInputDependencies());
043        this.eval = eval;
044    }
045
046    public CoordInputLogicEvaluatorResult evalInput(String dataSet, int min, int wait) {
047        return getResultFromPullPush(coordAction, dataSet, min);
048
049    }
050
051    public CoordInputLogicEvaluatorResult evalCombineInput(String[] inputSets, int min, int wait) {
052        return combine(coordInputDependency, inputSets, min, wait);
053    }
054
055    public CoordInputLogicEvaluatorResult combine(AbstractCoordInputDependency coordInputDependency,
056            String[] inputSets, int min, int wait) {
057
058        List<String> availableList = new ArrayList<String>();
059
060        if (coordInputDependency.getDependencyMap().get(inputSets[0]) == null) {
061            return new CoordInputLogicEvaluatorResult(CoordInputLogicEvaluatorResult.STATUS.FALSE);
062        }
063
064        try {
065            String firstInputSet = inputSets[0];
066            List<CoordInputInstance> firstInputSetList = coordInputDependency.getDependencyMap().get(firstInputSet);
067            for (int i = 0; i < firstInputSetList.size(); i++) {
068                CoordInputInstance coordInputInstance = firstInputSetList.get(i);
069                if (!coordInputInstance.isAvailable()) {
070                    for (int j = 1; j < inputSets.length; j++) {
071                        if (coordInputDependency.getDependencyMap().get(inputSets[j]).get(i).isAvailable()) {
072                            availableList.add(getPathWithoutDoneFlag(
073                                    coordInputDependency.getDependencyMap().get(inputSets[j]).get(i)
074                                            .getInputDataInstance(), inputSets[j]));
075                        }
076                    }
077                }
078
079                else {
080                    availableList.add(getPathWithoutDoneFlag(coordInputInstance.getInputDataInstance(), firstInputSet));
081                }
082            }
083        }
084        catch (Exception e) {
085            log.error(e);
086            throw new RuntimeException(ErrorCode.E1028.format("Error executing combine function " + e.getMessage()));
087        }
088        boolean allFound = availableList.size() == coordInputDependency.getDependencyMap().get(inputSets[0]).size();
089        return getEvalResult(allFound, min, wait, availableList);
090    }
091
092    protected boolean pathExists(String sPath, Configuration actionConf) throws IOException, URISyntaxException,
093            URIHandlerException {
094        return false;
095    }
096
097    public boolean isInputWaitElapsed(int timeInMin) {
098        return true;
099    }
100
101    public String getListAsString(List<String> input, String dataSet) {
102        if (input == null || input.isEmpty()) {
103            return "";
104        }
105        StringBuilder sb = new StringBuilder();
106        try {
107
108            for (int i = 1; i < input.size(); i++) {
109                sb.append(getPathWithoutDoneFlag(input.get(i - 1), dataSet)).append(",");
110            }
111            sb.append(getPathWithoutDoneFlag(input.get(input.size() - 1), dataSet));
112        }
113        catch (URIHandlerException e) {
114            log.error(e);
115            throw new RuntimeException(ErrorCode.E1028.format("Error finding path without done flag " + e.getMessage()));
116        }
117
118        return sb.toString();
119    }
120
121    private String getPathWithoutDoneFlag(String sPath, String dataSet) throws URIHandlerException {
122        if (dataSet == null) {
123            return sPath;
124        }
125        URIHandlerService service = Services.get().get(URIHandlerService.class);
126        URIHandler handler = service.getURIHandler(sPath);
127        return handler.getURIWithoutDoneFlag(sPath, eval.getVariable(".datain." + dataSet + ".doneFlag").toString());
128    }
129
130}