001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.dependency.hcat; 019 020 import java.util.ArrayList; 021 import java.util.Collection; 022 import java.util.Collections; 023 import java.util.HashMap; 024 import java.util.HashSet; 025 import java.util.List; 026 import java.util.Map; 027 import java.util.Map.Entry; 028 import java.util.concurrent.ConcurrentHashMap; 029 import java.util.concurrent.ConcurrentMap; 030 031 import org.apache.hadoop.conf.Configuration; 032 import org.apache.oozie.service.HCatAccessorService; 033 import org.apache.oozie.service.Services; 034 import org.apache.oozie.util.HCatURI; 035 import org.apache.oozie.util.XLog; 036 037 public class SimpleHCatDependencyCache implements HCatDependencyCache { 038 039 private static XLog LOG = XLog.getLog(SimpleHCatDependencyCache.class); 040 private static String DELIMITER = ";"; 041 042 /** 043 * Map of server;db;table - sorter partition key order (country;dt;state) - sorted partition 044 * value (us;20120101;CA) - Collection of waiting actions (actionID and original hcat uri as 045 * string). 046 */ 047 private ConcurrentMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>> missingDeps; 048 049 /** 050 * Map of actionIDs and collection of available URIs 051 */ 052 private ConcurrentMap<String, Collection<String>> availableDeps; 053 054 // TODO: 055 // Gather and print stats on cache hits and misses. 056 057 @Override 058 public void init(Configuration conf) { 059 missingDeps = new ConcurrentHashMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>>(); 060 availableDeps = new ConcurrentHashMap<String, Collection<String>>(); 061 } 062 063 @Override 064 public void addMissingDependency(HCatURI hcatURI, String actionID) { 065 String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable(); 066 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); 067 // Partition keys seperated by ;. For eg: date;country;state 068 String partKey = sortedPKV.getPartKeys(); 069 // Partition values seperated by ;. For eg: 20120101;US;CA 070 String partVal = sortedPKV.getPartVals(); 071 ConcurrentMap<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey); 072 if (partKeyPatterns == null) { 073 partKeyPatterns = new ConcurrentHashMap<String, Map<String, Collection<WaitingAction>>>(); 074 ConcurrentMap<String, Map<String, Collection<WaitingAction>>> existingMap = missingDeps.putIfAbsent( 075 tableKey, partKeyPatterns); 076 if (existingMap != null) { 077 partKeyPatterns = existingMap; 078 } 079 } 080 synchronized (partKeyPatterns) { 081 missingDeps.put(tableKey, partKeyPatterns); // To handle race condition with removal of partKeyPatterns 082 Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey); 083 if (partValues == null) { 084 partValues = new HashMap<String, Collection<WaitingAction>>(); 085 partKeyPatterns.put(partKey, partValues); 086 } 087 Collection<WaitingAction> waitingActions = partValues.get(partVal); 088 if (waitingActions == null) { 089 waitingActions = new HashSet<WaitingAction>(); 090 partValues.put(partVal, waitingActions); 091 } 092 waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString())); 093 } 094 } 095 096 @Override 097 public boolean removeMissingDependency(HCatURI hcatURI, String actionID) { 098 String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable(); 099 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); 100 String partKey = sortedPKV.getPartKeys(); 101 String partVal = sortedPKV.getPartVals(); 102 Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey); 103 if (partKeyPatterns == null) { 104 LOG.warn("Remove missing dependency - Missing table entry - uri={0}, actionID={1}", 105 hcatURI.toURIString(), actionID); 106 return false; 107 } 108 synchronized(partKeyPatterns) { 109 Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey); 110 if (partValues == null) { 111 LOG.warn("Remove missing dependency - Missing partition pattern - uri={0}, actionID={1}", 112 hcatURI.toURIString(), actionID); 113 return false; 114 } 115 Collection<WaitingAction> waitingActions = partValues.get(partVal); 116 if (waitingActions == null) { 117 LOG.warn("Remove missing dependency - Missing partition value - uri={0}, actionID={1}", 118 hcatURI.toURIString(), actionID); 119 return false; 120 } 121 boolean removed = waitingActions.remove(new WaitingAction(actionID, hcatURI.toURIString())); 122 if (!removed) { 123 LOG.warn("Remove missing dependency - Missing action ID - uri={0}, actionID={1}", 124 hcatURI.toURIString(), actionID); 125 } 126 if (waitingActions.isEmpty()) { 127 partValues.remove(partVal); 128 if (partValues.isEmpty()) { 129 partKeyPatterns.remove(partKey); 130 } 131 if (partKeyPatterns.isEmpty()) { 132 missingDeps.remove(tableKey); 133 // Close JMS session. Stop listening on topic 134 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 135 hcatService.unregisterFromNotification(hcatURI); 136 } 137 } 138 return removed; 139 } 140 } 141 142 @Override 143 public Collection<String> getWaitingActions(HCatURI hcatURI) { 144 String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable(); 145 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); 146 String partKey = sortedPKV.getPartKeys(); 147 String partVal = sortedPKV.getPartVals(); 148 Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey); 149 if (partKeyPatterns == null) { 150 return null; 151 } 152 Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey); 153 if (partValues == null) { 154 return null; 155 } 156 Collection<WaitingAction> waitingActions = partValues.get(partVal); 157 if (waitingActions == null) { 158 return null; 159 } 160 Collection<String> actionIDs = new ArrayList<String>(); 161 String uriString = hcatURI.toURIString(); 162 for (WaitingAction action : waitingActions) { 163 if (action.getDependencyURI().equals(uriString)) { 164 actionIDs.add(action.getActionID()); 165 } 166 } 167 return actionIDs; 168 } 169 170 @Override 171 public Collection<String> markDependencyAvailable(String server, String db, String table, 172 Map<String, String> partitions) { 173 String tableKey = server + DELIMITER + db + DELIMITER + table; 174 Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey); 175 if (partKeyPatterns == null) { 176 LOG.warn("Got partition available notification for " + tableKey 177 + ". Unexpected and should not be listening to topic. Unregistering topic"); 178 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 179 hcatService.unregisterFromNotification(server, db, table); 180 return null; 181 } 182 Collection<String> actionsWithAvailDep = new HashSet<String>(); 183 List<String> partKeysToRemove = new ArrayList<String>(); 184 StringBuilder partValSB = new StringBuilder(); 185 synchronized (partKeyPatterns) { 186 // If partition patterns are date, date;country and date;country;state, 187 // construct the partition values for each pattern from the available partitions map and 188 // for the matching value in the dependency map, get the waiting actions. 189 for (Entry<String, Map<String, Collection<WaitingAction>>> entry : partKeyPatterns.entrySet()) { 190 String[] partKeys = entry.getKey().split(DELIMITER); 191 partValSB.setLength(0); 192 for (String key : partKeys) { 193 partValSB.append(partitions.get(key)).append(DELIMITER); 194 } 195 partValSB.setLength(partValSB.length() - 1); 196 197 Map<String, Collection<WaitingAction>> partValues = entry.getValue(); 198 String partVal = partValSB.toString(); 199 Collection<WaitingAction> wActions = entry.getValue().get(partVal); 200 if (wActions == null) 201 continue; 202 for (WaitingAction wAction : wActions) { 203 String actionID = wAction.getActionID(); 204 actionsWithAvailDep.add(actionID); 205 Collection<String> depURIs = availableDeps.get(actionID); 206 if (depURIs == null) { 207 depURIs = new ArrayList<String>(); 208 Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs); 209 if (existing != null) { 210 depURIs = existing; 211 } 212 } 213 synchronized (depURIs) { 214 depURIs.add(wAction.getDependencyURI()); 215 availableDeps.put(actionID, depURIs); 216 } 217 } 218 partValues.remove(partVal); 219 if (partValues.isEmpty()) { 220 partKeysToRemove.add(entry.getKey()); 221 } 222 } 223 for (String partKey : partKeysToRemove) { 224 partKeyPatterns.remove(partKey); 225 } 226 if (partKeyPatterns.isEmpty()) { 227 missingDeps.remove(tableKey); 228 // Close JMS session. Stop listening on topic 229 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 230 hcatService.unregisterFromNotification(server, db, table); 231 } 232 } 233 return actionsWithAvailDep; 234 } 235 236 @Override 237 public Collection<String> getAvailableDependencyURIs(String actionID) { 238 Collection<String> available = availableDeps.get(actionID); 239 if (available != null) { 240 // Return a copy 241 available = new ArrayList<String>(available); 242 } 243 return available; 244 } 245 246 @Override 247 public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) { 248 if (!availableDeps.containsKey(actionID)) { 249 return false; 250 } 251 else { 252 Collection<String> availList = availableDeps.get(actionID); 253 if (!availList.removeAll(dependencyURIs)) { 254 return false; 255 } 256 synchronized (availList) { 257 if (availList.isEmpty()) { 258 availableDeps.remove(actionID); 259 } 260 } 261 } 262 return true; 263 } 264 265 @Override 266 public void destroy() { 267 missingDeps.clear(); 268 availableDeps.clear(); 269 } 270 271 private static class SortedPKV { 272 private StringBuilder partKeys; 273 private StringBuilder partVals; 274 275 public SortedPKV(Map<String, String> partitions) { 276 this.partKeys = new StringBuilder(); 277 this.partVals = new StringBuilder(); 278 ArrayList<String> keys = new ArrayList<String>(partitions.keySet()); 279 Collections.sort(keys); 280 for (String key : keys) { 281 this.partKeys.append(key).append(DELIMITER); 282 this.partVals.append(partitions.get(key)).append(DELIMITER); 283 } 284 this.partKeys.setLength(partKeys.length() - 1); 285 this.partVals.setLength(partVals.length() - 1); 286 } 287 288 public String getPartKeys() { 289 return partKeys.toString(); 290 } 291 292 public String getPartVals() { 293 return partVals.toString(); 294 } 295 296 } 297 298 }