001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.dependency.hcat;
019    
020    import java.net.URL;
021    import java.util.ArrayList;
022    import java.util.Collection;
023    import java.util.Collections;
024    import java.util.HashSet;
025    import java.util.Map;
026    import java.util.Map.Entry;
027    import java.util.concurrent.ConcurrentHashMap;
028    import java.util.concurrent.ConcurrentMap;
029    
030    import net.sf.ehcache.Cache;
031    import net.sf.ehcache.CacheException;
032    import net.sf.ehcache.CacheManager;
033    import net.sf.ehcache.Ehcache;
034    import net.sf.ehcache.Element;
035    import net.sf.ehcache.config.CacheConfiguration;
036    import net.sf.ehcache.event.CacheEventListener;
037    
038    import org.apache.hadoop.conf.Configuration;
039    import org.apache.oozie.service.HCatAccessorService;
040    import org.apache.oozie.service.PartitionDependencyManagerService;
041    import org.apache.oozie.service.Services;
042    import org.apache.oozie.util.HCatURI;
043    import org.apache.oozie.util.XLog;
044    
045    public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEventListener {
046    
047        private static XLog LOG = XLog.getLog(EhcacheHCatDependencyCache.class);
048        private static String TABLE_DELIMITER = "#";
049        private static String PARTITION_DELIMITER = ";";
050    
051        public static String CONF_CACHE_NAME = PartitionDependencyManagerService.CONF_PREFIX + "cache.ehcache.name";
052    
053        private CacheManager cacheManager;
054    
055        /**
056         * Map of server to EhCache which has key as db#table#pk1;pk2#val;val2 and value as WaitingActions (list of
057         * WaitingAction) which is Serializable (for overflowToDisk)
058         */
059        private ConcurrentMap<String, Cache> missingDepsByServer;
060    
061        private CacheConfiguration cacheConfig;
062        /**
063         * Map of server#db#table - sorted part key pattern - count of different partition values (count
064         * of elements in the cache) still missing for a partition key pattern. This count is used to
065         * quickly determine if there are any more missing dependencies for a table. When the count
066         * becomes 0, we unregister from notifications as there are no more missing dependencies for
067         * that table.
068         */
069        private ConcurrentMap<String, ConcurrentMap<String, SettableInteger>> partKeyPatterns;
070        /**
071         * Map of actionIDs and collection of available URIs
072         */
073        private ConcurrentMap<String, Collection<String>> availableDeps;
074    
075        @Override
076        public void init(Configuration conf) {
077            String cacheName = conf.get(CONF_CACHE_NAME);
078            URL cacheConfigURL;
079            if (cacheName == null) {
080                cacheConfigURL = this.getClass().getClassLoader().getResource("ehcache-default.xml");
081                cacheName = "dependency-default";
082            }
083            else {
084                cacheConfigURL = this.getClass().getClassLoader().getResource("ehcache.xml");
085            }
086            if (cacheConfigURL == null) {
087                throw new IllegalStateException("ehcache.xml is not found in classpath");
088            }
089            cacheManager = CacheManager.newInstance(cacheConfigURL);
090            final Cache specifiedCache = cacheManager.getCache(cacheName);
091            if (specifiedCache == null) {
092                throw new IllegalStateException("Cache " + cacheName + " configured in " + CONF_CACHE_NAME
093                        + " is not found");
094            }
095            cacheConfig = specifiedCache.getCacheConfiguration();
096            missingDepsByServer = new ConcurrentHashMap<String, Cache>();
097            partKeyPatterns = new ConcurrentHashMap<String, ConcurrentMap<String, SettableInteger>>();
098            availableDeps = new ConcurrentHashMap<String, Collection<String>>();
099        }
100    
101        @Override
102        public void addMissingDependency(HCatURI hcatURI, String actionID) {
103    
104            // Create cache for the server if we don't have one
105            Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
106            if (missingCache == null) {
107                CacheConfiguration clonedConfig = cacheConfig.clone();
108                clonedConfig.setName(hcatURI.getServer());
109                missingCache = new Cache(clonedConfig);
110                Cache exists = missingDepsByServer.putIfAbsent(hcatURI.getServer(), missingCache);
111                if (exists == null) {
112                    cacheManager.addCache(missingCache);
113                    missingCache.getCacheEventNotificationService().registerListener(this);
114                }
115                else {
116                    missingCache.dispose(); //discard
117                }
118            }
119    
120            // Add hcat uri into the missingCache
121            SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
122            String partKeys = sortedPKV.getPartKeys();
123            String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER
124                    + partKeys + TABLE_DELIMITER + sortedPKV.getPartVals();
125            boolean newlyAdded = true;
126            synchronized (missingCache) {
127                Element element = missingCache.get(missingKey);
128                if (element == null) {
129                    WaitingActions waitingActions = new WaitingActions();
130                    element = new Element(missingKey, waitingActions);
131                    Element exists = missingCache.putIfAbsent(element);
132                    if (exists != null) {
133                        newlyAdded = false;
134                        waitingActions = (WaitingActions) exists.getObjectValue();
135                    }
136                    waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
137                }
138                else {
139                    newlyAdded = false;
140                    WaitingActions waitingActions = (WaitingActions) element.getObjectValue();
141                    waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
142                }
143            }
144    
145            // Increment count for the partition key pattern
146            if (newlyAdded) {
147                String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
148                        + hcatURI.getTable();
149                synchronized (partKeyPatterns) {
150                    ConcurrentMap<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
151                    if (patternCounts == null) {
152                        patternCounts = new ConcurrentHashMap<String, SettableInteger>();
153                        partKeyPatterns.put(tableKey, patternCounts);
154                    }
155                    SettableInteger count = patternCounts.get(partKeys);
156                    if (count == null) {
157                        patternCounts.put(partKeys, new SettableInteger(1));
158                    }
159                    else {
160                        count.increment();
161                    }
162                }
163            }
164        }
165    
166        @Override
167        public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
168    
169            Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
170            if (missingCache == null) {
171                LOG.warn("Remove missing dependency - Missing cache entry for server - uri={0}, actionID={1}",
172                        hcatURI.toURIString(), actionID);
173                return false;
174            }
175            SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
176            String partKeys = sortedPKV.getPartKeys();
177            String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER +
178                    partKeys + TABLE_DELIMITER + sortedPKV.getPartVals();
179            boolean decrement = false;
180            boolean removed = false;
181            synchronized (missingCache) {
182                Element element = missingCache.get(missingKey);
183                if (element == null) {
184                    LOG.warn("Remove missing dependency - Missing cache entry - uri={0}, actionID={1}",
185                            hcatURI.toURIString(), actionID);
186                    return false;
187                }
188                Collection<WaitingAction> waitingActions = ((WaitingActions) element.getObjectValue()).getWaitingActions();
189                removed = waitingActions.remove(new WaitingAction(actionID, hcatURI.toURIString()));
190                if (!removed) {
191                    LOG.warn("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
192                            hcatURI.toURIString(), actionID);
193                }
194                if (waitingActions.isEmpty()) {
195                    missingCache.remove(missingKey);
196                    decrement = true;
197                }
198            }
199            // Decrement partition key pattern count if the cache entry is removed
200            if (decrement) {
201                String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
202                        + hcatURI.getTable();
203                decrementPartKeyPatternCount(tableKey, partKeys, hcatURI.toURIString());
204            }
205            return removed;
206        }
207    
208        @Override
209        public Collection<String> getWaitingActions(HCatURI hcatURI) {
210            Collection<String> actionIDs = null;
211            Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
212            if (missingCache != null) {
213                SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
214                String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER
215                        + sortedPKV.getPartKeys() + TABLE_DELIMITER + sortedPKV.getPartVals();
216                Element element = missingCache.get(missingKey);
217                if (element != null) {
218                    WaitingActions waitingActions = (WaitingActions) element.getObjectValue();
219                    actionIDs = new ArrayList<String>();
220                    String uriString = hcatURI.getURI().toString();
221                    for (WaitingAction action : waitingActions.getWaitingActions()) {
222                        if (action.getDependencyURI().equals(uriString)) {
223                            actionIDs.add(action.getActionID());
224                        }
225                    }
226                }
227            }
228            return actionIDs;
229        }
230    
231        @Override
232        public Collection<String> markDependencyAvailable(String server, String db, String table,
233                Map<String, String> partitions) {
234            String tableKey = server + TABLE_DELIMITER + db + TABLE_DELIMITER + table;
235            synchronized (partKeyPatterns) {
236                Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
237                if (patternCounts == null) {
238                    LOG.warn("Got partition available notification for " + tableKey
239                            + ". Unexpected as no matching partition keys. Unregistering topic");
240                    unregisterFromNotifications(server, db, table);
241                    return null;
242                }
243                Cache missingCache = missingDepsByServer.get(server);
244                if (missingCache == null) {
245                    LOG.warn("Got partition available notification for " + tableKey
246                            + ". Unexpected. Missing server entry in cache. Unregistering topic");
247                    partKeyPatterns.remove(tableKey);
248                    unregisterFromNotifications(server, db, table);
249                    return null;
250                }
251                Collection<String> actionsWithAvailDep = new HashSet<String>();
252                StringBuilder partValSB = new StringBuilder();
253                // If partition patterns are date, date;country and date;country;state,
254                // construct the partition values for each pattern and for the matching value in the
255                // missingCache, get the waiting actions and mark it as available.
256                for (Entry<String, SettableInteger> entry : patternCounts.entrySet()) {
257                    String[] partKeys = entry.getKey().split(PARTITION_DELIMITER);
258                    partValSB.setLength(0);
259                    for (String key : partKeys) {
260                        partValSB.append(partitions.get(key)).append(PARTITION_DELIMITER);
261                    }
262                    partValSB.setLength(partValSB.length() - 1);
263                    String missingKey = db + TABLE_DELIMITER + table + TABLE_DELIMITER + entry.getKey() + TABLE_DELIMITER
264                            + partValSB.toString();
265                    boolean removed = false;
266                    Element element = null;
267                    synchronized (missingCache) {
268                        element = missingCache.get(missingKey);
269                        if (element != null) {
270                            missingCache.remove(missingKey);
271                            removed = true;
272                        }
273                    }
274                    if (removed) {
275                        decrementPartKeyPatternCount(tableKey, entry.getKey(), server + TABLE_DELIMITER + missingKey);
276                        // Add the removed entry to available dependencies
277                        Collection<WaitingAction> wActions = ((WaitingActions) element.getObjectValue())
278                                .getWaitingActions();
279                        for (WaitingAction wAction : wActions) {
280                            String actionID = wAction.getActionID();
281                            actionsWithAvailDep.add(actionID);
282                            Collection<String> depURIs = availableDeps.get(actionID);
283                            if (depURIs == null) {
284                                depURIs = new ArrayList<String>();
285                                Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs);
286                                if (existing != null) {
287                                    depURIs = existing;
288                                }
289                            }
290                            synchronized (depURIs) {
291                                depURIs.add(wAction.getDependencyURI());
292                                availableDeps.put(actionID, depURIs);
293                            }
294                        }
295                    }
296                }
297                return actionsWithAvailDep;
298            }
299        }
300    
301        @Override
302        public Collection<String> getAvailableDependencyURIs(String actionID) {
303            Collection<String> available = availableDeps.get(actionID);
304            if (available !=  null) {
305                // Return a copy
306                available = new ArrayList<String>(available);
307            }
308            return available;
309        }
310    
311        @Override
312        public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) {
313            if (!availableDeps.containsKey(actionID)) {
314                return false;
315            }
316            else {
317                Collection<String> availList = availableDeps.get(actionID);
318                if (!availList.removeAll(dependencyURIs)) {
319                    return false;
320                }
321                synchronized (availList) {
322                    if (availList.isEmpty()) {
323                        availableDeps.remove(actionID);
324                    }
325                }
326            }
327            return true;
328        }
329    
330        @Override
331        public void destroy() {
332            availableDeps.clear();
333            cacheManager.shutdown();
334        }
335    
336        @Override
337        public Object clone() throws CloneNotSupportedException {
338            throw new CloneNotSupportedException();
339        }
340    
341        @Override
342        public void dispose() {
343        }
344    
345        @Override
346        public void notifyElementExpired(Ehcache cache, Element element) {
347            // Invoked when timeToIdleSeconds or timeToLiveSeconds is met
348            String missingDepKey = (String) element.getObjectKey();
349            LOG.info("Cache entry [{0}] of cache [{1}] expired", missingDepKey, cache.getName());
350            onExpiryOrEviction(cache, element, missingDepKey);
351        }
352    
353        @Override
354        public void notifyElementPut(Ehcache arg0, Element arg1) throws CacheException {
355    
356        }
357    
358        @Override
359        public void notifyElementRemoved(Ehcache arg0, Element arg1) throws CacheException {
360        }
361    
362        @Override
363        public void notifyElementUpdated(Ehcache arg0, Element arg1) throws CacheException {
364        }
365    
366        @Override
367        public void notifyRemoveAll(Ehcache arg0) {
368        }
369    
370        @Override
371        public void notifyElementEvicted(Ehcache cache, Element element) {
372            // Invoked when maxElementsInMemory is met
373            String missingDepKey = (String) element.getObjectKey();
374            LOG.info("Cache entry [{0}] of cache [{1}] evicted", missingDepKey, cache.getName());
375            onExpiryOrEviction(cache, element, missingDepKey);
376        }
377    
378        private void onExpiryOrEviction(Ehcache cache, Element element, String missingDepKey) {
379            int partValIndex = missingDepKey.lastIndexOf(TABLE_DELIMITER);
380            int partKeyIndex = missingDepKey.lastIndexOf(TABLE_DELIMITER, partValIndex - 1);
381            // server#db#table. Name of the cache is that of the server.
382            String tableKey = cache.getName() + TABLE_DELIMITER + missingDepKey.substring(0, partKeyIndex);
383            String partKeys = missingDepKey.substring(partKeyIndex + 1, partValIndex);
384            decrementPartKeyPatternCount(tableKey, partKeys, missingDepKey);
385        }
386    
387        /**
388         * Decrement partition key pattern count, once a hcat URI is removed from the cache
389         *
390         * @param tableKey key identifying the table - server#db#table
391         * @param partKeys partition key pattern
392         * @param hcatURI URI with the partition key pattern
393         */
394        private void decrementPartKeyPatternCount(String tableKey, String partKeys, String hcatURI) {
395            synchronized (partKeyPatterns) {
396                Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
397                if (patternCounts == null) {
398                    LOG.warn("Removed dependency - Missing cache entry - uri={0}. "
399                            + "But no corresponding pattern key table entry", hcatURI);
400                }
401                else {
402                    SettableInteger count = patternCounts.get(partKeys);
403                    if (count == null) {
404                        LOG.warn("Removed dependency - Missing cache entry - uri={0}. "
405                                + "But no corresponding pattern key entry", hcatURI);
406                    }
407                    else {
408                        count.decrement();
409                        if (count.getValue() == 0) {
410                            patternCounts.remove(partKeys);
411                        }
412                        if (patternCounts.isEmpty()) {
413                            partKeyPatterns.remove(tableKey);
414                            String[] tableDetails = tableKey.split(TABLE_DELIMITER);
415                            unregisterFromNotifications(tableDetails[0], tableDetails[1], tableDetails[2]);
416                        }
417                    }
418                }
419            }
420        }
421    
422        private void unregisterFromNotifications(String server, String db, String table) {
423            // Close JMS session. Stop listening on topic
424            HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
425            hcatService.unregisterFromNotification(server, db, table);
426        }
427    
428        private static class SortedPKV {
429            private StringBuilder partKeys;
430            private StringBuilder partVals;
431    
432            public SortedPKV(Map<String, String> partitions) {
433                this.partKeys = new StringBuilder();
434                this.partVals = new StringBuilder();
435                ArrayList<String> keys = new ArrayList<String>(partitions.keySet());
436                Collections.sort(keys);
437                for (String key : keys) {
438                    this.partKeys.append(key).append(PARTITION_DELIMITER);
439                    this.partVals.append(partitions.get(key)).append(PARTITION_DELIMITER);
440                }
441                this.partKeys.setLength(partKeys.length() - 1);
442                this.partVals.setLength(partVals.length() - 1);
443            }
444    
445            public String getPartKeys() {
446                return partKeys.toString();
447            }
448    
449            public String getPartVals() {
450                return partVals.toString();
451            }
452    
453        }
454    
455        private static class SettableInteger {
456            private int value;
457    
458            public SettableInteger(int value) {
459                this.value = value;
460            }
461    
462            public int getValue() {
463                return value;
464            }
465    
466            public void increment() {
467                value++;
468            }
469    
470            public void decrement() {
471                value--;
472            }
473        }
474    
475    }