001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.text.MessageFormat;
022import java.util.Collection;
023import java.util.Date;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.Map;
027import java.util.Set;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ConcurrentMap;
030
031import org.apache.commons.cli.ParseException;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.util.ReflectionUtils;
034import org.apache.oozie.CoordinatorActionBean;
035import org.apache.oozie.ErrorCode;
036import org.apache.oozie.client.CoordinatorAction;
037import org.apache.oozie.command.coord.CoordActionUpdatePushMissingDependency;
038import org.apache.oozie.dependency.hcat.HCatDependencyCache;
039import org.apache.oozie.dependency.hcat.SimpleHCatDependencyCache;
040import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
041import org.apache.oozie.executor.jpa.JPAExecutorException;
042import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
043import org.apache.oozie.util.HCatURI;
044import org.apache.oozie.util.XLog;
045
046import com.google.common.annotations.VisibleForTesting;
047
048/**
049 * Module that functions like a caching service to maintain partition dependency mappings
050 */
051public class PartitionDependencyManagerService implements Service {
052
053    public static final String CONF_PREFIX = Service.CONF_PREFIX + "PartitionDependencyManagerService.";
054    public static final String CACHE_MANAGER_IMPL = CONF_PREFIX + "cache.manager.impl";
055    public static final String CACHE_PURGE_INTERVAL = CONF_PREFIX + "cache.purge.interval";
056    public static final String CACHE_PURGE_TTL = CONF_PREFIX + "cache.purge.ttl";
057
058    private static XLog LOG = XLog.getLog(PartitionDependencyManagerService.class);
059
060    private HCatDependencyCache dependencyCache;
061
062    /**
063     * Keep timestamp when missing dependencies of a coord action are registered
064     */
065    private ConcurrentMap<String, Long> registeredCoordActionMap;
066
067    private boolean purgeEnabled = false;
068
069    @Override
070    public void init(Services services) throws ServiceException {
071        init(services.getConf());
072    }
073
074    private void init(Configuration conf) throws ServiceException {
075        Class<?> defaultClass = conf.getClass(CACHE_MANAGER_IMPL, null);
076        dependencyCache = (defaultClass == null) ? new SimpleHCatDependencyCache()
077                : (HCatDependencyCache) ReflectionUtils.newInstance(defaultClass, null);
078        dependencyCache.init(conf);
079        LOG.info("PartitionDependencyManagerService initialized. Dependency cache is {0} ", dependencyCache.getClass()
080                .getName());
081        purgeEnabled = Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode();
082        if (purgeEnabled) {
083            Runnable purgeThread = new CachePurgeWorker(dependencyCache);
084            // schedule runnable by default every 10 min
085            Services.get()
086                    .get(SchedulerService.class)
087                    .schedule(purgeThread, 10, Services.get().getConf().getInt(CACHE_PURGE_INTERVAL, 600),
088                            SchedulerService.Unit.SEC);
089            registeredCoordActionMap = new ConcurrentHashMap<String, Long>();
090        }
091    }
092
093    private class CachePurgeWorker implements Runnable {
094        HCatDependencyCache cache;
095        public CachePurgeWorker(HCatDependencyCache cache) {
096            this.cache = cache;
097        }
098
099        @Override
100        public void run() {
101            if (Thread.currentThread().isInterrupted()) {
102                return;
103            }
104            try {
105                purgeMissingDependency(Services.get().getConf().getInt(CACHE_PURGE_TTL, 1800));
106            }
107            catch (Throwable error) {
108                XLog.getLog(PartitionDependencyManagerService.class).debug("Throwable in CachePurgeWorker thread run : ", error);
109            }
110        }
111
112        private void purgeMissingDependency(int timeToLive) {
113            long currentTime = new Date().getTime();
114            Set<String> staleActions = new HashSet<String>();
115            Iterator<String> actionItr = registeredCoordActionMap.keySet().iterator();
116            while(actionItr.hasNext()){
117                String actionId = actionItr.next();
118                Long regTime = registeredCoordActionMap.get(actionId);
119                if(regTime < (currentTime - timeToLive * 1000)){
120                    CoordinatorActionBean caBean = null;
121                    try {
122                        caBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION_STATUS, actionId);
123                    }
124                    catch (JPAExecutorException e) {
125                        if (e.getErrorCode() == ErrorCode.E0605) {
126                            LOG.info(MessageFormat.format(
127                                    "Coord action {0} is not in database, deleting it from cache", actionId));
128                            staleActions.add(actionId);
129                            actionItr.remove();
130                        }
131                        else {
132                            LOG.warn("Error in checking coord action:" + actionId + "to purge, skipping", e);
133                        }
134                    }
135                    if(caBean != null && !caBean.getStatus().equals(CoordinatorAction.Status.WAITING)){
136                        staleActions.add(actionId);
137                        actionItr.remove();
138                    }
139                }
140            }
141            dependencyCache.removeNonWaitingCoordActions(staleActions);
142        }
143    }
144
145    @Override
146    public void destroy() {
147        dependencyCache.destroy();
148    }
149
150    @Override
151    public Class<? extends Service> getInterface() {
152        return PartitionDependencyManagerService.class;
153    }
154
155    /**
156     * Add a missing partition dependency and the actionID waiting on it
157     *
158     * @param hcatURI dependency URI
159     * @param actionID ID of action which is waiting for the dependency
160     */
161    public void addMissingDependency(HCatURI hcatURI, String actionID) {
162        if (purgeEnabled) {
163            registeredCoordActionMap.put(actionID, new Date().getTime());
164        }
165        dependencyCache.addMissingDependency(hcatURI, actionID);
166    }
167
168    /**
169     * Remove a missing partition dependency associated with a actionID
170     *
171     * @param hcatURI dependency URI
172     * @param actionID ID of action which is waiting for the dependency
173     * @return true if successful, else false
174     */
175    public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
176        return dependencyCache.removeMissingDependency(hcatURI, actionID);
177    }
178
179    /**
180     * Get the list of actionIDs waiting for a partition
181     *
182     * @param hcatURI dependency URI
183     * @return list of actionIDs
184     */
185    public Collection<String> getWaitingActions(HCatURI hcatURI) {
186        return dependencyCache.getWaitingActions(hcatURI);
187    }
188
189    /**
190     * Mark a partition dependency as available
191     *
192     * @param server host:port of the server
193     * @param db name of the database
194     * @param table name of the table
195     * @param partitions list of available partitions
196     */
197    public void partitionAvailable(String server, String db, String table, Map<String, String> partitions) {
198        Collection<String> actionsWithAvailableDep = dependencyCache.markDependencyAvailable(server, db, table,
199                partitions);
200        if (actionsWithAvailableDep != null) {
201            for (String actionID : actionsWithAvailableDep) {
202                boolean ret = Services.get().get(CallableQueueService.class)
203                        .queue(new CoordActionUpdatePushMissingDependency(actionID), 100);
204                if (ret == false) {
205                    XLog.getLog(getClass()).warn(
206                            "Unable to queue the callable commands for PartitionDependencyManagerService for actionID "
207                                    + actionID + ".Most possibly command queue is full. Queue size is :"
208                                    + Services.get().get(CallableQueueService.class).queueSize());
209                }
210            }
211        }
212    }
213
214    /**
215     * Get a list of available dependency URIs for a actionID
216     *
217     * @param actionID action id
218     * @return list of available dependency URIs
219     */
220    public Collection<String> getAvailableDependencyURIs(String actionID) {
221        return dependencyCache.getAvailableDependencyURIs(actionID);
222    }
223
224    /**
225     * Remove the list of available dependency URIs for a actionID once the missing dependencies are processed.
226     *
227     * @param actionID action id
228     * @param dependencyURIs set of dependency URIs
229     * @return true if successful, else false
230     */
231    public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) {
232        return dependencyCache.removeAvailableDependencyURIs(actionID, dependencyURIs);
233    }
234
235    /**
236     * Remove a coord action from dependency cache when all push missing dependencies available
237     *
238     * @param actionID action id
239     */
240    public void removeCoordActionWithDependenciesAvailable(String actionID) {
241        if (purgeEnabled) {
242            registeredCoordActionMap.remove(actionID);
243        }
244        dependencyCache.removeCoordActionWithDependenciesAvailable(actionID);
245    }
246
247    @VisibleForTesting
248    public void runCachePurgeWorker() {
249        new CachePurgeWorker(dependencyCache).run();
250    }
251}