001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.util.ArrayList;
022import java.util.Date;
023import java.util.List;
024
025import org.apache.oozie.BundleJobBean;
026import org.apache.oozie.CoordinatorJobBean;
027import org.apache.oozie.command.bundle.BundlePauseXCommand;
028import org.apache.oozie.command.bundle.BundleStartXCommand;
029import org.apache.oozie.command.bundle.BundleUnpauseXCommand;
030import org.apache.oozie.command.coord.CoordPauseXCommand;
031import org.apache.oozie.command.coord.CoordUnpauseXCommand;
032import org.apache.oozie.executor.jpa.BundleJobsGetNeedStartJPAExecutor;
033import org.apache.oozie.executor.jpa.BundleJobsGetPausedJPAExecutor;
034import org.apache.oozie.executor.jpa.BundleJobsGetUnpausedJPAExecutor;
035import org.apache.oozie.executor.jpa.CoordJobsGetPausedJPAExecutor;
036import org.apache.oozie.executor.jpa.CoordJobsGetUnpausedJPAExecutor;
037import org.apache.oozie.executor.jpa.JPAExecutorException;
038import org.apache.oozie.service.SchedulerService;
039import org.apache.oozie.service.Service;
040import org.apache.oozie.service.Services;
041import org.apache.oozie.lock.LockToken;
042import org.apache.oozie.util.ConfigUtils;
043import org.apache.oozie.util.XCallable;
044import org.apache.oozie.util.XLog;
045
046import com.google.common.annotations.VisibleForTesting;
047
048/**
049 * PauseTransitService is the runnable which is scheduled to run at the configured interval, it checks all bundles to
050 * see if they should be paused, un-paused or started.
051 */
052public class PauseTransitService implements Service {
053    public static final String CONF_PREFIX = Service.CONF_PREFIX + "PauseTransitService.";
054    public static final String CONF_BUNDLE_PAUSE_START_INTERVAL = CONF_PREFIX + "PauseTransit.interval";
055    private final static XLog LOG = XLog.getLog(PauseTransitService.class);
056
057    public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
058
059    /**
060     * PauseTransitRunnable is the runnable which is scheduled to run at the configured interval, it checks all bundles
061     * to see if they should be paused, un-paused or started.
062     */
063    @VisibleForTesting
064    public static class PauseTransitRunnable implements Runnable {
065        private JPAService jpaService = null;
066        private LockToken lock;
067        private List<XCallable<Void>> callables;
068
069        public PauseTransitRunnable() {
070            jpaService = Services.get().get(JPAService.class);
071            if (jpaService == null) {
072                LOG.error("Missing JPAService");
073            }
074        }
075
076        public void run() {
077            try {
078                // first check if there is some other running instance from the same service;
079                lock = Services.get().get(MemoryLocksService.class)
080                        .getWriteLock(PauseTransitService.class.getName(), lockTimeout);
081                if (lock == null) {
082                    LOG.info("This PauseTransitService instance will"
083                            + "not run since there is already an instance running");
084                }
085                else {
086                    LOG.info("Acquired lock for [{0}]", PauseTransitService.class.getName());
087
088                    updateBundle();
089                    updateCoord();
090                    if (null != callables) {
091                        boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
092                        if (ret == false) {
093                            XLog.getLog(getClass()).warn(
094                                    "Unable to queue the callables commands for PauseTransitService. "
095                                            + "Most possibly command queue is full. Queue size is :"
096                                            + Services.get().get(CallableQueueService.class).queueSize());
097                        }
098                        callables = null;
099                    }
100
101                }
102            }
103            catch (Exception ex) {
104                LOG.warn("Exception happened when pausing/unpausing/starting bundle/coord jobs", ex);
105            }
106            finally {
107                // release lock;
108                if (lock != null) {
109                    lock.release();
110                    LOG.info("Released lock for [{0}]", PauseTransitService.class.getName());
111                }
112            }
113        }
114
115        private void updateBundle() {
116            Date d = new Date(); // records the start time of this service run;
117            List<BundleJobBean> jobList = null;
118            // pause bundles as needed;
119            try {
120                jobList = jpaService.execute(new BundleJobsGetUnpausedJPAExecutor(-1));
121                if (jobList != null) {
122                    for (BundleJobBean bundleJob : jobList) {
123                        if ((bundleJob.getPauseTime() != null) && !bundleJob.getPauseTime().after(d)) {
124                            queueCallable(new BundlePauseXCommand(bundleJob));
125                            LOG.debug("Queuing BundlePauseXCommand for bundle job = " + bundleJob.getId());
126                        }
127                    }
128                }
129            }
130            catch (JPAExecutorException ex) {
131                LOG.warn("JPAExecutorException happened when pausing/unpausing/starting Bundle jobs", ex);
132            }
133            // unpause bundles as needed;
134            try {
135                jobList = jpaService.execute(new BundleJobsGetPausedJPAExecutor(-1));
136                if (jobList != null) {
137                    for (BundleJobBean bundleJob : jobList) {
138                        if ((bundleJob.getPauseTime() == null || bundleJob.getPauseTime().after(d))) {
139                            queueCallable(new BundleUnpauseXCommand(bundleJob));
140                            LOG.debug("Queuing BundleUnpauseXCommand for bundle job = " + bundleJob.getId());
141                        }
142                    }
143                }
144            }
145            catch (JPAExecutorException ex) {
146                LOG.warn("JPAExecutorException happened when pausing/unpausing/starting Bundle jobs", ex);
147            }
148            // start bundles as needed;
149            try {
150                jobList = jpaService.execute(new BundleJobsGetNeedStartJPAExecutor(d));
151                if (jobList != null) {
152                    for (BundleJobBean bundleJob : jobList) {
153                        queueCallable(new BundleStartXCommand(bundleJob.getId()));
154                        LOG.debug("Queuing BundleStartXCommand for bundle job = " + bundleJob.getId());
155                    }
156                }
157            }
158            catch (JPAExecutorException ex) {
159                LOG.warn("JPAExecutorException happened when pausing/unpausing/starting Bundle jobs", ex);
160            }
161        }
162
163        private void updateCoord() {
164            Date d = new Date(); // records the start time of this service run;
165            List<CoordinatorJobBean> jobList = null;
166            boolean backwardSupportForCoordStatus = ConfigUtils.isBackwardSupportForCoordStatus();
167
168            // pause coordinators as needed;
169            try {
170                jobList = jpaService.execute(new CoordJobsGetUnpausedJPAExecutor(-1));
171                if (jobList != null) {
172                    for (CoordinatorJobBean coordJob : jobList) {
173                        // if namespace 0.1 is used and backward support is true, then ignore this coord job
174                        if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
175                                && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
176                            continue;
177                        }
178                        if ((coordJob.getPauseTime() != null) && !coordJob.getPauseTime().after(d)) {
179                            queueCallable(new CoordPauseXCommand(coordJob));
180                            LOG.debug("Queuing CoordPauseXCommand for coordinator job = " + coordJob.getId());
181                        }
182                    }
183                }
184            }
185            catch (JPAExecutorException ex) {
186                LOG.warn("JPAExecutorException happened when pausing/unpausing Coordinator jobs", ex);
187            }
188            // unpause coordinators as needed;
189            try {
190                jobList = jpaService.execute(new CoordJobsGetPausedJPAExecutor(-1));
191                if (jobList != null) {
192                    for (CoordinatorJobBean coordJob : jobList) {
193                        // if namespace 0.1 is used and backward support is true, then ignore this coord job
194                        if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
195                                && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
196                            continue;
197                        }
198                        if ((coordJob.getPauseTime() == null || coordJob.getPauseTime().after(d))) {
199                            queueCallable(new CoordUnpauseXCommand(coordJob));
200                            LOG.debug("Queuing CoordUnpauseXCommand for coordinator job = " + coordJob.getId());
201                        }
202                    }
203                }
204            }
205            catch (JPAExecutorException ex) {
206                LOG.warn("JPAExecutorException happened when pausing/unpausing Coordinator jobs", ex);
207            }
208        }
209
210        private void queueCallable(XCallable<Void> callable) {
211            if (callables == null) {
212                callables = new ArrayList<XCallable<Void>>();
213            }
214            callables.add(callable);
215            if (callables.size() == ConfigurationService.getInt(CONF_CALLABLE_BATCH_SIZE)) {
216                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
217                if (ret == false) {
218                    XLog.getLog(getClass()).warn(
219                            "Unable to queue the callables commands for PauseTransitService. "
220                                    + "Most possibly command queue is full. Queue size is :"
221                                    + Services.get().get(CallableQueueService.class).queueSize());
222                }
223                callables = new ArrayList<XCallable<Void>>();
224            }
225        }
226    }
227
228    /**
229     * Initializes the {@link PauseTransitService}.
230     *
231     * @param services services instance.
232     */
233    @Override
234    public void init(Services services) {
235        Runnable bundlePauseStartRunnable = new PauseTransitRunnable();
236        services.get(SchedulerService.class).schedule(bundlePauseStartRunnable, 10,
237                ConfigurationService.getInt(services.getConf(), CONF_BUNDLE_PAUSE_START_INTERVAL),
238                SchedulerService.Unit.SEC);
239    }
240
241    /**
242     * Destroy the StateTransit Jobs Service.
243     */
244    @Override
245    public void destroy() {
246    }
247
248    /**
249     * Return the public interface for the purge jobs service.
250     *
251     * @return {@link PauseTransitService}.
252     */
253    @Override
254    public Class<? extends Service> getInterface() {
255        return PauseTransitService.class;
256    }
257}