001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.oozie.command.bundle.BundlePurgeXCommand;
022    import org.apache.oozie.command.coord.CoordPurgeCommand;
023    import org.apache.oozie.command.coord.CoordPurgeXCommand;
024    import org.apache.oozie.command.wf.PurgeCommand;
025    import org.apache.oozie.command.wf.PurgeXCommand;
026    import org.apache.oozie.service.CallableQueueService;
027    import org.apache.oozie.service.SchedulerService;
028    import org.apache.oozie.service.Service;
029    import org.apache.oozie.service.Services;
030    
031    /**
032     * The PurgeService schedules purging of completed jobs and associated action older than a specified age for workflow, coordinator and bundle.
033     */
034    public class PurgeService implements Service {
035    
036        public static final String CONF_PREFIX = Service.CONF_PREFIX + "PurgeService.";
037        /**
038         * Age of completed jobs to be deleted, in days.
039         */
040        public static final String CONF_OLDER_THAN = CONF_PREFIX + "older.than";
041        public static final String COORD_CONF_OLDER_THAN = CONF_PREFIX + "coord.older.than";
042        public static final String BUNDLE_CONF_OLDER_THAN = CONF_PREFIX + "bundle.older.than";
043        /**
044         * Time interval, in seconds, at which the purge jobs service will be scheduled to run.
045         */
046        public static final String CONF_PURGE_INTERVAL = CONF_PREFIX + "purge.interval";
047        public static final String PURGE_LIMIT = CONF_PREFIX + "purge.limit";
048    
049        private static boolean useXCommand = true;
050    
051        /**
052         * PurgeRunnable is the runnable which is scheduled to run at the configured interval. PurgeCommand is queued to
053         * remove completed jobs and associated actions older than the configured age for workflow, coordinator and bundle.
054         */
055        static class PurgeRunnable implements Runnable {
056            private int olderThan;
057            private int coordOlderThan;
058            private int bundleOlderThan;
059            private int limit;
060    
061            public PurgeRunnable(int olderThan, int coordOlderThan, int bundleOlderThan, int limit) {
062                this.olderThan = olderThan;
063                this.coordOlderThan = coordOlderThan;
064                this.bundleOlderThan = bundleOlderThan;
065                this.limit = limit;
066            }
067    
068            public void run() {
069                if (useXCommand) {
070                    Services.get().get(CallableQueueService.class).queue(new PurgeXCommand(olderThan, limit));
071                    Services.get().get(CallableQueueService.class).queue(new CoordPurgeXCommand(coordOlderThan, limit));
072                    Services.get().get(CallableQueueService.class).queue(new BundlePurgeXCommand(bundleOlderThan, limit));
073                } else {
074                    Services.get().get(CallableQueueService.class).queue(new PurgeCommand(olderThan, limit));
075                    Services.get().get(CallableQueueService.class).queue(new CoordPurgeCommand(coordOlderThan, limit));
076                }
077            }
078    
079        }
080    
081        /**
082         * Initializes the {@link PurgeService}.
083         *
084         * @param services services instance.
085         */
086        @Override
087        public void init(Services services) {
088            Configuration conf = services.getConf();
089            Runnable purgeJobsRunnable = new PurgeRunnable(conf.getInt(
090                    CONF_OLDER_THAN, 30), conf.getInt(COORD_CONF_OLDER_THAN, 7), conf.getInt(BUNDLE_CONF_OLDER_THAN, 7),
091                                          conf.getInt(PURGE_LIMIT, 100));
092            services.get(SchedulerService.class).schedule(purgeJobsRunnable, 10, conf.getInt(CONF_PURGE_INTERVAL, 3600),
093                                                          SchedulerService.Unit.SEC);
094    
095            if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) {
096                useXCommand = false;
097            }
098        }
099    
100        /**
101         * Destroy the Purge Jobs Service.
102         */
103        @Override
104        public void destroy() {
105        }
106    
107        /**
108         * Return the public interface for the purge jobs service.
109         *
110         * @return {@link PurgeService}.
111         */
112        @Override
113        public Class<? extends Service> getInterface() {
114            return PurgeService.class;
115        }
116    }