001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.util.Date;
021import java.util.List;
022
023import org.apache.hadoop.conf.Configuration;
024import org.apache.oozie.BundleJobBean;
025import org.apache.oozie.CoordinatorJobBean;
026import org.apache.oozie.command.bundle.BundlePauseXCommand;
027import org.apache.oozie.command.bundle.BundleStartXCommand;
028import org.apache.oozie.command.bundle.BundleUnpauseXCommand;
029import org.apache.oozie.command.coord.CoordPauseXCommand;
030import org.apache.oozie.command.coord.CoordUnpauseXCommand;
031import org.apache.oozie.executor.jpa.BundleJobsGetNeedStartJPAExecutor;
032import org.apache.oozie.executor.jpa.BundleJobsGetPausedJPAExecutor;
033import org.apache.oozie.executor.jpa.BundleJobsGetUnpausedJPAExecutor;
034import org.apache.oozie.executor.jpa.CoordJobsGetPausedJPAExecutor;
035import org.apache.oozie.executor.jpa.CoordJobsGetUnpausedJPAExecutor;
036import org.apache.oozie.service.SchedulerService;
037import org.apache.oozie.service.Service;
038import org.apache.oozie.service.Services;
039import org.apache.oozie.lock.LockToken;
040import org.apache.oozie.util.XLog;
041
042import com.google.common.annotations.VisibleForTesting;
043
044/**
045 * PauseTransitService is the runnable which is scheduled to run at the configured interval, it checks all bundles
046 * to see if they should be paused, un-paused or started.
047 */
048public class PauseTransitService implements Service {
049    public static final String CONF_PREFIX = Service.CONF_PREFIX + "PauseTransitService.";
050    public static final String CONF_BUNDLE_PAUSE_START_INTERVAL = CONF_PREFIX + "PauseTransit.interval";
051    private final static XLog LOG = XLog.getLog(PauseTransitService.class);
052
053    /**
054     * PauseTransitRunnable is the runnable which is scheduled to run at the configured interval, it checks all
055     * bundles to see if they should be paused, un-paused or started.
056     */
057    @VisibleForTesting
058    public static class PauseTransitRunnable implements Runnable {
059        private JPAService jpaService = null;
060        private LockToken lock;
061
062        public PauseTransitRunnable() {
063            jpaService = Services.get().get(JPAService.class);
064            if (jpaService == null) {
065                LOG.error("Missing JPAService");
066            }
067        }
068
069        public void run() {
070            try {
071                // first check if there is some other running instance from the same service;
072                lock = Services.get().get(MemoryLocksService.class).getWriteLock(
073                        PauseTransitService.class.getName(), lockTimeout);
074                if (lock == null) {
075                    LOG.info("This PauseTransitService instance will"
076                            + "not run since there is already an instance running");
077                }
078                else {
079                    LOG.info("Acquired lock for [{0}]", PauseTransitService.class.getName());
080
081                    updateBundle();
082                    updateCoord();
083                }
084            }
085            catch (Exception ex) {
086                LOG.warn("Exception happened when pausing/unpausing/starting bundle/coord jobs", ex);
087            }
088            finally {
089                // release lock;
090                if (lock != null) {
091                    lock.release();
092                    LOG.info("Released lock for [{0}]", PauseTransitService.class.getName());
093                }
094            }
095        }
096
097        private void updateBundle() {
098            Date d = new Date(); // records the start time of this service run;
099            List<BundleJobBean> jobList = null;
100            // pause bundles as needed;
101            try {
102                jobList = jpaService.execute(new BundleJobsGetUnpausedJPAExecutor(-1));
103                if (jobList != null) {
104                    for (BundleJobBean bundleJob : jobList) {
105                        if ((bundleJob.getPauseTime() != null) && !bundleJob.getPauseTime().after(d)) {
106                            new BundlePauseXCommand(bundleJob).call();
107                            LOG.debug("Calling BundlePauseXCommand for bundle job = " + bundleJob.getId());
108                        }
109                    }
110                }
111            }
112            catch (Exception ex) {
113                LOG.warn("Exception happened when pausing/unpausing/starting Bundle jobs", ex);
114            }
115            // unpause bundles as needed;
116            try {
117                jobList = jpaService.execute(new BundleJobsGetPausedJPAExecutor(-1));
118                if (jobList != null) {
119                    for (BundleJobBean bundleJob : jobList) {
120                        if ((bundleJob.getPauseTime() == null || bundleJob.getPauseTime().after(d))) {
121                            new BundleUnpauseXCommand(bundleJob).call();
122                            LOG.debug("Calling BundleUnpauseXCommand for bundle job = " + bundleJob.getId());
123                        }
124                    }
125                }
126            }
127            catch (Exception ex) {
128                LOG.warn("Exception happened when pausing/unpausing/starting Bundle jobs", ex);
129            }
130            // start bundles as needed;
131            try {
132                jobList = jpaService.execute(new BundleJobsGetNeedStartJPAExecutor(d));
133                if (jobList != null) {
134                    for (BundleJobBean bundleJob : jobList) {
135                        bundleJob.setKickoffTime(d);
136                        new BundleStartXCommand(bundleJob.getId()).call();
137                        LOG.debug("Calling BundleStartXCommand for bundle job = " + bundleJob.getId());
138                    }
139                }
140            }
141            catch (Exception ex) {
142                LOG.warn("Exception happened when pausing/unpausing/starting Bundle jobs", ex);
143            }
144        }
145
146        private void updateCoord() {
147            Date d = new Date(); // records the start time of this service run;
148            List<CoordinatorJobBean> jobList = null;
149            Configuration conf = Services.get().getConf();
150            boolean backwardSupportForCoordStatus = conf.getBoolean(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false);
151
152            // pause coordinators as needed;
153            try {
154                jobList = jpaService.execute(new CoordJobsGetUnpausedJPAExecutor(-1));
155                if (jobList != null) {
156                    for (CoordinatorJobBean coordJob : jobList) {
157                        // if namespace 0.1 is used and backward support is true, then ignore this coord job
158                        if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
159                                && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
160                            continue;
161                        }
162                        if ((coordJob.getPauseTime() != null) && !coordJob.getPauseTime().after(d)) {
163                            new CoordPauseXCommand(coordJob).call();
164                            LOG.debug("Calling CoordPauseXCommand for coordinator job = " + coordJob.getId());
165                        }
166                    }
167                }
168            }
169            catch (Exception ex) {
170                LOG.warn("Exception happened when pausing/unpausing Coordinator jobs", ex);
171            }
172            // unpause coordinators as needed;
173            try {
174                jobList = jpaService.execute(new CoordJobsGetPausedJPAExecutor(-1));
175                if (jobList != null) {
176                    for (CoordinatorJobBean coordJob : jobList) {
177                        // if namespace 0.1 is used and backward support is true, then ignore this coord job
178                        if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
179                                && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
180                            continue;
181                        }
182                        if ((coordJob.getPauseTime() == null || coordJob.getPauseTime().after(d))) {
183                            new CoordUnpauseXCommand(coordJob).call();
184                            LOG.debug("Calling CoordUnpauseXCommand for coordinator job = " + coordJob.getId());
185                        }
186                    }
187                }
188            }
189            catch (Exception ex) {
190                LOG.warn("Exception happened when pausing/unpausing Coordinator jobs", ex);
191            }
192        }
193    }
194
195    /**
196     * Initializes the {@link PauseTransitService}.
197     *
198     * @param services services instance.
199     */
200    @Override
201    public void init(Services services) {
202        Configuration conf = services.getConf();
203        Runnable bundlePauseStartRunnable = new PauseTransitRunnable();
204        services.get(SchedulerService.class).schedule(bundlePauseStartRunnable, 10,
205                conf.getInt(CONF_BUNDLE_PAUSE_START_INTERVAL, 60), SchedulerService.Unit.SEC);
206    }
207
208    /**
209     * Destroy the StateTransit Jobs Service.
210     */
211    @Override
212    public void destroy() {
213    }
214
215    /**
216     * Return the public interface for the purge jobs service.
217     *
218     * @return {@link PauseTransitService}.
219     */
220    @Override
221    public Class<? extends Service> getInterface() {
222        return PauseTransitService.class;
223    }
224}