001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.coord.input.logic; 020 021import java.util.Arrays; 022import java.util.List; 023import java.util.Map; 024 025import org.apache.oozie.CoordinatorActionBean; 026import org.apache.oozie.client.CoordinatorJob; 027import org.apache.oozie.coord.input.dependency.CoordInputInstance; 028import org.apache.oozie.coord.input.dependency.CoordPullInputDependency; 029import org.apache.oozie.coord.input.dependency.CoordPushInputDependency; 030 031public class CoordInputLogicEvaluatorPhaseValidate implements CoordInputLogicEvaluator { 032 033 CoordPullInputDependency coordPullInputDependency; 034 CoordPushInputDependency coordPushInputDependency; 035 036 protected Map<String, List<CoordInputInstance>> dependencyMap; 037 protected CoordinatorActionBean coordAction = null; 038 protected CoordinatorJob coordJob = null; 039 040 public CoordInputLogicEvaluatorPhaseValidate(CoordinatorActionBean coordAction) { 041 this.coordAction = coordAction; 042 coordPullInputDependency = (CoordPullInputDependency) coordAction.getPullInputDependencies(); 043 coordPushInputDependency = (CoordPushInputDependency) coordAction.getPushInputDependencies(); 044 045 } 046 047 @Override 048 public CoordInputLogicEvaluatorResult evalInput(String dataSet, int min, int wait) { 049 getDataSetLen(dataSet); 050 return new CoordInputLogicEvaluatorResult(CoordInputLogicEvaluatorResult.STATUS.FALSE); 051 } 052 053 @Override 054 public CoordInputLogicEvaluatorResult evalCombineInput(String[] inputSets, int min, int wait) { 055 if (inputSets.length <= 1) { 056 throw new RuntimeException("Combine should have at least two input sets. DataSets : " 057 + Arrays.toString(inputSets)); 058 } 059 int firstInputSetLen = getDataSetLen(inputSets[0]); 060 for (int i = 1; i < inputSets.length; i++) { 061 if (getDataSetLen(inputSets[i]) != firstInputSetLen) { 062 throw new RuntimeException("Combine should have same range. DataSets : " + Arrays.toString(inputSets)); 063 } 064 if (coordPullInputDependency.getUnResolvedDependency(inputSets[i]) != null) { 065 throw new RuntimeException("Combine is not supported for latest/future"); 066 } 067 } 068 return new CoordInputLogicEvaluatorResult(CoordInputLogicEvaluatorResult.STATUS.FALSE); 069 } 070 071 private int getDataSetLen(String dataset) { 072 if (coordAction.getPullInputDependencies() != null) { 073 if (coordPullInputDependency.getDependencyMap().get(dataset) != null) { 074 return coordPullInputDependency.getDependencyMap().get(dataset).size(); 075 } 076 077 if (coordPullInputDependency.getUnResolvedDependency(dataset) != null) { 078 return 1; 079 } 080 081 } 082 if (coordAction.getPushInputDependencies() != null) { 083 if (coordPushInputDependency.getDependencyMap().get(dataset) != null) { 084 return coordPushInputDependency.getDependencyMap().get(dataset).size(); 085 } 086 } 087 throw new RuntimeException(" Data set not found : " + dataset); 088 } 089}