001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import java.util.Collection; 021import java.util.Date; 022import java.util.HashSet; 023import java.util.Iterator; 024import java.util.Map; 025import java.util.Set; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.util.ReflectionUtils; 031import org.apache.oozie.CoordinatorActionBean; 032import org.apache.oozie.client.CoordinatorAction; 033import org.apache.oozie.command.coord.CoordActionUpdatePushMissingDependency; 034import org.apache.oozie.dependency.hcat.HCatDependencyCache; 035import org.apache.oozie.dependency.hcat.SimpleHCatDependencyCache; 036import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; 037import org.apache.oozie.executor.jpa.JPAExecutorException; 038import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 039import org.apache.oozie.util.HCatURI; 040import org.apache.oozie.util.XLog; 041 042import com.google.common.annotations.VisibleForTesting; 043 044/** 045 * Module that functions like a caching service to maintain partition dependency mappings 046 */ 047public class PartitionDependencyManagerService implements Service { 048 049 public static final String CONF_PREFIX = Service.CONF_PREFIX + "PartitionDependencyManagerService."; 050 public static final String CACHE_MANAGER_IMPL = CONF_PREFIX + "cache.manager.impl"; 051 public static final String CACHE_PURGE_INTERVAL = CONF_PREFIX + "cache.purge.interval"; 052 public static final String CACHE_PURGE_TTL = CONF_PREFIX + "cache.purge.ttl"; 053 054 private static XLog LOG = XLog.getLog(PartitionDependencyManagerService.class); 055 056 private HCatDependencyCache dependencyCache; 057 058 /** 059 * Keep timestamp when missing dependencies of a coord action are registered 060 */ 061 private ConcurrentMap<String, Long> registeredCoordActionMap; 062 063 private boolean purgeEnabled = false; 064 065 @Override 066 public void init(Services services) throws ServiceException { 067 init(services.getConf()); 068 } 069 070 private void init(Configuration conf) throws ServiceException { 071 Class<?> defaultClass = conf.getClass(CACHE_MANAGER_IMPL, null); 072 dependencyCache = (defaultClass == null) ? new SimpleHCatDependencyCache() 073 : (HCatDependencyCache) ReflectionUtils.newInstance(defaultClass, null); 074 dependencyCache.init(conf); 075 LOG.info("PartitionDependencyManagerService initialized. Dependency cache is {0} ", dependencyCache.getClass() 076 .getName()); 077 purgeEnabled = Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode(); 078 if (purgeEnabled) { 079 Runnable purgeThread = new CachePurgeWorker(dependencyCache); 080 // schedule runnable by default every 10 min 081 Services.get() 082 .get(SchedulerService.class) 083 .schedule(purgeThread, 10, Services.get().getConf().getInt(CACHE_PURGE_INTERVAL, 600), 084 SchedulerService.Unit.SEC); 085 registeredCoordActionMap = new ConcurrentHashMap<String, Long>(); 086 } 087 } 088 089 private class CachePurgeWorker implements Runnable { 090 HCatDependencyCache cache; 091 public CachePurgeWorker(HCatDependencyCache cache) { 092 this.cache = cache; 093 } 094 095 @Override 096 public void run() { 097 if (Thread.currentThread().isInterrupted()) { 098 return; 099 } 100 try { 101 purgeMissingDependency(Services.get().getConf().getInt(CACHE_PURGE_TTL, 1800)); 102 } 103 catch (Throwable error) { 104 XLog.getLog(PartitionDependencyManagerService.class).debug("Throwable in CachePurgeWorker thread run : ", error); 105 } 106 } 107 108 private void purgeMissingDependency(int timeToLive) { 109 long currentTime = new Date().getTime(); 110 Set<String> staleActions = new HashSet<String>(); 111 Iterator<String> actionItr = registeredCoordActionMap.keySet().iterator(); 112 while(actionItr.hasNext()){ 113 String actionId = actionItr.next(); 114 Long regTime = registeredCoordActionMap.get(actionId); 115 if(regTime < (currentTime - timeToLive * 1000)){ 116 CoordinatorActionBean caBean = null; 117 try { 118 caBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION_STATUS, actionId); 119 } 120 catch (JPAExecutorException e) { 121 LOG.warn("Error in checking coord action:" + actionId + "to purge, skipping", e); 122 } 123 if(caBean != null && !caBean.getStatus().equals(CoordinatorAction.Status.WAITING)){ 124 staleActions.add(actionId); 125 actionItr.remove(); 126 } 127 } 128 } 129 dependencyCache.removeNonWaitingCoordActions(staleActions); 130 } 131 } 132 133 @Override 134 public void destroy() { 135 dependencyCache.destroy(); 136 } 137 138 @Override 139 public Class<? extends Service> getInterface() { 140 return PartitionDependencyManagerService.class; 141 } 142 143 /** 144 * Add a missing partition dependency and the actionID waiting on it 145 * 146 * @param hcatURI dependency URI 147 * @param actionID ID of action which is waiting for the dependency 148 */ 149 public void addMissingDependency(HCatURI hcatURI, String actionID) { 150 if (purgeEnabled) { 151 registeredCoordActionMap.put(actionID, new Date().getTime()); 152 } 153 dependencyCache.addMissingDependency(hcatURI, actionID); 154 } 155 156 /** 157 * Remove a missing partition dependency associated with a actionID 158 * 159 * @param hcatURI dependency URI 160 * @param actionID ID of action which is waiting for the dependency 161 * @return true if successful, else false 162 */ 163 public boolean removeMissingDependency(HCatURI hcatURI, String actionID) { 164 return dependencyCache.removeMissingDependency(hcatURI, actionID); 165 } 166 167 /** 168 * Get the list of actionIDs waiting for a partition 169 * 170 * @param hcatURI dependency URI 171 * @return list of actionIDs 172 */ 173 public Collection<String> getWaitingActions(HCatURI hcatURI) { 174 return dependencyCache.getWaitingActions(hcatURI); 175 } 176 177 /** 178 * Mark a partition dependency as available 179 * 180 * @param server host:port of the server 181 * @param db name of the database 182 * @param table name of the table 183 * @param partitions list of available partitions 184 * @return list of actionIDs for which the dependency is now available 185 */ 186 public void partitionAvailable(String server, String db, String table, Map<String, String> partitions) { 187 Collection<String> actionsWithAvailableDep = dependencyCache.markDependencyAvailable(server, db, table, 188 partitions); 189 if (actionsWithAvailableDep != null) { 190 for (String actionID : actionsWithAvailableDep) { 191 boolean ret = Services.get().get(CallableQueueService.class) 192 .queue(new CoordActionUpdatePushMissingDependency(actionID), 100); 193 if (ret == false) { 194 XLog.getLog(getClass()).warn( 195 "Unable to queue the callable commands for PartitionDependencyManagerService for actionID " 196 + actionID + ".Most possibly command queue is full. Queue size is :" 197 + Services.get().get(CallableQueueService.class).queueSize()); 198 } 199 } 200 } 201 } 202 203 /** 204 * Get a list of available dependency URIs for a actionID 205 * 206 * @param actionID action id 207 * @return list of available dependency URIs 208 */ 209 public Collection<String> getAvailableDependencyURIs(String actionID) { 210 return dependencyCache.getAvailableDependencyURIs(actionID); 211 } 212 213 /** 214 * Remove the list of available dependency URIs for a actionID once the missing dependencies are processed. 215 * 216 * @param actionID action id 217 * @param dependencyURIs set of dependency URIs 218 * @return true if successful, else false 219 */ 220 public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) { 221 return dependencyCache.removeAvailableDependencyURIs(actionID, dependencyURIs); 222 } 223 224 /** 225 * Remove a coord action from dependency cache when all push missing dependencies available 226 * 227 * @param actionID action id 228 * @param dependencyURIs set of dependency URIs 229 * @return true if successful, else false 230 */ 231 public void removeCoordActionWithDependenciesAvailable(String actionID) { 232 if (purgeEnabled) { 233 registeredCoordActionMap.remove(actionID); 234 } 235 dependencyCache.removeCoordActionWithDependenciesAvailable(actionID); 236 } 237 238 @VisibleForTesting 239 public void runCachePurgeWorker() { 240 new CachePurgeWorker(dependencyCache).run(); 241 } 242}