001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.coord.input.logic;
020
021import java.util.Arrays;
022import java.util.List;
023import java.util.Map;
024
025import org.apache.oozie.CoordinatorActionBean;
026import org.apache.oozie.client.CoordinatorJob;
027import org.apache.oozie.coord.input.dependency.CoordInputInstance;
028import org.apache.oozie.coord.input.dependency.CoordPullInputDependency;
029import org.apache.oozie.coord.input.dependency.CoordPushInputDependency;
030
031public class CoordInputLogicEvaluatorPhaseValidate implements CoordInputLogicEvaluator {
032
033    CoordPullInputDependency coordPullInputDependency;
034    CoordPushInputDependency coordPushInputDependency;
035
036    protected Map<String, List<CoordInputInstance>> dependencyMap;
037    protected CoordinatorActionBean coordAction = null;
038    protected CoordinatorJob coordJob = null;
039
040    public CoordInputLogicEvaluatorPhaseValidate(CoordinatorActionBean coordAction) {
041        this.coordAction = coordAction;
042        coordPullInputDependency = (CoordPullInputDependency) coordAction.getPullInputDependencies();
043        coordPushInputDependency = (CoordPushInputDependency) coordAction.getPushInputDependencies();
044
045    }
046
047    @Override
048    public CoordInputLogicEvaluatorResult evalInput(String dataSet, int min, int wait) {
049        getDataSetLen(dataSet);
050        return new CoordInputLogicEvaluatorResult(CoordInputLogicEvaluatorResult.STATUS.FALSE);
051    }
052
053    @Override
054    public CoordInputLogicEvaluatorResult evalCombineInput(String[] inputSets, int min, int wait) {
055        if (inputSets.length <= 1) {
056            throw new RuntimeException("Combine should have at least two input sets. DataSets : "
057                    + Arrays.toString(inputSets));
058        }
059        int firstInputSetLen = getDataSetLen(inputSets[0]);
060        for (int i = 1; i < inputSets.length; i++) {
061            if (getDataSetLen(inputSets[i]) != firstInputSetLen) {
062                throw new RuntimeException("Combine should have same range. DataSets : " + Arrays.toString(inputSets));
063            }
064            if (coordPullInputDependency.getUnResolvedDependency(inputSets[i]) != null) {
065                throw new RuntimeException("Combine is not supported for latest/future");
066            }
067        }
068        return new CoordInputLogicEvaluatorResult(CoordInputLogicEvaluatorResult.STATUS.FALSE);
069    }
070
071    private int getDataSetLen(String dataset) {
072        if (coordAction.getPullInputDependencies() != null) {
073            if (coordPullInputDependency.getDependencyMap().get(dataset) != null) {
074                return coordPullInputDependency.getDependencyMap().get(dataset).size();
075            }
076
077            if (coordPullInputDependency.getUnResolvedDependency(dataset) != null) {
078                return 1;
079            }
080
081        }
082        if (coordAction.getPushInputDependencies() != null) {
083            if (coordPushInputDependency.getDependencyMap().get(dataset) != null) {
084                return coordPushInputDependency.getDependencyMap().get(dataset).size();
085            }
086        }
087        throw new RuntimeException(" Data set not found : " + dataset);
088    }
089}