001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.dependency.hcat;
020
021import java.net.URISyntaxException;
022import java.net.URL;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.Iterator;
028import java.util.Map;
029import java.util.Map.Entry;
030import java.util.Set;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.ConcurrentMap;
033
034import net.sf.ehcache.Cache;
035import net.sf.ehcache.CacheException;
036import net.sf.ehcache.CacheManager;
037import net.sf.ehcache.Ehcache;
038import net.sf.ehcache.Element;
039import net.sf.ehcache.config.CacheConfiguration;
040import net.sf.ehcache.event.CacheEventListener;
041
042import org.apache.hadoop.conf.Configuration;
043import org.apache.oozie.service.HCatAccessorService;
044import org.apache.oozie.service.PartitionDependencyManagerService;
045import org.apache.oozie.service.Services;
046import org.apache.oozie.util.HCatURI;
047import org.apache.oozie.util.XLog;
048
049public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEventListener {
050
051    private static XLog LOG = XLog.getLog(EhcacheHCatDependencyCache.class);
052    private static String TABLE_DELIMITER = "#";
053    private static String PARTITION_DELIMITER = ";";
054
055    public static String CONF_CACHE_NAME = PartitionDependencyManagerService.CONF_PREFIX + "cache.ehcache.name";
056
057    private CacheManager cacheManager;
058
059    /**
060     * Map of server to EhCache which has key as db#table#pk1;pk2#val;val2 and value as WaitingActions (list of
061     * WaitingAction) which is Serializable (for overflowToDisk)
062     */
063    private ConcurrentMap<String, Cache> missingDepsByServer;
064
065    private CacheConfiguration cacheConfig;
066    /**
067     * Map of server#db#table - sorted part key pattern - count of different partition values (count
068     * of elements in the cache) still missing for a partition key pattern. This count is used to
069     * quickly determine if there are any more missing dependencies for a table. When the count
070     * becomes 0, we unregister from notifications as there are no more missing dependencies for
071     * that table.
072     */
073    private ConcurrentMap<String, ConcurrentMap<String, SettableInteger>> partKeyPatterns;
074    /**
075     * Map of actionIDs and collection of available URIs
076     */
077    private ConcurrentMap<String, Collection<String>> availableDeps;
078
079    @Override
080    public void init(Configuration conf) {
081        String cacheName = conf.get(CONF_CACHE_NAME);
082        URL cacheConfigURL;
083        if (cacheName == null) {
084            cacheConfigURL = this.getClass().getClassLoader().getResource("ehcache-default.xml");
085            cacheName = "dependency-default";
086        }
087        else {
088            cacheConfigURL = this.getClass().getClassLoader().getResource("ehcache.xml");
089        }
090        if (cacheConfigURL == null) {
091            throw new IllegalStateException("ehcache.xml is not found in classpath");
092        }
093        cacheManager = CacheManager.newInstance(cacheConfigURL);
094        final Cache specifiedCache = cacheManager.getCache(cacheName);
095        if (specifiedCache == null) {
096            throw new IllegalStateException("Cache " + cacheName + " configured in " + CONF_CACHE_NAME
097                    + " is not found");
098        }
099        cacheConfig = specifiedCache.getCacheConfiguration();
100        missingDepsByServer = new ConcurrentHashMap<String, Cache>();
101        partKeyPatterns = new ConcurrentHashMap<String, ConcurrentMap<String, SettableInteger>>();
102        availableDeps = new ConcurrentHashMap<String, Collection<String>>();
103    }
104
105    @Override
106    public void addMissingDependency(HCatURI hcatURI, String actionID) {
107
108        // Create cache for the server if we don't have one
109        Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
110        if (missingCache == null) {
111            CacheConfiguration clonedConfig = cacheConfig.clone();
112            clonedConfig.setName(hcatURI.getServer());
113            missingCache = new Cache(clonedConfig);
114            Cache exists = missingDepsByServer.putIfAbsent(hcatURI.getServer(), missingCache);
115            if (exists == null) {
116                cacheManager.addCache(missingCache);
117                missingCache.getCacheEventNotificationService().registerListener(this);
118            }
119            else {
120                missingCache.dispose(); //discard
121            }
122        }
123
124        // Add hcat uri into the missingCache
125        SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
126        String partKeys = sortedPKV.getPartKeys();
127        String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER
128                + partKeys + TABLE_DELIMITER + sortedPKV.getPartVals();
129        boolean newlyAdded = true;
130        synchronized (missingCache) {
131            Element element = missingCache.get(missingKey);
132            if (element == null) {
133                WaitingActions waitingActions = new WaitingActions();
134                element = new Element(missingKey, waitingActions);
135                Element exists = missingCache.putIfAbsent(element);
136                if (exists != null) {
137                    newlyAdded = false;
138                    waitingActions = (WaitingActions) exists.getObjectValue();
139                }
140                waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
141            }
142            else {
143                newlyAdded = false;
144                WaitingActions waitingActions = (WaitingActions) element.getObjectValue();
145                waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
146            }
147        }
148
149        // Increment count for the partition key pattern
150        if (newlyAdded) {
151            String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
152                    + hcatURI.getTable();
153            synchronized (partKeyPatterns) {
154                ConcurrentMap<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
155                if (patternCounts == null) {
156                    patternCounts = new ConcurrentHashMap<String, SettableInteger>();
157                    partKeyPatterns.put(tableKey, patternCounts);
158                }
159                SettableInteger count = patternCounts.get(partKeys);
160                if (count == null) {
161                    patternCounts.put(partKeys, new SettableInteger(1));
162                }
163                else {
164                    count.increment();
165                }
166            }
167        }
168    }
169
170    @Override
171    public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
172
173        Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
174        if (missingCache == null) {
175            LOG.warn("Remove missing dependency - Missing cache entry for server - uri={0}, actionID={1}",
176                    hcatURI.toURIString(), actionID);
177            return false;
178        }
179        SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
180        String partKeys = sortedPKV.getPartKeys();
181        String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER +
182                partKeys + TABLE_DELIMITER + sortedPKV.getPartVals();
183        boolean decrement = false;
184        boolean removed = false;
185        synchronized (missingCache) {
186            Element element = missingCache.get(missingKey);
187            if (element == null) {
188                LOG.warn("Remove missing dependency - Missing cache entry - uri={0}, actionID={1}",
189                        hcatURI.toURIString(), actionID);
190                return false;
191            }
192            Collection<WaitingAction> waitingActions = ((WaitingActions) element.getObjectValue()).getWaitingActions();
193            removed = waitingActions.remove(new WaitingAction(actionID, hcatURI.toURIString()));
194            if (!removed) {
195                LOG.warn("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
196                        hcatURI.toURIString(), actionID);
197            }
198            if (waitingActions.isEmpty()) {
199                missingCache.remove(missingKey);
200                decrement = true;
201            }
202        }
203        // Decrement partition key pattern count if the cache entry is removed
204        if (decrement) {
205            String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
206                    + hcatURI.getTable();
207            decrementPartKeyPatternCount(tableKey, partKeys, hcatURI.toURIString());
208        }
209        return removed;
210    }
211
212    @Override
213    public Collection<String> getWaitingActions(HCatURI hcatURI) {
214        Collection<String> actionIDs = null;
215        Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
216        if (missingCache != null) {
217            SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
218            String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER
219                    + sortedPKV.getPartKeys() + TABLE_DELIMITER + sortedPKV.getPartVals();
220            Element element = missingCache.get(missingKey);
221            if (element != null) {
222                WaitingActions waitingActions = (WaitingActions) element.getObjectValue();
223                actionIDs = new ArrayList<String>();
224                String uriString = hcatURI.getURI().toString();
225                for (WaitingAction action : waitingActions.getWaitingActions()) {
226                    if (action.getDependencyURI().equals(uriString)) {
227                        actionIDs.add(action.getActionID());
228                    }
229                }
230            }
231        }
232        return actionIDs;
233    }
234
235    @Override
236    public Collection<String> markDependencyAvailable(String server, String db, String table,
237            Map<String, String> partitions) {
238        String tableKey = server + TABLE_DELIMITER + db + TABLE_DELIMITER + table;
239        synchronized (partKeyPatterns) {
240            Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
241            if (patternCounts == null) {
242                LOG.warn("Got partition available notification for " + tableKey
243                        + ". Unexpected as no matching partition keys. Unregistering topic");
244                unregisterFromNotifications(server, db, table);
245                return null;
246            }
247            Cache missingCache = missingDepsByServer.get(server);
248            if (missingCache == null) {
249                LOG.warn("Got partition available notification for " + tableKey
250                        + ". Unexpected. Missing server entry in cache. Unregistering topic");
251                partKeyPatterns.remove(tableKey);
252                unregisterFromNotifications(server, db, table);
253                return null;
254            }
255            Collection<String> actionsWithAvailDep = new HashSet<String>();
256            StringBuilder partValSB = new StringBuilder();
257            // If partition patterns are date, date;country and date;country;state,
258            // construct the partition values for each pattern and for the matching value in the
259            // missingCache, get the waiting actions and mark it as available.
260            for (Entry<String, SettableInteger> entry : patternCounts.entrySet()) {
261                String[] partKeys = entry.getKey().split(PARTITION_DELIMITER);
262                partValSB.setLength(0);
263                for (String key : partKeys) {
264                    partValSB.append(partitions.get(key)).append(PARTITION_DELIMITER);
265                }
266                partValSB.setLength(partValSB.length() - 1);
267                String missingKey = db + TABLE_DELIMITER + table + TABLE_DELIMITER + entry.getKey() + TABLE_DELIMITER
268                        + partValSB.toString();
269                boolean removed = false;
270                Element element = null;
271                synchronized (missingCache) {
272                    element = missingCache.get(missingKey);
273                    if (element != null) {
274                        missingCache.remove(missingKey);
275                        removed = true;
276                    }
277                }
278                if (removed) {
279                    decrementPartKeyPatternCount(tableKey, entry.getKey(), server + TABLE_DELIMITER + missingKey);
280                    // Add the removed entry to available dependencies
281                    Collection<WaitingAction> wActions = ((WaitingActions) element.getObjectValue())
282                            .getWaitingActions();
283                    for (WaitingAction wAction : wActions) {
284                        String actionID = wAction.getActionID();
285                        actionsWithAvailDep.add(actionID);
286                        Collection<String> depURIs = availableDeps.get(actionID);
287                        if (depURIs == null) {
288                            depURIs = new ArrayList<String>();
289                            Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs);
290                            if (existing != null) {
291                                depURIs = existing;
292                            }
293                        }
294                        synchronized (depURIs) {
295                            depURIs.add(wAction.getDependencyURI());
296                            availableDeps.put(actionID, depURIs);
297                        }
298                    }
299                }
300            }
301            return actionsWithAvailDep;
302        }
303    }
304
305    @Override
306    public Collection<String> getAvailableDependencyURIs(String actionID) {
307        Collection<String> available = availableDeps.get(actionID);
308        if (available !=  null) {
309            // Return a copy
310            available = new ArrayList<String>(available);
311        }
312        return available;
313    }
314
315    @Override
316    public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) {
317        if (!availableDeps.containsKey(actionID)) {
318            return false;
319        }
320        else {
321            Collection<String> availList = availableDeps.get(actionID);
322            if (!availList.removeAll(dependencyURIs)) {
323                return false;
324            }
325            synchronized (availList) {
326                if (availList.isEmpty()) {
327                    availableDeps.remove(actionID);
328                }
329            }
330        }
331        return true;
332    }
333
334    @Override
335    public void destroy() {
336        availableDeps.clear();
337        cacheManager.shutdown();
338    }
339
340    @Override
341    public Object clone() throws CloneNotSupportedException {
342        throw new CloneNotSupportedException();
343    }
344
345    @Override
346    public void dispose() {
347    }
348
349    @Override
350    public void notifyElementExpired(Ehcache cache, Element element) {
351        // Invoked when timeToIdleSeconds or timeToLiveSeconds is met
352        String missingDepKey = (String) element.getObjectKey();
353        LOG.info("Cache entry [{0}] of cache [{1}] expired", missingDepKey, cache.getName());
354        onExpiryOrEviction(cache, element, missingDepKey);
355    }
356
357    @Override
358    public void notifyElementPut(Ehcache arg0, Element arg1) throws CacheException {
359
360    }
361
362    @Override
363    public void notifyElementRemoved(Ehcache arg0, Element arg1) throws CacheException {
364    }
365
366    @Override
367    public void notifyElementUpdated(Ehcache arg0, Element arg1) throws CacheException {
368    }
369
370    @Override
371    public void notifyRemoveAll(Ehcache arg0) {
372    }
373
374    @Override
375    public void notifyElementEvicted(Ehcache cache, Element element) {
376        // Invoked when maxElementsInMemory is met
377        String missingDepKey = (String) element.getObjectKey();
378        LOG.info("Cache entry [{0}] of cache [{1}] evicted", missingDepKey, cache.getName());
379        onExpiryOrEviction(cache, element, missingDepKey);
380    }
381
382    private void onExpiryOrEviction(Ehcache cache, Element element, String missingDepKey) {
383        int partValIndex = missingDepKey.lastIndexOf(TABLE_DELIMITER);
384        int partKeyIndex = missingDepKey.lastIndexOf(TABLE_DELIMITER, partValIndex - 1);
385        // server#db#table. Name of the cache is that of the server.
386        String tableKey = cache.getName() + TABLE_DELIMITER + missingDepKey.substring(0, partKeyIndex);
387        String partKeys = missingDepKey.substring(partKeyIndex + 1, partValIndex);
388        decrementPartKeyPatternCount(tableKey, partKeys, missingDepKey);
389    }
390
391    /**
392     * Decrement partition key pattern count, once a hcat URI is removed from the cache
393     *
394     * @param tableKey key identifying the table - server#db#table
395     * @param partKeys partition key pattern
396     * @param hcatURI URI with the partition key pattern
397     */
398    private void decrementPartKeyPatternCount(String tableKey, String partKeys, String hcatURI) {
399        synchronized (partKeyPatterns) {
400            Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
401            if (patternCounts == null) {
402                LOG.warn("Removed dependency - Missing cache entry - uri={0}. "
403                        + "But no corresponding pattern key table entry", hcatURI);
404            }
405            else {
406                SettableInteger count = patternCounts.get(partKeys);
407                if (count == null) {
408                    LOG.warn("Removed dependency - Missing cache entry - uri={0}. "
409                            + "But no corresponding pattern key entry", hcatURI);
410                }
411                else {
412                    count.decrement();
413                    if (count.getValue() == 0) {
414                        patternCounts.remove(partKeys);
415                    }
416                    if (patternCounts.isEmpty()) {
417                        partKeyPatterns.remove(tableKey);
418                        String[] tableDetails = tableKey.split(TABLE_DELIMITER);
419                        unregisterFromNotifications(tableDetails[0], tableDetails[1], tableDetails[2]);
420                    }
421                }
422            }
423        }
424    }
425
426    private void unregisterFromNotifications(String server, String db, String table) {
427        // Close JMS session. Stop listening on topic
428        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
429        hcatService.unregisterFromNotification(server, db, table);
430    }
431
432    private static class SortedPKV {
433        private StringBuilder partKeys;
434        private StringBuilder partVals;
435
436        public SortedPKV(Map<String, String> partitions) {
437            this.partKeys = new StringBuilder();
438            this.partVals = new StringBuilder();
439            ArrayList<String> keys = new ArrayList<String>(partitions.keySet());
440            Collections.sort(keys);
441            for (String key : keys) {
442                this.partKeys.append(key).append(PARTITION_DELIMITER);
443                this.partVals.append(partitions.get(key)).append(PARTITION_DELIMITER);
444            }
445            this.partKeys.setLength(partKeys.length() - 1);
446            this.partVals.setLength(partVals.length() - 1);
447        }
448
449        public String getPartKeys() {
450            return partKeys.toString();
451        }
452
453        public String getPartVals() {
454            return partVals.toString();
455        }
456
457    }
458
459    private static class SettableInteger {
460        private int value;
461
462        public SettableInteger(int value) {
463            this.value = value;
464        }
465
466        public int getValue() {
467            return value;
468        }
469
470        public void increment() {
471            value++;
472        }
473
474        public void decrement() {
475            value--;
476        }
477    }
478
479    @Override
480    public void removeNonWaitingCoordActions(Set<String> staleActions) {
481        Iterator<String> serverItr = missingDepsByServer.keySet().iterator();
482        while (serverItr.hasNext()) {
483            String server = serverItr.next();
484            Cache missingCache = missingDepsByServer.get(server);
485            if (missingCache == null) {
486                continue;
487            }
488            synchronized (missingCache) {
489                for (Object key : missingCache.getKeys()) {
490                    Element element = missingCache.get(key);
491                    if (element == null) {
492                        continue;
493                    }
494                    Collection<WaitingAction> waitingActions = ((WaitingActions) element.getObjectValue())
495                            .getWaitingActions();
496                    Iterator<WaitingAction> wactionItr = waitingActions.iterator();
497                    HCatURI hcatURI = null;
498                    while(wactionItr.hasNext()) {
499                        WaitingAction waction = wactionItr.next();
500                        if(staleActions.contains(waction.getActionID())) {
501                            try {
502                                hcatURI = new HCatURI(waction.getDependencyURI());
503                                wactionItr.remove();
504                            }
505                            catch (URISyntaxException e) {
506                                continue;
507                            }
508                        }
509                    }
510                    if (waitingActions.isEmpty() && hcatURI != null) {
511                        missingCache.remove(key);
512                        // Decrement partition key pattern count if the cache entry is removed
513                        SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
514                        String partKeys = sortedPKV.getPartKeys();
515                        String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
516                                + hcatURI.getTable();
517                        String hcatURIStr = hcatURI.toURIString();
518                        decrementPartKeyPatternCount(tableKey, partKeys, hcatURIStr);
519                    }
520                }
521            }
522        }
523    }
524
525    @Override
526    public void removeCoordActionWithDependenciesAvailable(String coordAction) {
527        // to be implemented when reverse-lookup data structure for purging is added
528    }
529
530}