001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.dependency.hcat; 020 021import java.net.URISyntaxException; 022import java.net.URL; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashSet; 027import java.util.Iterator; 028import java.util.Map; 029import java.util.Map.Entry; 030import java.util.Set; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.ConcurrentMap; 033 034import net.sf.ehcache.Cache; 035import net.sf.ehcache.CacheException; 036import net.sf.ehcache.CacheManager; 037import net.sf.ehcache.Ehcache; 038import net.sf.ehcache.Element; 039import net.sf.ehcache.config.CacheConfiguration; 040import net.sf.ehcache.event.CacheEventListener; 041 042import org.apache.hadoop.conf.Configuration; 043import org.apache.oozie.service.HCatAccessorService; 044import org.apache.oozie.service.PartitionDependencyManagerService; 045import org.apache.oozie.service.Services; 046import org.apache.oozie.util.HCatURI; 047import org.apache.oozie.util.XLog; 048 049public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEventListener { 050 051 private static XLog LOG = XLog.getLog(EhcacheHCatDependencyCache.class); 052 private static String TABLE_DELIMITER = "#"; 053 private static String PARTITION_DELIMITER = ";"; 054 055 public static String CONF_CACHE_NAME = PartitionDependencyManagerService.CONF_PREFIX + "cache.ehcache.name"; 056 057 private CacheManager cacheManager; 058 059 /** 060 * Map of server to EhCache which has key as db#table#pk1;pk2#val;val2 and value as WaitingActions (list of 061 * WaitingAction) which is Serializable (for overflowToDisk) 062 */ 063 private ConcurrentMap<String, Cache> missingDepsByServer; 064 065 private CacheConfiguration cacheConfig; 066 /** 067 * Map of server#db#table - sorted part key pattern - count of different partition values (count 068 * of elements in the cache) still missing for a partition key pattern. This count is used to 069 * quickly determine if there are any more missing dependencies for a table. When the count 070 * becomes 0, we unregister from notifications as there are no more missing dependencies for 071 * that table. 072 */ 073 private ConcurrentMap<String, ConcurrentMap<String, SettableInteger>> partKeyPatterns; 074 /** 075 * Map of actionIDs and collection of available URIs 076 */ 077 private ConcurrentMap<String, Collection<String>> availableDeps; 078 079 @Override 080 public void init(Configuration conf) { 081 String cacheName = conf.get(CONF_CACHE_NAME); 082 URL cacheConfigURL; 083 if (cacheName == null) { 084 cacheConfigURL = this.getClass().getClassLoader().getResource("ehcache-default.xml"); 085 cacheName = "dependency-default"; 086 } 087 else { 088 cacheConfigURL = this.getClass().getClassLoader().getResource("ehcache.xml"); 089 } 090 if (cacheConfigURL == null) { 091 throw new IllegalStateException("ehcache.xml is not found in classpath"); 092 } 093 cacheManager = CacheManager.newInstance(cacheConfigURL); 094 final Cache specifiedCache = cacheManager.getCache(cacheName); 095 if (specifiedCache == null) { 096 throw new IllegalStateException("Cache " + cacheName + " configured in " + CONF_CACHE_NAME 097 + " is not found"); 098 } 099 cacheConfig = specifiedCache.getCacheConfiguration(); 100 missingDepsByServer = new ConcurrentHashMap<String, Cache>(); 101 partKeyPatterns = new ConcurrentHashMap<String, ConcurrentMap<String, SettableInteger>>(); 102 availableDeps = new ConcurrentHashMap<String, Collection<String>>(); 103 } 104 105 @Override 106 public void addMissingDependency(HCatURI hcatURI, String actionID) { 107 108 // Create cache for the server if we don't have one 109 Cache missingCache = missingDepsByServer.get(hcatURI.getServer()); 110 if (missingCache == null) { 111 CacheConfiguration clonedConfig = cacheConfig.clone(); 112 clonedConfig.setName(hcatURI.getServer()); 113 missingCache = new Cache(clonedConfig); 114 Cache exists = missingDepsByServer.putIfAbsent(hcatURI.getServer(), missingCache); 115 if (exists == null) { 116 cacheManager.addCache(missingCache); 117 missingCache.getCacheEventNotificationService().registerListener(this); 118 } 119 else { 120 missingCache.dispose(); //discard 121 } 122 } 123 124 // Add hcat uri into the missingCache 125 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); 126 String partKeys = sortedPKV.getPartKeys(); 127 String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER 128 + partKeys + TABLE_DELIMITER + sortedPKV.getPartVals(); 129 boolean newlyAdded = true; 130 synchronized (missingCache) { 131 Element element = missingCache.get(missingKey); 132 if (element == null) { 133 WaitingActions waitingActions = new WaitingActions(); 134 element = new Element(missingKey, waitingActions); 135 Element exists = missingCache.putIfAbsent(element); 136 if (exists != null) { 137 newlyAdded = false; 138 waitingActions = (WaitingActions) exists.getObjectValue(); 139 } 140 waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString())); 141 } 142 else { 143 newlyAdded = false; 144 WaitingActions waitingActions = (WaitingActions) element.getObjectValue(); 145 waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString())); 146 } 147 } 148 149 // Increment count for the partition key pattern 150 if (newlyAdded) { 151 String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER 152 + hcatURI.getTable(); 153 synchronized (partKeyPatterns) { 154 ConcurrentMap<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey); 155 if (patternCounts == null) { 156 patternCounts = new ConcurrentHashMap<String, SettableInteger>(); 157 partKeyPatterns.put(tableKey, patternCounts); 158 } 159 SettableInteger count = patternCounts.get(partKeys); 160 if (count == null) { 161 patternCounts.put(partKeys, new SettableInteger(1)); 162 } 163 else { 164 count.increment(); 165 } 166 } 167 } 168 } 169 170 @Override 171 public boolean removeMissingDependency(HCatURI hcatURI, String actionID) { 172 173 Cache missingCache = missingDepsByServer.get(hcatURI.getServer()); 174 if (missingCache == null) { 175 LOG.warn("Remove missing dependency - Missing cache entry for server - uri={0}, actionID={1}", 176 hcatURI.toURIString(), actionID); 177 return false; 178 } 179 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); 180 String partKeys = sortedPKV.getPartKeys(); 181 String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER + 182 partKeys + TABLE_DELIMITER + sortedPKV.getPartVals(); 183 boolean decrement = false; 184 boolean removed = false; 185 synchronized (missingCache) { 186 Element element = missingCache.get(missingKey); 187 if (element == null) { 188 LOG.warn("Remove missing dependency - Missing cache entry - uri={0}, actionID={1}", 189 hcatURI.toURIString(), actionID); 190 return false; 191 } 192 Collection<WaitingAction> waitingActions = ((WaitingActions) element.getObjectValue()).getWaitingActions(); 193 removed = waitingActions.remove(new WaitingAction(actionID, hcatURI.toURIString())); 194 if (!removed) { 195 LOG.warn("Remove missing dependency - Missing action ID - uri={0}, actionID={1}", 196 hcatURI.toURIString(), actionID); 197 } 198 if (waitingActions.isEmpty()) { 199 missingCache.remove(missingKey); 200 decrement = true; 201 } 202 } 203 // Decrement partition key pattern count if the cache entry is removed 204 if (decrement) { 205 String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER 206 + hcatURI.getTable(); 207 decrementPartKeyPatternCount(tableKey, partKeys, hcatURI.toURIString()); 208 } 209 return removed; 210 } 211 212 @Override 213 public Collection<String> getWaitingActions(HCatURI hcatURI) { 214 Collection<String> actionIDs = null; 215 Cache missingCache = missingDepsByServer.get(hcatURI.getServer()); 216 if (missingCache != null) { 217 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); 218 String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER 219 + sortedPKV.getPartKeys() + TABLE_DELIMITER + sortedPKV.getPartVals(); 220 Element element = missingCache.get(missingKey); 221 if (element != null) { 222 WaitingActions waitingActions = (WaitingActions) element.getObjectValue(); 223 actionIDs = new ArrayList<String>(); 224 String uriString = hcatURI.getURI().toString(); 225 for (WaitingAction action : waitingActions.getWaitingActions()) { 226 if (action.getDependencyURI().equals(uriString)) { 227 actionIDs.add(action.getActionID()); 228 } 229 } 230 } 231 } 232 return actionIDs; 233 } 234 235 @Override 236 public Collection<String> markDependencyAvailable(String server, String db, String table, 237 Map<String, String> partitions) { 238 String tableKey = server + TABLE_DELIMITER + db + TABLE_DELIMITER + table; 239 synchronized (partKeyPatterns) { 240 Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey); 241 if (patternCounts == null) { 242 LOG.warn("Got partition available notification for " + tableKey 243 + ". Unexpected as no matching partition keys. Unregistering topic"); 244 unregisterFromNotifications(server, db, table); 245 return null; 246 } 247 Cache missingCache = missingDepsByServer.get(server); 248 if (missingCache == null) { 249 LOG.warn("Got partition available notification for " + tableKey 250 + ". Unexpected. Missing server entry in cache. Unregistering topic"); 251 partKeyPatterns.remove(tableKey); 252 unregisterFromNotifications(server, db, table); 253 return null; 254 } 255 Collection<String> actionsWithAvailDep = new HashSet<String>(); 256 StringBuilder partValSB = new StringBuilder(); 257 // If partition patterns are date, date;country and date;country;state, 258 // construct the partition values for each pattern and for the matching value in the 259 // missingCache, get the waiting actions and mark it as available. 260 for (Entry<String, SettableInteger> entry : patternCounts.entrySet()) { 261 String[] partKeys = entry.getKey().split(PARTITION_DELIMITER); 262 partValSB.setLength(0); 263 for (String key : partKeys) { 264 partValSB.append(partitions.get(key)).append(PARTITION_DELIMITER); 265 } 266 partValSB.setLength(partValSB.length() - 1); 267 String missingKey = db + TABLE_DELIMITER + table + TABLE_DELIMITER + entry.getKey() + TABLE_DELIMITER 268 + partValSB.toString(); 269 boolean removed = false; 270 Element element = null; 271 synchronized (missingCache) { 272 element = missingCache.get(missingKey); 273 if (element != null) { 274 missingCache.remove(missingKey); 275 removed = true; 276 } 277 } 278 if (removed) { 279 decrementPartKeyPatternCount(tableKey, entry.getKey(), server + TABLE_DELIMITER + missingKey); 280 // Add the removed entry to available dependencies 281 Collection<WaitingAction> wActions = ((WaitingActions) element.getObjectValue()) 282 .getWaitingActions(); 283 for (WaitingAction wAction : wActions) { 284 String actionID = wAction.getActionID(); 285 actionsWithAvailDep.add(actionID); 286 Collection<String> depURIs = availableDeps.get(actionID); 287 if (depURIs == null) { 288 depURIs = new ArrayList<String>(); 289 Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs); 290 if (existing != null) { 291 depURIs = existing; 292 } 293 } 294 synchronized (depURIs) { 295 depURIs.add(wAction.getDependencyURI()); 296 availableDeps.put(actionID, depURIs); 297 } 298 } 299 } 300 } 301 return actionsWithAvailDep; 302 } 303 } 304 305 @Override 306 public Collection<String> getAvailableDependencyURIs(String actionID) { 307 Collection<String> available = availableDeps.get(actionID); 308 if (available != null) { 309 // Return a copy 310 available = new ArrayList<String>(available); 311 } 312 return available; 313 } 314 315 @Override 316 public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) { 317 if (!availableDeps.containsKey(actionID)) { 318 return false; 319 } 320 else { 321 Collection<String> availList = availableDeps.get(actionID); 322 if (!availList.removeAll(dependencyURIs)) { 323 return false; 324 } 325 synchronized (availList) { 326 if (availList.isEmpty()) { 327 availableDeps.remove(actionID); 328 } 329 } 330 } 331 return true; 332 } 333 334 @Override 335 public void destroy() { 336 availableDeps.clear(); 337 cacheManager.shutdown(); 338 } 339 340 @Override 341 public Object clone() throws CloneNotSupportedException { 342 throw new CloneNotSupportedException(); 343 } 344 345 @Override 346 public void dispose() { 347 } 348 349 @Override 350 public void notifyElementExpired(Ehcache cache, Element element) { 351 // Invoked when timeToIdleSeconds or timeToLiveSeconds is met 352 String missingDepKey = (String) element.getObjectKey(); 353 LOG.info("Cache entry [{0}] of cache [{1}] expired", missingDepKey, cache.getName()); 354 onExpiryOrEviction(cache, element, missingDepKey); 355 } 356 357 @Override 358 public void notifyElementPut(Ehcache arg0, Element arg1) throws CacheException { 359 360 } 361 362 @Override 363 public void notifyElementRemoved(Ehcache arg0, Element arg1) throws CacheException { 364 } 365 366 @Override 367 public void notifyElementUpdated(Ehcache arg0, Element arg1) throws CacheException { 368 } 369 370 @Override 371 public void notifyRemoveAll(Ehcache arg0) { 372 } 373 374 @Override 375 public void notifyElementEvicted(Ehcache cache, Element element) { 376 // Invoked when maxElementsInMemory is met 377 String missingDepKey = (String) element.getObjectKey(); 378 LOG.info("Cache entry [{0}] of cache [{1}] evicted", missingDepKey, cache.getName()); 379 onExpiryOrEviction(cache, element, missingDepKey); 380 } 381 382 private void onExpiryOrEviction(Ehcache cache, Element element, String missingDepKey) { 383 int partValIndex = missingDepKey.lastIndexOf(TABLE_DELIMITER); 384 int partKeyIndex = missingDepKey.lastIndexOf(TABLE_DELIMITER, partValIndex - 1); 385 // server#db#table. Name of the cache is that of the server. 386 String tableKey = cache.getName() + TABLE_DELIMITER + missingDepKey.substring(0, partKeyIndex); 387 String partKeys = missingDepKey.substring(partKeyIndex + 1, partValIndex); 388 decrementPartKeyPatternCount(tableKey, partKeys, missingDepKey); 389 } 390 391 /** 392 * Decrement partition key pattern count, once a hcat URI is removed from the cache 393 * 394 * @param tableKey key identifying the table - server#db#table 395 * @param partKeys partition key pattern 396 * @param hcatURI URI with the partition key pattern 397 */ 398 private void decrementPartKeyPatternCount(String tableKey, String partKeys, String hcatURI) { 399 synchronized (partKeyPatterns) { 400 Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey); 401 if (patternCounts == null) { 402 LOG.warn("Removed dependency - Missing cache entry - uri={0}. " 403 + "But no corresponding pattern key table entry", hcatURI); 404 } 405 else { 406 SettableInteger count = patternCounts.get(partKeys); 407 if (count == null) { 408 LOG.warn("Removed dependency - Missing cache entry - uri={0}. " 409 + "But no corresponding pattern key entry", hcatURI); 410 } 411 else { 412 count.decrement(); 413 if (count.getValue() == 0) { 414 patternCounts.remove(partKeys); 415 } 416 if (patternCounts.isEmpty()) { 417 partKeyPatterns.remove(tableKey); 418 String[] tableDetails = tableKey.split(TABLE_DELIMITER); 419 unregisterFromNotifications(tableDetails[0], tableDetails[1], tableDetails[2]); 420 } 421 } 422 } 423 } 424 } 425 426 private void unregisterFromNotifications(String server, String db, String table) { 427 // Close JMS session. Stop listening on topic 428 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 429 hcatService.unregisterFromNotification(server, db, table); 430 } 431 432 private static class SortedPKV { 433 private StringBuilder partKeys; 434 private StringBuilder partVals; 435 436 public SortedPKV(Map<String, String> partitions) { 437 this.partKeys = new StringBuilder(); 438 this.partVals = new StringBuilder(); 439 ArrayList<String> keys = new ArrayList<String>(partitions.keySet()); 440 Collections.sort(keys); 441 for (String key : keys) { 442 this.partKeys.append(key).append(PARTITION_DELIMITER); 443 this.partVals.append(partitions.get(key)).append(PARTITION_DELIMITER); 444 } 445 this.partKeys.setLength(partKeys.length() - 1); 446 this.partVals.setLength(partVals.length() - 1); 447 } 448 449 public String getPartKeys() { 450 return partKeys.toString(); 451 } 452 453 public String getPartVals() { 454 return partVals.toString(); 455 } 456 457 } 458 459 private static class SettableInteger { 460 private int value; 461 462 public SettableInteger(int value) { 463 this.value = value; 464 } 465 466 public int getValue() { 467 return value; 468 } 469 470 public void increment() { 471 value++; 472 } 473 474 public void decrement() { 475 value--; 476 } 477 } 478 479 @Override 480 public void removeNonWaitingCoordActions(Set<String> staleActions) { 481 Iterator<String> serverItr = missingDepsByServer.keySet().iterator(); 482 while (serverItr.hasNext()) { 483 String server = serverItr.next(); 484 Cache missingCache = missingDepsByServer.get(server); 485 if (missingCache == null) { 486 continue; 487 } 488 synchronized (missingCache) { 489 for (Object key : missingCache.getKeys()) { 490 Element element = missingCache.get(key); 491 if (element == null) { 492 continue; 493 } 494 Collection<WaitingAction> waitingActions = ((WaitingActions) element.getObjectValue()) 495 .getWaitingActions(); 496 Iterator<WaitingAction> wactionItr = waitingActions.iterator(); 497 HCatURI hcatURI = null; 498 while(wactionItr.hasNext()) { 499 WaitingAction waction = wactionItr.next(); 500 if(staleActions.contains(waction.getActionID())) { 501 try { 502 hcatURI = new HCatURI(waction.getDependencyURI()); 503 wactionItr.remove(); 504 } 505 catch (URISyntaxException e) { 506 continue; 507 } 508 } 509 } 510 if (waitingActions.isEmpty() && hcatURI != null) { 511 missingCache.remove(key); 512 // Decrement partition key pattern count if the cache entry is removed 513 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); 514 String partKeys = sortedPKV.getPartKeys(); 515 String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER 516 + hcatURI.getTable(); 517 String hcatURIStr = hcatURI.toURIString(); 518 decrementPartKeyPatternCount(tableKey, partKeys, hcatURIStr); 519 } 520 } 521 } 522 } 523 } 524 525 @Override 526 public void removeCoordActionWithDependenciesAvailable(String coordAction) { 527 // to be implemented when reverse-lookup data structure for purging is added 528 } 529 530}