001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.coord.input.logic; 020 021import java.io.IOException; 022import java.net.URISyntaxException; 023import java.util.ArrayList; 024import java.util.List; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.oozie.CoordinatorActionBean; 028import org.apache.oozie.ErrorCode; 029import org.apache.oozie.coord.input.dependency.AbstractCoordInputDependency; 030import org.apache.oozie.coord.input.dependency.CoordInputInstance; 031import org.apache.oozie.dependency.URIHandler; 032import org.apache.oozie.dependency.URIHandlerException; 033import org.apache.oozie.service.Services; 034import org.apache.oozie.service.URIHandlerService; 035import org.apache.oozie.util.ELEvaluator; 036 037public class CoordInputLogicEvaluatorPhaseThree extends CoordInputLogicEvaluatorPhaseOne { 038 039 ELEvaluator eval; 040 041 public CoordInputLogicEvaluatorPhaseThree(CoordinatorActionBean coordAction, ELEvaluator eval) { 042 super(coordAction, (AbstractCoordInputDependency) coordAction.getPullInputDependencies()); 043 this.eval = eval; 044 } 045 046 public CoordInputLogicEvaluatorResult evalInput(String dataSet, int min, int wait) { 047 return getResultFromPullPush(coordAction, dataSet, min); 048 049 } 050 051 public CoordInputLogicEvaluatorResult evalCombineInput(String[] inputSets, int min, int wait) { 052 return combine(coordInputDependency, inputSets, min, wait); 053 } 054 055 public CoordInputLogicEvaluatorResult combine(AbstractCoordInputDependency coordInputDependency, 056 String[] inputSets, int min, int wait) { 057 058 List<String> availableList = new ArrayList<String>(); 059 060 if (coordInputDependency.getDependencyMap().get(inputSets[0]) == null) { 061 return new CoordInputLogicEvaluatorResult(CoordInputLogicEvaluatorResult.STATUS.FALSE); 062 } 063 064 try { 065 String firstInputSet = inputSets[0]; 066 List<CoordInputInstance> firstInputSetList = coordInputDependency.getDependencyMap().get(firstInputSet); 067 for (int i = 0; i < firstInputSetList.size(); i++) { 068 CoordInputInstance coordInputInstance = firstInputSetList.get(i); 069 if (!coordInputInstance.isAvailable()) { 070 for (int j = 1; j < inputSets.length; j++) { 071 if (coordInputDependency.getDependencyMap().get(inputSets[j]).get(i).isAvailable()) { 072 availableList.add(getPathWithoutDoneFlag( 073 coordInputDependency.getDependencyMap().get(inputSets[j]).get(i) 074 .getInputDataInstance(), inputSets[j])); 075 } 076 } 077 } 078 079 else { 080 availableList.add(getPathWithoutDoneFlag(coordInputInstance.getInputDataInstance(), firstInputSet)); 081 } 082 } 083 } 084 catch (Exception e) { 085 log.error(e); 086 throw new RuntimeException(ErrorCode.E1028.format("Error executing combine function " + e.getMessage())); 087 } 088 boolean allFound = availableList.size() == coordInputDependency.getDependencyMap().get(inputSets[0]).size(); 089 return getEvalResult(allFound, min, wait, availableList); 090 } 091 092 protected boolean pathExists(String sPath, Configuration actionConf) throws IOException, URISyntaxException, 093 URIHandlerException { 094 return false; 095 } 096 097 public boolean isInputWaitElapsed(int timeInMin) { 098 return true; 099 } 100 101 public String getListAsString(List<String> input, String dataSet) { 102 if (input == null || input.isEmpty()) { 103 return ""; 104 } 105 StringBuilder sb = new StringBuilder(); 106 try { 107 108 for (int i = 1; i < input.size(); i++) { 109 sb.append(getPathWithoutDoneFlag(input.get(i - 1), dataSet)).append(","); 110 } 111 sb.append(getPathWithoutDoneFlag(input.get(input.size() - 1), dataSet)); 112 } 113 catch (URIHandlerException e) { 114 log.error(e); 115 throw new RuntimeException(ErrorCode.E1028.format("Error finding path without done flag " + e.getMessage())); 116 } 117 118 return sb.toString(); 119 } 120 121 private String getPathWithoutDoneFlag(String sPath, String dataSet) throws URIHandlerException { 122 if (dataSet == null) { 123 return sPath; 124 } 125 URIHandlerService service = Services.get().get(URIHandlerService.class); 126 URIHandler handler = service.getURIHandler(sPath); 127 return handler.getURIWithoutDoneFlag(sPath, eval.getVariable(".datain." + dataSet + ".doneFlag").toString()); 128 } 129 130}