001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.util.Collection;
021    import java.util.Map;
022    
023    import org.apache.hadoop.conf.Configuration;
024    import org.apache.hadoop.util.ReflectionUtils;
025    import org.apache.oozie.command.coord.CoordActionUpdatePushMissingDependency;
026    import org.apache.oozie.dependency.hcat.HCatDependencyCache;
027    import org.apache.oozie.dependency.hcat.SimpleHCatDependencyCache;
028    import org.apache.oozie.util.HCatURI;
029    import org.apache.oozie.util.XLog;
030    
031    /**
032     * Module that functions like a caching service to maintain partition dependency mappings
033     */
034    public class PartitionDependencyManagerService implements Service {
035    
036        public static final String CONF_PREFIX = Service.CONF_PREFIX + "PartitionDependencyManagerService.";
037        public static final String CACHE_MANAGER_IMPL = CONF_PREFIX + "cache.manager.impl";
038    
039        private static XLog LOG = XLog.getLog(PartitionDependencyManagerService.class);
040    
041        private HCatDependencyCache dependencyCache;
042    
043        @Override
044        public void init(Services services) throws ServiceException {
045            init(services.getConf());
046        }
047    
048        private void init(Configuration conf) throws ServiceException {
049            Class<?> defaultClass = conf.getClass(CACHE_MANAGER_IMPL, null);
050            dependencyCache = (defaultClass == null) ? new SimpleHCatDependencyCache()
051                    : (HCatDependencyCache) ReflectionUtils.newInstance(defaultClass, null);
052            dependencyCache.init(conf);
053            LOG.info("PartitionDependencyManagerService initialized. Dependency cache is {0} ", dependencyCache.getClass()
054                    .getName());
055        }
056    
057        @Override
058        public void destroy() {
059            dependencyCache.destroy();
060        }
061    
062        @Override
063        public Class<? extends Service> getInterface() {
064            return PartitionDependencyManagerService.class;
065        }
066    
067        /**
068         * Add a missing partition dependency and the actionID waiting on it
069         *
070         * @param hcatURI dependency URI
071         * @param actionID ID of action which is waiting for the dependency
072         */
073        public void addMissingDependency(HCatURI hcatURI, String actionID) {
074            dependencyCache.addMissingDependency(hcatURI, actionID);
075        }
076    
077        /**
078         * Remove a missing partition dependency associated with a actionID
079         *
080         * @param hcatURI dependency URI
081         * @param actionID ID of action which is waiting for the dependency
082         * @return true if successful, else false
083         */
084        public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
085            return dependencyCache.removeMissingDependency(hcatURI, actionID);
086        }
087    
088        /**
089         * Get the list of actionIDs waiting for a partition
090         *
091         * @param hcatURI dependency URI
092         * @return list of actionIDs
093         */
094        public Collection<String> getWaitingActions(HCatURI hcatURI) {
095            return dependencyCache.getWaitingActions(hcatURI);
096        }
097    
098        /**
099         * Mark a partition dependency as available
100         *
101         * @param server host:port of the server
102         * @param db name of the database
103         * @param table name of the table
104         * @param partitions list of available partitions
105         * @return list of actionIDs for which the dependency is now available
106         */
107        public void partitionAvailable(String server, String db, String table, Map<String, String> partitions) {
108            Collection<String> actionsWithAvailableDep = dependencyCache.markDependencyAvailable(server, db, table,
109                    partitions);
110            if (actionsWithAvailableDep != null) {
111                for (String actionID : actionsWithAvailableDep) {
112                    boolean ret = Services.get().get(CallableQueueService.class)
113                            .queue(new CoordActionUpdatePushMissingDependency(actionID), 100);
114                    if (ret == false) {
115                        XLog.getLog(getClass()).warn(
116                                "Unable to queue the callable commands for PartitionDependencyManagerService for actionID "
117                                        + actionID + ".Most possibly command queue is full. Queue size is :"
118                                        + Services.get().get(CallableQueueService.class).queueSize());
119                    }
120                }
121            }
122        }
123    
124        /**
125         * Get a list of available dependency URIs for a actionID
126         *
127         * @param actionID action id
128         * @return list of available dependency URIs
129         */
130        public Collection<String> getAvailableDependencyURIs(String actionID) {
131            return dependencyCache.getAvailableDependencyURIs(actionID);
132        }
133    
134        /**
135         * Remove the list of available dependency URIs for a actionID once the missing dependencies are processed.
136         *
137         * @param actionID action id
138         * @param dependencyURIs set of dependency URIs
139         * @return true if successful, else false
140         */
141        public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) {
142            return dependencyCache.removeAvailableDependencyURIs(actionID, dependencyURIs);
143        }
144    
145    }