001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.dependency.hcat;
020
021import java.net.URISyntaxException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Map.Entry;
031import java.util.Set;
032import java.util.concurrent.ConcurrentHashMap;
033import java.util.concurrent.ConcurrentMap;
034
035import org.apache.hadoop.conf.Configuration;
036import org.apache.oozie.service.HCatAccessorService;
037import org.apache.oozie.service.Services;
038import org.apache.oozie.util.HCatURI;
039import org.apache.oozie.util.XLog;
040
041public class SimpleHCatDependencyCache implements HCatDependencyCache {
042
043    private static XLog LOG = XLog.getLog(SimpleHCatDependencyCache.class);
044    private static String DELIMITER = ";";
045
046    /**
047     * Map of server;db;table - sorter partition key order (country;dt;state) - sorted partition
048     * value (us;20120101;CA) - Collection of waiting actions (actionID and original hcat uri as
049     * string).
050     */
051    private ConcurrentMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>> missingDeps;
052
053    /**
054     * Map of actionIDs and collection of available URIs
055     */
056    private ConcurrentMap<String, Collection<String>> availableDeps;
057
058    /**
059     * Map of actionIDs and partitions for reverse-lookup in purging
060     */
061    private ConcurrentMap<String, ConcurrentMap<String, Collection<String>>> actionPartitionMap;
062
063    @Override
064    public void init(Configuration conf) {
065        missingDeps = new ConcurrentHashMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>>();
066        availableDeps = new ConcurrentHashMap<String, Collection<String>>();
067        actionPartitionMap = new ConcurrentHashMap<String, ConcurrentMap<String, Collection<String>>>();
068    }
069
070    @Override
071    public void addMissingDependency(HCatURI hcatURI, String actionID) {
072        String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable();
073        SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
074        // Partition keys seperated by ;. For eg: date;country;state
075        String partKey = sortedPKV.getPartKeys();
076        // Partition values seperated by ;. For eg: 20120101;US;CA
077        String partVal = sortedPKV.getPartVals();
078        ConcurrentMap<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
079        if (partKeyPatterns == null) {
080            partKeyPatterns = new ConcurrentHashMap<String, Map<String, Collection<WaitingAction>>>();
081            ConcurrentMap<String, Map<String, Collection<WaitingAction>>> existingMap = missingDeps.putIfAbsent(
082                    tableKey, partKeyPatterns);
083            if (existingMap != null) {
084                partKeyPatterns = existingMap;
085            }
086        }
087        ConcurrentMap<String, Collection<String>> partitionMap = actionPartitionMap.get(actionID);
088        if (partitionMap == null) {
089            partitionMap = new ConcurrentHashMap<String, Collection<String>>();
090            ConcurrentMap<String, Collection<String>> existingPartMap = actionPartitionMap.putIfAbsent(actionID,
091                    partitionMap);
092            if (existingPartMap != null) {
093                partitionMap = existingPartMap;
094            }
095        }
096        synchronized (partitionMap) {
097            Collection<String> partKeys = partitionMap.get(tableKey);
098            if (partKeys == null) {
099                partKeys = new ArrayList<String>();
100            }
101            partKeys.add(partKey);
102            partitionMap.put(tableKey, partKeys);
103        }
104        synchronized (partKeyPatterns) {
105            missingDeps.put(tableKey, partKeyPatterns); // To handle race condition with removal of partKeyPatterns
106            Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
107            if (partValues == null) {
108                partValues = new HashMap<String, Collection<WaitingAction>>();
109                partKeyPatterns.put(partKey, partValues);
110            }
111            Collection<WaitingAction> waitingActions = partValues.get(partVal);
112            if (waitingActions == null) {
113                waitingActions = new HashSet<WaitingAction>();
114                partValues.put(partVal, waitingActions);
115            }
116            waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
117        }
118    }
119
120    @Override
121    public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
122        String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable();
123        SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
124        String partKey = sortedPKV.getPartKeys();
125        String partVal = sortedPKV.getPartVals();
126        Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
127        if (partKeyPatterns == null) {
128            LOG.warn("Remove missing dependency - Missing table entry - uri={0}, actionID={1}",
129                    hcatURI.toURIString(), actionID);
130            return false;
131        }
132        ConcurrentMap<String, Collection<String>> partitionMap = actionPartitionMap.get(actionID);
133        if (partitionMap != null) {
134            synchronized (partitionMap) {
135                Collection<String> partKeys = partitionMap.get(tableKey);
136                if (partKeys != null) {
137                    partKeys.remove(partKey);
138                }
139                if (partKeys.size() == 0) {
140                    partitionMap.remove(tableKey);
141                }
142                if (partitionMap.size() == 0) {
143                    actionPartitionMap.remove(actionID);
144                }
145            }
146        }
147
148        synchronized(partKeyPatterns) {
149            Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
150            if (partValues == null) {
151                LOG.warn("Remove missing dependency - Missing partition pattern - uri={0}, actionID={1}",
152                        hcatURI.toURIString(), actionID);
153                return false;
154            }
155            Collection<WaitingAction> waitingActions = partValues.get(partVal);
156            if (waitingActions == null) {
157                LOG.warn("Remove missing dependency - Missing partition value - uri={0}, actionID={1}",
158                        hcatURI.toURIString(), actionID);
159                return false;
160            }
161            boolean removed = waitingActions.remove(new WaitingAction(actionID, hcatURI.toURIString()));
162            if (!removed) {
163                LOG.warn("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
164                        hcatURI.toURIString(), actionID);
165            }
166            if (waitingActions.isEmpty()) {
167                partValues.remove(partVal);
168                if (partValues.isEmpty()) {
169                    partKeyPatterns.remove(partKey);
170                }
171                if (partKeyPatterns.isEmpty()) {
172                    missingDeps.remove(tableKey);
173                    // Close JMS session. Stop listening on topic
174                    HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
175                    hcatService.unregisterFromNotification(hcatURI);
176                }
177            }
178            return removed;
179        }
180    }
181
182    @Override
183    public Collection<String> getWaitingActions(HCatURI hcatURI) {
184        String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable();
185        SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
186        String partKey = sortedPKV.getPartKeys();
187        String partVal = sortedPKV.getPartVals();
188        Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
189        if (partKeyPatterns == null) {
190            return null;
191        }
192        Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
193        if (partValues == null) {
194            return null;
195        }
196        Collection<WaitingAction> waitingActions = partValues.get(partVal);
197        if (waitingActions == null) {
198            return null;
199        }
200        Collection<String> actionIDs = new ArrayList<String>();
201        String uriString = hcatURI.toURIString();
202        for (WaitingAction action : waitingActions) {
203            if (action.getDependencyURI().equals(uriString)) {
204                actionIDs.add(action.getActionID());
205            }
206        }
207        return actionIDs;
208    }
209
210    @Override
211    public Collection<String> markDependencyAvailable(String server, String db, String table,
212            Map<String, String> partitions) {
213        String tableKey = server + DELIMITER + db + DELIMITER + table;
214        Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
215        if (partKeyPatterns == null) {
216            LOG.warn("Got partition available notification for " + tableKey
217                    + ". Unexpected and should not be listening to topic. Unregistering topic");
218            HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
219            hcatService.unregisterFromNotification(server, db, table);
220            return null;
221        }
222        Collection<String> actionsWithAvailDep = new HashSet<String>();
223        List<String> partKeysToRemove = new ArrayList<String>();
224        StringBuilder partValSB = new StringBuilder();
225        synchronized (partKeyPatterns) {
226            // If partition patterns are date, date;country and date;country;state,
227            // construct the partition values for each pattern from the available partitions map and
228            // for the matching value in the dependency map, get the waiting actions.
229            for (Entry<String, Map<String, Collection<WaitingAction>>> entry : partKeyPatterns.entrySet()) {
230                String[] partKeys = entry.getKey().split(DELIMITER);
231                partValSB.setLength(0);
232                for (String key : partKeys) {
233                    partValSB.append(partitions.get(key)).append(DELIMITER);
234                }
235                partValSB.setLength(partValSB.length() - 1);
236
237                Map<String, Collection<WaitingAction>> partValues = entry.getValue();
238                String partVal = partValSB.toString();
239                Collection<WaitingAction> wActions = entry.getValue().get(partVal);
240                if (wActions == null)
241                    continue;
242                for (WaitingAction wAction : wActions) {
243                    String actionID = wAction.getActionID();
244                    actionsWithAvailDep.add(actionID);
245                    Collection<String> depURIs = availableDeps.get(actionID);
246                    if (depURIs == null) {
247                        depURIs = new ArrayList<String>();
248                        Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs);
249                        if (existing != null) {
250                            depURIs = existing;
251                        }
252                    }
253                    synchronized (depURIs) {
254                        depURIs.add(wAction.getDependencyURI());
255                        availableDeps.put(actionID, depURIs);
256                    }
257                }
258                partValues.remove(partVal);
259                if (partValues.isEmpty()) {
260                    partKeysToRemove.add(entry.getKey());
261                }
262            }
263            for (String partKey : partKeysToRemove) {
264                partKeyPatterns.remove(partKey);
265            }
266            if (partKeyPatterns.isEmpty()) {
267                missingDeps.remove(tableKey);
268                // Close JMS session. Stop listening on topic
269                HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
270                hcatService.unregisterFromNotification(server, db, table);
271            }
272        }
273        return actionsWithAvailDep;
274    }
275
276    @Override
277    public Collection<String> getAvailableDependencyURIs(String actionID) {
278        Collection<String> available = availableDeps.get(actionID);
279        if (available !=  null) {
280            // Return a copy
281            available = new ArrayList<String>(available);
282        }
283        return available;
284    }
285
286    @Override
287    public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) {
288        if (!availableDeps.containsKey(actionID)) {
289            return false;
290        }
291        else {
292            Collection<String> availList = availableDeps.get(actionID);
293            if (!availList.removeAll(dependencyURIs)) {
294                return false;
295            }
296            synchronized (availList) {
297                if (availList.isEmpty()) {
298                    availableDeps.remove(actionID);
299                }
300            }
301        }
302        return true;
303    }
304
305    @Override
306    public void destroy() {
307        missingDeps.clear();
308        availableDeps.clear();
309    }
310
311    private static class SortedPKV {
312        private StringBuilder partKeys;
313        private StringBuilder partVals;
314
315        public SortedPKV(Map<String, String> partitions) {
316            this.partKeys = new StringBuilder();
317            this.partVals = new StringBuilder();
318            ArrayList<String> keys = new ArrayList<String>(partitions.keySet());
319            Collections.sort(keys);
320            for (String key : keys) {
321                this.partKeys.append(key).append(DELIMITER);
322                this.partVals.append(partitions.get(key)).append(DELIMITER);
323            }
324            this.partKeys.setLength(partKeys.length() - 1);
325            this.partVals.setLength(partVals.length() - 1);
326        }
327
328        public String getPartKeys() {
329            return partKeys.toString();
330        }
331
332        public String getPartVals() {
333            return partVals.toString();
334        }
335    }
336
337    private HCatURI removePartitions(String coordActionId, Collection<String> partKeys,
338            Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns) {
339        HCatURI hcatUri = null;
340        for (String partKey : partKeys) {
341            Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
342            Iterator<String> partValItr = partValues.keySet().iterator();
343            while (partValItr.hasNext()) {
344                String partVal = partValItr.next();
345                Collection<WaitingAction> waitingActions = partValues.get(partVal);
346                if (waitingActions != null) {
347                    Iterator<WaitingAction> waitItr = waitingActions.iterator();
348                    while (waitItr.hasNext()) {
349                        WaitingAction waction = waitItr.next();
350                        if (coordActionId.contains(waction.getActionID())) {
351                            waitItr.remove();
352                            if (hcatUri == null) {
353                                try {
354                                    hcatUri = new HCatURI(waction.getDependencyURI());
355                                }
356                                catch (URISyntaxException e) {
357                                    continue;
358                                }
359                            }
360                        }
361                    }
362                }
363                // delete partition value with no waiting actions
364                if (waitingActions.size() == 0) {
365                    partValItr.remove();
366                }
367            }
368            if (partValues.size() == 0) {
369                partKeyPatterns.remove(partKey);
370            }
371        }
372        return hcatUri;
373    }
374
375    @Override
376    public void removeNonWaitingCoordActions(Set<String> coordActions) {
377        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
378        for (String coordActionId : coordActions) {
379            LOG.info("Removing non waiting coord action {0} from partition dependency map", coordActionId);
380            synchronized (actionPartitionMap) {
381                Map<String, Collection<String>> partitionMap = actionPartitionMap.get(coordActionId);
382                if (partitionMap != null) {
383                    Iterator<String> tableItr = partitionMap.keySet().iterator();
384                    while (tableItr.hasNext()) {
385                        String tableKey = tableItr.next();
386                        HCatURI hcatUri = null;
387                        Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
388                        if (partKeyPatterns != null) {
389                            synchronized (partKeyPatterns) {
390                                Collection<String> partKeys = partitionMap.get(tableKey);
391                                if (partKeys != null) {
392                                    hcatUri = removePartitions(coordActionId, partKeys, partKeyPatterns);
393                                }
394                            }
395                            if (partKeyPatterns.size() == 0) {
396                                tableItr.remove();
397                                if (hcatUri != null) {
398                                    // Close JMS session. Stop listening on topic
399                                    hcatService.unregisterFromNotification(hcatUri);
400                                }
401                            }
402                        }
403                    }
404                }
405                actionPartitionMap.remove(coordActionId);
406            }
407        }
408    }
409
410    @Override
411    public void removeCoordActionWithDependenciesAvailable(String coordAction) {
412        actionPartitionMap.remove(coordAction);
413    }
414}