001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.dependency.hcat;
019
020import java.net.URISyntaxException;
021import java.net.URL;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.Map;
028import java.util.Map.Entry;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.ConcurrentMap;
032
033import net.sf.ehcache.Cache;
034import net.sf.ehcache.CacheException;
035import net.sf.ehcache.CacheManager;
036import net.sf.ehcache.Ehcache;
037import net.sf.ehcache.Element;
038import net.sf.ehcache.config.CacheConfiguration;
039import net.sf.ehcache.event.CacheEventListener;
040
041import org.apache.hadoop.conf.Configuration;
042import org.apache.oozie.service.HCatAccessorService;
043import org.apache.oozie.service.PartitionDependencyManagerService;
044import org.apache.oozie.service.Services;
045import org.apache.oozie.util.HCatURI;
046import org.apache.oozie.util.XLog;
047
048public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEventListener {
049
050    private static XLog LOG = XLog.getLog(EhcacheHCatDependencyCache.class);
051    private static String TABLE_DELIMITER = "#";
052    private static String PARTITION_DELIMITER = ";";
053
054    public static String CONF_CACHE_NAME = PartitionDependencyManagerService.CONF_PREFIX + "cache.ehcache.name";
055
056    private CacheManager cacheManager;
057
058    /**
059     * Map of server to EhCache which has key as db#table#pk1;pk2#val;val2 and value as WaitingActions (list of
060     * WaitingAction) which is Serializable (for overflowToDisk)
061     */
062    private ConcurrentMap<String, Cache> missingDepsByServer;
063
064    private CacheConfiguration cacheConfig;
065    /**
066     * Map of server#db#table - sorted part key pattern - count of different partition values (count
067     * of elements in the cache) still missing for a partition key pattern. This count is used to
068     * quickly determine if there are any more missing dependencies for a table. When the count
069     * becomes 0, we unregister from notifications as there are no more missing dependencies for
070     * that table.
071     */
072    private ConcurrentMap<String, ConcurrentMap<String, SettableInteger>> partKeyPatterns;
073    /**
074     * Map of actionIDs and collection of available URIs
075     */
076    private ConcurrentMap<String, Collection<String>> availableDeps;
077
078    @Override
079    public void init(Configuration conf) {
080        String cacheName = conf.get(CONF_CACHE_NAME);
081        URL cacheConfigURL;
082        if (cacheName == null) {
083            cacheConfigURL = this.getClass().getClassLoader().getResource("ehcache-default.xml");
084            cacheName = "dependency-default";
085        }
086        else {
087            cacheConfigURL = this.getClass().getClassLoader().getResource("ehcache.xml");
088        }
089        if (cacheConfigURL == null) {
090            throw new IllegalStateException("ehcache.xml is not found in classpath");
091        }
092        cacheManager = CacheManager.newInstance(cacheConfigURL);
093        final Cache specifiedCache = cacheManager.getCache(cacheName);
094        if (specifiedCache == null) {
095            throw new IllegalStateException("Cache " + cacheName + " configured in " + CONF_CACHE_NAME
096                    + " is not found");
097        }
098        cacheConfig = specifiedCache.getCacheConfiguration();
099        missingDepsByServer = new ConcurrentHashMap<String, Cache>();
100        partKeyPatterns = new ConcurrentHashMap<String, ConcurrentMap<String, SettableInteger>>();
101        availableDeps = new ConcurrentHashMap<String, Collection<String>>();
102    }
103
104    @Override
105    public void addMissingDependency(HCatURI hcatURI, String actionID) {
106
107        // Create cache for the server if we don't have one
108        Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
109        if (missingCache == null) {
110            CacheConfiguration clonedConfig = cacheConfig.clone();
111            clonedConfig.setName(hcatURI.getServer());
112            missingCache = new Cache(clonedConfig);
113            Cache exists = missingDepsByServer.putIfAbsent(hcatURI.getServer(), missingCache);
114            if (exists == null) {
115                cacheManager.addCache(missingCache);
116                missingCache.getCacheEventNotificationService().registerListener(this);
117            }
118            else {
119                missingCache.dispose(); //discard
120            }
121        }
122
123        // Add hcat uri into the missingCache
124        SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
125        String partKeys = sortedPKV.getPartKeys();
126        String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER
127                + partKeys + TABLE_DELIMITER + sortedPKV.getPartVals();
128        boolean newlyAdded = true;
129        synchronized (missingCache) {
130            Element element = missingCache.get(missingKey);
131            if (element == null) {
132                WaitingActions waitingActions = new WaitingActions();
133                element = new Element(missingKey, waitingActions);
134                Element exists = missingCache.putIfAbsent(element);
135                if (exists != null) {
136                    newlyAdded = false;
137                    waitingActions = (WaitingActions) exists.getObjectValue();
138                }
139                waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
140            }
141            else {
142                newlyAdded = false;
143                WaitingActions waitingActions = (WaitingActions) element.getObjectValue();
144                waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
145            }
146        }
147
148        // Increment count for the partition key pattern
149        if (newlyAdded) {
150            String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
151                    + hcatURI.getTable();
152            synchronized (partKeyPatterns) {
153                ConcurrentMap<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
154                if (patternCounts == null) {
155                    patternCounts = new ConcurrentHashMap<String, SettableInteger>();
156                    partKeyPatterns.put(tableKey, patternCounts);
157                }
158                SettableInteger count = patternCounts.get(partKeys);
159                if (count == null) {
160                    patternCounts.put(partKeys, new SettableInteger(1));
161                }
162                else {
163                    count.increment();
164                }
165            }
166        }
167    }
168
169    @Override
170    public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
171
172        Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
173        if (missingCache == null) {
174            LOG.warn("Remove missing dependency - Missing cache entry for server - uri={0}, actionID={1}",
175                    hcatURI.toURIString(), actionID);
176            return false;
177        }
178        SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
179        String partKeys = sortedPKV.getPartKeys();
180        String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER +
181                partKeys + TABLE_DELIMITER + sortedPKV.getPartVals();
182        boolean decrement = false;
183        boolean removed = false;
184        synchronized (missingCache) {
185            Element element = missingCache.get(missingKey);
186            if (element == null) {
187                LOG.warn("Remove missing dependency - Missing cache entry - uri={0}, actionID={1}",
188                        hcatURI.toURIString(), actionID);
189                return false;
190            }
191            Collection<WaitingAction> waitingActions = ((WaitingActions) element.getObjectValue()).getWaitingActions();
192            removed = waitingActions.remove(new WaitingAction(actionID, hcatURI.toURIString()));
193            if (!removed) {
194                LOG.warn("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
195                        hcatURI.toURIString(), actionID);
196            }
197            if (waitingActions.isEmpty()) {
198                missingCache.remove(missingKey);
199                decrement = true;
200            }
201        }
202        // Decrement partition key pattern count if the cache entry is removed
203        if (decrement) {
204            String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
205                    + hcatURI.getTable();
206            decrementPartKeyPatternCount(tableKey, partKeys, hcatURI.toURIString());
207        }
208        return removed;
209    }
210
211    @Override
212    public Collection<String> getWaitingActions(HCatURI hcatURI) {
213        Collection<String> actionIDs = null;
214        Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
215        if (missingCache != null) {
216            SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
217            String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER
218                    + sortedPKV.getPartKeys() + TABLE_DELIMITER + sortedPKV.getPartVals();
219            Element element = missingCache.get(missingKey);
220            if (element != null) {
221                WaitingActions waitingActions = (WaitingActions) element.getObjectValue();
222                actionIDs = new ArrayList<String>();
223                String uriString = hcatURI.getURI().toString();
224                for (WaitingAction action : waitingActions.getWaitingActions()) {
225                    if (action.getDependencyURI().equals(uriString)) {
226                        actionIDs.add(action.getActionID());
227                    }
228                }
229            }
230        }
231        return actionIDs;
232    }
233
234    @Override
235    public Collection<String> markDependencyAvailable(String server, String db, String table,
236            Map<String, String> partitions) {
237        String tableKey = server + TABLE_DELIMITER + db + TABLE_DELIMITER + table;
238        synchronized (partKeyPatterns) {
239            Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
240            if (patternCounts == null) {
241                LOG.warn("Got partition available notification for " + tableKey
242                        + ". Unexpected as no matching partition keys. Unregistering topic");
243                unregisterFromNotifications(server, db, table);
244                return null;
245            }
246            Cache missingCache = missingDepsByServer.get(server);
247            if (missingCache == null) {
248                LOG.warn("Got partition available notification for " + tableKey
249                        + ". Unexpected. Missing server entry in cache. Unregistering topic");
250                partKeyPatterns.remove(tableKey);
251                unregisterFromNotifications(server, db, table);
252                return null;
253            }
254            Collection<String> actionsWithAvailDep = new HashSet<String>();
255            StringBuilder partValSB = new StringBuilder();
256            // If partition patterns are date, date;country and date;country;state,
257            // construct the partition values for each pattern and for the matching value in the
258            // missingCache, get the waiting actions and mark it as available.
259            for (Entry<String, SettableInteger> entry : patternCounts.entrySet()) {
260                String[] partKeys = entry.getKey().split(PARTITION_DELIMITER);
261                partValSB.setLength(0);
262                for (String key : partKeys) {
263                    partValSB.append(partitions.get(key)).append(PARTITION_DELIMITER);
264                }
265                partValSB.setLength(partValSB.length() - 1);
266                String missingKey = db + TABLE_DELIMITER + table + TABLE_DELIMITER + entry.getKey() + TABLE_DELIMITER
267                        + partValSB.toString();
268                boolean removed = false;
269                Element element = null;
270                synchronized (missingCache) {
271                    element = missingCache.get(missingKey);
272                    if (element != null) {
273                        missingCache.remove(missingKey);
274                        removed = true;
275                    }
276                }
277                if (removed) {
278                    decrementPartKeyPatternCount(tableKey, entry.getKey(), server + TABLE_DELIMITER + missingKey);
279                    // Add the removed entry to available dependencies
280                    Collection<WaitingAction> wActions = ((WaitingActions) element.getObjectValue())
281                            .getWaitingActions();
282                    for (WaitingAction wAction : wActions) {
283                        String actionID = wAction.getActionID();
284                        actionsWithAvailDep.add(actionID);
285                        Collection<String> depURIs = availableDeps.get(actionID);
286                        if (depURIs == null) {
287                            depURIs = new ArrayList<String>();
288                            Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs);
289                            if (existing != null) {
290                                depURIs = existing;
291                            }
292                        }
293                        synchronized (depURIs) {
294                            depURIs.add(wAction.getDependencyURI());
295                            availableDeps.put(actionID, depURIs);
296                        }
297                    }
298                }
299            }
300            return actionsWithAvailDep;
301        }
302    }
303
304    @Override
305    public Collection<String> getAvailableDependencyURIs(String actionID) {
306        Collection<String> available = availableDeps.get(actionID);
307        if (available !=  null) {
308            // Return a copy
309            available = new ArrayList<String>(available);
310        }
311        return available;
312    }
313
314    @Override
315    public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) {
316        if (!availableDeps.containsKey(actionID)) {
317            return false;
318        }
319        else {
320            Collection<String> availList = availableDeps.get(actionID);
321            if (!availList.removeAll(dependencyURIs)) {
322                return false;
323            }
324            synchronized (availList) {
325                if (availList.isEmpty()) {
326                    availableDeps.remove(actionID);
327                }
328            }
329        }
330        return true;
331    }
332
333    @Override
334    public void destroy() {
335        availableDeps.clear();
336        cacheManager.shutdown();
337    }
338
339    @Override
340    public Object clone() throws CloneNotSupportedException {
341        throw new CloneNotSupportedException();
342    }
343
344    @Override
345    public void dispose() {
346    }
347
348    @Override
349    public void notifyElementExpired(Ehcache cache, Element element) {
350        // Invoked when timeToIdleSeconds or timeToLiveSeconds is met
351        String missingDepKey = (String) element.getObjectKey();
352        LOG.info("Cache entry [{0}] of cache [{1}] expired", missingDepKey, cache.getName());
353        onExpiryOrEviction(cache, element, missingDepKey);
354    }
355
356    @Override
357    public void notifyElementPut(Ehcache arg0, Element arg1) throws CacheException {
358
359    }
360
361    @Override
362    public void notifyElementRemoved(Ehcache arg0, Element arg1) throws CacheException {
363    }
364
365    @Override
366    public void notifyElementUpdated(Ehcache arg0, Element arg1) throws CacheException {
367    }
368
369    @Override
370    public void notifyRemoveAll(Ehcache arg0) {
371    }
372
373    @Override
374    public void notifyElementEvicted(Ehcache cache, Element element) {
375        // Invoked when maxElementsInMemory is met
376        String missingDepKey = (String) element.getObjectKey();
377        LOG.info("Cache entry [{0}] of cache [{1}] evicted", missingDepKey, cache.getName());
378        onExpiryOrEviction(cache, element, missingDepKey);
379    }
380
381    private void onExpiryOrEviction(Ehcache cache, Element element, String missingDepKey) {
382        int partValIndex = missingDepKey.lastIndexOf(TABLE_DELIMITER);
383        int partKeyIndex = missingDepKey.lastIndexOf(TABLE_DELIMITER, partValIndex - 1);
384        // server#db#table. Name of the cache is that of the server.
385        String tableKey = cache.getName() + TABLE_DELIMITER + missingDepKey.substring(0, partKeyIndex);
386        String partKeys = missingDepKey.substring(partKeyIndex + 1, partValIndex);
387        decrementPartKeyPatternCount(tableKey, partKeys, missingDepKey);
388    }
389
390    /**
391     * Decrement partition key pattern count, once a hcat URI is removed from the cache
392     *
393     * @param tableKey key identifying the table - server#db#table
394     * @param partKeys partition key pattern
395     * @param hcatURI URI with the partition key pattern
396     */
397    private void decrementPartKeyPatternCount(String tableKey, String partKeys, String hcatURI) {
398        synchronized (partKeyPatterns) {
399            Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
400            if (patternCounts == null) {
401                LOG.warn("Removed dependency - Missing cache entry - uri={0}. "
402                        + "But no corresponding pattern key table entry", hcatURI);
403            }
404            else {
405                SettableInteger count = patternCounts.get(partKeys);
406                if (count == null) {
407                    LOG.warn("Removed dependency - Missing cache entry - uri={0}. "
408                            + "But no corresponding pattern key entry", hcatURI);
409                }
410                else {
411                    count.decrement();
412                    if (count.getValue() == 0) {
413                        patternCounts.remove(partKeys);
414                    }
415                    if (patternCounts.isEmpty()) {
416                        partKeyPatterns.remove(tableKey);
417                        String[] tableDetails = tableKey.split(TABLE_DELIMITER);
418                        unregisterFromNotifications(tableDetails[0], tableDetails[1], tableDetails[2]);
419                    }
420                }
421            }
422        }
423    }
424
425    private void unregisterFromNotifications(String server, String db, String table) {
426        // Close JMS session. Stop listening on topic
427        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
428        hcatService.unregisterFromNotification(server, db, table);
429    }
430
431    private static class SortedPKV {
432        private StringBuilder partKeys;
433        private StringBuilder partVals;
434
435        public SortedPKV(Map<String, String> partitions) {
436            this.partKeys = new StringBuilder();
437            this.partVals = new StringBuilder();
438            ArrayList<String> keys = new ArrayList<String>(partitions.keySet());
439            Collections.sort(keys);
440            for (String key : keys) {
441                this.partKeys.append(key).append(PARTITION_DELIMITER);
442                this.partVals.append(partitions.get(key)).append(PARTITION_DELIMITER);
443            }
444            this.partKeys.setLength(partKeys.length() - 1);
445            this.partVals.setLength(partVals.length() - 1);
446        }
447
448        public String getPartKeys() {
449            return partKeys.toString();
450        }
451
452        public String getPartVals() {
453            return partVals.toString();
454        }
455
456    }
457
458    private static class SettableInteger {
459        private int value;
460
461        public SettableInteger(int value) {
462            this.value = value;
463        }
464
465        public int getValue() {
466            return value;
467        }
468
469        public void increment() {
470            value++;
471        }
472
473        public void decrement() {
474            value--;
475        }
476    }
477
478    @Override
479    public void removeNonWaitingCoordActions(Set<String> staleActions) {
480        Iterator<String> serverItr = missingDepsByServer.keySet().iterator();
481        while (serverItr.hasNext()) {
482            String server = serverItr.next();
483            Cache missingCache = missingDepsByServer.get(server);
484            if (missingCache == null) {
485                continue;
486            }
487            synchronized (missingCache) {
488                for (Object key : missingCache.getKeys()) {
489                    Element element = missingCache.get(key);
490                    if (element == null) {
491                        continue;
492                    }
493                    Collection<WaitingAction> waitingActions = ((WaitingActions) element.getObjectValue())
494                            .getWaitingActions();
495                    Iterator<WaitingAction> wactionItr = waitingActions.iterator();
496                    HCatURI hcatURI = null;
497                    while(wactionItr.hasNext()) {
498                        WaitingAction waction = wactionItr.next();
499                        if(staleActions.contains(waction.getActionID())) {
500                            try {
501                                hcatURI = new HCatURI(waction.getDependencyURI());
502                                wactionItr.remove();
503                            }
504                            catch (URISyntaxException e) {
505                                continue;
506                            }
507                        }
508                    }
509                    if (waitingActions.isEmpty() && hcatURI != null) {
510                        missingCache.remove(key);
511                        // Decrement partition key pattern count if the cache entry is removed
512                        SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
513                        String partKeys = sortedPKV.getPartKeys();
514                        String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
515                                + hcatURI.getTable();
516                        String hcatURIStr = hcatURI.toURIString();
517                        decrementPartKeyPatternCount(tableKey, partKeys, hcatURIStr);
518                    }
519                }
520            }
521        }
522    }
523
524    @Override
525    public void removeCoordActionWithDependenciesAvailable(String coordAction) {
526        // to be implemented when reverse-lookup data structure for purging is added
527    }
528
529}