001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.dependency.hcat;
019    
020    import java.util.ArrayList;
021    import java.util.Collection;
022    import java.util.Collections;
023    import java.util.HashMap;
024    import java.util.HashSet;
025    import java.util.List;
026    import java.util.Map;
027    import java.util.Map.Entry;
028    import java.util.concurrent.ConcurrentHashMap;
029    import java.util.concurrent.ConcurrentMap;
030    
031    import org.apache.hadoop.conf.Configuration;
032    import org.apache.oozie.service.HCatAccessorService;
033    import org.apache.oozie.service.Services;
034    import org.apache.oozie.util.HCatURI;
035    import org.apache.oozie.util.XLog;
036    
037    public class SimpleHCatDependencyCache implements HCatDependencyCache {
038    
039        private static XLog LOG = XLog.getLog(SimpleHCatDependencyCache.class);
040        private static String DELIMITER = ";";
041    
042        /**
043         * Map of server;db;table - sorter partition key order (country;dt;state) - sorted partition
044         * value (us;20120101;CA) - Collection of waiting actions (actionID and original hcat uri as
045         * string).
046         */
047        private ConcurrentMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>> missingDeps;
048    
049        /**
050         * Map of actionIDs and collection of available URIs
051         */
052        private ConcurrentMap<String, Collection<String>> availableDeps;
053    
054        // TODO:
055        // Gather and print stats on cache hits and misses.
056    
057        @Override
058        public void init(Configuration conf) {
059            missingDeps = new ConcurrentHashMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>>();
060            availableDeps = new ConcurrentHashMap<String, Collection<String>>();
061        }
062    
063        @Override
064        public void addMissingDependency(HCatURI hcatURI, String actionID) {
065            String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable();
066            SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
067            // Partition keys seperated by ;. For eg: date;country;state
068            String partKey = sortedPKV.getPartKeys();
069            // Partition values seperated by ;. For eg: 20120101;US;CA
070            String partVal = sortedPKV.getPartVals();
071            ConcurrentMap<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
072            if (partKeyPatterns == null) {
073                partKeyPatterns = new ConcurrentHashMap<String, Map<String, Collection<WaitingAction>>>();
074                ConcurrentMap<String, Map<String, Collection<WaitingAction>>> existingMap = missingDeps.putIfAbsent(
075                        tableKey, partKeyPatterns);
076                if (existingMap != null) {
077                    partKeyPatterns = existingMap;
078                }
079            }
080            synchronized (partKeyPatterns) {
081                missingDeps.put(tableKey, partKeyPatterns); // To handle race condition with removal of partKeyPatterns
082                Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
083                if (partValues == null) {
084                    partValues = new HashMap<String, Collection<WaitingAction>>();
085                    partKeyPatterns.put(partKey, partValues);
086                }
087                Collection<WaitingAction> waitingActions = partValues.get(partVal);
088                if (waitingActions == null) {
089                    waitingActions = new HashSet<WaitingAction>();
090                    partValues.put(partVal, waitingActions);
091                }
092                waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
093            }
094        }
095    
096        @Override
097        public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
098            String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable();
099            SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
100            String partKey = sortedPKV.getPartKeys();
101            String partVal = sortedPKV.getPartVals();
102            Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
103            if (partKeyPatterns == null) {
104                LOG.warn("Remove missing dependency - Missing table entry - uri={0}, actionID={1}",
105                        hcatURI.toURIString(), actionID);
106                return false;
107            }
108            synchronized(partKeyPatterns) {
109                Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
110                if (partValues == null) {
111                    LOG.warn("Remove missing dependency - Missing partition pattern - uri={0}, actionID={1}",
112                            hcatURI.toURIString(), actionID);
113                    return false;
114                }
115                Collection<WaitingAction> waitingActions = partValues.get(partVal);
116                if (waitingActions == null) {
117                    LOG.warn("Remove missing dependency - Missing partition value - uri={0}, actionID={1}",
118                            hcatURI.toURIString(), actionID);
119                    return false;
120                }
121                boolean removed = waitingActions.remove(new WaitingAction(actionID, hcatURI.toURIString()));
122                if (!removed) {
123                    LOG.warn("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
124                            hcatURI.toURIString(), actionID);
125                }
126                if (waitingActions.isEmpty()) {
127                    partValues.remove(partVal);
128                    if (partValues.isEmpty()) {
129                        partKeyPatterns.remove(partKey);
130                    }
131                    if (partKeyPatterns.isEmpty()) {
132                        missingDeps.remove(tableKey);
133                        // Close JMS session. Stop listening on topic
134                        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
135                        hcatService.unregisterFromNotification(hcatURI);
136                    }
137                }
138                return removed;
139            }
140        }
141    
142        @Override
143        public Collection<String> getWaitingActions(HCatURI hcatURI) {
144            String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable();
145            SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
146            String partKey = sortedPKV.getPartKeys();
147            String partVal = sortedPKV.getPartVals();
148            Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
149            if (partKeyPatterns == null) {
150                return null;
151            }
152            Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
153            if (partValues == null) {
154                return null;
155            }
156            Collection<WaitingAction> waitingActions = partValues.get(partVal);
157            if (waitingActions == null) {
158                return null;
159            }
160            Collection<String> actionIDs = new ArrayList<String>();
161            String uriString = hcatURI.toURIString();
162            for (WaitingAction action : waitingActions) {
163                if (action.getDependencyURI().equals(uriString)) {
164                    actionIDs.add(action.getActionID());
165                }
166            }
167            return actionIDs;
168        }
169    
170        @Override
171        public Collection<String> markDependencyAvailable(String server, String db, String table,
172                Map<String, String> partitions) {
173            String tableKey = server + DELIMITER + db + DELIMITER + table;
174            Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
175            if (partKeyPatterns == null) {
176                LOG.warn("Got partition available notification for " + tableKey
177                        + ". Unexpected and should not be listening to topic. Unregistering topic");
178                HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
179                hcatService.unregisterFromNotification(server, db, table);
180                return null;
181            }
182            Collection<String> actionsWithAvailDep = new HashSet<String>();
183            List<String> partKeysToRemove = new ArrayList<String>();
184            StringBuilder partValSB = new StringBuilder();
185            synchronized (partKeyPatterns) {
186                // If partition patterns are date, date;country and date;country;state,
187                // construct the partition values for each pattern from the available partitions map and
188                // for the matching value in the dependency map, get the waiting actions.
189                for (Entry<String, Map<String, Collection<WaitingAction>>> entry : partKeyPatterns.entrySet()) {
190                    String[] partKeys = entry.getKey().split(DELIMITER);
191                    partValSB.setLength(0);
192                    for (String key : partKeys) {
193                        partValSB.append(partitions.get(key)).append(DELIMITER);
194                    }
195                    partValSB.setLength(partValSB.length() - 1);
196    
197                    Map<String, Collection<WaitingAction>> partValues = entry.getValue();
198                    String partVal = partValSB.toString();
199                    Collection<WaitingAction> wActions = entry.getValue().get(partVal);
200                    if (wActions == null)
201                        continue;
202                    for (WaitingAction wAction : wActions) {
203                        String actionID = wAction.getActionID();
204                        actionsWithAvailDep.add(actionID);
205                        Collection<String> depURIs = availableDeps.get(actionID);
206                        if (depURIs == null) {
207                            depURIs = new ArrayList<String>();
208                            Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs);
209                            if (existing != null) {
210                                depURIs = existing;
211                            }
212                        }
213                        synchronized (depURIs) {
214                            depURIs.add(wAction.getDependencyURI());
215                            availableDeps.put(actionID, depURIs);
216                        }
217                    }
218                    partValues.remove(partVal);
219                    if (partValues.isEmpty()) {
220                        partKeysToRemove.add(entry.getKey());
221                    }
222                }
223                for (String partKey : partKeysToRemove) {
224                    partKeyPatterns.remove(partKey);
225                }
226                if (partKeyPatterns.isEmpty()) {
227                    missingDeps.remove(tableKey);
228                    // Close JMS session. Stop listening on topic
229                    HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
230                    hcatService.unregisterFromNotification(server, db, table);
231                }
232            }
233            return actionsWithAvailDep;
234        }
235    
236        @Override
237        public Collection<String> getAvailableDependencyURIs(String actionID) {
238            Collection<String> available = availableDeps.get(actionID);
239            if (available !=  null) {
240                // Return a copy
241                available = new ArrayList<String>(available);
242            }
243            return available;
244        }
245    
246        @Override
247        public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) {
248            if (!availableDeps.containsKey(actionID)) {
249                return false;
250            }
251            else {
252                Collection<String> availList = availableDeps.get(actionID);
253                if (!availList.removeAll(dependencyURIs)) {
254                    return false;
255                }
256                synchronized (availList) {
257                    if (availList.isEmpty()) {
258                        availableDeps.remove(actionID);
259                    }
260                }
261            }
262            return true;
263        }
264    
265        @Override
266        public void destroy() {
267            missingDeps.clear();
268            availableDeps.clear();
269        }
270    
271        private static class SortedPKV {
272            private StringBuilder partKeys;
273            private StringBuilder partVals;
274    
275            public SortedPKV(Map<String, String> partitions) {
276                this.partKeys = new StringBuilder();
277                this.partVals = new StringBuilder();
278                ArrayList<String> keys = new ArrayList<String>(partitions.keySet());
279                Collections.sort(keys);
280                for (String key : keys) {
281                    this.partKeys.append(key).append(DELIMITER);
282                    this.partVals.append(partitions.get(key)).append(DELIMITER);
283                }
284                this.partKeys.setLength(partKeys.length() - 1);
285                this.partVals.setLength(partVals.length() - 1);
286            }
287    
288            public String getPartKeys() {
289                return partKeys.toString();
290            }
291    
292            public String getPartVals() {
293                return partVals.toString();
294            }
295    
296        }
297    
298    }