This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.dependency.hcat;
019
020 import java.util.ArrayList;
021 import java.util.Collection;
022 import java.util.Collections;
023 import java.util.HashMap;
024 import java.util.HashSet;
025 import java.util.List;
026 import java.util.Map;
027 import java.util.Map.Entry;
028 import java.util.concurrent.ConcurrentHashMap;
029 import java.util.concurrent.ConcurrentMap;
030
031 import org.apache.hadoop.conf.Configuration;
032 import org.apache.oozie.service.HCatAccessorService;
033 import org.apache.oozie.service.Services;
034 import org.apache.oozie.util.HCatURI;
035 import org.apache.oozie.util.XLog;
036
037 public class SimpleHCatDependencyCache implements HCatDependencyCache {
038
039 private static XLog LOG = XLog.getLog(SimpleHCatDependencyCache.class);
040 private static String DELIMITER = ";";
041
042 /**
043 * Map of server;db;table - sorter partition key order (country;dt;state) - sorted partition
044 * value (us;20120101;CA) - Collection of waiting actions (actionID and original hcat uri as
045 * string).
046 */
047 private ConcurrentMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>> missingDeps;
048
049 /**
050 * Map of actionIDs and collection of available URIs
051 */
052 private ConcurrentMap<String, Collection<String>> availableDeps;
053
054 // TODO:
055 // Gather and print stats on cache hits and misses.
056
057 @Override
058 public void init(Configuration conf) {
059 missingDeps = new ConcurrentHashMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>>();
060 availableDeps = new ConcurrentHashMap<String, Collection<String>>();
061 }
062
063 @Override
064 public void addMissingDependency(HCatURI hcatURI, String actionID) {
065 String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable();
066 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
067 // Partition keys seperated by ;. For eg: date;country;state
068 String partKey = sortedPKV.getPartKeys();
069 // Partition values seperated by ;. For eg: 20120101;US;CA
070 String partVal = sortedPKV.getPartVals();
071 ConcurrentMap<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
072 if (partKeyPatterns == null) {
073 partKeyPatterns = new ConcurrentHashMap<String, Map<String, Collection<WaitingAction>>>();
074 ConcurrentMap<String, Map<String, Collection<WaitingAction>>> existingMap = missingDeps.putIfAbsent(
075 tableKey, partKeyPatterns);
076 if (existingMap != null) {
077 partKeyPatterns = existingMap;
078 }
079 }
080 synchronized (partKeyPatterns) {
081 missingDeps.put(tableKey, partKeyPatterns); // To handle race condition with removal of partKeyPatterns
082 Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
083 if (partValues == null) {
084 partValues = new HashMap<String, Collection<WaitingAction>>();
085 partKeyPatterns.put(partKey, partValues);
086 }
087 Collection<WaitingAction> waitingActions = partValues.get(partVal);
088 if (waitingActions == null) {
089 waitingActions = new HashSet<WaitingAction>();
090 partValues.put(partVal, waitingActions);
091 }
092 waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
093 }
094 }
095
096 @Override
097 public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
098 String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable();
099 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
100 String partKey = sortedPKV.getPartKeys();
101 String partVal = sortedPKV.getPartVals();
102 Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
103 if (partKeyPatterns == null) {
104 LOG.warn("Remove missing dependency - Missing table entry - uri={0}, actionID={1}",
105 hcatURI.toURIString(), actionID);
106 return false;
107 }
108 synchronized(partKeyPatterns) {
109 Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
110 if (partValues == null) {
111 LOG.warn("Remove missing dependency - Missing partition pattern - uri={0}, actionID={1}",
112 hcatURI.toURIString(), actionID);
113 return false;
114 }
115 Collection<WaitingAction> waitingActions = partValues.get(partVal);
116 if (waitingActions == null) {
117 LOG.warn("Remove missing dependency - Missing partition value - uri={0}, actionID={1}",
118 hcatURI.toURIString(), actionID);
119 return false;
120 }
121 boolean removed = waitingActions.remove(new WaitingAction(actionID, hcatURI.toURIString()));
122 if (!removed) {
123 LOG.warn("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
124 hcatURI.toURIString(), actionID);
125 }
126 if (waitingActions.isEmpty()) {
127 partValues.remove(partVal);
128 if (partValues.isEmpty()) {
129 partKeyPatterns.remove(partKey);
130 }
131 if (partKeyPatterns.isEmpty()) {
132 missingDeps.remove(tableKey);
133 // Close JMS session. Stop listening on topic
134 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
135 hcatService.unregisterFromNotification(hcatURI);
136 }
137 }
138 return removed;
139 }
140 }
141
142 @Override
143 public Collection<String> getWaitingActions(HCatURI hcatURI) {
144 String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable();
145 SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
146 String partKey = sortedPKV.getPartKeys();
147 String partVal = sortedPKV.getPartVals();
148 Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
149 if (partKeyPatterns == null) {
150 return null;
151 }
152 Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
153 if (partValues == null) {
154 return null;
155 }
156 Collection<WaitingAction> waitingActions = partValues.get(partVal);
157 if (waitingActions == null) {
158 return null;
159 }
160 Collection<String> actionIDs = new ArrayList<String>();
161 String uriString = hcatURI.toURIString();
162 for (WaitingAction action : waitingActions) {
163 if (action.getDependencyURI().equals(uriString)) {
164 actionIDs.add(action.getActionID());
165 }
166 }
167 return actionIDs;
168 }
169
170 @Override
171 public Collection<String> markDependencyAvailable(String server, String db, String table,
172 Map<String, String> partitions) {
173 String tableKey = server + DELIMITER + db + DELIMITER + table;
174 Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
175 if (partKeyPatterns == null) {
176 LOG.warn("Got partition available notification for " + tableKey
177 + ". Unexpected and should not be listening to topic. Unregistering topic");
178 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
179 hcatService.unregisterFromNotification(server, db, table);
180 return null;
181 }
182 Collection<String> actionsWithAvailDep = new HashSet<String>();
183 List<String> partKeysToRemove = new ArrayList<String>();
184 StringBuilder partValSB = new StringBuilder();
185 synchronized (partKeyPatterns) {
186 // If partition patterns are date, date;country and date;country;state,
187 // construct the partition values for each pattern from the available partitions map and
188 // for the matching value in the dependency map, get the waiting actions.
189 for (Entry<String, Map<String, Collection<WaitingAction>>> entry : partKeyPatterns.entrySet()) {
190 String[] partKeys = entry.getKey().split(DELIMITER);
191 partValSB.setLength(0);
192 for (String key : partKeys) {
193 partValSB.append(partitions.get(key)).append(DELIMITER);
194 }
195 partValSB.setLength(partValSB.length() - 1);
196
197 Map<String, Collection<WaitingAction>> partValues = entry.getValue();
198 String partVal = partValSB.toString();
199 Collection<WaitingAction> wActions = entry.getValue().get(partVal);
200 if (wActions == null)
201 continue;
202 for (WaitingAction wAction : wActions) {
203 String actionID = wAction.getActionID();
204 actionsWithAvailDep.add(actionID);
205 Collection<String> depURIs = availableDeps.get(actionID);
206 if (depURIs == null) {
207 depURIs = new ArrayList<String>();
208 Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs);
209 if (existing != null) {
210 depURIs = existing;
211 }
212 }
213 synchronized (depURIs) {
214 depURIs.add(wAction.getDependencyURI());
215 availableDeps.put(actionID, depURIs);
216 }
217 }
218 partValues.remove(partVal);
219 if (partValues.isEmpty()) {
220 partKeysToRemove.add(entry.getKey());
221 }
222 }
223 for (String partKey : partKeysToRemove) {
224 partKeyPatterns.remove(partKey);
225 }
226 if (partKeyPatterns.isEmpty()) {
227 missingDeps.remove(tableKey);
228 // Close JMS session. Stop listening on topic
229 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
230 hcatService.unregisterFromNotification(server, db, table);
231 }
232 }
233 return actionsWithAvailDep;
234 }
235
236 @Override
237 public Collection<String> getAvailableDependencyURIs(String actionID) {
238 Collection<String> available = availableDeps.get(actionID);
239 if (available != null) {
240 // Return a copy
241 available = new ArrayList<String>(available);
242 }
243 return available;
244 }
245
246 @Override
247 public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) {
248 if (!availableDeps.containsKey(actionID)) {
249 return false;
250 }
251 else {
252 Collection<String> availList = availableDeps.get(actionID);
253 if (!availList.removeAll(dependencyURIs)) {
254 return false;
255 }
256 synchronized (availList) {
257 if (availList.isEmpty()) {
258 availableDeps.remove(actionID);
259 }
260 }
261 }
262 return true;
263 }
264
265 @Override
266 public void destroy() {
267 missingDeps.clear();
268 availableDeps.clear();
269 }
270
271 private static class SortedPKV {
272 private StringBuilder partKeys;
273 private StringBuilder partVals;
274
275 public SortedPKV(Map<String, String> partitions) {
276 this.partKeys = new StringBuilder();
277 this.partVals = new StringBuilder();
278 ArrayList<String> keys = new ArrayList<String>(partitions.keySet());
279 Collections.sort(keys);
280 for (String key : keys) {
281 this.partKeys.append(key).append(DELIMITER);
282 this.partVals.append(partitions.get(key)).append(DELIMITER);
283 }
284 this.partKeys.setLength(partKeys.length() - 1);
285 this.partVals.setLength(partVals.length() - 1);
286 }
287
288 public String getPartKeys() {
289 return partKeys.toString();
290 }
291
292 public String getPartVals() {
293 return partVals.toString();
294 }
295
296 }
297
298 }