001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.dependency.hcat; 020 021import java.net.URISyntaxException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.Iterator; 028import java.util.List; 029import java.util.Map; 030import java.util.Map.Entry; 031import java.util.Set; 032import java.util.concurrent.ConcurrentHashMap; 033import java.util.concurrent.ConcurrentMap; 034 035import org.apache.hadoop.conf.Configuration; 036import org.apache.oozie.service.HCatAccessorService; 037import org.apache.oozie.service.Services; 038import org.apache.oozie.util.HCatURI; 039import org.apache.oozie.util.XLog; 040 041public class SimpleHCatDependencyCache implements HCatDependencyCache { 042 043 private static XLog LOG = XLog.getLog(SimpleHCatDependencyCache.class); 044 private static String DELIMITER = ";"; 045 046 /** 047 * Map of server;db;table - sorter partition key order (country;dt;state) - sorted partition 048 * value (us;20120101;CA) - Collection of waiting actions (actionID and original hcat uri as 049 * string). 050 */ 051 private ConcurrentMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>> missingDeps; 052 053 /** 054 * Map of actionIDs and collection of available URIs 055 */ 056 private ConcurrentMap<String, Collection<String>> availableDeps; 057 058 /** 059 * Map of actionIDs and partitions for reverse-lookup in purging 060 */ 061 private ConcurrentMap<String, ConcurrentMap<String, Collection<String>>> actionPartitionMap; 062 063 @Override 064 public void init(Configuration conf) { 065 missingDeps = new ConcurrentHashMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>>(); 066 availableDeps = new ConcurrentHashMap<String, Collection<String>>(); 067 actionPartitionMap = new ConcurrentHashMap<String, ConcurrentMap<String, Collection<String>>>(); 068 } 069 070 @Override 071 public void addMissingDependency(HCatURI hcatURI, String actionID) { 072 String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable(); 073 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); 074 // Partition keys seperated by ;. For eg: date;country;state 075 String partKey = sortedPKV.getPartKeys(); 076 // Partition values seperated by ;. For eg: 20120101;US;CA 077 String partVal = sortedPKV.getPartVals(); 078 ConcurrentMap<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey); 079 if (partKeyPatterns == null) { 080 partKeyPatterns = new ConcurrentHashMap<String, Map<String, Collection<WaitingAction>>>(); 081 ConcurrentMap<String, Map<String, Collection<WaitingAction>>> existingMap = missingDeps.putIfAbsent( 082 tableKey, partKeyPatterns); 083 if (existingMap != null) { 084 partKeyPatterns = existingMap; 085 } 086 } 087 ConcurrentMap<String, Collection<String>> partitionMap = actionPartitionMap.get(actionID); 088 if (partitionMap == null) { 089 partitionMap = new ConcurrentHashMap<String, Collection<String>>(); 090 ConcurrentMap<String, Collection<String>> existingPartMap = actionPartitionMap.putIfAbsent(actionID, 091 partitionMap); 092 if (existingPartMap != null) { 093 partitionMap = existingPartMap; 094 } 095 } 096 synchronized (partitionMap) { 097 Collection<String> partKeys = partitionMap.get(tableKey); 098 if (partKeys == null) { 099 partKeys = new ArrayList<String>(); 100 } 101 partKeys.add(partKey); 102 partitionMap.put(tableKey, partKeys); 103 } 104 synchronized (partKeyPatterns) { 105 missingDeps.put(tableKey, partKeyPatterns); // To handle race condition with removal of partKeyPatterns 106 Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey); 107 if (partValues == null) { 108 partValues = new HashMap<String, Collection<WaitingAction>>(); 109 partKeyPatterns.put(partKey, partValues); 110 } 111 Collection<WaitingAction> waitingActions = partValues.get(partVal); 112 if (waitingActions == null) { 113 waitingActions = new HashSet<WaitingAction>(); 114 partValues.put(partVal, waitingActions); 115 } 116 waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString())); 117 } 118 } 119 120 @Override 121 public boolean removeMissingDependency(HCatURI hcatURI, String actionID) { 122 String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable(); 123 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); 124 String partKey = sortedPKV.getPartKeys(); 125 String partVal = sortedPKV.getPartVals(); 126 Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey); 127 if (partKeyPatterns == null) { 128 LOG.warn("Remove missing dependency - Missing table entry - uri={0}, actionID={1}", 129 hcatURI.toURIString(), actionID); 130 return false; 131 } 132 ConcurrentMap<String, Collection<String>> partitionMap = actionPartitionMap.get(actionID); 133 if (partitionMap != null) { 134 synchronized (partitionMap) { 135 Collection<String> partKeys = partitionMap.get(tableKey); 136 if (partKeys != null) { 137 partKeys.remove(partKey); 138 } 139 if (partKeys.size() == 0) { 140 partitionMap.remove(tableKey); 141 } 142 if (partitionMap.size() == 0) { 143 actionPartitionMap.remove(actionID); 144 } 145 } 146 } 147 148 synchronized(partKeyPatterns) { 149 Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey); 150 if (partValues == null) { 151 LOG.warn("Remove missing dependency - Missing partition pattern - uri={0}, actionID={1}", 152 hcatURI.toURIString(), actionID); 153 return false; 154 } 155 Collection<WaitingAction> waitingActions = partValues.get(partVal); 156 if (waitingActions == null) { 157 LOG.warn("Remove missing dependency - Missing partition value - uri={0}, actionID={1}", 158 hcatURI.toURIString(), actionID); 159 return false; 160 } 161 boolean removed = waitingActions.remove(new WaitingAction(actionID, hcatURI.toURIString())); 162 if (!removed) { 163 LOG.warn("Remove missing dependency - Missing action ID - uri={0}, actionID={1}", 164 hcatURI.toURIString(), actionID); 165 } 166 if (waitingActions.isEmpty()) { 167 partValues.remove(partVal); 168 if (partValues.isEmpty()) { 169 partKeyPatterns.remove(partKey); 170 } 171 if (partKeyPatterns.isEmpty()) { 172 missingDeps.remove(tableKey); 173 // Close JMS session. Stop listening on topic 174 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 175 hcatService.unregisterFromNotification(hcatURI); 176 } 177 } 178 return removed; 179 } 180 } 181 182 @Override 183 public Collection<String> getWaitingActions(HCatURI hcatURI) { 184 String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable(); 185 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); 186 String partKey = sortedPKV.getPartKeys(); 187 String partVal = sortedPKV.getPartVals(); 188 Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey); 189 if (partKeyPatterns == null) { 190 return null; 191 } 192 Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey); 193 if (partValues == null) { 194 return null; 195 } 196 Collection<WaitingAction> waitingActions = partValues.get(partVal); 197 if (waitingActions == null) { 198 return null; 199 } 200 Collection<String> actionIDs = new ArrayList<String>(); 201 String uriString = hcatURI.toURIString(); 202 for (WaitingAction action : waitingActions) { 203 if (action.getDependencyURI().equals(uriString)) { 204 actionIDs.add(action.getActionID()); 205 } 206 } 207 return actionIDs; 208 } 209 210 @Override 211 public Collection<String> markDependencyAvailable(String server, String db, String table, 212 Map<String, String> partitions) { 213 String tableKey = server + DELIMITER + db + DELIMITER + table; 214 Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey); 215 if (partKeyPatterns == null) { 216 LOG.warn("Got partition available notification for " + tableKey 217 + ". Unexpected and should not be listening to topic. Unregistering topic"); 218 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 219 hcatService.unregisterFromNotification(server, db, table); 220 return null; 221 } 222 Collection<String> actionsWithAvailDep = new HashSet<String>(); 223 List<String> partKeysToRemove = new ArrayList<String>(); 224 StringBuilder partValSB = new StringBuilder(); 225 synchronized (partKeyPatterns) { 226 // If partition patterns are date, date;country and date;country;state, 227 // construct the partition values for each pattern from the available partitions map and 228 // for the matching value in the dependency map, get the waiting actions. 229 for (Entry<String, Map<String, Collection<WaitingAction>>> entry : partKeyPatterns.entrySet()) { 230 String[] partKeys = entry.getKey().split(DELIMITER); 231 partValSB.setLength(0); 232 for (String key : partKeys) { 233 partValSB.append(partitions.get(key)).append(DELIMITER); 234 } 235 partValSB.setLength(partValSB.length() - 1); 236 237 Map<String, Collection<WaitingAction>> partValues = entry.getValue(); 238 String partVal = partValSB.toString(); 239 Collection<WaitingAction> wActions = entry.getValue().get(partVal); 240 if (wActions == null) 241 continue; 242 for (WaitingAction wAction : wActions) { 243 String actionID = wAction.getActionID(); 244 actionsWithAvailDep.add(actionID); 245 Collection<String> depURIs = availableDeps.get(actionID); 246 if (depURIs == null) { 247 depURIs = new ArrayList<String>(); 248 Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs); 249 if (existing != null) { 250 depURIs = existing; 251 } 252 } 253 synchronized (depURIs) { 254 depURIs.add(wAction.getDependencyURI()); 255 availableDeps.put(actionID, depURIs); 256 } 257 } 258 partValues.remove(partVal); 259 if (partValues.isEmpty()) { 260 partKeysToRemove.add(entry.getKey()); 261 } 262 } 263 for (String partKey : partKeysToRemove) { 264 partKeyPatterns.remove(partKey); 265 } 266 if (partKeyPatterns.isEmpty()) { 267 missingDeps.remove(tableKey); 268 // Close JMS session. Stop listening on topic 269 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 270 hcatService.unregisterFromNotification(server, db, table); 271 } 272 } 273 return actionsWithAvailDep; 274 } 275 276 @Override 277 public Collection<String> getAvailableDependencyURIs(String actionID) { 278 Collection<String> available = availableDeps.get(actionID); 279 if (available != null) { 280 // Return a copy 281 available = new ArrayList<String>(available); 282 } 283 return available; 284 } 285 286 @Override 287 public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) { 288 if (!availableDeps.containsKey(actionID)) { 289 return false; 290 } 291 else { 292 Collection<String> availList = availableDeps.get(actionID); 293 if (!availList.removeAll(dependencyURIs)) { 294 return false; 295 } 296 synchronized (availList) { 297 if (availList.isEmpty()) { 298 availableDeps.remove(actionID); 299 } 300 } 301 } 302 return true; 303 } 304 305 @Override 306 public void destroy() { 307 missingDeps.clear(); 308 availableDeps.clear(); 309 } 310 311 private static class SortedPKV { 312 private StringBuilder partKeys; 313 private StringBuilder partVals; 314 315 public SortedPKV(Map<String, String> partitions) { 316 this.partKeys = new StringBuilder(); 317 this.partVals = new StringBuilder(); 318 ArrayList<String> keys = new ArrayList<String>(partitions.keySet()); 319 Collections.sort(keys); 320 for (String key : keys) { 321 this.partKeys.append(key).append(DELIMITER); 322 this.partVals.append(partitions.get(key)).append(DELIMITER); 323 } 324 this.partKeys.setLength(partKeys.length() - 1); 325 this.partVals.setLength(partVals.length() - 1); 326 } 327 328 public String getPartKeys() { 329 return partKeys.toString(); 330 } 331 332 public String getPartVals() { 333 return partVals.toString(); 334 } 335 } 336 337 private HCatURI removePartitions(String coordActionId, Collection<String> partKeys, 338 Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns) { 339 HCatURI hcatUri = null; 340 for (String partKey : partKeys) { 341 Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey); 342 Iterator<String> partValItr = partValues.keySet().iterator(); 343 while (partValItr.hasNext()) { 344 String partVal = partValItr.next(); 345 Collection<WaitingAction> waitingActions = partValues.get(partVal); 346 if (waitingActions != null) { 347 Iterator<WaitingAction> waitItr = waitingActions.iterator(); 348 while (waitItr.hasNext()) { 349 WaitingAction waction = waitItr.next(); 350 if (coordActionId.contains(waction.getActionID())) { 351 waitItr.remove(); 352 if (hcatUri == null) { 353 try { 354 hcatUri = new HCatURI(waction.getDependencyURI()); 355 } 356 catch (URISyntaxException e) { 357 continue; 358 } 359 } 360 } 361 } 362 } 363 // delete partition value with no waiting actions 364 if (waitingActions.size() == 0) { 365 partValItr.remove(); 366 } 367 } 368 if (partValues.size() == 0) { 369 partKeyPatterns.remove(partKey); 370 } 371 } 372 return hcatUri; 373 } 374 375 @Override 376 public void removeNonWaitingCoordActions(Set<String> coordActions) { 377 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 378 for (String coordActionId : coordActions) { 379 LOG.info("Removing non waiting coord action {0} from partition dependency map", coordActionId); 380 synchronized (actionPartitionMap) { 381 Map<String, Collection<String>> partitionMap = actionPartitionMap.get(coordActionId); 382 if (partitionMap != null) { 383 Iterator<String> tableItr = partitionMap.keySet().iterator(); 384 while (tableItr.hasNext()) { 385 String tableKey = tableItr.next(); 386 HCatURI hcatUri = null; 387 Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey); 388 if (partKeyPatterns != null) { 389 synchronized (partKeyPatterns) { 390 Collection<String> partKeys = partitionMap.get(tableKey); 391 if (partKeys != null) { 392 hcatUri = removePartitions(coordActionId, partKeys, partKeyPatterns); 393 } 394 } 395 if (partKeyPatterns.size() == 0) { 396 tableItr.remove(); 397 if (hcatUri != null) { 398 // Close JMS session. Stop listening on topic 399 hcatService.unregisterFromNotification(hcatUri); 400 } 401 } 402 } 403 } 404 } 405 actionPartitionMap.remove(coordActionId); 406 } 407 } 408 } 409 410 @Override 411 public void removeCoordActionWithDependenciesAvailable(String coordAction) { 412 actionPartitionMap.remove(coordAction); 413 } 414}