001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.dependency.hcat; 019 020import java.net.URISyntaxException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Map; 029import java.util.Map.Entry; 030import java.util.Set; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.ConcurrentMap; 033 034import org.apache.hadoop.conf.Configuration; 035import org.apache.oozie.service.HCatAccessorService; 036import org.apache.oozie.service.Services; 037import org.apache.oozie.util.HCatURI; 038import org.apache.oozie.util.XLog; 039 040public class SimpleHCatDependencyCache implements HCatDependencyCache { 041 042 private static XLog LOG = XLog.getLog(SimpleHCatDependencyCache.class); 043 private static String DELIMITER = ";"; 044 045 /** 046 * Map of server;db;table - sorter partition key order (country;dt;state) - sorted partition 047 * value (us;20120101;CA) - Collection of waiting actions (actionID and original hcat uri as 048 * string). 049 */ 050 private ConcurrentMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>> missingDeps; 051 052 /** 053 * Map of actionIDs and collection of available URIs 054 */ 055 private ConcurrentMap<String, Collection<String>> availableDeps; 056 057 /** 058 * Map of actionIDs and partitions for reverse-lookup in purging 059 */ 060 private ConcurrentMap<String, ConcurrentMap<String, Collection<String>>> actionPartitionMap; 061 062 @Override 063 public void init(Configuration conf) { 064 missingDeps = new ConcurrentHashMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>>(); 065 availableDeps = new ConcurrentHashMap<String, Collection<String>>(); 066 actionPartitionMap = new ConcurrentHashMap<String, ConcurrentMap<String, Collection<String>>>(); 067 } 068 069 @Override 070 public void addMissingDependency(HCatURI hcatURI, String actionID) { 071 String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable(); 072 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); 073 // Partition keys seperated by ;. For eg: date;country;state 074 String partKey = sortedPKV.getPartKeys(); 075 // Partition values seperated by ;. For eg: 20120101;US;CA 076 String partVal = sortedPKV.getPartVals(); 077 ConcurrentMap<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey); 078 if (partKeyPatterns == null) { 079 partKeyPatterns = new ConcurrentHashMap<String, Map<String, Collection<WaitingAction>>>(); 080 ConcurrentMap<String, Map<String, Collection<WaitingAction>>> existingMap = missingDeps.putIfAbsent( 081 tableKey, partKeyPatterns); 082 if (existingMap != null) { 083 partKeyPatterns = existingMap; 084 } 085 } 086 ConcurrentMap<String, Collection<String>> partitionMap = actionPartitionMap.get(actionID); 087 if (partitionMap == null) { 088 partitionMap = new ConcurrentHashMap<String, Collection<String>>(); 089 ConcurrentMap<String, Collection<String>> existingPartMap = actionPartitionMap.putIfAbsent(actionID, 090 partitionMap); 091 if (existingPartMap != null) { 092 partitionMap = existingPartMap; 093 } 094 } 095 synchronized (partitionMap) { 096 Collection<String> partKeys = partitionMap.get(tableKey); 097 if (partKeys == null) { 098 partKeys = new ArrayList<String>(); 099 } 100 partKeys.add(partKey); 101 partitionMap.put(tableKey, partKeys); 102 } 103 synchronized (partKeyPatterns) { 104 missingDeps.put(tableKey, partKeyPatterns); // To handle race condition with removal of partKeyPatterns 105 Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey); 106 if (partValues == null) { 107 partValues = new HashMap<String, Collection<WaitingAction>>(); 108 partKeyPatterns.put(partKey, partValues); 109 } 110 Collection<WaitingAction> waitingActions = partValues.get(partVal); 111 if (waitingActions == null) { 112 waitingActions = new HashSet<WaitingAction>(); 113 partValues.put(partVal, waitingActions); 114 } 115 waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString())); 116 } 117 } 118 119 @Override 120 public boolean removeMissingDependency(HCatURI hcatURI, String actionID) { 121 String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable(); 122 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); 123 String partKey = sortedPKV.getPartKeys(); 124 String partVal = sortedPKV.getPartVals(); 125 Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey); 126 if (partKeyPatterns == null) { 127 LOG.warn("Remove missing dependency - Missing table entry - uri={0}, actionID={1}", 128 hcatURI.toURIString(), actionID); 129 return false; 130 } 131 ConcurrentMap<String, Collection<String>> partitionMap = actionPartitionMap.get(actionID); 132 if (partitionMap != null) { 133 synchronized (partitionMap) { 134 Collection<String> partKeys = partitionMap.get(tableKey); 135 if (partKeys != null) { 136 partKeys.remove(partKey); 137 } 138 if (partKeys.size() == 0) { 139 partitionMap.remove(tableKey); 140 } 141 if (partitionMap.size() == 0) { 142 actionPartitionMap.remove(actionID); 143 } 144 } 145 } 146 147 synchronized(partKeyPatterns) { 148 Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey); 149 if (partValues == null) { 150 LOG.warn("Remove missing dependency - Missing partition pattern - uri={0}, actionID={1}", 151 hcatURI.toURIString(), actionID); 152 return false; 153 } 154 Collection<WaitingAction> waitingActions = partValues.get(partVal); 155 if (waitingActions == null) { 156 LOG.warn("Remove missing dependency - Missing partition value - uri={0}, actionID={1}", 157 hcatURI.toURIString(), actionID); 158 return false; 159 } 160 boolean removed = waitingActions.remove(new WaitingAction(actionID, hcatURI.toURIString())); 161 if (!removed) { 162 LOG.warn("Remove missing dependency - Missing action ID - uri={0}, actionID={1}", 163 hcatURI.toURIString(), actionID); 164 } 165 if (waitingActions.isEmpty()) { 166 partValues.remove(partVal); 167 if (partValues.isEmpty()) { 168 partKeyPatterns.remove(partKey); 169 } 170 if (partKeyPatterns.isEmpty()) { 171 missingDeps.remove(tableKey); 172 // Close JMS session. Stop listening on topic 173 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 174 hcatService.unregisterFromNotification(hcatURI); 175 } 176 } 177 return removed; 178 } 179 } 180 181 @Override 182 public Collection<String> getWaitingActions(HCatURI hcatURI) { 183 String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable(); 184 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); 185 String partKey = sortedPKV.getPartKeys(); 186 String partVal = sortedPKV.getPartVals(); 187 Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey); 188 if (partKeyPatterns == null) { 189 return null; 190 } 191 Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey); 192 if (partValues == null) { 193 return null; 194 } 195 Collection<WaitingAction> waitingActions = partValues.get(partVal); 196 if (waitingActions == null) { 197 return null; 198 } 199 Collection<String> actionIDs = new ArrayList<String>(); 200 String uriString = hcatURI.toURIString(); 201 for (WaitingAction action : waitingActions) { 202 if (action.getDependencyURI().equals(uriString)) { 203 actionIDs.add(action.getActionID()); 204 } 205 } 206 return actionIDs; 207 } 208 209 @Override 210 public Collection<String> markDependencyAvailable(String server, String db, String table, 211 Map<String, String> partitions) { 212 String tableKey = server + DELIMITER + db + DELIMITER + table; 213 Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey); 214 if (partKeyPatterns == null) { 215 LOG.warn("Got partition available notification for " + tableKey 216 + ". Unexpected and should not be listening to topic. Unregistering topic"); 217 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 218 hcatService.unregisterFromNotification(server, db, table); 219 return null; 220 } 221 Collection<String> actionsWithAvailDep = new HashSet<String>(); 222 List<String> partKeysToRemove = new ArrayList<String>(); 223 StringBuilder partValSB = new StringBuilder(); 224 synchronized (partKeyPatterns) { 225 // If partition patterns are date, date;country and date;country;state, 226 // construct the partition values for each pattern from the available partitions map and 227 // for the matching value in the dependency map, get the waiting actions. 228 for (Entry<String, Map<String, Collection<WaitingAction>>> entry : partKeyPatterns.entrySet()) { 229 String[] partKeys = entry.getKey().split(DELIMITER); 230 partValSB.setLength(0); 231 for (String key : partKeys) { 232 partValSB.append(partitions.get(key)).append(DELIMITER); 233 } 234 partValSB.setLength(partValSB.length() - 1); 235 236 Map<String, Collection<WaitingAction>> partValues = entry.getValue(); 237 String partVal = partValSB.toString(); 238 Collection<WaitingAction> wActions = entry.getValue().get(partVal); 239 if (wActions == null) 240 continue; 241 for (WaitingAction wAction : wActions) { 242 String actionID = wAction.getActionID(); 243 actionsWithAvailDep.add(actionID); 244 Collection<String> depURIs = availableDeps.get(actionID); 245 if (depURIs == null) { 246 depURIs = new ArrayList<String>(); 247 Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs); 248 if (existing != null) { 249 depURIs = existing; 250 } 251 } 252 synchronized (depURIs) { 253 depURIs.add(wAction.getDependencyURI()); 254 availableDeps.put(actionID, depURIs); 255 } 256 } 257 partValues.remove(partVal); 258 if (partValues.isEmpty()) { 259 partKeysToRemove.add(entry.getKey()); 260 } 261 } 262 for (String partKey : partKeysToRemove) { 263 partKeyPatterns.remove(partKey); 264 } 265 if (partKeyPatterns.isEmpty()) { 266 missingDeps.remove(tableKey); 267 // Close JMS session. Stop listening on topic 268 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 269 hcatService.unregisterFromNotification(server, db, table); 270 } 271 } 272 return actionsWithAvailDep; 273 } 274 275 @Override 276 public Collection<String> getAvailableDependencyURIs(String actionID) { 277 Collection<String> available = availableDeps.get(actionID); 278 if (available != null) { 279 // Return a copy 280 available = new ArrayList<String>(available); 281 } 282 return available; 283 } 284 285 @Override 286 public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) { 287 if (!availableDeps.containsKey(actionID)) { 288 return false; 289 } 290 else { 291 Collection<String> availList = availableDeps.get(actionID); 292 if (!availList.removeAll(dependencyURIs)) { 293 return false; 294 } 295 synchronized (availList) { 296 if (availList.isEmpty()) { 297 availableDeps.remove(actionID); 298 } 299 } 300 } 301 return true; 302 } 303 304 @Override 305 public void destroy() { 306 missingDeps.clear(); 307 availableDeps.clear(); 308 } 309 310 private static class SortedPKV { 311 private StringBuilder partKeys; 312 private StringBuilder partVals; 313 314 public SortedPKV(Map<String, String> partitions) { 315 this.partKeys = new StringBuilder(); 316 this.partVals = new StringBuilder(); 317 ArrayList<String> keys = new ArrayList<String>(partitions.keySet()); 318 Collections.sort(keys); 319 for (String key : keys) { 320 this.partKeys.append(key).append(DELIMITER); 321 this.partVals.append(partitions.get(key)).append(DELIMITER); 322 } 323 this.partKeys.setLength(partKeys.length() - 1); 324 this.partVals.setLength(partVals.length() - 1); 325 } 326 327 public String getPartKeys() { 328 return partKeys.toString(); 329 } 330 331 public String getPartVals() { 332 return partVals.toString(); 333 } 334 } 335 336 private HCatURI removePartitions(String coordActionId, Collection<String> partKeys, 337 Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns) { 338 HCatURI hcatUri = null; 339 for (String partKey : partKeys) { 340 Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey); 341 Iterator<String> partValItr = partValues.keySet().iterator(); 342 while (partValItr.hasNext()) { 343 String partVal = partValItr.next(); 344 Collection<WaitingAction> waitingActions = partValues.get(partVal); 345 if (waitingActions != null) { 346 Iterator<WaitingAction> waitItr = waitingActions.iterator(); 347 while (waitItr.hasNext()) { 348 WaitingAction waction = waitItr.next(); 349 if (coordActionId.contains(waction.getActionID())) { 350 waitItr.remove(); 351 if (hcatUri == null) { 352 try { 353 hcatUri = new HCatURI(waction.getDependencyURI()); 354 } 355 catch (URISyntaxException e) { 356 continue; 357 } 358 } 359 } 360 } 361 } 362 // delete partition value with no waiting actions 363 if (waitingActions.size() == 0) { 364 partValItr.remove(); 365 } 366 } 367 if (partValues.size() == 0) { 368 partKeyPatterns.remove(partKey); 369 } 370 } 371 return hcatUri; 372 } 373 374 @Override 375 public void removeNonWaitingCoordActions(Set<String> coordActions) { 376 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 377 for (String coordActionId : coordActions) { 378 LOG.info("Removing non waiting coord action {0} from partition dependency map", coordActionId); 379 synchronized (actionPartitionMap) { 380 Map<String, Collection<String>> partitionMap = actionPartitionMap.get(coordActionId); 381 if (partitionMap != null) { 382 Iterator<String> tableItr = partitionMap.keySet().iterator(); 383 while (tableItr.hasNext()) { 384 String tableKey = tableItr.next(); 385 HCatURI hcatUri = null; 386 Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey); 387 if (partKeyPatterns != null) { 388 synchronized (partKeyPatterns) { 389 Collection<String> partKeys = partitionMap.get(tableKey); 390 if (partKeys != null) { 391 hcatUri = removePartitions(coordActionId, partKeys, partKeyPatterns); 392 } 393 } 394 if (partKeyPatterns.size() == 0) { 395 tableItr.remove(); 396 if (hcatUri != null) { 397 // Close JMS session. Stop listening on topic 398 hcatService.unregisterFromNotification(hcatUri); 399 } 400 } 401 } 402 } 403 } 404 actionPartitionMap.remove(coordActionId); 405 } 406 } 407 } 408 409 @Override 410 public void removeCoordActionWithDependenciesAvailable(String coordAction) { 411 actionPartitionMap.remove(coordAction); 412 } 413}