001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.coord.input.logic; 020 021import java.io.IOException; 022import java.io.StringReader; 023import java.net.URISyntaxException; 024import java.text.MessageFormat; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Date; 028import java.util.List; 029import java.util.Map; 030 031import org.apache.hadoop.conf.Configuration; 032import org.apache.oozie.CoordinatorActionBean; 033import org.apache.oozie.ErrorCode; 034import org.apache.oozie.command.coord.CoordCommandUtils; 035import org.apache.oozie.coord.input.dependency.AbstractCoordInputDependency; 036import org.apache.oozie.coord.input.dependency.CoordInputDependency; 037import org.apache.oozie.coord.input.dependency.CoordInputInstance; 038import org.apache.oozie.coord.input.dependency.CoordPullInputDependency; 039import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorResult.STATUS; 040import org.apache.oozie.dependency.URIHandlerException; 041import org.apache.oozie.util.LogUtils; 042import org.apache.oozie.util.XConfiguration; 043import org.apache.oozie.util.XLog; 044 045/** 046 * PhaseOne is for all dependencies check, except unresolved. Unresolved will be checked as part of phaseTwo. 047 * Phasethree is only to get dependencies from dataset, no hdfs/hcat check. 048 */ 049public class CoordInputLogicEvaluatorPhaseOne implements CoordInputLogicEvaluator { 050 051 protected AbstractCoordInputDependency coordInputDependency; 052 protected Map<String, List<CoordInputInstance>> dependencyMap; 053 protected CoordinatorActionBean coordAction = null; 054 protected XLog log = XLog.getLog(getClass()); 055 056 public CoordInputLogicEvaluatorPhaseOne(CoordinatorActionBean coordAction) { 057 this(coordAction, coordAction.getPullInputDependencies()); 058 } 059 060 public CoordInputLogicEvaluatorPhaseOne(CoordinatorActionBean coordAction, CoordInputDependency coordInputDependency) { 061 this.coordAction = coordAction; 062 this.coordInputDependency = (AbstractCoordInputDependency) coordInputDependency; 063 dependencyMap = ((AbstractCoordInputDependency) coordInputDependency).getDependencyMap(); 064 LogUtils.setLogInfo(coordAction.getId()); 065 066 } 067 068 public CoordInputLogicEvaluatorResult evalInput(String dataSet, int min, int wait) { 069 return input(coordInputDependency, dataSet, min, wait); 070 071 } 072 073 /** 074 * Evaluate input function with min and wait 075 * 076 * @param coordInputDependency 077 * @param dataSet 078 * @param min 079 * @param wait 080 * @return the coord input logic evaluator result 081 */ 082 public CoordInputLogicEvaluatorResult input(AbstractCoordInputDependency coordInputDependency, String dataSet, 083 int min, int wait) { 084 085 List<String> availableList = new ArrayList<String>(); 086 if (coordInputDependency.getDependencyMap().get(dataSet) == null) { 087 CoordInputLogicEvaluatorResult retData = new CoordInputLogicEvaluatorResult(); 088 if (((CoordPullInputDependency) coordAction.getPullInputDependencies()).getUnResolvedDependency(dataSet) != null) { 089 log.debug("Data set [{0}] is unresolved set, will get resolved in phasetwo", dataSet); 090 retData.setStatus(CoordInputLogicEvaluatorResult.STATUS.PHASE_TWO_EVALUATION); 091 } 092 else { 093 return getResultFromPullPush(coordAction, dataSet, min); 094 } 095 return retData; 096 } 097 boolean allFound = true; 098 try { 099 Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf())); 100 List<CoordInputInstance> firstInputSetList = coordInputDependency.getDependencyMap().get(dataSet); 101 for (int i = 0; i < firstInputSetList.size(); i++) { 102 CoordInputInstance coordInputInstance = firstInputSetList.get(i); 103 if (!coordInputInstance.isAvailable()) { 104 if (pathExists(coordInputInstance.getInputDataInstance(), actionConf)) { 105 availableList.add(coordInputInstance.getInputDataInstance()); 106 coordInputDependency.addToAvailableDependencies(dataSet, coordInputInstance); 107 } 108 else { 109 log.debug("[{0} is not found ", coordInputInstance.getInputDataInstance()); 110 allFound = false; 111 // Stop looking for dependencies, if min is not specified. 112 if (min < 0) { 113 break; 114 } 115 } 116 } 117 else { 118 availableList.add(coordInputInstance.getInputDataInstance()); 119 } 120 } 121 } 122 catch (Exception e) { 123 log.error(e); 124 throw new RuntimeException(ErrorCode.E1028.format("Error executing input function " + e.getMessage())); 125 } 126 CoordInputLogicEvaluatorResult retData = getEvalResult(allFound, min, wait, availableList); 127 128 log.debug("Resolved status of Data set [{0}] with min [{1}] and wait [{2}] = [{3}]", dataSet, min, wait, 129 retData.getStatus()); 130 return retData; 131 } 132 133 public boolean isInputWaitElapsed(int timeInMin) { 134 135 if (timeInMin == -1) { 136 return true; 137 } 138 long waitingTime = (new Date().getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction 139 .getCreatedTime().getTime())) 140 / (60 * 1000); 141 return timeInMin <= waitingTime; 142 } 143 144 public CoordInputLogicEvaluatorResult evalCombineInput(String[] inputSets, int min, int wait) { 145 return combine(coordInputDependency, inputSets, min, wait); 146 } 147 148 public CoordInputLogicEvaluatorResult combine(AbstractCoordInputDependency coordInputDependency, 149 String[] inputSets, int min, int wait) { 150 151 List<String> availableList = new ArrayList<String>(); 152 153 if (coordInputDependency.getDependencyMap().get(inputSets[0]) == null) { 154 return new CoordInputLogicEvaluatorResult(CoordInputLogicEvaluatorResult.STATUS.TIMED_WAITING); 155 } 156 157 try { 158 159 Configuration jobConf = new XConfiguration(new StringReader(coordAction.getRunConf())); 160 String firstInputSet = inputSets[0]; 161 List<CoordInputInstance> firstInputSetList = coordInputDependency.getDependencyMap().get(firstInputSet); 162 for (int i = 0; i < firstInputSetList.size(); i++) { 163 CoordInputInstance coordInputInstance = firstInputSetList.get(i); 164 boolean found = false; 165 if (!coordInputInstance.isAvailable()) { 166 if (!pathExists(coordInputInstance.getInputDataInstance(), jobConf)) { 167 log.debug(MessageFormat.format("{0} is not found. Looking from other datasets.", 168 coordInputInstance.getInputDataInstance())); 169 for (int j = 1; j < inputSets.length; j++) { 170 if (!coordInputDependency.getDependencyMap().get(inputSets[j]).get(i).isAvailable()) { 171 if (pathExists(coordInputDependency.getDependencyMap().get(inputSets[j]).get(i) 172 .getInputDataInstance(), jobConf)) { 173 coordInputDependency.addToAvailableDependencies(inputSets[j], coordInputDependency 174 .getDependencyMap().get(inputSets[j]).get(i)); 175 availableList.add(coordInputDependency.getDependencyMap().get(inputSets[j]).get(i) 176 .getInputDataInstance()); 177 log.debug(MessageFormat.format("{0} is found.", 178 coordInputInstance.getInputDataInstance())); 179 found = true; 180 } 181 182 } 183 else { 184 coordInputDependency.addToAvailableDependencies(inputSets[j], coordInputDependency 185 .getDependencyMap().get(inputSets[j]).get(i)); 186 availableList.add(coordInputDependency.getDependencyMap().get(inputSets[j]).get(i) 187 .getInputDataInstance()); 188 found = true; 189 190 } 191 } 192 } 193 else { 194 coordInputDependency.addToAvailableDependencies(firstInputSet, coordInputInstance); 195 availableList.add(coordInputInstance.getInputDataInstance()); 196 found = true; 197 } 198 } 199 else { 200 availableList.add(coordInputInstance.getInputDataInstance()); 201 found = true; 202 } 203 204 if (min < 0 && !found) { 205 // Stop looking for dependencies, if min is not specified. 206 break; 207 } 208 209 } 210 } 211 catch (Exception e) { 212 log.error(e); 213 throw new RuntimeException(ErrorCode.E1028.format("Error executing combine function " + e.getMessage())); 214 } 215 boolean allFound = availableList.size() == coordInputDependency.getDependencyMap().get(inputSets[0]).size(); 216 CoordInputLogicEvaluatorResult retData = getEvalResult(allFound, min, wait, availableList); 217 log.debug("Resolved status of Data set [{0}] with min [{1}] and wait [{2}] = [{3}]", 218 Arrays.toString(inputSets), min, wait, retData.getStatus()); 219 return retData; 220 221 } 222 223 public Configuration getConf() throws IOException { 224 return new XConfiguration(new StringReader(coordAction.getRunConf())); 225 226 } 227 228 public String getListAsString(List<String> list, String dataset) { 229 if (list == null || list.isEmpty()) { 230 return ""; 231 } 232 StringBuilder sb = new StringBuilder(); 233 for (int i = 1; i < list.size(); i++) { 234 sb.append(list.get(i - 1)).append(","); 235 } 236 sb.append(list.get(list.size() - 1)); 237 return sb.toString(); 238 } 239 240 protected CoordInputLogicEvaluatorResult getEvalResult(boolean found, int min, int wait, List<String> availableList) { 241 CoordInputLogicEvaluatorResult retData = new CoordInputLogicEvaluatorResult(); 242 if (!found && wait > 0) { 243 if (!isInputWaitElapsed(wait)) { 244 return new CoordInputLogicEvaluatorResult(STATUS.TIMED_WAITING); 245 } 246 } 247 248 if (found || (min > 0 && availableList.size() >= min)) { 249 retData.setStatus(CoordInputLogicEvaluatorResult.STATUS.TRUE); 250 retData.setDataSets(getListAsString(availableList, null)); 251 } 252 253 if (min == 0) { 254 retData.setStatus(CoordInputLogicEvaluatorResult.STATUS.TRUE); 255 } 256 257 return retData; 258 } 259 260 protected boolean pathExists(String sPath, Configuration jobConf) throws IOException, URISyntaxException, 261 URIHandlerException { 262 return CoordCommandUtils.pathExists(sPath, jobConf); 263 264 } 265 266 public CoordInputLogicEvaluatorResult getResultFromPullPush(CoordinatorActionBean coordAction, String dataSet, int min) { 267 CoordInputLogicEvaluatorResult result = new CoordInputLogicEvaluatorResult(); 268 CoordInputLogicEvaluatorResult pullResult = getEvalResult( 269 (AbstractCoordInputDependency) coordAction.getPullInputDependencies(), dataSet, min); 270 CoordInputLogicEvaluatorResult pushResult = getEvalResult( 271 (AbstractCoordInputDependency) coordAction.getPushInputDependencies(), dataSet, min); 272 result.appendDataSets(pullResult.getDataSets()); 273 result.appendDataSets(pushResult.getDataSets()); 274 275 if (pullResult.isWaiting() || pushResult.isWaiting()) { 276 result.setStatus(STATUS.TIMED_WAITING); 277 } 278 279 else if (pullResult.isPhaseTwoEvaluation() || pushResult.isPhaseTwoEvaluation()) { 280 result.setStatus(STATUS.PHASE_TWO_EVALUATION); 281 } 282 283 else if (pullResult.isTrue() || pushResult.isTrue()) { 284 result.setStatus(STATUS.TRUE); 285 } 286 else { 287 result.setStatus(STATUS.FALSE); 288 } 289 return result; 290 291 } 292 293 /** 294 * Gets evaluator Result 295 * 296 * @param coordInputDependencies the coord dependencies 297 * @param dataSet the data set 298 * @param min the min 299 * @return the coord input logic evaluator result 300 */ 301 public CoordInputLogicEvaluatorResult getEvalResult(AbstractCoordInputDependency coordInputDependencies, 302 String dataSet, int min) { 303 CoordInputLogicEvaluatorResult result = new CoordInputLogicEvaluatorResult(); 304 if ((coordInputDependencies.getAvailableDependencies(dataSet) == null || coordInputDependencies 305 .getAvailableDependencies(dataSet).isEmpty())) { 306 if (min == 0) { 307 result.setStatus(CoordInputLogicEvaluatorResult.STATUS.TRUE); 308 } 309 else { 310 result.setStatus(CoordInputLogicEvaluatorResult.STATUS.FALSE); 311 } 312 } 313 314 if (min > -1 && coordInputDependencies.getAvailableDependencies(dataSet).size() >= min) { 315 result.setStatus(CoordInputLogicEvaluatorResult.STATUS.TRUE); 316 result.appendDataSets(getListAsString(coordInputDependencies.getAvailableDependencies(dataSet), dataSet)); 317 } 318 319 else if (coordInputDependencies.isDataSetResolved(dataSet)) { 320 result.setStatus(CoordInputLogicEvaluatorResult.STATUS.TRUE); 321 result.appendDataSets(getListAsString(coordInputDependencies.getAvailableDependencies(dataSet), dataSet)); 322 } 323 return result; 324 } 325}