001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.dependency.hcat; 019 020 import java.net.URL; 021 import java.util.ArrayList; 022 import java.util.Collection; 023 import java.util.Collections; 024 import java.util.HashSet; 025 import java.util.Map; 026 import java.util.Map.Entry; 027 import java.util.concurrent.ConcurrentHashMap; 028 import java.util.concurrent.ConcurrentMap; 029 030 import net.sf.ehcache.Cache; 031 import net.sf.ehcache.CacheException; 032 import net.sf.ehcache.CacheManager; 033 import net.sf.ehcache.Ehcache; 034 import net.sf.ehcache.Element; 035 import net.sf.ehcache.config.CacheConfiguration; 036 import net.sf.ehcache.event.CacheEventListener; 037 038 import org.apache.hadoop.conf.Configuration; 039 import org.apache.oozie.service.HCatAccessorService; 040 import org.apache.oozie.service.PartitionDependencyManagerService; 041 import org.apache.oozie.service.Services; 042 import org.apache.oozie.util.HCatURI; 043 import org.apache.oozie.util.XLog; 044 045 public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEventListener { 046 047 private static XLog LOG = XLog.getLog(EhcacheHCatDependencyCache.class); 048 private static String TABLE_DELIMITER = "#"; 049 private static String PARTITION_DELIMITER = ";"; 050 051 public static String CONF_CACHE_NAME = PartitionDependencyManagerService.CONF_PREFIX + "cache.ehcache.name"; 052 053 private CacheManager cacheManager; 054 055 /** 056 * Map of server to EhCache which has key as db#table#pk1;pk2#val;val2 and value as WaitingActions (list of 057 * WaitingAction) which is Serializable (for overflowToDisk) 058 */ 059 private ConcurrentMap<String, Cache> missingDepsByServer; 060 061 private CacheConfiguration cacheConfig; 062 /** 063 * Map of server#db#table - sorted part key pattern - count of different partition values (count 064 * of elements in the cache) still missing for a partition key pattern. This count is used to 065 * quickly determine if there are any more missing dependencies for a table. When the count 066 * becomes 0, we unregister from notifications as there are no more missing dependencies for 067 * that table. 068 */ 069 private ConcurrentMap<String, ConcurrentMap<String, SettableInteger>> partKeyPatterns; 070 /** 071 * Map of actionIDs and collection of available URIs 072 */ 073 private ConcurrentMap<String, Collection<String>> availableDeps; 074 075 @Override 076 public void init(Configuration conf) { 077 String cacheName = conf.get(CONF_CACHE_NAME); 078 URL cacheConfigURL; 079 if (cacheName == null) { 080 cacheConfigURL = this.getClass().getClassLoader().getResource("ehcache-default.xml"); 081 cacheName = "dependency-default"; 082 } 083 else { 084 cacheConfigURL = this.getClass().getClassLoader().getResource("ehcache.xml"); 085 } 086 if (cacheConfigURL == null) { 087 throw new IllegalStateException("ehcache.xml is not found in classpath"); 088 } 089 cacheManager = CacheManager.newInstance(cacheConfigURL); 090 final Cache specifiedCache = cacheManager.getCache(cacheName); 091 if (specifiedCache == null) { 092 throw new IllegalStateException("Cache " + cacheName + " configured in " + CONF_CACHE_NAME 093 + " is not found"); 094 } 095 cacheConfig = specifiedCache.getCacheConfiguration(); 096 missingDepsByServer = new ConcurrentHashMap<String, Cache>(); 097 partKeyPatterns = new ConcurrentHashMap<String, ConcurrentMap<String, SettableInteger>>(); 098 availableDeps = new ConcurrentHashMap<String, Collection<String>>(); 099 } 100 101 @Override 102 public void addMissingDependency(HCatURI hcatURI, String actionID) { 103 104 // Create cache for the server if we don't have one 105 Cache missingCache = missingDepsByServer.get(hcatURI.getServer()); 106 if (missingCache == null) { 107 CacheConfiguration clonedConfig = cacheConfig.clone(); 108 clonedConfig.setName(hcatURI.getServer()); 109 missingCache = new Cache(clonedConfig); 110 Cache exists = missingDepsByServer.putIfAbsent(hcatURI.getServer(), missingCache); 111 if (exists == null) { 112 cacheManager.addCache(missingCache); 113 missingCache.getCacheEventNotificationService().registerListener(this); 114 } 115 else { 116 missingCache.dispose(); //discard 117 } 118 } 119 120 // Add hcat uri into the missingCache 121 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); 122 String partKeys = sortedPKV.getPartKeys(); 123 String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER 124 + partKeys + TABLE_DELIMITER + sortedPKV.getPartVals(); 125 boolean newlyAdded = true; 126 synchronized (missingCache) { 127 Element element = missingCache.get(missingKey); 128 if (element == null) { 129 WaitingActions waitingActions = new WaitingActions(); 130 element = new Element(missingKey, waitingActions); 131 Element exists = missingCache.putIfAbsent(element); 132 if (exists != null) { 133 newlyAdded = false; 134 waitingActions = (WaitingActions) exists.getObjectValue(); 135 } 136 waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString())); 137 } 138 else { 139 newlyAdded = false; 140 WaitingActions waitingActions = (WaitingActions) element.getObjectValue(); 141 waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString())); 142 } 143 } 144 145 // Increment count for the partition key pattern 146 if (newlyAdded) { 147 String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER 148 + hcatURI.getTable(); 149 synchronized (partKeyPatterns) { 150 ConcurrentMap<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey); 151 if (patternCounts == null) { 152 patternCounts = new ConcurrentHashMap<String, SettableInteger>(); 153 partKeyPatterns.put(tableKey, patternCounts); 154 } 155 SettableInteger count = patternCounts.get(partKeys); 156 if (count == null) { 157 patternCounts.put(partKeys, new SettableInteger(1)); 158 } 159 else { 160 count.increment(); 161 } 162 } 163 } 164 } 165 166 @Override 167 public boolean removeMissingDependency(HCatURI hcatURI, String actionID) { 168 169 Cache missingCache = missingDepsByServer.get(hcatURI.getServer()); 170 if (missingCache == null) { 171 LOG.warn("Remove missing dependency - Missing cache entry for server - uri={0}, actionID={1}", 172 hcatURI.toURIString(), actionID); 173 return false; 174 } 175 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); 176 String partKeys = sortedPKV.getPartKeys(); 177 String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER + 178 partKeys + TABLE_DELIMITER + sortedPKV.getPartVals(); 179 boolean decrement = false; 180 boolean removed = false; 181 synchronized (missingCache) { 182 Element element = missingCache.get(missingKey); 183 if (element == null) { 184 LOG.warn("Remove missing dependency - Missing cache entry - uri={0}, actionID={1}", 185 hcatURI.toURIString(), actionID); 186 return false; 187 } 188 Collection<WaitingAction> waitingActions = ((WaitingActions) element.getObjectValue()).getWaitingActions(); 189 removed = waitingActions.remove(new WaitingAction(actionID, hcatURI.toURIString())); 190 if (!removed) { 191 LOG.warn("Remove missing dependency - Missing action ID - uri={0}, actionID={1}", 192 hcatURI.toURIString(), actionID); 193 } 194 if (waitingActions.isEmpty()) { 195 missingCache.remove(missingKey); 196 decrement = true; 197 } 198 } 199 // Decrement partition key pattern count if the cache entry is removed 200 if (decrement) { 201 String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER 202 + hcatURI.getTable(); 203 decrementPartKeyPatternCount(tableKey, partKeys, hcatURI.toURIString()); 204 } 205 return removed; 206 } 207 208 @Override 209 public Collection<String> getWaitingActions(HCatURI hcatURI) { 210 Collection<String> actionIDs = null; 211 Cache missingCache = missingDepsByServer.get(hcatURI.getServer()); 212 if (missingCache != null) { 213 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); 214 String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER 215 + sortedPKV.getPartKeys() + TABLE_DELIMITER + sortedPKV.getPartVals(); 216 Element element = missingCache.get(missingKey); 217 if (element != null) { 218 WaitingActions waitingActions = (WaitingActions) element.getObjectValue(); 219 actionIDs = new ArrayList<String>(); 220 String uriString = hcatURI.getURI().toString(); 221 for (WaitingAction action : waitingActions.getWaitingActions()) { 222 if (action.getDependencyURI().equals(uriString)) { 223 actionIDs.add(action.getActionID()); 224 } 225 } 226 } 227 } 228 return actionIDs; 229 } 230 231 @Override 232 public Collection<String> markDependencyAvailable(String server, String db, String table, 233 Map<String, String> partitions) { 234 String tableKey = server + TABLE_DELIMITER + db + TABLE_DELIMITER + table; 235 synchronized (partKeyPatterns) { 236 Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey); 237 if (patternCounts == null) { 238 LOG.warn("Got partition available notification for " + tableKey 239 + ". Unexpected as no matching partition keys. Unregistering topic"); 240 unregisterFromNotifications(server, db, table); 241 return null; 242 } 243 Cache missingCache = missingDepsByServer.get(server); 244 if (missingCache == null) { 245 LOG.warn("Got partition available notification for " + tableKey 246 + ". Unexpected. Missing server entry in cache. Unregistering topic"); 247 partKeyPatterns.remove(tableKey); 248 unregisterFromNotifications(server, db, table); 249 return null; 250 } 251 Collection<String> actionsWithAvailDep = new HashSet<String>(); 252 StringBuilder partValSB = new StringBuilder(); 253 // If partition patterns are date, date;country and date;country;state, 254 // construct the partition values for each pattern and for the matching value in the 255 // missingCache, get the waiting actions and mark it as available. 256 for (Entry<String, SettableInteger> entry : patternCounts.entrySet()) { 257 String[] partKeys = entry.getKey().split(PARTITION_DELIMITER); 258 partValSB.setLength(0); 259 for (String key : partKeys) { 260 partValSB.append(partitions.get(key)).append(PARTITION_DELIMITER); 261 } 262 partValSB.setLength(partValSB.length() - 1); 263 String missingKey = db + TABLE_DELIMITER + table + TABLE_DELIMITER + entry.getKey() + TABLE_DELIMITER 264 + partValSB.toString(); 265 boolean removed = false; 266 Element element = null; 267 synchronized (missingCache) { 268 element = missingCache.get(missingKey); 269 if (element != null) { 270 missingCache.remove(missingKey); 271 removed = true; 272 } 273 } 274 if (removed) { 275 decrementPartKeyPatternCount(tableKey, entry.getKey(), server + TABLE_DELIMITER + missingKey); 276 // Add the removed entry to available dependencies 277 Collection<WaitingAction> wActions = ((WaitingActions) element.getObjectValue()) 278 .getWaitingActions(); 279 for (WaitingAction wAction : wActions) { 280 String actionID = wAction.getActionID(); 281 actionsWithAvailDep.add(actionID); 282 Collection<String> depURIs = availableDeps.get(actionID); 283 if (depURIs == null) { 284 depURIs = new ArrayList<String>(); 285 Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs); 286 if (existing != null) { 287 depURIs = existing; 288 } 289 } 290 synchronized (depURIs) { 291 depURIs.add(wAction.getDependencyURI()); 292 availableDeps.put(actionID, depURIs); 293 } 294 } 295 } 296 } 297 return actionsWithAvailDep; 298 } 299 } 300 301 @Override 302 public Collection<String> getAvailableDependencyURIs(String actionID) { 303 Collection<String> available = availableDeps.get(actionID); 304 if (available != null) { 305 // Return a copy 306 available = new ArrayList<String>(available); 307 } 308 return available; 309 } 310 311 @Override 312 public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) { 313 if (!availableDeps.containsKey(actionID)) { 314 return false; 315 } 316 else { 317 Collection<String> availList = availableDeps.get(actionID); 318 if (!availList.removeAll(dependencyURIs)) { 319 return false; 320 } 321 synchronized (availList) { 322 if (availList.isEmpty()) { 323 availableDeps.remove(actionID); 324 } 325 } 326 } 327 return true; 328 } 329 330 @Override 331 public void destroy() { 332 availableDeps.clear(); 333 cacheManager.shutdown(); 334 } 335 336 @Override 337 public Object clone() throws CloneNotSupportedException { 338 throw new CloneNotSupportedException(); 339 } 340 341 @Override 342 public void dispose() { 343 } 344 345 @Override 346 public void notifyElementExpired(Ehcache cache, Element element) { 347 // Invoked when timeToIdleSeconds or timeToLiveSeconds is met 348 String missingDepKey = (String) element.getObjectKey(); 349 LOG.info("Cache entry [{0}] of cache [{1}] expired", missingDepKey, cache.getName()); 350 onExpiryOrEviction(cache, element, missingDepKey); 351 } 352 353 @Override 354 public void notifyElementPut(Ehcache arg0, Element arg1) throws CacheException { 355 356 } 357 358 @Override 359 public void notifyElementRemoved(Ehcache arg0, Element arg1) throws CacheException { 360 } 361 362 @Override 363 public void notifyElementUpdated(Ehcache arg0, Element arg1) throws CacheException { 364 } 365 366 @Override 367 public void notifyRemoveAll(Ehcache arg0) { 368 } 369 370 @Override 371 public void notifyElementEvicted(Ehcache cache, Element element) { 372 // Invoked when maxElementsInMemory is met 373 String missingDepKey = (String) element.getObjectKey(); 374 LOG.info("Cache entry [{0}] of cache [{1}] evicted", missingDepKey, cache.getName()); 375 onExpiryOrEviction(cache, element, missingDepKey); 376 } 377 378 private void onExpiryOrEviction(Ehcache cache, Element element, String missingDepKey) { 379 int partValIndex = missingDepKey.lastIndexOf(TABLE_DELIMITER); 380 int partKeyIndex = missingDepKey.lastIndexOf(TABLE_DELIMITER, partValIndex - 1); 381 // server#db#table. Name of the cache is that of the server. 382 String tableKey = cache.getName() + TABLE_DELIMITER + missingDepKey.substring(0, partKeyIndex); 383 String partKeys = missingDepKey.substring(partKeyIndex + 1, partValIndex); 384 decrementPartKeyPatternCount(tableKey, partKeys, missingDepKey); 385 } 386 387 /** 388 * Decrement partition key pattern count, once a hcat URI is removed from the cache 389 * 390 * @param tableKey key identifying the table - server#db#table 391 * @param partKeys partition key pattern 392 * @param hcatURI URI with the partition key pattern 393 */ 394 private void decrementPartKeyPatternCount(String tableKey, String partKeys, String hcatURI) { 395 synchronized (partKeyPatterns) { 396 Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey); 397 if (patternCounts == null) { 398 LOG.warn("Removed dependency - Missing cache entry - uri={0}. " 399 + "But no corresponding pattern key table entry", hcatURI); 400 } 401 else { 402 SettableInteger count = patternCounts.get(partKeys); 403 if (count == null) { 404 LOG.warn("Removed dependency - Missing cache entry - uri={0}. " 405 + "But no corresponding pattern key entry", hcatURI); 406 } 407 else { 408 count.decrement(); 409 if (count.getValue() == 0) { 410 patternCounts.remove(partKeys); 411 } 412 if (patternCounts.isEmpty()) { 413 partKeyPatterns.remove(tableKey); 414 String[] tableDetails = tableKey.split(TABLE_DELIMITER); 415 unregisterFromNotifications(tableDetails[0], tableDetails[1], tableDetails[2]); 416 } 417 } 418 } 419 } 420 } 421 422 private void unregisterFromNotifications(String server, String db, String table) { 423 // Close JMS session. Stop listening on topic 424 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 425 hcatService.unregisterFromNotification(server, db, table); 426 } 427 428 private static class SortedPKV { 429 private StringBuilder partKeys; 430 private StringBuilder partVals; 431 432 public SortedPKV(Map<String, String> partitions) { 433 this.partKeys = new StringBuilder(); 434 this.partVals = new StringBuilder(); 435 ArrayList<String> keys = new ArrayList<String>(partitions.keySet()); 436 Collections.sort(keys); 437 for (String key : keys) { 438 this.partKeys.append(key).append(PARTITION_DELIMITER); 439 this.partVals.append(partitions.get(key)).append(PARTITION_DELIMITER); 440 } 441 this.partKeys.setLength(partKeys.length() - 1); 442 this.partVals.setLength(partVals.length() - 1); 443 } 444 445 public String getPartKeys() { 446 return partKeys.toString(); 447 } 448 449 public String getPartVals() { 450 return partVals.toString(); 451 } 452 453 } 454 455 private static class SettableInteger { 456 private int value; 457 458 public SettableInteger(int value) { 459 this.value = value; 460 } 461 462 public int getValue() { 463 return value; 464 } 465 466 public void increment() { 467 value++; 468 } 469 470 public void decrement() { 471 value--; 472 } 473 } 474 475 }