001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.dependency.hcat;
019
020import java.net.URISyntaxException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.Map.Entry;
030import java.util.Set;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.ConcurrentMap;
033
034import org.apache.hadoop.conf.Configuration;
035import org.apache.oozie.service.HCatAccessorService;
036import org.apache.oozie.service.Services;
037import org.apache.oozie.util.HCatURI;
038import org.apache.oozie.util.XLog;
039
040public class SimpleHCatDependencyCache implements HCatDependencyCache {
041
042    private static XLog LOG = XLog.getLog(SimpleHCatDependencyCache.class);
043    private static String DELIMITER = ";";
044
045    /**
046     * Map of server;db;table - sorter partition key order (country;dt;state) - sorted partition
047     * value (us;20120101;CA) - Collection of waiting actions (actionID and original hcat uri as
048     * string).
049     */
050    private ConcurrentMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>> missingDeps;
051
052    /**
053     * Map of actionIDs and collection of available URIs
054     */
055    private ConcurrentMap<String, Collection<String>> availableDeps;
056
057    /**
058     * Map of actionIDs and partitions for reverse-lookup in purging
059     */
060    private ConcurrentMap<String, ConcurrentMap<String, Collection<String>>> actionPartitionMap;
061
062    @Override
063    public void init(Configuration conf) {
064        missingDeps = new ConcurrentHashMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>>();
065        availableDeps = new ConcurrentHashMap<String, Collection<String>>();
066        actionPartitionMap = new ConcurrentHashMap<String, ConcurrentMap<String, Collection<String>>>();
067    }
068
069    @Override
070    public void addMissingDependency(HCatURI hcatURI, String actionID) {
071        String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable();
072        SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
073        // Partition keys seperated by ;. For eg: date;country;state
074        String partKey = sortedPKV.getPartKeys();
075        // Partition values seperated by ;. For eg: 20120101;US;CA
076        String partVal = sortedPKV.getPartVals();
077        ConcurrentMap<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
078        if (partKeyPatterns == null) {
079            partKeyPatterns = new ConcurrentHashMap<String, Map<String, Collection<WaitingAction>>>();
080            ConcurrentMap<String, Map<String, Collection<WaitingAction>>> existingMap = missingDeps.putIfAbsent(
081                    tableKey, partKeyPatterns);
082            if (existingMap != null) {
083                partKeyPatterns = existingMap;
084            }
085        }
086        ConcurrentMap<String, Collection<String>> partitionMap = actionPartitionMap.get(actionID);
087        if (partitionMap == null) {
088            partitionMap = new ConcurrentHashMap<String, Collection<String>>();
089            ConcurrentMap<String, Collection<String>> existingPartMap = actionPartitionMap.putIfAbsent(actionID,
090                    partitionMap);
091            if (existingPartMap != null) {
092                partitionMap = existingPartMap;
093            }
094        }
095        synchronized (partitionMap) {
096            Collection<String> partKeys = partitionMap.get(tableKey);
097            if (partKeys == null) {
098                partKeys = new ArrayList<String>();
099            }
100            partKeys.add(partKey);
101            partitionMap.put(tableKey, partKeys);
102        }
103        synchronized (partKeyPatterns) {
104            missingDeps.put(tableKey, partKeyPatterns); // To handle race condition with removal of partKeyPatterns
105            Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
106            if (partValues == null) {
107                partValues = new HashMap<String, Collection<WaitingAction>>();
108                partKeyPatterns.put(partKey, partValues);
109            }
110            Collection<WaitingAction> waitingActions = partValues.get(partVal);
111            if (waitingActions == null) {
112                waitingActions = new HashSet<WaitingAction>();
113                partValues.put(partVal, waitingActions);
114            }
115            waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
116        }
117    }
118
119    @Override
120    public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
121        String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable();
122        SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
123        String partKey = sortedPKV.getPartKeys();
124        String partVal = sortedPKV.getPartVals();
125        Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
126        if (partKeyPatterns == null) {
127            LOG.warn("Remove missing dependency - Missing table entry - uri={0}, actionID={1}",
128                    hcatURI.toURIString(), actionID);
129            return false;
130        }
131        ConcurrentMap<String, Collection<String>> partitionMap = actionPartitionMap.get(actionID);
132        if (partitionMap != null) {
133            synchronized (partitionMap) {
134                Collection<String> partKeys = partitionMap.get(tableKey);
135                if (partKeys != null) {
136                    partKeys.remove(partKey);
137                }
138                if (partKeys.size() == 0) {
139                    partitionMap.remove(tableKey);
140                }
141                if (partitionMap.size() == 0) {
142                    actionPartitionMap.remove(actionID);
143                }
144            }
145        }
146
147        synchronized(partKeyPatterns) {
148            Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
149            if (partValues == null) {
150                LOG.warn("Remove missing dependency - Missing partition pattern - uri={0}, actionID={1}",
151                        hcatURI.toURIString(), actionID);
152                return false;
153            }
154            Collection<WaitingAction> waitingActions = partValues.get(partVal);
155            if (waitingActions == null) {
156                LOG.warn("Remove missing dependency - Missing partition value - uri={0}, actionID={1}",
157                        hcatURI.toURIString(), actionID);
158                return false;
159            }
160            boolean removed = waitingActions.remove(new WaitingAction(actionID, hcatURI.toURIString()));
161            if (!removed) {
162                LOG.warn("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
163                        hcatURI.toURIString(), actionID);
164            }
165            if (waitingActions.isEmpty()) {
166                partValues.remove(partVal);
167                if (partValues.isEmpty()) {
168                    partKeyPatterns.remove(partKey);
169                }
170                if (partKeyPatterns.isEmpty()) {
171                    missingDeps.remove(tableKey);
172                    // Close JMS session. Stop listening on topic
173                    HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
174                    hcatService.unregisterFromNotification(hcatURI);
175                }
176            }
177            return removed;
178        }
179    }
180
181    @Override
182    public Collection<String> getWaitingActions(HCatURI hcatURI) {
183        String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable();
184        SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
185        String partKey = sortedPKV.getPartKeys();
186        String partVal = sortedPKV.getPartVals();
187        Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
188        if (partKeyPatterns == null) {
189            return null;
190        }
191        Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
192        if (partValues == null) {
193            return null;
194        }
195        Collection<WaitingAction> waitingActions = partValues.get(partVal);
196        if (waitingActions == null) {
197            return null;
198        }
199        Collection<String> actionIDs = new ArrayList<String>();
200        String uriString = hcatURI.toURIString();
201        for (WaitingAction action : waitingActions) {
202            if (action.getDependencyURI().equals(uriString)) {
203                actionIDs.add(action.getActionID());
204            }
205        }
206        return actionIDs;
207    }
208
209    @Override
210    public Collection<String> markDependencyAvailable(String server, String db, String table,
211            Map<String, String> partitions) {
212        String tableKey = server + DELIMITER + db + DELIMITER + table;
213        Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
214        if (partKeyPatterns == null) {
215            LOG.warn("Got partition available notification for " + tableKey
216                    + ". Unexpected and should not be listening to topic. Unregistering topic");
217            HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
218            hcatService.unregisterFromNotification(server, db, table);
219            return null;
220        }
221        Collection<String> actionsWithAvailDep = new HashSet<String>();
222        List<String> partKeysToRemove = new ArrayList<String>();
223        StringBuilder partValSB = new StringBuilder();
224        synchronized (partKeyPatterns) {
225            // If partition patterns are date, date;country and date;country;state,
226            // construct the partition values for each pattern from the available partitions map and
227            // for the matching value in the dependency map, get the waiting actions.
228            for (Entry<String, Map<String, Collection<WaitingAction>>> entry : partKeyPatterns.entrySet()) {
229                String[] partKeys = entry.getKey().split(DELIMITER);
230                partValSB.setLength(0);
231                for (String key : partKeys) {
232                    partValSB.append(partitions.get(key)).append(DELIMITER);
233                }
234                partValSB.setLength(partValSB.length() - 1);
235
236                Map<String, Collection<WaitingAction>> partValues = entry.getValue();
237                String partVal = partValSB.toString();
238                Collection<WaitingAction> wActions = entry.getValue().get(partVal);
239                if (wActions == null)
240                    continue;
241                for (WaitingAction wAction : wActions) {
242                    String actionID = wAction.getActionID();
243                    actionsWithAvailDep.add(actionID);
244                    Collection<String> depURIs = availableDeps.get(actionID);
245                    if (depURIs == null) {
246                        depURIs = new ArrayList<String>();
247                        Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs);
248                        if (existing != null) {
249                            depURIs = existing;
250                        }
251                    }
252                    synchronized (depURIs) {
253                        depURIs.add(wAction.getDependencyURI());
254                        availableDeps.put(actionID, depURIs);
255                    }
256                }
257                partValues.remove(partVal);
258                if (partValues.isEmpty()) {
259                    partKeysToRemove.add(entry.getKey());
260                }
261            }
262            for (String partKey : partKeysToRemove) {
263                partKeyPatterns.remove(partKey);
264            }
265            if (partKeyPatterns.isEmpty()) {
266                missingDeps.remove(tableKey);
267                // Close JMS session. Stop listening on topic
268                HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
269                hcatService.unregisterFromNotification(server, db, table);
270            }
271        }
272        return actionsWithAvailDep;
273    }
274
275    @Override
276    public Collection<String> getAvailableDependencyURIs(String actionID) {
277        Collection<String> available = availableDeps.get(actionID);
278        if (available !=  null) {
279            // Return a copy
280            available = new ArrayList<String>(available);
281        }
282        return available;
283    }
284
285    @Override
286    public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) {
287        if (!availableDeps.containsKey(actionID)) {
288            return false;
289        }
290        else {
291            Collection<String> availList = availableDeps.get(actionID);
292            if (!availList.removeAll(dependencyURIs)) {
293                return false;
294            }
295            synchronized (availList) {
296                if (availList.isEmpty()) {
297                    availableDeps.remove(actionID);
298                }
299            }
300        }
301        return true;
302    }
303
304    @Override
305    public void destroy() {
306        missingDeps.clear();
307        availableDeps.clear();
308    }
309
310    private static class SortedPKV {
311        private StringBuilder partKeys;
312        private StringBuilder partVals;
313
314        public SortedPKV(Map<String, String> partitions) {
315            this.partKeys = new StringBuilder();
316            this.partVals = new StringBuilder();
317            ArrayList<String> keys = new ArrayList<String>(partitions.keySet());
318            Collections.sort(keys);
319            for (String key : keys) {
320                this.partKeys.append(key).append(DELIMITER);
321                this.partVals.append(partitions.get(key)).append(DELIMITER);
322            }
323            this.partKeys.setLength(partKeys.length() - 1);
324            this.partVals.setLength(partVals.length() - 1);
325        }
326
327        public String getPartKeys() {
328            return partKeys.toString();
329        }
330
331        public String getPartVals() {
332            return partVals.toString();
333        }
334    }
335
336    private HCatURI removePartitions(String coordActionId, Collection<String> partKeys,
337            Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns) {
338        HCatURI hcatUri = null;
339        for (String partKey : partKeys) {
340            Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
341            Iterator<String> partValItr = partValues.keySet().iterator();
342            while (partValItr.hasNext()) {
343                String partVal = partValItr.next();
344                Collection<WaitingAction> waitingActions = partValues.get(partVal);
345                if (waitingActions != null) {
346                    Iterator<WaitingAction> waitItr = waitingActions.iterator();
347                    while (waitItr.hasNext()) {
348                        WaitingAction waction = waitItr.next();
349                        if (coordActionId.contains(waction.getActionID())) {
350                            waitItr.remove();
351                            if (hcatUri == null) {
352                                try {
353                                    hcatUri = new HCatURI(waction.getDependencyURI());
354                                }
355                                catch (URISyntaxException e) {
356                                    continue;
357                                }
358                            }
359                        }
360                    }
361                }
362                // delete partition value with no waiting actions
363                if (waitingActions.size() == 0) {
364                    partValItr.remove();
365                }
366            }
367            if (partValues.size() == 0) {
368                partKeyPatterns.remove(partKey);
369            }
370        }
371        return hcatUri;
372    }
373
374    @Override
375    public void removeNonWaitingCoordActions(Set<String> coordActions) {
376        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
377        for (String coordActionId : coordActions) {
378            LOG.info("Removing non waiting coord action {0} from partition dependency map", coordActionId);
379            synchronized (actionPartitionMap) {
380                Map<String, Collection<String>> partitionMap = actionPartitionMap.get(coordActionId);
381                if (partitionMap != null) {
382                    Iterator<String> tableItr = partitionMap.keySet().iterator();
383                    while (tableItr.hasNext()) {
384                        String tableKey = tableItr.next();
385                        HCatURI hcatUri = null;
386                        Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
387                        if (partKeyPatterns != null) {
388                            synchronized (partKeyPatterns) {
389                                Collection<String> partKeys = partitionMap.get(tableKey);
390                                if (partKeys != null) {
391                                    hcatUri = removePartitions(coordActionId, partKeys, partKeyPatterns);
392                                }
393                            }
394                            if (partKeyPatterns.size() == 0) {
395                                tableItr.remove();
396                                if (hcatUri != null) {
397                                    // Close JMS session. Stop listening on topic
398                                    hcatService.unregisterFromNotification(hcatUri);
399                                }
400                            }
401                        }
402                    }
403                }
404                actionPartitionMap.remove(coordActionId);
405            }
406        }
407    }
408
409    @Override
410    public void removeCoordActionWithDependenciesAvailable(String coordAction) {
411        actionPartitionMap.remove(coordAction);
412    }
413}