This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.dependency.hcat;
019
020 import java.net.URL;
021 import java.util.ArrayList;
022 import java.util.Collection;
023 import java.util.Collections;
024 import java.util.HashSet;
025 import java.util.Map;
026 import java.util.Map.Entry;
027 import java.util.concurrent.ConcurrentHashMap;
028 import java.util.concurrent.ConcurrentMap;
029
030 import net.sf.ehcache.Cache;
031 import net.sf.ehcache.CacheException;
032 import net.sf.ehcache.CacheManager;
033 import net.sf.ehcache.Ehcache;
034 import net.sf.ehcache.Element;
035 import net.sf.ehcache.config.CacheConfiguration;
036 import net.sf.ehcache.event.CacheEventListener;
037
038 import org.apache.hadoop.conf.Configuration;
039 import org.apache.oozie.service.HCatAccessorService;
040 import org.apache.oozie.service.PartitionDependencyManagerService;
041 import org.apache.oozie.service.Services;
042 import org.apache.oozie.util.HCatURI;
043 import org.apache.oozie.util.XLog;
044
045 public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEventListener {
046
047 private static XLog LOG = XLog.getLog(EhcacheHCatDependencyCache.class);
048 private static String TABLE_DELIMITER = "#";
049 private static String PARTITION_DELIMITER = ";";
050
051 public static String CONF_CACHE_NAME = PartitionDependencyManagerService.CONF_PREFIX + "cache.ehcache.name";
052
053 private CacheManager cacheManager;
054
055 /**
056 * Map of server to EhCache which has key as db#table#pk1;pk2#val;val2 and value as WaitingActions (list of
057 * WaitingAction) which is Serializable (for overflowToDisk)
058 */
059 private ConcurrentMap<String, Cache> missingDepsByServer;
060
061 private CacheConfiguration cacheConfig;
062 /**
063 * Map of server#db#table - sorted part key pattern - count of different partition values (count
064 * of elements in the cache) still missing for a partition key pattern. This count is used to
065 * quickly determine if there are any more missing dependencies for a table. When the count
066 * becomes 0, we unregister from notifications as there are no more missing dependencies for
067 * that table.
068 */
069 private ConcurrentMap<String, ConcurrentMap<String, SettableInteger>> partKeyPatterns;
070 /**
071 * Map of actionIDs and collection of available URIs
072 */
073 private ConcurrentMap<String, Collection<String>> availableDeps;
074
075 @Override
076 public void init(Configuration conf) {
077 String cacheName = conf.get(CONF_CACHE_NAME);
078 URL cacheConfigURL;
079 if (cacheName == null) {
080 cacheConfigURL = this.getClass().getClassLoader().getResource("ehcache-default.xml");
081 cacheName = "dependency-default";
082 }
083 else {
084 cacheConfigURL = this.getClass().getClassLoader().getResource("ehcache.xml");
085 }
086 if (cacheConfigURL == null) {
087 throw new IllegalStateException("ehcache.xml is not found in classpath");
088 }
089 cacheManager = CacheManager.newInstance(cacheConfigURL);
090 final Cache specifiedCache = cacheManager.getCache(cacheName);
091 if (specifiedCache == null) {
092 throw new IllegalStateException("Cache " + cacheName + " configured in " + CONF_CACHE_NAME
093 + " is not found");
094 }
095 cacheConfig = specifiedCache.getCacheConfiguration();
096 missingDepsByServer = new ConcurrentHashMap<String, Cache>();
097 partKeyPatterns = new ConcurrentHashMap<String, ConcurrentMap<String, SettableInteger>>();
098 availableDeps = new ConcurrentHashMap<String, Collection<String>>();
099 }
100
101 @Override
102 public void addMissingDependency(HCatURI hcatURI, String actionID) {
103
104 // Create cache for the server if we don't have one
105 Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
106 if (missingCache == null) {
107 CacheConfiguration clonedConfig = cacheConfig.clone();
108 clonedConfig.setName(hcatURI.getServer());
109 missingCache = new Cache(clonedConfig);
110 Cache exists = missingDepsByServer.putIfAbsent(hcatURI.getServer(), missingCache);
111 if (exists == null) {
112 cacheManager.addCache(missingCache);
113 missingCache.getCacheEventNotificationService().registerListener(this);
114 }
115 else {
116 missingCache.dispose(); //discard
117 }
118 }
119
120 // Add hcat uri into the missingCache
121 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
122 String partKeys = sortedPKV.getPartKeys();
123 String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER
124 + partKeys + TABLE_DELIMITER + sortedPKV.getPartVals();
125 boolean newlyAdded = true;
126 synchronized (missingCache) {
127 Element element = missingCache.get(missingKey);
128 if (element == null) {
129 WaitingActions waitingActions = new WaitingActions();
130 element = new Element(missingKey, waitingActions);
131 Element exists = missingCache.putIfAbsent(element);
132 if (exists != null) {
133 newlyAdded = false;
134 waitingActions = (WaitingActions) exists.getObjectValue();
135 }
136 waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
137 }
138 else {
139 newlyAdded = false;
140 WaitingActions waitingActions = (WaitingActions) element.getObjectValue();
141 waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
142 }
143 }
144
145 // Increment count for the partition key pattern
146 if (newlyAdded) {
147 String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
148 + hcatURI.getTable();
149 synchronized (partKeyPatterns) {
150 ConcurrentMap<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
151 if (patternCounts == null) {
152 patternCounts = new ConcurrentHashMap<String, SettableInteger>();
153 partKeyPatterns.put(tableKey, patternCounts);
154 }
155 SettableInteger count = patternCounts.get(partKeys);
156 if (count == null) {
157 patternCounts.put(partKeys, new SettableInteger(1));
158 }
159 else {
160 count.increment();
161 }
162 }
163 }
164 }
165
166 @Override
167 public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
168
169 Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
170 if (missingCache == null) {
171 LOG.warn("Remove missing dependency - Missing cache entry for server - uri={0}, actionID={1}",
172 hcatURI.toURIString(), actionID);
173 return false;
174 }
175 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
176 String partKeys = sortedPKV.getPartKeys();
177 String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER +
178 partKeys + TABLE_DELIMITER + sortedPKV.getPartVals();
179 boolean decrement = false;
180 boolean removed = false;
181 synchronized (missingCache) {
182 Element element = missingCache.get(missingKey);
183 if (element == null) {
184 LOG.warn("Remove missing dependency - Missing cache entry - uri={0}, actionID={1}",
185 hcatURI.toURIString(), actionID);
186 return false;
187 }
188 Collection<WaitingAction> waitingActions = ((WaitingActions) element.getObjectValue()).getWaitingActions();
189 removed = waitingActions.remove(new WaitingAction(actionID, hcatURI.toURIString()));
190 if (!removed) {
191 LOG.warn("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
192 hcatURI.toURIString(), actionID);
193 }
194 if (waitingActions.isEmpty()) {
195 missingCache.remove(missingKey);
196 decrement = true;
197 }
198 }
199 // Decrement partition key pattern count if the cache entry is removed
200 if (decrement) {
201 String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
202 + hcatURI.getTable();
203 decrementPartKeyPatternCount(tableKey, partKeys, hcatURI.toURIString());
204 }
205 return removed;
206 }
207
208 @Override
209 public Collection<String> getWaitingActions(HCatURI hcatURI) {
210 Collection<String> actionIDs = null;
211 Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
212 if (missingCache != null) {
213 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
214 String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER
215 + sortedPKV.getPartKeys() + TABLE_DELIMITER + sortedPKV.getPartVals();
216 Element element = missingCache.get(missingKey);
217 if (element != null) {
218 WaitingActions waitingActions = (WaitingActions) element.getObjectValue();
219 actionIDs = new ArrayList<String>();
220 String uriString = hcatURI.getURI().toString();
221 for (WaitingAction action : waitingActions.getWaitingActions()) {
222 if (action.getDependencyURI().equals(uriString)) {
223 actionIDs.add(action.getActionID());
224 }
225 }
226 }
227 }
228 return actionIDs;
229 }
230
231 @Override
232 public Collection<String> markDependencyAvailable(String server, String db, String table,
233 Map<String, String> partitions) {
234 String tableKey = server + TABLE_DELIMITER + db + TABLE_DELIMITER + table;
235 synchronized (partKeyPatterns) {
236 Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
237 if (patternCounts == null) {
238 LOG.warn("Got partition available notification for " + tableKey
239 + ". Unexpected as no matching partition keys. Unregistering topic");
240 unregisterFromNotifications(server, db, table);
241 return null;
242 }
243 Cache missingCache = missingDepsByServer.get(server);
244 if (missingCache == null) {
245 LOG.warn("Got partition available notification for " + tableKey
246 + ". Unexpected. Missing server entry in cache. Unregistering topic");
247 partKeyPatterns.remove(tableKey);
248 unregisterFromNotifications(server, db, table);
249 return null;
250 }
251 Collection<String> actionsWithAvailDep = new HashSet<String>();
252 StringBuilder partValSB = new StringBuilder();
253 // If partition patterns are date, date;country and date;country;state,
254 // construct the partition values for each pattern and for the matching value in the
255 // missingCache, get the waiting actions and mark it as available.
256 for (Entry<String, SettableInteger> entry : patternCounts.entrySet()) {
257 String[] partKeys = entry.getKey().split(PARTITION_DELIMITER);
258 partValSB.setLength(0);
259 for (String key : partKeys) {
260 partValSB.append(partitions.get(key)).append(PARTITION_DELIMITER);
261 }
262 partValSB.setLength(partValSB.length() - 1);
263 String missingKey = db + TABLE_DELIMITER + table + TABLE_DELIMITER + entry.getKey() + TABLE_DELIMITER
264 + partValSB.toString();
265 boolean removed = false;
266 Element element = null;
267 synchronized (missingCache) {
268 element = missingCache.get(missingKey);
269 if (element != null) {
270 missingCache.remove(missingKey);
271 removed = true;
272 }
273 }
274 if (removed) {
275 decrementPartKeyPatternCount(tableKey, entry.getKey(), server + TABLE_DELIMITER + missingKey);
276 // Add the removed entry to available dependencies
277 Collection<WaitingAction> wActions = ((WaitingActions) element.getObjectValue())
278 .getWaitingActions();
279 for (WaitingAction wAction : wActions) {
280 String actionID = wAction.getActionID();
281 actionsWithAvailDep.add(actionID);
282 Collection<String> depURIs = availableDeps.get(actionID);
283 if (depURIs == null) {
284 depURIs = new ArrayList<String>();
285 Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs);
286 if (existing != null) {
287 depURIs = existing;
288 }
289 }
290 synchronized (depURIs) {
291 depURIs.add(wAction.getDependencyURI());
292 availableDeps.put(actionID, depURIs);
293 }
294 }
295 }
296 }
297 return actionsWithAvailDep;
298 }
299 }
300
301 @Override
302 public Collection<String> getAvailableDependencyURIs(String actionID) {
303 Collection<String> available = availableDeps.get(actionID);
304 if (available != null) {
305 // Return a copy
306 available = new ArrayList<String>(available);
307 }
308 return available;
309 }
310
311 @Override
312 public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) {
313 if (!availableDeps.containsKey(actionID)) {
314 return false;
315 }
316 else {
317 Collection<String> availList = availableDeps.get(actionID);
318 if (!availList.removeAll(dependencyURIs)) {
319 return false;
320 }
321 synchronized (availList) {
322 if (availList.isEmpty()) {
323 availableDeps.remove(actionID);
324 }
325 }
326 }
327 return true;
328 }
329
330 @Override
331 public void destroy() {
332 availableDeps.clear();
333 cacheManager.shutdown();
334 }
335
336 @Override
337 public Object clone() throws CloneNotSupportedException {
338 throw new CloneNotSupportedException();
339 }
340
341 @Override
342 public void dispose() {
343 }
344
345 @Override
346 public void notifyElementExpired(Ehcache cache, Element element) {
347 // Invoked when timeToIdleSeconds or timeToLiveSeconds is met
348 String missingDepKey = (String) element.getObjectKey();
349 LOG.info("Cache entry [{0}] of cache [{1}] expired", missingDepKey, cache.getName());
350 onExpiryOrEviction(cache, element, missingDepKey);
351 }
352
353 @Override
354 public void notifyElementPut(Ehcache arg0, Element arg1) throws CacheException {
355
356 }
357
358 @Override
359 public void notifyElementRemoved(Ehcache arg0, Element arg1) throws CacheException {
360 }
361
362 @Override
363 public void notifyElementUpdated(Ehcache arg0, Element arg1) throws CacheException {
364 }
365
366 @Override
367 public void notifyRemoveAll(Ehcache arg0) {
368 }
369
370 @Override
371 public void notifyElementEvicted(Ehcache cache, Element element) {
372 // Invoked when maxElementsInMemory is met
373 String missingDepKey = (String) element.getObjectKey();
374 LOG.info("Cache entry [{0}] of cache [{1}] evicted", missingDepKey, cache.getName());
375 onExpiryOrEviction(cache, element, missingDepKey);
376 }
377
378 private void onExpiryOrEviction(Ehcache cache, Element element, String missingDepKey) {
379 int partValIndex = missingDepKey.lastIndexOf(TABLE_DELIMITER);
380 int partKeyIndex = missingDepKey.lastIndexOf(TABLE_DELIMITER, partValIndex - 1);
381 // server#db#table. Name of the cache is that of the server.
382 String tableKey = cache.getName() + TABLE_DELIMITER + missingDepKey.substring(0, partKeyIndex);
383 String partKeys = missingDepKey.substring(partKeyIndex + 1, partValIndex);
384 decrementPartKeyPatternCount(tableKey, partKeys, missingDepKey);
385 }
386
387 /**
388 * Decrement partition key pattern count, once a hcat URI is removed from the cache
389 *
390 * @param tableKey key identifying the table - server#db#table
391 * @param partKeys partition key pattern
392 * @param hcatURI URI with the partition key pattern
393 */
394 private void decrementPartKeyPatternCount(String tableKey, String partKeys, String hcatURI) {
395 synchronized (partKeyPatterns) {
396 Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
397 if (patternCounts == null) {
398 LOG.warn("Removed dependency - Missing cache entry - uri={0}. "
399 + "But no corresponding pattern key table entry", hcatURI);
400 }
401 else {
402 SettableInteger count = patternCounts.get(partKeys);
403 if (count == null) {
404 LOG.warn("Removed dependency - Missing cache entry - uri={0}. "
405 + "But no corresponding pattern key entry", hcatURI);
406 }
407 else {
408 count.decrement();
409 if (count.getValue() == 0) {
410 patternCounts.remove(partKeys);
411 }
412 if (patternCounts.isEmpty()) {
413 partKeyPatterns.remove(tableKey);
414 String[] tableDetails = tableKey.split(TABLE_DELIMITER);
415 unregisterFromNotifications(tableDetails[0], tableDetails[1], tableDetails[2]);
416 }
417 }
418 }
419 }
420 }
421
422 private void unregisterFromNotifications(String server, String db, String table) {
423 // Close JMS session. Stop listening on topic
424 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
425 hcatService.unregisterFromNotification(server, db, table);
426 }
427
428 private static class SortedPKV {
429 private StringBuilder partKeys;
430 private StringBuilder partVals;
431
432 public SortedPKV(Map<String, String> partitions) {
433 this.partKeys = new StringBuilder();
434 this.partVals = new StringBuilder();
435 ArrayList<String> keys = new ArrayList<String>(partitions.keySet());
436 Collections.sort(keys);
437 for (String key : keys) {
438 this.partKeys.append(key).append(PARTITION_DELIMITER);
439 this.partVals.append(partitions.get(key)).append(PARTITION_DELIMITER);
440 }
441 this.partKeys.setLength(partKeys.length() - 1);
442 this.partVals.setLength(partVals.length() - 1);
443 }
444
445 public String getPartKeys() {
446 return partKeys.toString();
447 }
448
449 public String getPartVals() {
450 return partVals.toString();
451 }
452
453 }
454
455 private static class SettableInteger {
456 private int value;
457
458 public SettableInteger(int value) {
459 this.value = value;
460 }
461
462 public int getValue() {
463 return value;
464 }
465
466 public void increment() {
467 value++;
468 }
469
470 public void decrement() {
471 value--;
472 }
473 }
474
475 }