001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.coord.input.dependency; 020 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Date; 027import java.util.HashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.Map.Entry; 031import java.util.Set; 032 033import org.apache.commons.lang.StringUtils; 034import org.apache.hadoop.io.Writable; 035import org.apache.oozie.CoordinatorActionBean; 036import org.apache.oozie.command.CommandException; 037import org.apache.oozie.command.coord.CoordCommandUtils; 038import org.apache.oozie.coord.CoordELFunctions; 039import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorUtil; 040import org.apache.oozie.dependency.ActionDependency; 041import org.apache.oozie.util.DateUtils; 042import org.apache.oozie.util.WritableUtils; 043import org.jdom.Element; 044import org.jdom.JDOMException; 045 046public abstract class AbstractCoordInputDependency implements Writable, CoordInputDependency { 047 protected boolean isDependencyMet = false; 048 /* 049 * Transient variables only used for processing, not stored in DB. 050 */ 051 protected transient Map<String, List<String>> missingDependenciesSet = new HashMap<String, List<String>>(); 052 protected transient Map<String, List<String>> availableDependenciesSet = new HashMap<String, List<String>>(); 053 protected Map<String, List<CoordInputInstance>> dependencyMap = new HashMap<String, List<CoordInputInstance>>(); 054 055 public AbstractCoordInputDependency() { 056 } 057 058 059 public AbstractCoordInputDependency(Map<String, List<CoordInputInstance>> dependencyMap) { 060 this.dependencyMap = dependencyMap; 061 generateDependencies(); 062 } 063 064 public void addInputInstanceList(String inputEventName, List<CoordInputInstance> inputInstanceList) { 065 dependencyMap.put(inputEventName, inputInstanceList); 066 } 067 068 public Map<String, List<CoordInputInstance>> getDependencyMap() { 069 return dependencyMap; 070 } 071 072 public void setDependencyMap(Map<String, List<CoordInputInstance>> dependencyMap) { 073 this.dependencyMap = dependencyMap; 074 } 075 076 public void addToAvailableDependencies(String dataSet, CoordInputInstance coordInputInstance) { 077 coordInputInstance.setAvailability(true); 078 List<String> availableSet = availableDependenciesSet.get(dataSet); 079 if (availableSet == null) { 080 availableSet = new ArrayList<String>(); 081 availableDependenciesSet.put(dataSet, availableSet); 082 } 083 availableSet.add(coordInputInstance.getInputDataInstance()); 084 removeFromMissingDependencies(dataSet, coordInputInstance); 085 } 086 087 public void removeFromMissingDependencies(String dataSet, CoordInputInstance coordInputInstance) { 088 coordInputInstance.setAvailability(true); 089 List<String> missingSet = missingDependenciesSet.get(dataSet); 090 if (missingSet != null) { 091 missingSet.remove(coordInputInstance.getInputDataInstance()); 092 if (missingSet.isEmpty()) { 093 missingDependenciesSet.remove(dataSet); 094 } 095 } 096 097 } 098 099 public void addToMissingDependencies(String dataSet, CoordInputInstance coordInputInstance) { 100 List<String> availableSet = missingDependenciesSet.get(dataSet); 101 if (availableSet == null) { 102 availableSet = new ArrayList<String>(); 103 } 104 availableSet.add(coordInputInstance.getInputDataInstance()); 105 missingDependenciesSet.put(dataSet, availableSet); 106 107 } 108 109 protected void generateDependencies() { 110 try { 111 missingDependenciesSet = new HashMap<String, List<String>>(); 112 availableDependenciesSet = new HashMap<String, List<String>>(); 113 114 Set<String> keySets = dependencyMap.keySet(); 115 for (String key : keySets) { 116 for (CoordInputInstance coordInputInstance : dependencyMap.get(key)) 117 if (coordInputInstance.isAvailable()) { 118 addToAvailableDependencies(key, coordInputInstance); 119 } 120 else { 121 addToMissingDependencies(key, coordInputInstance); 122 } 123 } 124 } 125 catch (Exception e) { 126 throw new RuntimeException(e); 127 } 128 129 } 130 131 public List<String> getAvailableDependencies(String dataSet) { 132 if (availableDependenciesSet.get(dataSet) != null) { 133 return availableDependenciesSet.get(dataSet); 134 } 135 else { 136 return new ArrayList<String>(); 137 } 138 139 } 140 141 public String getMissingDependencies(String dataSet) { 142 StringBuilder sb = new StringBuilder(); 143 for (String dependencies : missingDependenciesSet.get(dataSet)) { 144 sb.append(dependencies).append("#"); 145 } 146 return sb.toString(); 147 } 148 149 public void addToAvailableDependencies(String dataSet, String availableSet) { 150 List<CoordInputInstance> list = dependencyMap.get(dataSet); 151 if (list == null) { 152 list = new ArrayList<CoordInputInstance>(); 153 dependencyMap.put(dataSet, list); 154 } 155 156 for (String available : availableSet.split(CoordELFunctions.INSTANCE_SEPARATOR)) { 157 CoordInputInstance coordInstance = new CoordInputInstance(available, true); 158 list.add(coordInstance); 159 addToAvailableDependencies(dataSet, coordInstance); 160 } 161 162 } 163 164 public String getMissingDependencies() { 165 StringBuilder sb = new StringBuilder(); 166 if (missingDependenciesSet != null) { 167 for (List<String> dependenciesList : missingDependenciesSet.values()) { 168 for (String dependencies : dependenciesList) { 169 sb.append(dependencies).append("#"); 170 } 171 } 172 } 173 return sb.toString(); 174 } 175 176 public List<String> getMissingDependenciesAsList() { 177 List<String> missingDependencies = new ArrayList<String>(); 178 for (List<String> dependenciesList : missingDependenciesSet.values()) { 179 missingDependencies.addAll(dependenciesList); 180 } 181 return missingDependencies; 182 } 183 184 public List<String> getAvailableDependenciesAsList() { 185 List<String> availableDependencies = new ArrayList<String>(); 186 for (List<String> dependenciesList : availableDependenciesSet.values()) { 187 availableDependencies.addAll(dependenciesList); 188 189 } 190 return availableDependencies; 191 } 192 193 public String serialize() throws IOException { 194 return CoordInputDependencyFactory.getMagicNumber() 195 + new String(WritableUtils.toByteArray(this), CoordInputDependencyFactory.CHAR_ENCODING); 196 197 } 198 199 public String getListAsString(List<String> dataSets) { 200 StringBuilder sb = new StringBuilder(); 201 for (String dependencies : dataSets) { 202 sb.append(dependencies).append("#"); 203 } 204 205 return sb.toString(); 206 } 207 208 public void setDependencyMet(boolean isDependencyMeet) { 209 this.isDependencyMet = isDependencyMeet; 210 } 211 212 public boolean isDependencyMet() { 213 return missingDependenciesSet.isEmpty() || isDependencyMet; 214 } 215 216 public boolean isUnResolvedDependencyMet() { 217 return false; 218 } 219 220 221 @Override 222 public void addToAvailableDependencies(Collection<String> availableList) { 223 for (Entry<String, List<CoordInputInstance>> dependenciesList : dependencyMap.entrySet()) { 224 for (CoordInputInstance coordInputInstance : dependenciesList.getValue()) { 225 if (availableList.contains(coordInputInstance.getInputDataInstance())) 226 addToAvailableDependencies(dependenciesList.getKey(), coordInputInstance); 227 } 228 } 229 } 230 231 @Override 232 public ActionDependency checkPushMissingDependencies(CoordinatorActionBean coordAction, 233 boolean registerForNotification) throws CommandException, IOException, 234 JDOMException { 235 boolean status = new CoordInputLogicEvaluatorUtil(coordAction).checkPushDependencies(); 236 if (status) { 237 coordAction.getPushInputDependencies().setDependencyMet(true); 238 } 239 return new ActionDependency(coordAction.getPushInputDependencies().getMissingDependenciesAsList(), coordAction 240 .getPushInputDependencies().getAvailableDependenciesAsList()); 241 242 } 243 244 public boolean checkPullMissingDependencies(CoordinatorActionBean coordAction, 245 StringBuilder existList, StringBuilder nonExistList) throws IOException, JDOMException { 246 boolean status = new CoordInputLogicEvaluatorUtil(coordAction).checkPullMissingDependencies(); 247 if (status) { 248 coordAction.getPullInputDependencies().setDependencyMet(true); 249 } 250 return status; 251 252 } 253 254 public boolean isChangeInDependency(StringBuilder nonExistList, String missingDependencies, 255 StringBuilder nonResolvedList, boolean status) { 256 if (!StringUtils.isEmpty(missingDependencies)) { 257 return !missingDependencies.equals(getMissingDependencies()); 258 } 259 else { 260 return true; 261 } 262 } 263 264 @SuppressWarnings("unchecked") 265 public boolean checkUnresolved(CoordinatorActionBean coordAction, Element eAction) 266 throws Exception { 267 String actualTimeStr = eAction.getAttributeValue("action-actual-time"); 268 Element inputList = eAction.getChild("input-events", eAction.getNamespace()); 269 Date actualTime = null; 270 if (actualTimeStr == null) { 271 actualTime = new Date(); 272 } 273 else { 274 actualTime = DateUtils.parseDateOozieTZ(actualTimeStr); 275 } 276 if (inputList == null) { 277 return true; 278 } 279 List<Element> eDataEvents = inputList.getChildren("data-in", eAction.getNamespace()); 280 for (Element dEvent : eDataEvents) { 281 if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()) == null) { 282 continue; 283 } 284 String unResolvedInstance = dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, 285 dEvent.getNamespace()).getTextTrim(); 286 String name = dEvent.getAttribute("name").getValue(); 287 addUnResolvedList(name, unResolvedInstance); 288 } 289 return new CoordInputLogicEvaluatorUtil(coordAction).checkUnResolved(actualTime); 290 } 291 292 @Override 293 public void write(DataOutput out) throws IOException { 294 WritableUtils.writeStringAsBytes(out,INTERNAL_VERSION_ID); 295 out.writeBoolean(isDependencyMet); 296 WritableUtils.writeMapWithList(out, dependencyMap); 297 298 } 299 300 @Override 301 public void readFields(DataInput in) throws IOException { 302 WritableUtils.readBytesAsString(in); 303 this.isDependencyMet = in.readBoolean(); 304 dependencyMap = WritableUtils.readMapWithList(in, CoordInputInstance.class); 305 generateDependencies(); 306 } 307 308 public boolean isDataSetResolved(String dataSet){ 309 if(getAvailableDependencies(dataSet) ==null|| getDependencyMap().get(dataSet) == null){ 310 return false; 311 } 312 return getAvailableDependencies(dataSet).size() == getDependencyMap().get(dataSet).size(); 313 } 314 315}