001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.util.ArrayList; 022import java.util.Date; 023import java.util.List; 024 025import org.apache.oozie.BundleJobBean; 026import org.apache.oozie.CoordinatorJobBean; 027import org.apache.oozie.command.bundle.BundlePauseXCommand; 028import org.apache.oozie.command.bundle.BundleStartXCommand; 029import org.apache.oozie.command.bundle.BundleUnpauseXCommand; 030import org.apache.oozie.command.coord.CoordPauseXCommand; 031import org.apache.oozie.command.coord.CoordUnpauseXCommand; 032import org.apache.oozie.executor.jpa.BundleJobsGetNeedStartJPAExecutor; 033import org.apache.oozie.executor.jpa.BundleJobsGetPausedJPAExecutor; 034import org.apache.oozie.executor.jpa.BundleJobsGetUnpausedJPAExecutor; 035import org.apache.oozie.executor.jpa.CoordJobsGetPausedJPAExecutor; 036import org.apache.oozie.executor.jpa.CoordJobsGetUnpausedJPAExecutor; 037import org.apache.oozie.executor.jpa.JPAExecutorException; 038import org.apache.oozie.service.SchedulerService; 039import org.apache.oozie.service.Service; 040import org.apache.oozie.service.Services; 041import org.apache.oozie.lock.LockToken; 042import org.apache.oozie.util.ConfigUtils; 043import org.apache.oozie.util.XCallable; 044import org.apache.oozie.util.XLog; 045 046import com.google.common.annotations.VisibleForTesting; 047 048/** 049 * PauseTransitService is the runnable which is scheduled to run at the configured interval, it checks all bundles to 050 * see if they should be paused, un-paused or started. 051 */ 052public class PauseTransitService implements Service { 053 public static final String CONF_PREFIX = Service.CONF_PREFIX + "PauseTransitService."; 054 public static final String CONF_BUNDLE_PAUSE_START_INTERVAL = CONF_PREFIX + "PauseTransit.interval"; 055 private final static XLog LOG = XLog.getLog(PauseTransitService.class); 056 057 public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size"; 058 059 /** 060 * PauseTransitRunnable is the runnable which is scheduled to run at the configured interval, it checks all bundles 061 * to see if they should be paused, un-paused or started. 062 */ 063 @VisibleForTesting 064 public static class PauseTransitRunnable implements Runnable { 065 private JPAService jpaService = null; 066 private LockToken lock; 067 private List<XCallable<Void>> callables; 068 069 public PauseTransitRunnable() { 070 jpaService = Services.get().get(JPAService.class); 071 if (jpaService == null) { 072 LOG.error("Missing JPAService"); 073 } 074 } 075 076 public void run() { 077 try { 078 // first check if there is some other running instance from the same service; 079 lock = Services.get().get(MemoryLocksService.class) 080 .getWriteLock(PauseTransitService.class.getName(), lockTimeout); 081 if (lock == null) { 082 LOG.info("This PauseTransitService instance will" 083 + "not run since there is already an instance running"); 084 } 085 else { 086 LOG.info("Acquired lock for [{0}]", PauseTransitService.class.getName()); 087 088 updateBundle(); 089 updateCoord(); 090 if (null != callables) { 091 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 092 if (ret == false) { 093 XLog.getLog(getClass()).warn( 094 "Unable to queue the callables commands for PauseTransitService. " 095 + "Most possibly command queue is full. Queue size is :" 096 + Services.get().get(CallableQueueService.class).queueSize()); 097 } 098 callables = null; 099 } 100 101 } 102 } 103 catch (Exception ex) { 104 LOG.warn("Exception happened when pausing/unpausing/starting bundle/coord jobs", ex); 105 } 106 finally { 107 // release lock; 108 if (lock != null) { 109 lock.release(); 110 LOG.info("Released lock for [{0}]", PauseTransitService.class.getName()); 111 } 112 } 113 } 114 115 private void updateBundle() { 116 Date d = new Date(); // records the start time of this service run; 117 List<BundleJobBean> jobList = null; 118 // pause bundles as needed; 119 try { 120 jobList = jpaService.execute(new BundleJobsGetUnpausedJPAExecutor(-1)); 121 if (jobList != null) { 122 for (BundleJobBean bundleJob : jobList) { 123 if ((bundleJob.getPauseTime() != null) && !bundleJob.getPauseTime().after(d)) { 124 queueCallable(new BundlePauseXCommand(bundleJob)); 125 LOG.debug("Queuing BundlePauseXCommand for bundle job = " + bundleJob.getId()); 126 } 127 } 128 } 129 } 130 catch (JPAExecutorException ex) { 131 LOG.warn("JPAExecutorException happened when pausing/unpausing/starting Bundle jobs", ex); 132 } 133 // unpause bundles as needed; 134 try { 135 jobList = jpaService.execute(new BundleJobsGetPausedJPAExecutor(-1)); 136 if (jobList != null) { 137 for (BundleJobBean bundleJob : jobList) { 138 if ((bundleJob.getPauseTime() == null || bundleJob.getPauseTime().after(d))) { 139 queueCallable(new BundleUnpauseXCommand(bundleJob)); 140 LOG.debug("Queuing BundleUnpauseXCommand for bundle job = " + bundleJob.getId()); 141 } 142 } 143 } 144 } 145 catch (JPAExecutorException ex) { 146 LOG.warn("JPAExecutorException happened when pausing/unpausing/starting Bundle jobs", ex); 147 } 148 // start bundles as needed; 149 try { 150 jobList = jpaService.execute(new BundleJobsGetNeedStartJPAExecutor(d)); 151 if (jobList != null) { 152 for (BundleJobBean bundleJob : jobList) { 153 queueCallable(new BundleStartXCommand(bundleJob.getId())); 154 LOG.debug("Queuing BundleStartXCommand for bundle job = " + bundleJob.getId()); 155 } 156 } 157 } 158 catch (JPAExecutorException ex) { 159 LOG.warn("JPAExecutorException happened when pausing/unpausing/starting Bundle jobs", ex); 160 } 161 } 162 163 private void updateCoord() { 164 Date d = new Date(); // records the start time of this service run; 165 List<CoordinatorJobBean> jobList = null; 166 boolean backwardSupportForCoordStatus = ConfigUtils.isBackwardSupportForCoordStatus(); 167 168 // pause coordinators as needed; 169 try { 170 jobList = jpaService.execute(new CoordJobsGetUnpausedJPAExecutor(-1)); 171 if (jobList != null) { 172 for (CoordinatorJobBean coordJob : jobList) { 173 // if namespace 0.1 is used and backward support is true, then ignore this coord job 174 if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null 175 && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) { 176 continue; 177 } 178 if ((coordJob.getPauseTime() != null) && !coordJob.getPauseTime().after(d)) { 179 queueCallable(new CoordPauseXCommand(coordJob)); 180 LOG.debug("Queuing CoordPauseXCommand for coordinator job = " + coordJob.getId()); 181 } 182 } 183 } 184 } 185 catch (JPAExecutorException ex) { 186 LOG.warn("JPAExecutorException happened when pausing/unpausing Coordinator jobs", ex); 187 } 188 // unpause coordinators as needed; 189 try { 190 jobList = jpaService.execute(new CoordJobsGetPausedJPAExecutor(-1)); 191 if (jobList != null) { 192 for (CoordinatorJobBean coordJob : jobList) { 193 // if namespace 0.1 is used and backward support is true, then ignore this coord job 194 if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null 195 && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) { 196 continue; 197 } 198 if ((coordJob.getPauseTime() == null || coordJob.getPauseTime().after(d))) { 199 queueCallable(new CoordUnpauseXCommand(coordJob)); 200 LOG.debug("Queuing CoordUnpauseXCommand for coordinator job = " + coordJob.getId()); 201 } 202 } 203 } 204 } 205 catch (JPAExecutorException ex) { 206 LOG.warn("JPAExecutorException happened when pausing/unpausing Coordinator jobs", ex); 207 } 208 } 209 210 private void queueCallable(XCallable<Void> callable) { 211 if (callables == null) { 212 callables = new ArrayList<XCallable<Void>>(); 213 } 214 callables.add(callable); 215 if (callables.size() == ConfigurationService.getInt(CONF_CALLABLE_BATCH_SIZE)) { 216 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 217 if (ret == false) { 218 XLog.getLog(getClass()).warn( 219 "Unable to queue the callables commands for PauseTransitService. " 220 + "Most possibly command queue is full. Queue size is :" 221 + Services.get().get(CallableQueueService.class).queueSize()); 222 } 223 callables = new ArrayList<XCallable<Void>>(); 224 } 225 } 226 } 227 228 /** 229 * Initializes the {@link PauseTransitService}. 230 * 231 * @param services services instance. 232 */ 233 @Override 234 public void init(Services services) { 235 Runnable bundlePauseStartRunnable = new PauseTransitRunnable(); 236 services.get(SchedulerService.class).schedule(bundlePauseStartRunnable, 10, 237 ConfigurationService.getInt(services.getConf(), CONF_BUNDLE_PAUSE_START_INTERVAL), 238 SchedulerService.Unit.SEC); 239 } 240 241 /** 242 * Destroy the StateTransit Jobs Service. 243 */ 244 @Override 245 public void destroy() { 246 } 247 248 /** 249 * Return the public interface for the purge jobs service. 250 * 251 * @return {@link PauseTransitService}. 252 */ 253 @Override 254 public Class<? extends Service> getInterface() { 255 return PauseTransitService.class; 256 } 257}