001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.dependency.hcat; 019 020import java.net.URISyntaxException; 021import java.net.URL; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashSet; 026import java.util.Iterator; 027import java.util.Map; 028import java.util.Map.Entry; 029import java.util.Set; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.ConcurrentMap; 032 033import net.sf.ehcache.Cache; 034import net.sf.ehcache.CacheException; 035import net.sf.ehcache.CacheManager; 036import net.sf.ehcache.Ehcache; 037import net.sf.ehcache.Element; 038import net.sf.ehcache.config.CacheConfiguration; 039import net.sf.ehcache.event.CacheEventListener; 040 041import org.apache.hadoop.conf.Configuration; 042import org.apache.oozie.service.HCatAccessorService; 043import org.apache.oozie.service.PartitionDependencyManagerService; 044import org.apache.oozie.service.Services; 045import org.apache.oozie.util.HCatURI; 046import org.apache.oozie.util.XLog; 047 048public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEventListener { 049 050 private static XLog LOG = XLog.getLog(EhcacheHCatDependencyCache.class); 051 private static String TABLE_DELIMITER = "#"; 052 private static String PARTITION_DELIMITER = ";"; 053 054 public static String CONF_CACHE_NAME = PartitionDependencyManagerService.CONF_PREFIX + "cache.ehcache.name"; 055 056 private CacheManager cacheManager; 057 058 /** 059 * Map of server to EhCache which has key as db#table#pk1;pk2#val;val2 and value as WaitingActions (list of 060 * WaitingAction) which is Serializable (for overflowToDisk) 061 */ 062 private ConcurrentMap<String, Cache> missingDepsByServer; 063 064 private CacheConfiguration cacheConfig; 065 /** 066 * Map of server#db#table - sorted part key pattern - count of different partition values (count 067 * of elements in the cache) still missing for a partition key pattern. This count is used to 068 * quickly determine if there are any more missing dependencies for a table. When the count 069 * becomes 0, we unregister from notifications as there are no more missing dependencies for 070 * that table. 071 */ 072 private ConcurrentMap<String, ConcurrentMap<String, SettableInteger>> partKeyPatterns; 073 /** 074 * Map of actionIDs and collection of available URIs 075 */ 076 private ConcurrentMap<String, Collection<String>> availableDeps; 077 078 @Override 079 public void init(Configuration conf) { 080 String cacheName = conf.get(CONF_CACHE_NAME); 081 URL cacheConfigURL; 082 if (cacheName == null) { 083 cacheConfigURL = this.getClass().getClassLoader().getResource("ehcache-default.xml"); 084 cacheName = "dependency-default"; 085 } 086 else { 087 cacheConfigURL = this.getClass().getClassLoader().getResource("ehcache.xml"); 088 } 089 if (cacheConfigURL == null) { 090 throw new IllegalStateException("ehcache.xml is not found in classpath"); 091 } 092 cacheManager = CacheManager.newInstance(cacheConfigURL); 093 final Cache specifiedCache = cacheManager.getCache(cacheName); 094 if (specifiedCache == null) { 095 throw new IllegalStateException("Cache " + cacheName + " configured in " + CONF_CACHE_NAME 096 + " is not found"); 097 } 098 cacheConfig = specifiedCache.getCacheConfiguration(); 099 missingDepsByServer = new ConcurrentHashMap<String, Cache>(); 100 partKeyPatterns = new ConcurrentHashMap<String, ConcurrentMap<String, SettableInteger>>(); 101 availableDeps = new ConcurrentHashMap<String, Collection<String>>(); 102 } 103 104 @Override 105 public void addMissingDependency(HCatURI hcatURI, String actionID) { 106 107 // Create cache for the server if we don't have one 108 Cache missingCache = missingDepsByServer.get(hcatURI.getServer()); 109 if (missingCache == null) { 110 CacheConfiguration clonedConfig = cacheConfig.clone(); 111 clonedConfig.setName(hcatURI.getServer()); 112 missingCache = new Cache(clonedConfig); 113 Cache exists = missingDepsByServer.putIfAbsent(hcatURI.getServer(), missingCache); 114 if (exists == null) { 115 cacheManager.addCache(missingCache); 116 missingCache.getCacheEventNotificationService().registerListener(this); 117 } 118 else { 119 missingCache.dispose(); //discard 120 } 121 } 122 123 // Add hcat uri into the missingCache 124 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); 125 String partKeys = sortedPKV.getPartKeys(); 126 String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER 127 + partKeys + TABLE_DELIMITER + sortedPKV.getPartVals(); 128 boolean newlyAdded = true; 129 synchronized (missingCache) { 130 Element element = missingCache.get(missingKey); 131 if (element == null) { 132 WaitingActions waitingActions = new WaitingActions(); 133 element = new Element(missingKey, waitingActions); 134 Element exists = missingCache.putIfAbsent(element); 135 if (exists != null) { 136 newlyAdded = false; 137 waitingActions = (WaitingActions) exists.getObjectValue(); 138 } 139 waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString())); 140 } 141 else { 142 newlyAdded = false; 143 WaitingActions waitingActions = (WaitingActions) element.getObjectValue(); 144 waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString())); 145 } 146 } 147 148 // Increment count for the partition key pattern 149 if (newlyAdded) { 150 String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER 151 + hcatURI.getTable(); 152 synchronized (partKeyPatterns) { 153 ConcurrentMap<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey); 154 if (patternCounts == null) { 155 patternCounts = new ConcurrentHashMap<String, SettableInteger>(); 156 partKeyPatterns.put(tableKey, patternCounts); 157 } 158 SettableInteger count = patternCounts.get(partKeys); 159 if (count == null) { 160 patternCounts.put(partKeys, new SettableInteger(1)); 161 } 162 else { 163 count.increment(); 164 } 165 } 166 } 167 } 168 169 @Override 170 public boolean removeMissingDependency(HCatURI hcatURI, String actionID) { 171 172 Cache missingCache = missingDepsByServer.get(hcatURI.getServer()); 173 if (missingCache == null) { 174 LOG.warn("Remove missing dependency - Missing cache entry for server - uri={0}, actionID={1}", 175 hcatURI.toURIString(), actionID); 176 return false; 177 } 178 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); 179 String partKeys = sortedPKV.getPartKeys(); 180 String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER + 181 partKeys + TABLE_DELIMITER + sortedPKV.getPartVals(); 182 boolean decrement = false; 183 boolean removed = false; 184 synchronized (missingCache) { 185 Element element = missingCache.get(missingKey); 186 if (element == null) { 187 LOG.warn("Remove missing dependency - Missing cache entry - uri={0}, actionID={1}", 188 hcatURI.toURIString(), actionID); 189 return false; 190 } 191 Collection<WaitingAction> waitingActions = ((WaitingActions) element.getObjectValue()).getWaitingActions(); 192 removed = waitingActions.remove(new WaitingAction(actionID, hcatURI.toURIString())); 193 if (!removed) { 194 LOG.warn("Remove missing dependency - Missing action ID - uri={0}, actionID={1}", 195 hcatURI.toURIString(), actionID); 196 } 197 if (waitingActions.isEmpty()) { 198 missingCache.remove(missingKey); 199 decrement = true; 200 } 201 } 202 // Decrement partition key pattern count if the cache entry is removed 203 if (decrement) { 204 String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER 205 + hcatURI.getTable(); 206 decrementPartKeyPatternCount(tableKey, partKeys, hcatURI.toURIString()); 207 } 208 return removed; 209 } 210 211 @Override 212 public Collection<String> getWaitingActions(HCatURI hcatURI) { 213 Collection<String> actionIDs = null; 214 Cache missingCache = missingDepsByServer.get(hcatURI.getServer()); 215 if (missingCache != null) { 216 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); 217 String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER 218 + sortedPKV.getPartKeys() + TABLE_DELIMITER + sortedPKV.getPartVals(); 219 Element element = missingCache.get(missingKey); 220 if (element != null) { 221 WaitingActions waitingActions = (WaitingActions) element.getObjectValue(); 222 actionIDs = new ArrayList<String>(); 223 String uriString = hcatURI.getURI().toString(); 224 for (WaitingAction action : waitingActions.getWaitingActions()) { 225 if (action.getDependencyURI().equals(uriString)) { 226 actionIDs.add(action.getActionID()); 227 } 228 } 229 } 230 } 231 return actionIDs; 232 } 233 234 @Override 235 public Collection<String> markDependencyAvailable(String server, String db, String table, 236 Map<String, String> partitions) { 237 String tableKey = server + TABLE_DELIMITER + db + TABLE_DELIMITER + table; 238 synchronized (partKeyPatterns) { 239 Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey); 240 if (patternCounts == null) { 241 LOG.warn("Got partition available notification for " + tableKey 242 + ". Unexpected as no matching partition keys. Unregistering topic"); 243 unregisterFromNotifications(server, db, table); 244 return null; 245 } 246 Cache missingCache = missingDepsByServer.get(server); 247 if (missingCache == null) { 248 LOG.warn("Got partition available notification for " + tableKey 249 + ". Unexpected. Missing server entry in cache. Unregistering topic"); 250 partKeyPatterns.remove(tableKey); 251 unregisterFromNotifications(server, db, table); 252 return null; 253 } 254 Collection<String> actionsWithAvailDep = new HashSet<String>(); 255 StringBuilder partValSB = new StringBuilder(); 256 // If partition patterns are date, date;country and date;country;state, 257 // construct the partition values for each pattern and for the matching value in the 258 // missingCache, get the waiting actions and mark it as available. 259 for (Entry<String, SettableInteger> entry : patternCounts.entrySet()) { 260 String[] partKeys = entry.getKey().split(PARTITION_DELIMITER); 261 partValSB.setLength(0); 262 for (String key : partKeys) { 263 partValSB.append(partitions.get(key)).append(PARTITION_DELIMITER); 264 } 265 partValSB.setLength(partValSB.length() - 1); 266 String missingKey = db + TABLE_DELIMITER + table + TABLE_DELIMITER + entry.getKey() + TABLE_DELIMITER 267 + partValSB.toString(); 268 boolean removed = false; 269 Element element = null; 270 synchronized (missingCache) { 271 element = missingCache.get(missingKey); 272 if (element != null) { 273 missingCache.remove(missingKey); 274 removed = true; 275 } 276 } 277 if (removed) { 278 decrementPartKeyPatternCount(tableKey, entry.getKey(), server + TABLE_DELIMITER + missingKey); 279 // Add the removed entry to available dependencies 280 Collection<WaitingAction> wActions = ((WaitingActions) element.getObjectValue()) 281 .getWaitingActions(); 282 for (WaitingAction wAction : wActions) { 283 String actionID = wAction.getActionID(); 284 actionsWithAvailDep.add(actionID); 285 Collection<String> depURIs = availableDeps.get(actionID); 286 if (depURIs == null) { 287 depURIs = new ArrayList<String>(); 288 Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs); 289 if (existing != null) { 290 depURIs = existing; 291 } 292 } 293 synchronized (depURIs) { 294 depURIs.add(wAction.getDependencyURI()); 295 availableDeps.put(actionID, depURIs); 296 } 297 } 298 } 299 } 300 return actionsWithAvailDep; 301 } 302 } 303 304 @Override 305 public Collection<String> getAvailableDependencyURIs(String actionID) { 306 Collection<String> available = availableDeps.get(actionID); 307 if (available != null) { 308 // Return a copy 309 available = new ArrayList<String>(available); 310 } 311 return available; 312 } 313 314 @Override 315 public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) { 316 if (!availableDeps.containsKey(actionID)) { 317 return false; 318 } 319 else { 320 Collection<String> availList = availableDeps.get(actionID); 321 if (!availList.removeAll(dependencyURIs)) { 322 return false; 323 } 324 synchronized (availList) { 325 if (availList.isEmpty()) { 326 availableDeps.remove(actionID); 327 } 328 } 329 } 330 return true; 331 } 332 333 @Override 334 public void destroy() { 335 availableDeps.clear(); 336 cacheManager.shutdown(); 337 } 338 339 @Override 340 public Object clone() throws CloneNotSupportedException { 341 throw new CloneNotSupportedException(); 342 } 343 344 @Override 345 public void dispose() { 346 } 347 348 @Override 349 public void notifyElementExpired(Ehcache cache, Element element) { 350 // Invoked when timeToIdleSeconds or timeToLiveSeconds is met 351 String missingDepKey = (String) element.getObjectKey(); 352 LOG.info("Cache entry [{0}] of cache [{1}] expired", missingDepKey, cache.getName()); 353 onExpiryOrEviction(cache, element, missingDepKey); 354 } 355 356 @Override 357 public void notifyElementPut(Ehcache arg0, Element arg1) throws CacheException { 358 359 } 360 361 @Override 362 public void notifyElementRemoved(Ehcache arg0, Element arg1) throws CacheException { 363 } 364 365 @Override 366 public void notifyElementUpdated(Ehcache arg0, Element arg1) throws CacheException { 367 } 368 369 @Override 370 public void notifyRemoveAll(Ehcache arg0) { 371 } 372 373 @Override 374 public void notifyElementEvicted(Ehcache cache, Element element) { 375 // Invoked when maxElementsInMemory is met 376 String missingDepKey = (String) element.getObjectKey(); 377 LOG.info("Cache entry [{0}] of cache [{1}] evicted", missingDepKey, cache.getName()); 378 onExpiryOrEviction(cache, element, missingDepKey); 379 } 380 381 private void onExpiryOrEviction(Ehcache cache, Element element, String missingDepKey) { 382 int partValIndex = missingDepKey.lastIndexOf(TABLE_DELIMITER); 383 int partKeyIndex = missingDepKey.lastIndexOf(TABLE_DELIMITER, partValIndex - 1); 384 // server#db#table. Name of the cache is that of the server. 385 String tableKey = cache.getName() + TABLE_DELIMITER + missingDepKey.substring(0, partKeyIndex); 386 String partKeys = missingDepKey.substring(partKeyIndex + 1, partValIndex); 387 decrementPartKeyPatternCount(tableKey, partKeys, missingDepKey); 388 } 389 390 /** 391 * Decrement partition key pattern count, once a hcat URI is removed from the cache 392 * 393 * @param tableKey key identifying the table - server#db#table 394 * @param partKeys partition key pattern 395 * @param hcatURI URI with the partition key pattern 396 */ 397 private void decrementPartKeyPatternCount(String tableKey, String partKeys, String hcatURI) { 398 synchronized (partKeyPatterns) { 399 Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey); 400 if (patternCounts == null) { 401 LOG.warn("Removed dependency - Missing cache entry - uri={0}. " 402 + "But no corresponding pattern key table entry", hcatURI); 403 } 404 else { 405 SettableInteger count = patternCounts.get(partKeys); 406 if (count == null) { 407 LOG.warn("Removed dependency - Missing cache entry - uri={0}. " 408 + "But no corresponding pattern key entry", hcatURI); 409 } 410 else { 411 count.decrement(); 412 if (count.getValue() == 0) { 413 patternCounts.remove(partKeys); 414 } 415 if (patternCounts.isEmpty()) { 416 partKeyPatterns.remove(tableKey); 417 String[] tableDetails = tableKey.split(TABLE_DELIMITER); 418 unregisterFromNotifications(tableDetails[0], tableDetails[1], tableDetails[2]); 419 } 420 } 421 } 422 } 423 } 424 425 private void unregisterFromNotifications(String server, String db, String table) { 426 // Close JMS session. Stop listening on topic 427 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 428 hcatService.unregisterFromNotification(server, db, table); 429 } 430 431 private static class SortedPKV { 432 private StringBuilder partKeys; 433 private StringBuilder partVals; 434 435 public SortedPKV(Map<String, String> partitions) { 436 this.partKeys = new StringBuilder(); 437 this.partVals = new StringBuilder(); 438 ArrayList<String> keys = new ArrayList<String>(partitions.keySet()); 439 Collections.sort(keys); 440 for (String key : keys) { 441 this.partKeys.append(key).append(PARTITION_DELIMITER); 442 this.partVals.append(partitions.get(key)).append(PARTITION_DELIMITER); 443 } 444 this.partKeys.setLength(partKeys.length() - 1); 445 this.partVals.setLength(partVals.length() - 1); 446 } 447 448 public String getPartKeys() { 449 return partKeys.toString(); 450 } 451 452 public String getPartVals() { 453 return partVals.toString(); 454 } 455 456 } 457 458 private static class SettableInteger { 459 private int value; 460 461 public SettableInteger(int value) { 462 this.value = value; 463 } 464 465 public int getValue() { 466 return value; 467 } 468 469 public void increment() { 470 value++; 471 } 472 473 public void decrement() { 474 value--; 475 } 476 } 477 478 @Override 479 public void removeNonWaitingCoordActions(Set<String> staleActions) { 480 Iterator<String> serverItr = missingDepsByServer.keySet().iterator(); 481 while (serverItr.hasNext()) { 482 String server = serverItr.next(); 483 Cache missingCache = missingDepsByServer.get(server); 484 if (missingCache == null) { 485 continue; 486 } 487 synchronized (missingCache) { 488 for (Object key : missingCache.getKeys()) { 489 Element element = missingCache.get(key); 490 if (element == null) { 491 continue; 492 } 493 Collection<WaitingAction> waitingActions = ((WaitingActions) element.getObjectValue()) 494 .getWaitingActions(); 495 Iterator<WaitingAction> wactionItr = waitingActions.iterator(); 496 HCatURI hcatURI = null; 497 while(wactionItr.hasNext()) { 498 WaitingAction waction = wactionItr.next(); 499 if(staleActions.contains(waction.getActionID())) { 500 try { 501 hcatURI = new HCatURI(waction.getDependencyURI()); 502 wactionItr.remove(); 503 } 504 catch (URISyntaxException e) { 505 continue; 506 } 507 } 508 } 509 if (waitingActions.isEmpty() && hcatURI != null) { 510 missingCache.remove(key); 511 // Decrement partition key pattern count if the cache entry is removed 512 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); 513 String partKeys = sortedPKV.getPartKeys(); 514 String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER 515 + hcatURI.getTable(); 516 String hcatURIStr = hcatURI.toURIString(); 517 decrementPartKeyPatternCount(tableKey, partKeys, hcatURIStr); 518 } 519 } 520 } 521 } 522 } 523 524 @Override 525 public void removeCoordActionWithDependenciesAvailable(String coordAction) { 526 // to be implemented when reverse-lookup data structure for purging is added 527 } 528 529}