001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.oozie.command.bundle.BundlePurgeXCommand;
022    import org.apache.oozie.command.coord.CoordPurgeXCommand;
023    import org.apache.oozie.command.wf.PurgeXCommand;
024    import org.apache.oozie.service.CallableQueueService;
025    import org.apache.oozie.service.SchedulerService;
026    import org.apache.oozie.service.Service;
027    import org.apache.oozie.service.Services;
028    
029    /**
030     * The PurgeService schedules purging of completed jobs and associated action older than a specified age for workflow, coordinator and bundle.
031     */
032    public class PurgeService implements Service {
033    
034        public static final String CONF_PREFIX = Service.CONF_PREFIX + "PurgeService.";
035        /**
036         * Age of completed jobs to be deleted, in days.
037         */
038        public static final String CONF_OLDER_THAN = CONF_PREFIX + "older.than";
039        public static final String COORD_CONF_OLDER_THAN = CONF_PREFIX + "coord.older.than";
040        public static final String BUNDLE_CONF_OLDER_THAN = CONF_PREFIX + "bundle.older.than";
041        /**
042         * Time interval, in seconds, at which the purge jobs service will be scheduled to run.
043         */
044        public static final String CONF_PURGE_INTERVAL = CONF_PREFIX + "purge.interval";
045        public static final String PURGE_LIMIT = CONF_PREFIX + "purge.limit";
046    
047        /**
048         * PurgeRunnable is the runnable which is scheduled to run at the configured interval. PurgeCommand is queued to
049         * remove completed jobs and associated actions older than the configured age for workflow, coordinator and bundle.
050         */
051        static class PurgeRunnable implements Runnable {
052            private int olderThan;
053            private int coordOlderThan;
054            private int bundleOlderThan;
055            private int limit;
056    
057            public PurgeRunnable(int olderThan, int coordOlderThan, int bundleOlderThan, int limit) {
058                this.olderThan = olderThan;
059                this.coordOlderThan = coordOlderThan;
060                this.bundleOlderThan = bundleOlderThan;
061                this.limit = limit;
062            }
063    
064            public void run() {
065                    Services.get().get(CallableQueueService.class).queue(new PurgeXCommand(olderThan, limit));
066                    Services.get().get(CallableQueueService.class).queue(new CoordPurgeXCommand(coordOlderThan, limit));
067                    Services.get().get(CallableQueueService.class).queue(new BundlePurgeXCommand(bundleOlderThan, limit));
068            }
069    
070        }
071    
072        /**
073         * Initializes the {@link PurgeService}.
074         *
075         * @param services services instance.
076         */
077        @Override
078        public void init(Services services) {
079            Configuration conf = services.getConf();
080            Runnable purgeJobsRunnable = new PurgeRunnable(conf.getInt(
081                    CONF_OLDER_THAN, 30), conf.getInt(COORD_CONF_OLDER_THAN, 7), conf.getInt(BUNDLE_CONF_OLDER_THAN, 7),
082                                          conf.getInt(PURGE_LIMIT, 100));
083            services.get(SchedulerService.class).schedule(purgeJobsRunnable, 10, conf.getInt(CONF_PURGE_INTERVAL, 3600),
084                                                          SchedulerService.Unit.SEC);
085        }
086    
087        /**
088         * Destroy the Purge Jobs Service.
089         */
090        @Override
091        public void destroy() {
092        }
093    
094        /**
095         * Return the public interface for the purge jobs service.
096         *
097         * @return {@link PurgeService}.
098         */
099        @Override
100        public Class<? extends Service> getInterface() {
101            return PurgeService.class;
102        }
103    }