001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.util.Collection;
021import java.util.Date;
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.Map;
025import java.util.Set;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.util.ReflectionUtils;
031import org.apache.oozie.CoordinatorActionBean;
032import org.apache.oozie.client.CoordinatorAction;
033import org.apache.oozie.command.coord.CoordActionUpdatePushMissingDependency;
034import org.apache.oozie.dependency.hcat.HCatDependencyCache;
035import org.apache.oozie.dependency.hcat.SimpleHCatDependencyCache;
036import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
037import org.apache.oozie.executor.jpa.JPAExecutorException;
038import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
039import org.apache.oozie.util.HCatURI;
040import org.apache.oozie.util.XLog;
041
042import com.google.common.annotations.VisibleForTesting;
043
044/**
045 * Module that functions like a caching service to maintain partition dependency mappings
046 */
047public class PartitionDependencyManagerService implements Service {
048
049    public static final String CONF_PREFIX = Service.CONF_PREFIX + "PartitionDependencyManagerService.";
050    public static final String CACHE_MANAGER_IMPL = CONF_PREFIX + "cache.manager.impl";
051    public static final String CACHE_PURGE_INTERVAL = CONF_PREFIX + "cache.purge.interval";
052    public static final String CACHE_PURGE_TTL = CONF_PREFIX + "cache.purge.ttl";
053
054    private static XLog LOG = XLog.getLog(PartitionDependencyManagerService.class);
055
056    private HCatDependencyCache dependencyCache;
057
058    /**
059     * Keep timestamp when missing dependencies of a coord action are registered
060     */
061    private ConcurrentMap<String, Long> registeredCoordActionMap;
062
063    private boolean purgeEnabled = false;
064
065    @Override
066    public void init(Services services) throws ServiceException {
067        init(services.getConf());
068    }
069
070    private void init(Configuration conf) throws ServiceException {
071        Class<?> defaultClass = conf.getClass(CACHE_MANAGER_IMPL, null);
072        dependencyCache = (defaultClass == null) ? new SimpleHCatDependencyCache()
073                : (HCatDependencyCache) ReflectionUtils.newInstance(defaultClass, null);
074        dependencyCache.init(conf);
075        LOG.info("PartitionDependencyManagerService initialized. Dependency cache is {0} ", dependencyCache.getClass()
076                .getName());
077        purgeEnabled = Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode();
078        if (purgeEnabled) {
079            Runnable purgeThread = new CachePurgeWorker(dependencyCache);
080            // schedule runnable by default every 10 min
081            Services.get()
082                    .get(SchedulerService.class)
083                    .schedule(purgeThread, 10, Services.get().getConf().getInt(CACHE_PURGE_INTERVAL, 600),
084                            SchedulerService.Unit.SEC);
085            registeredCoordActionMap = new ConcurrentHashMap<String, Long>();
086        }
087    }
088
089    private class CachePurgeWorker implements Runnable {
090        HCatDependencyCache cache;
091        public CachePurgeWorker(HCatDependencyCache cache) {
092            this.cache = cache;
093        }
094
095        @Override
096        public void run() {
097            if (Thread.currentThread().isInterrupted()) {
098                return;
099            }
100            try {
101                purgeMissingDependency(Services.get().getConf().getInt(CACHE_PURGE_TTL, 1800));
102            }
103            catch (Throwable error) {
104                XLog.getLog(PartitionDependencyManagerService.class).debug("Throwable in CachePurgeWorker thread run : ", error);
105            }
106        }
107
108        private void purgeMissingDependency(int timeToLive) {
109            long currentTime = new Date().getTime();
110            Set<String> staleActions = new HashSet<String>();
111            Iterator<String> actionItr = registeredCoordActionMap.keySet().iterator();
112            while(actionItr.hasNext()){
113                String actionId = actionItr.next();
114                Long regTime = registeredCoordActionMap.get(actionId);
115                if(regTime < (currentTime - timeToLive * 1000)){
116                    CoordinatorActionBean caBean = null;
117                    try {
118                        caBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION_STATUS, actionId);
119                    }
120                    catch (JPAExecutorException e) {
121                        LOG.warn("Error in checking coord action:" + actionId + "to purge, skipping", e);
122                    }
123                    if(caBean != null && !caBean.getStatus().equals(CoordinatorAction.Status.WAITING)){
124                        staleActions.add(actionId);
125                        actionItr.remove();
126                    }
127                }
128            }
129            dependencyCache.removeNonWaitingCoordActions(staleActions);
130        }
131    }
132
133    @Override
134    public void destroy() {
135        dependencyCache.destroy();
136    }
137
138    @Override
139    public Class<? extends Service> getInterface() {
140        return PartitionDependencyManagerService.class;
141    }
142
143    /**
144     * Add a missing partition dependency and the actionID waiting on it
145     *
146     * @param hcatURI dependency URI
147     * @param actionID ID of action which is waiting for the dependency
148     */
149    public void addMissingDependency(HCatURI hcatURI, String actionID) {
150        if (purgeEnabled) {
151            registeredCoordActionMap.put(actionID, new Date().getTime());
152        }
153        dependencyCache.addMissingDependency(hcatURI, actionID);
154    }
155
156    /**
157     * Remove a missing partition dependency associated with a actionID
158     *
159     * @param hcatURI dependency URI
160     * @param actionID ID of action which is waiting for the dependency
161     * @return true if successful, else false
162     */
163    public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
164        return dependencyCache.removeMissingDependency(hcatURI, actionID);
165    }
166
167    /**
168     * Get the list of actionIDs waiting for a partition
169     *
170     * @param hcatURI dependency URI
171     * @return list of actionIDs
172     */
173    public Collection<String> getWaitingActions(HCatURI hcatURI) {
174        return dependencyCache.getWaitingActions(hcatURI);
175    }
176
177    /**
178     * Mark a partition dependency as available
179     *
180     * @param server host:port of the server
181     * @param db name of the database
182     * @param table name of the table
183     * @param partitions list of available partitions
184     * @return list of actionIDs for which the dependency is now available
185     */
186    public void partitionAvailable(String server, String db, String table, Map<String, String> partitions) {
187        Collection<String> actionsWithAvailableDep = dependencyCache.markDependencyAvailable(server, db, table,
188                partitions);
189        if (actionsWithAvailableDep != null) {
190            for (String actionID : actionsWithAvailableDep) {
191                boolean ret = Services.get().get(CallableQueueService.class)
192                        .queue(new CoordActionUpdatePushMissingDependency(actionID), 100);
193                if (ret == false) {
194                    XLog.getLog(getClass()).warn(
195                            "Unable to queue the callable commands for PartitionDependencyManagerService for actionID "
196                                    + actionID + ".Most possibly command queue is full. Queue size is :"
197                                    + Services.get().get(CallableQueueService.class).queueSize());
198                }
199            }
200        }
201    }
202
203    /**
204     * Get a list of available dependency URIs for a actionID
205     *
206     * @param actionID action id
207     * @return list of available dependency URIs
208     */
209    public Collection<String> getAvailableDependencyURIs(String actionID) {
210        return dependencyCache.getAvailableDependencyURIs(actionID);
211    }
212
213    /**
214     * Remove the list of available dependency URIs for a actionID once the missing dependencies are processed.
215     *
216     * @param actionID action id
217     * @param dependencyURIs set of dependency URIs
218     * @return true if successful, else false
219     */
220    public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) {
221        return dependencyCache.removeAvailableDependencyURIs(actionID, dependencyURIs);
222    }
223
224    /**
225     * Remove a coord action from dependency cache when all push missing dependencies available
226     *
227     * @param actionID action id
228     * @param dependencyURIs set of dependency URIs
229     * @return true if successful, else false
230     */
231    public void removeCoordActionWithDependenciesAvailable(String actionID) {
232        if (purgeEnabled) {
233            registeredCoordActionMap.remove(actionID);
234        }
235        dependencyCache.removeCoordActionWithDependenciesAvailable(actionID);
236    }
237
238    @VisibleForTesting
239    public void runCachePurgeWorker() {
240        new CachePurgeWorker(dependencyCache).run();
241    }
242}