001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.text.MessageFormat; 022import java.util.Collection; 023import java.util.Date; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.Map; 027import java.util.Set; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030 031import org.apache.commons.cli.ParseException; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.util.ReflectionUtils; 034import org.apache.oozie.CoordinatorActionBean; 035import org.apache.oozie.ErrorCode; 036import org.apache.oozie.client.CoordinatorAction; 037import org.apache.oozie.command.coord.CoordActionUpdatePushMissingDependency; 038import org.apache.oozie.dependency.hcat.HCatDependencyCache; 039import org.apache.oozie.dependency.hcat.SimpleHCatDependencyCache; 040import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; 041import org.apache.oozie.executor.jpa.JPAExecutorException; 042import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 043import org.apache.oozie.util.HCatURI; 044import org.apache.oozie.util.XLog; 045 046import com.google.common.annotations.VisibleForTesting; 047 048/** 049 * Module that functions like a caching service to maintain partition dependency mappings 050 */ 051public class PartitionDependencyManagerService implements Service { 052 053 public static final String CONF_PREFIX = Service.CONF_PREFIX + "PartitionDependencyManagerService."; 054 public static final String CACHE_MANAGER_IMPL = CONF_PREFIX + "cache.manager.impl"; 055 public static final String CACHE_PURGE_INTERVAL = CONF_PREFIX + "cache.purge.interval"; 056 public static final String CACHE_PURGE_TTL = CONF_PREFIX + "cache.purge.ttl"; 057 058 private static XLog LOG = XLog.getLog(PartitionDependencyManagerService.class); 059 060 private HCatDependencyCache dependencyCache; 061 062 /** 063 * Keep timestamp when missing dependencies of a coord action are registered 064 */ 065 private ConcurrentMap<String, Long> registeredCoordActionMap; 066 067 private boolean purgeEnabled = false; 068 069 @Override 070 public void init(Services services) throws ServiceException { 071 init(services.getConf()); 072 } 073 074 private void init(Configuration conf) throws ServiceException { 075 Class<?> defaultClass = conf.getClass(CACHE_MANAGER_IMPL, null); 076 dependencyCache = (defaultClass == null) ? new SimpleHCatDependencyCache() 077 : (HCatDependencyCache) ReflectionUtils.newInstance(defaultClass, null); 078 dependencyCache.init(conf); 079 LOG.info("PartitionDependencyManagerService initialized. Dependency cache is {0} ", dependencyCache.getClass() 080 .getName()); 081 purgeEnabled = Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode(); 082 if (purgeEnabled) { 083 Runnable purgeThread = new CachePurgeWorker(dependencyCache); 084 // schedule runnable by default every 10 min 085 Services.get() 086 .get(SchedulerService.class) 087 .schedule(purgeThread, 10, Services.get().getConf().getInt(CACHE_PURGE_INTERVAL, 600), 088 SchedulerService.Unit.SEC); 089 registeredCoordActionMap = new ConcurrentHashMap<String, Long>(); 090 } 091 } 092 093 private class CachePurgeWorker implements Runnable { 094 HCatDependencyCache cache; 095 public CachePurgeWorker(HCatDependencyCache cache) { 096 this.cache = cache; 097 } 098 099 @Override 100 public void run() { 101 if (Thread.currentThread().isInterrupted()) { 102 return; 103 } 104 try { 105 purgeMissingDependency(Services.get().getConf().getInt(CACHE_PURGE_TTL, 1800)); 106 } 107 catch (Throwable error) { 108 XLog.getLog(PartitionDependencyManagerService.class).debug("Throwable in CachePurgeWorker thread run : ", error); 109 } 110 } 111 112 private void purgeMissingDependency(int timeToLive) { 113 long currentTime = new Date().getTime(); 114 Set<String> staleActions = new HashSet<String>(); 115 Iterator<String> actionItr = registeredCoordActionMap.keySet().iterator(); 116 while(actionItr.hasNext()){ 117 String actionId = actionItr.next(); 118 Long regTime = registeredCoordActionMap.get(actionId); 119 if(regTime < (currentTime - timeToLive * 1000)){ 120 CoordinatorActionBean caBean = null; 121 try { 122 caBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION_STATUS, actionId); 123 } 124 catch (JPAExecutorException e) { 125 if (e.getErrorCode() == ErrorCode.E0605) { 126 LOG.info(MessageFormat.format( 127 "Coord action {0} is not in database, deleting it from cache", actionId)); 128 staleActions.add(actionId); 129 actionItr.remove(); 130 } 131 else { 132 LOG.warn("Error in checking coord action:" + actionId + "to purge, skipping", e); 133 } 134 } 135 if(caBean != null && !caBean.getStatus().equals(CoordinatorAction.Status.WAITING)){ 136 staleActions.add(actionId); 137 actionItr.remove(); 138 } 139 } 140 } 141 dependencyCache.removeNonWaitingCoordActions(staleActions); 142 } 143 } 144 145 @Override 146 public void destroy() { 147 dependencyCache.destroy(); 148 } 149 150 @Override 151 public Class<? extends Service> getInterface() { 152 return PartitionDependencyManagerService.class; 153 } 154 155 /** 156 * Add a missing partition dependency and the actionID waiting on it 157 * 158 * @param hcatURI dependency URI 159 * @param actionID ID of action which is waiting for the dependency 160 */ 161 public void addMissingDependency(HCatURI hcatURI, String actionID) { 162 if (purgeEnabled) { 163 registeredCoordActionMap.put(actionID, new Date().getTime()); 164 } 165 dependencyCache.addMissingDependency(hcatURI, actionID); 166 } 167 168 /** 169 * Remove a missing partition dependency associated with a actionID 170 * 171 * @param hcatURI dependency URI 172 * @param actionID ID of action which is waiting for the dependency 173 * @return true if successful, else false 174 */ 175 public boolean removeMissingDependency(HCatURI hcatURI, String actionID) { 176 return dependencyCache.removeMissingDependency(hcatURI, actionID); 177 } 178 179 /** 180 * Get the list of actionIDs waiting for a partition 181 * 182 * @param hcatURI dependency URI 183 * @return list of actionIDs 184 */ 185 public Collection<String> getWaitingActions(HCatURI hcatURI) { 186 return dependencyCache.getWaitingActions(hcatURI); 187 } 188 189 /** 190 * Mark a partition dependency as available 191 * 192 * @param server host:port of the server 193 * @param db name of the database 194 * @param table name of the table 195 * @param partitions list of available partitions 196 * @return list of actionIDs for which the dependency is now available 197 */ 198 public void partitionAvailable(String server, String db, String table, Map<String, String> partitions) { 199 Collection<String> actionsWithAvailableDep = dependencyCache.markDependencyAvailable(server, db, table, 200 partitions); 201 if (actionsWithAvailableDep != null) { 202 for (String actionID : actionsWithAvailableDep) { 203 boolean ret = Services.get().get(CallableQueueService.class) 204 .queue(new CoordActionUpdatePushMissingDependency(actionID), 100); 205 if (ret == false) { 206 XLog.getLog(getClass()).warn( 207 "Unable to queue the callable commands for PartitionDependencyManagerService for actionID " 208 + actionID + ".Most possibly command queue is full. Queue size is :" 209 + Services.get().get(CallableQueueService.class).queueSize()); 210 } 211 } 212 } 213 } 214 215 /** 216 * Get a list of available dependency URIs for a actionID 217 * 218 * @param actionID action id 219 * @return list of available dependency URIs 220 */ 221 public Collection<String> getAvailableDependencyURIs(String actionID) { 222 return dependencyCache.getAvailableDependencyURIs(actionID); 223 } 224 225 /** 226 * Remove the list of available dependency URIs for a actionID once the missing dependencies are processed. 227 * 228 * @param actionID action id 229 * @param dependencyURIs set of dependency URIs 230 * @return true if successful, else false 231 */ 232 public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) { 233 return dependencyCache.removeAvailableDependencyURIs(actionID, dependencyURIs); 234 } 235 236 /** 237 * Remove a coord action from dependency cache when all push missing dependencies available 238 * 239 * @param actionID action id 240 * @param dependencyURIs set of dependency URIs 241 * @return true if successful, else false 242 */ 243 public void removeCoordActionWithDependenciesAvailable(String actionID) { 244 if (purgeEnabled) { 245 registeredCoordActionMap.remove(actionID); 246 } 247 dependencyCache.removeCoordActionWithDependenciesAvailable(actionID); 248 } 249 250 @VisibleForTesting 251 public void runCachePurgeWorker() { 252 new CachePurgeWorker(dependencyCache).run(); 253 } 254}