This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.util.Collection;
021 import java.util.Map;
022
023 import org.apache.hadoop.conf.Configuration;
024 import org.apache.hadoop.util.ReflectionUtils;
025 import org.apache.oozie.command.coord.CoordActionUpdatePushMissingDependency;
026 import org.apache.oozie.dependency.hcat.HCatDependencyCache;
027 import org.apache.oozie.dependency.hcat.SimpleHCatDependencyCache;
028 import org.apache.oozie.util.HCatURI;
029 import org.apache.oozie.util.XLog;
030
031 /**
032 * Module that functions like a caching service to maintain partition dependency mappings
033 */
034 public class PartitionDependencyManagerService implements Service {
035
036 public static final String CONF_PREFIX = Service.CONF_PREFIX + "PartitionDependencyManagerService.";
037 public static final String CACHE_MANAGER_IMPL = CONF_PREFIX + "cache.manager.impl";
038
039 private static XLog LOG = XLog.getLog(PartitionDependencyManagerService.class);
040
041 private HCatDependencyCache dependencyCache;
042
043 @Override
044 public void init(Services services) throws ServiceException {
045 init(services.getConf());
046 }
047
048 private void init(Configuration conf) throws ServiceException {
049 Class<?> defaultClass = conf.getClass(CACHE_MANAGER_IMPL, null);
050 dependencyCache = (defaultClass == null) ? new SimpleHCatDependencyCache()
051 : (HCatDependencyCache) ReflectionUtils.newInstance(defaultClass, null);
052 dependencyCache.init(conf);
053 LOG.info("PartitionDependencyManagerService initialized. Dependency cache is {0} ", dependencyCache.getClass()
054 .getName());
055 }
056
057 @Override
058 public void destroy() {
059 dependencyCache.destroy();
060 }
061
062 @Override
063 public Class<? extends Service> getInterface() {
064 return PartitionDependencyManagerService.class;
065 }
066
067 /**
068 * Add a missing partition dependency and the actionID waiting on it
069 *
070 * @param hcatURI dependency URI
071 * @param actionID ID of action which is waiting for the dependency
072 */
073 public void addMissingDependency(HCatURI hcatURI, String actionID) {
074 dependencyCache.addMissingDependency(hcatURI, actionID);
075 }
076
077 /**
078 * Remove a missing partition dependency associated with a actionID
079 *
080 * @param hcatURI dependency URI
081 * @param actionID ID of action which is waiting for the dependency
082 * @return true if successful, else false
083 */
084 public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
085 return dependencyCache.removeMissingDependency(hcatURI, actionID);
086 }
087
088 /**
089 * Get the list of actionIDs waiting for a partition
090 *
091 * @param hcatURI dependency URI
092 * @return list of actionIDs
093 */
094 public Collection<String> getWaitingActions(HCatURI hcatURI) {
095 return dependencyCache.getWaitingActions(hcatURI);
096 }
097
098 /**
099 * Mark a partition dependency as available
100 *
101 * @param server host:port of the server
102 * @param db name of the database
103 * @param table name of the table
104 * @param partitions list of available partitions
105 * @return list of actionIDs for which the dependency is now available
106 */
107 public void partitionAvailable(String server, String db, String table, Map<String, String> partitions) {
108 Collection<String> actionsWithAvailableDep = dependencyCache.markDependencyAvailable(server, db, table,
109 partitions);
110 if (actionsWithAvailableDep != null) {
111 for (String actionID : actionsWithAvailableDep) {
112 boolean ret = Services.get().get(CallableQueueService.class)
113 .queue(new CoordActionUpdatePushMissingDependency(actionID), 100);
114 if (ret == false) {
115 XLog.getLog(getClass()).warn(
116 "Unable to queue the callable commands for PartitionDependencyManagerService for actionID "
117 + actionID + ".Most possibly command queue is full. Queue size is :"
118 + Services.get().get(CallableQueueService.class).queueSize());
119 }
120 }
121 }
122 }
123
124 /**
125 * Get a list of available dependency URIs for a actionID
126 *
127 * @param actionID action id
128 * @return list of available dependency URIs
129 */
130 public Collection<String> getAvailableDependencyURIs(String actionID) {
131 return dependencyCache.getAvailableDependencyURIs(actionID);
132 }
133
134 /**
135 * Remove the list of available dependency URIs for a actionID once the missing dependencies are processed.
136 *
137 * @param actionID action id
138 * @param dependencyURIs set of dependency URIs
139 * @return true if successful, else false
140 */
141 public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) {
142 return dependencyCache.removeAvailableDependencyURIs(actionID, dependencyURIs);
143 }
144
145 }