001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.coord.input.logic; 020 021import java.text.MessageFormat; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Date; 025import java.util.List; 026 027import org.apache.commons.lang.StringUtils; 028import org.apache.oozie.CoordinatorActionBean; 029import org.apache.oozie.coord.CoordELConstants; 030import org.apache.oozie.coord.CoordELEvaluator; 031import org.apache.oozie.coord.CoordELFunctions; 032import org.apache.oozie.coord.input.dependency.AbstractCoordInputDependency; 033import org.apache.oozie.coord.input.dependency.CoordPullInputDependency; 034import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorResult.STATUS; 035import org.apache.oozie.dependency.DependencyChecker; 036import org.apache.oozie.util.ELEvaluator; 037import org.apache.oozie.util.XmlUtils; 038import org.jdom.Element; 039import org.jdom.JDOMException; 040 041public class CoordInputLogicEvaluatorPhaseTwo extends CoordInputLogicEvaluatorPhaseOne { 042 043 Date actualTime; 044 045 public CoordInputLogicEvaluatorPhaseTwo(CoordinatorActionBean coordAction, Date actualTime) { 046 super(coordAction); 047 this.actualTime = actualTime; 048 } 049 050 public CoordInputLogicEvaluatorPhaseTwo(CoordinatorActionBean coordAction, 051 AbstractCoordInputDependency coordInputDependency) { 052 super(coordAction, coordInputDependency); 053 } 054 055 @Override 056 public CoordInputLogicEvaluatorResult evalInput(String dataSet, int min, int wait) { 057 try { 058 CoordPullInputDependency coordPullInputDependency = (CoordPullInputDependency) coordInputDependency; 059 ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, coordAction.getNominalTime(), 060 getInputSetEvent(dataSet), getConf()); 061 if (coordPullInputDependency.getUnResolvedDependency(dataSet) == null) { 062 return super.evalInput(dataSet, min, wait); 063 064 } 065 else { 066 cleanPreviousCheckData(coordPullInputDependency, dataSet); 067 List<String> unresolvedList = coordPullInputDependency.getUnResolvedDependency(dataSet) 068 .getDependencies(); 069 for (String unresolved : unresolvedList) { 070 String resolvedPath = ""; 071 072 CoordELFunctions.evalAndWrap(eval, unresolved); 073 boolean isResolved = (Boolean) eval.getVariable(CoordELConstants.IS_RESOLVED); 074 075 coordPullInputDependency.setDependencyMap(dependencyMap); 076 if (eval.getVariable(CoordELConstants.RESOLVED_PATH) != null) { 077 resolvedPath = eval.getVariable(CoordELConstants.RESOLVED_PATH).toString(); 078 } 079 if (resolvedPath != null) { 080 resolvedPath = getEvalResult(isResolved, min, wait, 081 Arrays.asList(DependencyChecker.dependenciesAsArray(resolvedPath.toString()))) 082 .getDataSets(); 083 084 } 085 086 log.trace(MessageFormat.format("Return data is {0}", resolvedPath)); 087 log.debug(MessageFormat.format("Resolved status of Data set {0} with min {1} and wait {2} = {3}", 088 dataSet, min, wait, !StringUtils.isEmpty(resolvedPath))); 089 090 if ((isInputWaitElapsed(wait) || isResolved) && !StringUtils.isEmpty(resolvedPath)) { 091 coordPullInputDependency.addResolvedList(dataSet, resolvedPath.toString()); 092 } 093 else { 094 cleanPreviousCheckData(coordPullInputDependency, dataSet); 095 if (!isInputWaitElapsed(wait)) { 096 return new CoordInputLogicEvaluatorResult( 097 CoordInputLogicEvaluatorResult.STATUS.TIMED_WAITING); 098 } 099 else { 100 return new CoordInputLogicEvaluatorResult(CoordInputLogicEvaluatorResult.STATUS.FALSE); 101 } 102 } 103 } 104 coordPullInputDependency.getUnResolvedDependency(dataSet).setResolved(true); 105 return new CoordInputLogicEvaluatorResult(STATUS.TRUE, getListAsString(coordPullInputDependency 106 .getUnResolvedDependency(dataSet).getResolvedList(), dataSet)); 107 108 } 109 } 110 catch (Exception e) { 111 throw new RuntimeException(" event not found" + e, e); 112 113 } 114 115 } 116 117 private void cleanPreviousCheckData(CoordPullInputDependency coordPullInputDependency, String dataSet) { 118 // Previous check might have resolved and added resolved list. Cleanup any resolved list stored by previous 119 // check. 120 if (coordPullInputDependency.getUnResolvedDependency(dataSet) != null) { 121 coordPullInputDependency.getUnResolvedDependency(dataSet).setResolvedList(new ArrayList<String>()); 122 } 123 124 } 125 126 @Override 127 public CoordInputLogicEvaluatorResult evalCombineInput(String[] inputSets, int min, int wait) { 128 throw new RuntimeException("Combine is not supported for latest/future"); 129 130 } 131 132 @SuppressWarnings("unchecked") 133 private Element getInputSetEvent(String name) throws JDOMException { 134 Element eAction = XmlUtils.parseXml(coordAction.getActionXml().toString()); 135 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 136 List<Element> eDataEvents = inputList.getChildren("data-in", eAction.getNamespace()); 137 for (Element dEvent : eDataEvents) { 138 if (dEvent.getAttribute("name").getValue().equals(name)) { 139 return dEvent; 140 } 141 } 142 throw new RuntimeException("Event not found"); 143 } 144}