001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.util.Collection; 021 import java.util.Map; 022 023 import org.apache.hadoop.conf.Configuration; 024 import org.apache.hadoop.util.ReflectionUtils; 025 import org.apache.oozie.command.coord.CoordActionUpdatePushMissingDependency; 026 import org.apache.oozie.dependency.hcat.HCatDependencyCache; 027 import org.apache.oozie.dependency.hcat.SimpleHCatDependencyCache; 028 import org.apache.oozie.util.HCatURI; 029 import org.apache.oozie.util.XLog; 030 031 /** 032 * Module that functions like a caching service to maintain partition dependency mappings 033 */ 034 public class PartitionDependencyManagerService implements Service { 035 036 public static final String CONF_PREFIX = Service.CONF_PREFIX + "PartitionDependencyManagerService."; 037 public static final String CACHE_MANAGER_IMPL = CONF_PREFIX + "cache.manager.impl"; 038 039 private static XLog LOG = XLog.getLog(PartitionDependencyManagerService.class); 040 041 private HCatDependencyCache dependencyCache; 042 043 @Override 044 public void init(Services services) throws ServiceException { 045 init(services.getConf()); 046 } 047 048 private void init(Configuration conf) throws ServiceException { 049 Class<?> defaultClass = conf.getClass(CACHE_MANAGER_IMPL, null); 050 dependencyCache = (defaultClass == null) ? new SimpleHCatDependencyCache() 051 : (HCatDependencyCache) ReflectionUtils.newInstance(defaultClass, null); 052 dependencyCache.init(conf); 053 LOG.info("PartitionDependencyManagerService initialized. Dependency cache is {0} ", dependencyCache.getClass() 054 .getName()); 055 } 056 057 @Override 058 public void destroy() { 059 dependencyCache.destroy(); 060 } 061 062 @Override 063 public Class<? extends Service> getInterface() { 064 return PartitionDependencyManagerService.class; 065 } 066 067 /** 068 * Add a missing partition dependency and the actionID waiting on it 069 * 070 * @param hcatURI dependency URI 071 * @param actionID ID of action which is waiting for the dependency 072 */ 073 public void addMissingDependency(HCatURI hcatURI, String actionID) { 074 dependencyCache.addMissingDependency(hcatURI, actionID); 075 } 076 077 /** 078 * Remove a missing partition dependency associated with a actionID 079 * 080 * @param hcatURI dependency URI 081 * @param actionID ID of action which is waiting for the dependency 082 * @return true if successful, else false 083 */ 084 public boolean removeMissingDependency(HCatURI hcatURI, String actionID) { 085 return dependencyCache.removeMissingDependency(hcatURI, actionID); 086 } 087 088 /** 089 * Get the list of actionIDs waiting for a partition 090 * 091 * @param hcatURI dependency URI 092 * @return list of actionIDs 093 */ 094 public Collection<String> getWaitingActions(HCatURI hcatURI) { 095 return dependencyCache.getWaitingActions(hcatURI); 096 } 097 098 /** 099 * Mark a partition dependency as available 100 * 101 * @param server host:port of the server 102 * @param db name of the database 103 * @param table name of the table 104 * @param partitions list of available partitions 105 * @return list of actionIDs for which the dependency is now available 106 */ 107 public void partitionAvailable(String server, String db, String table, Map<String, String> partitions) { 108 Collection<String> actionsWithAvailableDep = dependencyCache.markDependencyAvailable(server, db, table, 109 partitions); 110 if (actionsWithAvailableDep != null) { 111 for (String actionID : actionsWithAvailableDep) { 112 boolean ret = Services.get().get(CallableQueueService.class) 113 .queue(new CoordActionUpdatePushMissingDependency(actionID), 100); 114 if (ret == false) { 115 XLog.getLog(getClass()).warn( 116 "Unable to queue the callable commands for PartitionDependencyManagerService for actionID " 117 + actionID + ".Most possibly command queue is full. Queue size is :" 118 + Services.get().get(CallableQueueService.class).queueSize()); 119 } 120 } 121 } 122 } 123 124 /** 125 * Get a list of available dependency URIs for a actionID 126 * 127 * @param actionID action id 128 * @return list of available dependency URIs 129 */ 130 public Collection<String> getAvailableDependencyURIs(String actionID) { 131 return dependencyCache.getAvailableDependencyURIs(actionID); 132 } 133 134 /** 135 * Remove the list of available dependency URIs for a actionID once the missing dependencies are processed. 136 * 137 * @param actionID action id 138 * @param dependencyURIs set of dependency URIs 139 * @return true if successful, else false 140 */ 141 public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) { 142 return dependencyCache.removeAvailableDependencyURIs(actionID, dependencyURIs); 143 } 144 145 }