001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.util.Date;
021    import java.util.List;
022    
023    import org.apache.hadoop.conf.Configuration;
024    import org.apache.oozie.BundleJobBean;
025    import org.apache.oozie.CoordinatorJobBean;
026    import org.apache.oozie.command.bundle.BundlePauseXCommand;
027    import org.apache.oozie.command.bundle.BundleStartXCommand;
028    import org.apache.oozie.command.bundle.BundleUnpauseXCommand;
029    import org.apache.oozie.command.coord.CoordPauseXCommand;
030    import org.apache.oozie.command.coord.CoordUnpauseXCommand;
031    import org.apache.oozie.executor.jpa.BundleJobsGetNeedStartJPAExecutor;
032    import org.apache.oozie.executor.jpa.BundleJobsGetPausedJPAExecutor;
033    import org.apache.oozie.executor.jpa.BundleJobsGetUnpausedJPAExecutor;
034    import org.apache.oozie.executor.jpa.CoordJobsGetPausedJPAExecutor;
035    import org.apache.oozie.executor.jpa.CoordJobsGetUnpausedJPAExecutor;
036    import org.apache.oozie.service.SchedulerService;
037    import org.apache.oozie.service.Service;
038    import org.apache.oozie.service.Services;
039    import org.apache.oozie.util.MemoryLocks;
040    import org.apache.oozie.util.XLog;
041    
042    /**
043     * PauseTransitService is the runnable which is scheduled to run at the configured interval, it checks all bundles
044     * to see if they should be paused, un-paused or started.
045     */
046    public class PauseTransitService implements Service {
047        public static final String CONF_PREFIX = Service.CONF_PREFIX + "PauseTransitService.";
048        public static final String CONF_BUNDLE_PAUSE_START_INTERVAL = CONF_PREFIX + "PauseTransit.interval";
049        private final static XLog LOG = XLog.getLog(PauseTransitService.class);
050    
051        /**
052         * PauseTransitRunnable is the runnable which is scheduled to run at the configured interval, it checks all
053         * bundles to see if they should be paused, un-paused or started.
054         */
055        static class PauseTransitRunnable implements Runnable {
056            private JPAService jpaService = null;
057            private MemoryLocks.LockToken lock;
058    
059            public PauseTransitRunnable() {
060                jpaService = Services.get().get(JPAService.class);
061                if (jpaService == null) {
062                    LOG.error("Missing JPAService");
063                }
064            }
065    
066            public void run() {
067                try {
068                    // first check if there is some other running instance from the same service;
069                    lock = Services.get().get(MemoryLocksService.class).getWriteLock(
070                            PauseTransitService.class.getName(), lockTimeout);
071                    if (lock == null) {
072                        LOG.info("This PauseTransitService instance will"
073                                + "not run since there is already an instance running");
074                    }
075                    else {
076                        LOG.info("Acquired lock for [{0}]", PauseTransitService.class.getName());
077    
078                        updateBundle();
079                        updateCoord();
080                    }
081                }
082                catch (Exception ex) {
083                    LOG.warn("Exception happened when pausing/unpausing/starting bundle/coord jobs", ex);
084                }
085                finally {
086                    // release lock;
087                    if (lock != null) {
088                        lock.release();
089                        LOG.info("Released lock for [{0}]", PauseTransitService.class.getName());
090                    }
091                }
092            }
093    
094            private void updateBundle() {
095                Date d = new Date(); // records the start time of this service run;
096                List<BundleJobBean> jobList = null;
097                // pause bundles as needed;
098                try {
099                    jobList = jpaService.execute(new BundleJobsGetUnpausedJPAExecutor(-1));
100                    if (jobList != null) {
101                        for (BundleJobBean bundleJob : jobList) {
102                            if ((bundleJob.getPauseTime() != null) && !bundleJob.getPauseTime().after(d)) {
103                                new BundlePauseXCommand(bundleJob).call();
104                                LOG.debug("Calling BundlePauseXCommand for bundle job = " + bundleJob.getId());
105                            }
106                        }
107                    }
108                }
109                catch (Exception ex) {
110                    LOG.warn("Exception happened when pausing/unpausing/starting Bundle jobs", ex);
111                }
112                // unpause bundles as needed;
113                try {
114                    jobList = jpaService.execute(new BundleJobsGetPausedJPAExecutor(-1));
115                    if (jobList != null) {
116                        for (BundleJobBean bundleJob : jobList) {
117                            if ((bundleJob.getPauseTime() == null || bundleJob.getPauseTime().after(d))) {
118                                new BundleUnpauseXCommand(bundleJob).call();
119                                LOG.debug("Calling BundleUnpauseXCommand for bundle job = " + bundleJob.getId());
120                            }
121                        }
122                    }
123                }
124                catch (Exception ex) {
125                    LOG.warn("Exception happened when pausing/unpausing/starting Bundle jobs", ex);
126                }
127                // start bundles as needed;
128                try {
129                    jobList = jpaService.execute(new BundleJobsGetNeedStartJPAExecutor(d));
130                    if (jobList != null) {
131                        for (BundleJobBean bundleJob : jobList) {
132                            bundleJob.setKickoffTime(d);
133                            new BundleStartXCommand(bundleJob.getId()).call();
134                            LOG.debug("Calling BundleStartXCommand for bundle job = " + bundleJob.getId());
135                        }
136                    }
137                }
138                catch (Exception ex) {
139                    LOG.warn("Exception happened when pausing/unpausing/starting Bundle jobs", ex);
140                }
141            }
142    
143            private void updateCoord() {
144                Date d = new Date(); // records the start time of this service run;
145                List<CoordinatorJobBean> jobList = null;
146                Configuration conf = Services.get().getConf();
147                boolean backwardSupportForCoordStatus = conf.getBoolean(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false);
148    
149                // pause coordinators as needed;
150                try {
151                    jobList = jpaService.execute(new CoordJobsGetUnpausedJPAExecutor(-1));
152                    if (jobList != null) {
153                        for (CoordinatorJobBean coordJob : jobList) {
154                            // if namespace 0.1 is used and backward support is true, then ignore this coord job
155                            if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
156                                    && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
157                                continue;
158                            }
159                            if ((coordJob.getPauseTime() != null) && !coordJob.getPauseTime().after(d)) {
160                                new CoordPauseXCommand(coordJob).call();
161                                LOG.debug("Calling CoordPauseXCommand for coordinator job = " + coordJob.getId());
162                            }
163                        }
164                    }
165                }
166                catch (Exception ex) {
167                    LOG.warn("Exception happened when pausing/unpausing Coordinator jobs", ex);
168                }
169                // unpause coordinators as needed;
170                try {
171                    jobList = jpaService.execute(new CoordJobsGetPausedJPAExecutor(-1));
172                    if (jobList != null) {
173                        for (CoordinatorJobBean coordJob : jobList) {
174                            // if namespace 0.1 is used and backward support is true, then ignore this coord job
175                            if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
176                                    && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
177                                continue;
178                            }
179                            if ((coordJob.getPauseTime() == null || coordJob.getPauseTime().after(d))) {
180                                new CoordUnpauseXCommand(coordJob).call();
181                                LOG.debug("Calling CoordUnpauseXCommand for coordinator job = " + coordJob.getId());
182                            }
183                        }
184                    }
185                }
186                catch (Exception ex) {
187                    LOG.warn("Exception happened when pausing/unpausing Coordinator jobs", ex);
188                }
189            }
190        }
191    
192        /**
193         * Initializes the {@link PauseTransitService}.
194         *
195         * @param services services instance.
196         */
197        @Override
198        public void init(Services services) {
199            Configuration conf = services.getConf();
200            Runnable bundlePauseStartRunnable = new PauseTransitRunnable();
201            services.get(SchedulerService.class).schedule(bundlePauseStartRunnable, 10,
202                    conf.getInt(CONF_BUNDLE_PAUSE_START_INTERVAL, 60), SchedulerService.Unit.SEC);
203        }
204    
205        /**
206         * Destroy the StateTransit Jobs Service.
207         */
208        @Override
209        public void destroy() {
210        }
211    
212        /**
213         * Return the public interface for the purge jobs service.
214         *
215         * @return {@link PauseTransitService}.
216         */
217        @Override
218        public Class<? extends Service> getInterface() {
219            return PauseTransitService.class;
220        }
221    }