This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.util.Date;
021 import java.util.List;
022
023 import org.apache.hadoop.conf.Configuration;
024 import org.apache.oozie.BundleJobBean;
025 import org.apache.oozie.CoordinatorJobBean;
026 import org.apache.oozie.command.bundle.BundlePauseXCommand;
027 import org.apache.oozie.command.bundle.BundleStartXCommand;
028 import org.apache.oozie.command.bundle.BundleUnpauseXCommand;
029 import org.apache.oozie.command.coord.CoordPauseXCommand;
030 import org.apache.oozie.command.coord.CoordUnpauseXCommand;
031 import org.apache.oozie.executor.jpa.BundleJobsGetNeedStartJPAExecutor;
032 import org.apache.oozie.executor.jpa.BundleJobsGetPausedJPAExecutor;
033 import org.apache.oozie.executor.jpa.BundleJobsGetUnpausedJPAExecutor;
034 import org.apache.oozie.executor.jpa.CoordJobsGetPausedJPAExecutor;
035 import org.apache.oozie.executor.jpa.CoordJobsGetUnpausedJPAExecutor;
036 import org.apache.oozie.service.SchedulerService;
037 import org.apache.oozie.service.Service;
038 import org.apache.oozie.service.Services;
039 import org.apache.oozie.util.MemoryLocks;
040 import org.apache.oozie.util.XLog;
041
042 /**
043 * PauseTransitService is the runnable which is scheduled to run at the configured interval, it checks all bundles
044 * to see if they should be paused, un-paused or started.
045 */
046 public class PauseTransitService implements Service {
047 public static final String CONF_PREFIX = Service.CONF_PREFIX + "PauseTransitService.";
048 public static final String CONF_BUNDLE_PAUSE_START_INTERVAL = CONF_PREFIX + "PauseTransit.interval";
049 private final static XLog LOG = XLog.getLog(PauseTransitService.class);
050
051 /**
052 * PauseTransitRunnable is the runnable which is scheduled to run at the configured interval, it checks all
053 * bundles to see if they should be paused, un-paused or started.
054 */
055 static class PauseTransitRunnable implements Runnable {
056 private JPAService jpaService = null;
057 private MemoryLocks.LockToken lock;
058
059 public PauseTransitRunnable() {
060 jpaService = Services.get().get(JPAService.class);
061 if (jpaService == null) {
062 LOG.error("Missing JPAService");
063 }
064 }
065
066 public void run() {
067 try {
068 // first check if there is some other running instance from the same service;
069 lock = Services.get().get(MemoryLocksService.class).getWriteLock(
070 PauseTransitService.class.getName(), lockTimeout);
071 if (lock == null) {
072 LOG.info("This PauseTransitService instance will"
073 + "not run since there is already an instance running");
074 }
075 else {
076 LOG.info("Acquired lock for [{0}]", PauseTransitService.class.getName());
077
078 updateBundle();
079 updateCoord();
080 }
081 }
082 catch (Exception ex) {
083 LOG.warn("Exception happened when pausing/unpausing/starting bundle/coord jobs", ex);
084 }
085 finally {
086 // release lock;
087 if (lock != null) {
088 lock.release();
089 LOG.info("Released lock for [{0}]", PauseTransitService.class.getName());
090 }
091 }
092 }
093
094 private void updateBundle() {
095 Date d = new Date(); // records the start time of this service run;
096 List<BundleJobBean> jobList = null;
097 // pause bundles as needed;
098 try {
099 jobList = jpaService.execute(new BundleJobsGetUnpausedJPAExecutor(-1));
100 if (jobList != null) {
101 for (BundleJobBean bundleJob : jobList) {
102 if ((bundleJob.getPauseTime() != null) && !bundleJob.getPauseTime().after(d)) {
103 new BundlePauseXCommand(bundleJob).call();
104 LOG.debug("Calling BundlePauseXCommand for bundle job = " + bundleJob.getId());
105 }
106 }
107 }
108 }
109 catch (Exception ex) {
110 LOG.warn("Exception happened when pausing/unpausing/starting Bundle jobs", ex);
111 }
112 // unpause bundles as needed;
113 try {
114 jobList = jpaService.execute(new BundleJobsGetPausedJPAExecutor(-1));
115 if (jobList != null) {
116 for (BundleJobBean bundleJob : jobList) {
117 if ((bundleJob.getPauseTime() == null || bundleJob.getPauseTime().after(d))) {
118 new BundleUnpauseXCommand(bundleJob).call();
119 LOG.debug("Calling BundleUnpauseXCommand for bundle job = " + bundleJob.getId());
120 }
121 }
122 }
123 }
124 catch (Exception ex) {
125 LOG.warn("Exception happened when pausing/unpausing/starting Bundle jobs", ex);
126 }
127 // start bundles as needed;
128 try {
129 jobList = jpaService.execute(new BundleJobsGetNeedStartJPAExecutor(d));
130 if (jobList != null) {
131 for (BundleJobBean bundleJob : jobList) {
132 bundleJob.setKickoffTime(d);
133 new BundleStartXCommand(bundleJob.getId()).call();
134 LOG.debug("Calling BundleStartXCommand for bundle job = " + bundleJob.getId());
135 }
136 }
137 }
138 catch (Exception ex) {
139 LOG.warn("Exception happened when pausing/unpausing/starting Bundle jobs", ex);
140 }
141 }
142
143 private void updateCoord() {
144 Date d = new Date(); // records the start time of this service run;
145 List<CoordinatorJobBean> jobList = null;
146 Configuration conf = Services.get().getConf();
147 boolean backwardSupportForCoordStatus = conf.getBoolean(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false);
148
149 // pause coordinators as needed;
150 try {
151 jobList = jpaService.execute(new CoordJobsGetUnpausedJPAExecutor(-1));
152 if (jobList != null) {
153 for (CoordinatorJobBean coordJob : jobList) {
154 // if namespace 0.1 is used and backward support is true, then ignore this coord job
155 if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
156 && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
157 continue;
158 }
159 if ((coordJob.getPauseTime() != null) && !coordJob.getPauseTime().after(d)) {
160 new CoordPauseXCommand(coordJob).call();
161 LOG.debug("Calling CoordPauseXCommand for coordinator job = " + coordJob.getId());
162 }
163 }
164 }
165 }
166 catch (Exception ex) {
167 LOG.warn("Exception happened when pausing/unpausing Coordinator jobs", ex);
168 }
169 // unpause coordinators as needed;
170 try {
171 jobList = jpaService.execute(new CoordJobsGetPausedJPAExecutor(-1));
172 if (jobList != null) {
173 for (CoordinatorJobBean coordJob : jobList) {
174 // if namespace 0.1 is used and backward support is true, then ignore this coord job
175 if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
176 && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
177 continue;
178 }
179 if ((coordJob.getPauseTime() == null || coordJob.getPauseTime().after(d))) {
180 new CoordUnpauseXCommand(coordJob).call();
181 LOG.debug("Calling CoordUnpauseXCommand for coordinator job = " + coordJob.getId());
182 }
183 }
184 }
185 }
186 catch (Exception ex) {
187 LOG.warn("Exception happened when pausing/unpausing Coordinator jobs", ex);
188 }
189 }
190 }
191
192 /**
193 * Initializes the {@link PauseTransitService}.
194 *
195 * @param services services instance.
196 */
197 @Override
198 public void init(Services services) {
199 Configuration conf = services.getConf();
200 Runnable bundlePauseStartRunnable = new PauseTransitRunnable();
201 services.get(SchedulerService.class).schedule(bundlePauseStartRunnable, 10,
202 conf.getInt(CONF_BUNDLE_PAUSE_START_INTERVAL, 60), SchedulerService.Unit.SEC);
203 }
204
205 /**
206 * Destroy the StateTransit Jobs Service.
207 */
208 @Override
209 public void destroy() {
210 }
211
212 /**
213 * Return the public interface for the purge jobs service.
214 *
215 * @return {@link PauseTransitService}.
216 */
217 @Override
218 public Class<? extends Service> getInterface() {
219 return PauseTransitService.class;
220 }
221 }