001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.text.MessageFormat; 022import java.util.Collection; 023import java.util.Date; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.Map; 027import java.util.Set; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030 031import org.apache.commons.cli.ParseException; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.util.ReflectionUtils; 034import org.apache.oozie.CoordinatorActionBean; 035import org.apache.oozie.ErrorCode; 036import org.apache.oozie.client.CoordinatorAction; 037import org.apache.oozie.command.coord.CoordActionUpdatePushMissingDependency; 038import org.apache.oozie.dependency.hcat.HCatDependencyCache; 039import org.apache.oozie.dependency.hcat.SimpleHCatDependencyCache; 040import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; 041import org.apache.oozie.executor.jpa.JPAExecutorException; 042import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 043import org.apache.oozie.util.HCatURI; 044import org.apache.oozie.util.XLog; 045 046import com.google.common.annotations.VisibleForTesting; 047 048/** 049 * Module that functions like a caching service to maintain partition dependency mappings 050 */ 051public class PartitionDependencyManagerService implements Service { 052 053 public static final String CONF_PREFIX = Service.CONF_PREFIX + "PartitionDependencyManagerService."; 054 public static final String CACHE_MANAGER_IMPL = CONF_PREFIX + "cache.manager.impl"; 055 public static final String CACHE_PURGE_INTERVAL = CONF_PREFIX + "cache.purge.interval"; 056 public static final String CACHE_PURGE_TTL = CONF_PREFIX + "cache.purge.ttl"; 057 058 private static XLog LOG = XLog.getLog(PartitionDependencyManagerService.class); 059 060 private HCatDependencyCache dependencyCache; 061 062 /** 063 * Keep timestamp when missing dependencies of a coord action are registered 064 */ 065 private ConcurrentMap<String, Long> registeredCoordActionMap; 066 067 private boolean purgeEnabled = false; 068 069 @Override 070 public void init(Services services) throws ServiceException { 071 init(services.getConf()); 072 } 073 074 private void init(Configuration conf) throws ServiceException { 075 Class<?> defaultClass = conf.getClass(CACHE_MANAGER_IMPL, null); 076 dependencyCache = (defaultClass == null) ? new SimpleHCatDependencyCache() 077 : (HCatDependencyCache) ReflectionUtils.newInstance(defaultClass, null); 078 dependencyCache.init(conf); 079 LOG.info("PartitionDependencyManagerService initialized. Dependency cache is {0} ", dependencyCache.getClass() 080 .getName()); 081 purgeEnabled = Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode(); 082 if (purgeEnabled) { 083 Runnable purgeThread = new CachePurgeWorker(dependencyCache); 084 // schedule runnable by default every 10 min 085 Services.get() 086 .get(SchedulerService.class) 087 .schedule(purgeThread, 10, Services.get().getConf().getInt(CACHE_PURGE_INTERVAL, 600), 088 SchedulerService.Unit.SEC); 089 registeredCoordActionMap = new ConcurrentHashMap<String, Long>(); 090 } 091 } 092 093 private class CachePurgeWorker implements Runnable { 094 HCatDependencyCache cache; 095 public CachePurgeWorker(HCatDependencyCache cache) { 096 this.cache = cache; 097 } 098 099 @Override 100 public void run() { 101 if (Thread.currentThread().isInterrupted()) { 102 return; 103 } 104 try { 105 purgeMissingDependency(Services.get().getConf().getInt(CACHE_PURGE_TTL, 1800)); 106 } 107 catch (Throwable error) { 108 XLog.getLog(PartitionDependencyManagerService.class).debug("Throwable in CachePurgeWorker thread run : ", error); 109 } 110 } 111 112 private void purgeMissingDependency(int timeToLive) { 113 long currentTime = new Date().getTime(); 114 Set<String> staleActions = new HashSet<String>(); 115 Iterator<String> actionItr = registeredCoordActionMap.keySet().iterator(); 116 while(actionItr.hasNext()){ 117 String actionId = actionItr.next(); 118 Long regTime = registeredCoordActionMap.get(actionId); 119 if(regTime < (currentTime - timeToLive * 1000)){ 120 CoordinatorActionBean caBean = null; 121 try { 122 caBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION_STATUS, actionId); 123 } 124 catch (JPAExecutorException e) { 125 if (e.getErrorCode() == ErrorCode.E0605) { 126 LOG.info(MessageFormat.format( 127 "Coord action {0} is not in database, deleting it from cache", actionId)); 128 staleActions.add(actionId); 129 actionItr.remove(); 130 } 131 else { 132 LOG.warn("Error in checking coord action:" + actionId + "to purge, skipping", e); 133 } 134 } 135 if(caBean != null && !caBean.getStatus().equals(CoordinatorAction.Status.WAITING)){ 136 staleActions.add(actionId); 137 actionItr.remove(); 138 } 139 } 140 } 141 dependencyCache.removeNonWaitingCoordActions(staleActions); 142 } 143 } 144 145 @Override 146 public void destroy() { 147 dependencyCache.destroy(); 148 } 149 150 @Override 151 public Class<? extends Service> getInterface() { 152 return PartitionDependencyManagerService.class; 153 } 154 155 /** 156 * Add a missing partition dependency and the actionID waiting on it 157 * 158 * @param hcatURI dependency URI 159 * @param actionID ID of action which is waiting for the dependency 160 */ 161 public void addMissingDependency(HCatURI hcatURI, String actionID) { 162 if (purgeEnabled) { 163 registeredCoordActionMap.put(actionID, new Date().getTime()); 164 } 165 dependencyCache.addMissingDependency(hcatURI, actionID); 166 } 167 168 /** 169 * Remove a missing partition dependency associated with a actionID 170 * 171 * @param hcatURI dependency URI 172 * @param actionID ID of action which is waiting for the dependency 173 * @return true if successful, else false 174 */ 175 public boolean removeMissingDependency(HCatURI hcatURI, String actionID) { 176 return dependencyCache.removeMissingDependency(hcatURI, actionID); 177 } 178 179 /** 180 * Get the list of actionIDs waiting for a partition 181 * 182 * @param hcatURI dependency URI 183 * @return list of actionIDs 184 */ 185 public Collection<String> getWaitingActions(HCatURI hcatURI) { 186 return dependencyCache.getWaitingActions(hcatURI); 187 } 188 189 /** 190 * Mark a partition dependency as available 191 * 192 * @param server host:port of the server 193 * @param db name of the database 194 * @param table name of the table 195 * @param partitions list of available partitions 196 */ 197 public void partitionAvailable(String server, String db, String table, Map<String, String> partitions) { 198 Collection<String> actionsWithAvailableDep = dependencyCache.markDependencyAvailable(server, db, table, 199 partitions); 200 if (actionsWithAvailableDep != null) { 201 for (String actionID : actionsWithAvailableDep) { 202 boolean ret = Services.get().get(CallableQueueService.class) 203 .queue(new CoordActionUpdatePushMissingDependency(actionID), 100); 204 if (ret == false) { 205 XLog.getLog(getClass()).warn( 206 "Unable to queue the callable commands for PartitionDependencyManagerService for actionID " 207 + actionID + ".Most possibly command queue is full. Queue size is :" 208 + Services.get().get(CallableQueueService.class).queueSize()); 209 } 210 } 211 } 212 } 213 214 /** 215 * Get a list of available dependency URIs for a actionID 216 * 217 * @param actionID action id 218 * @return list of available dependency URIs 219 */ 220 public Collection<String> getAvailableDependencyURIs(String actionID) { 221 return dependencyCache.getAvailableDependencyURIs(actionID); 222 } 223 224 /** 225 * Remove the list of available dependency URIs for a actionID once the missing dependencies are processed. 226 * 227 * @param actionID action id 228 * @param dependencyURIs set of dependency URIs 229 * @return true if successful, else false 230 */ 231 public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) { 232 return dependencyCache.removeAvailableDependencyURIs(actionID, dependencyURIs); 233 } 234 235 /** 236 * Remove a coord action from dependency cache when all push missing dependencies available 237 * 238 * @param actionID action id 239 */ 240 public void removeCoordActionWithDependenciesAvailable(String actionID) { 241 if (purgeEnabled) { 242 registeredCoordActionMap.remove(actionID); 243 } 244 dependencyCache.removeCoordActionWithDependenciesAvailable(actionID); 245 } 246 247 @VisibleForTesting 248 public void runCachePurgeWorker() { 249 new CachePurgeWorker(dependencyCache).run(); 250 } 251}