001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import java.util.Date; 021import java.util.List; 022 023import org.apache.hadoop.conf.Configuration; 024import org.apache.oozie.BundleJobBean; 025import org.apache.oozie.CoordinatorJobBean; 026import org.apache.oozie.command.bundle.BundlePauseXCommand; 027import org.apache.oozie.command.bundle.BundleStartXCommand; 028import org.apache.oozie.command.bundle.BundleUnpauseXCommand; 029import org.apache.oozie.command.coord.CoordPauseXCommand; 030import org.apache.oozie.command.coord.CoordUnpauseXCommand; 031import org.apache.oozie.executor.jpa.BundleJobsGetNeedStartJPAExecutor; 032import org.apache.oozie.executor.jpa.BundleJobsGetPausedJPAExecutor; 033import org.apache.oozie.executor.jpa.BundleJobsGetUnpausedJPAExecutor; 034import org.apache.oozie.executor.jpa.CoordJobsGetPausedJPAExecutor; 035import org.apache.oozie.executor.jpa.CoordJobsGetUnpausedJPAExecutor; 036import org.apache.oozie.service.SchedulerService; 037import org.apache.oozie.service.Service; 038import org.apache.oozie.service.Services; 039import org.apache.oozie.lock.LockToken; 040import org.apache.oozie.util.XLog; 041 042import com.google.common.annotations.VisibleForTesting; 043 044/** 045 * PauseTransitService is the runnable which is scheduled to run at the configured interval, it checks all bundles 046 * to see if they should be paused, un-paused or started. 047 */ 048public class PauseTransitService implements Service { 049 public static final String CONF_PREFIX = Service.CONF_PREFIX + "PauseTransitService."; 050 public static final String CONF_BUNDLE_PAUSE_START_INTERVAL = CONF_PREFIX + "PauseTransit.interval"; 051 private final static XLog LOG = XLog.getLog(PauseTransitService.class); 052 053 /** 054 * PauseTransitRunnable is the runnable which is scheduled to run at the configured interval, it checks all 055 * bundles to see if they should be paused, un-paused or started. 056 */ 057 @VisibleForTesting 058 public static class PauseTransitRunnable implements Runnable { 059 private JPAService jpaService = null; 060 private LockToken lock; 061 062 public PauseTransitRunnable() { 063 jpaService = Services.get().get(JPAService.class); 064 if (jpaService == null) { 065 LOG.error("Missing JPAService"); 066 } 067 } 068 069 public void run() { 070 try { 071 // first check if there is some other running instance from the same service; 072 lock = Services.get().get(MemoryLocksService.class).getWriteLock( 073 PauseTransitService.class.getName(), lockTimeout); 074 if (lock == null) { 075 LOG.info("This PauseTransitService instance will" 076 + "not run since there is already an instance running"); 077 } 078 else { 079 LOG.info("Acquired lock for [{0}]", PauseTransitService.class.getName()); 080 081 updateBundle(); 082 updateCoord(); 083 } 084 } 085 catch (Exception ex) { 086 LOG.warn("Exception happened when pausing/unpausing/starting bundle/coord jobs", ex); 087 } 088 finally { 089 // release lock; 090 if (lock != null) { 091 lock.release(); 092 LOG.info("Released lock for [{0}]", PauseTransitService.class.getName()); 093 } 094 } 095 } 096 097 private void updateBundle() { 098 Date d = new Date(); // records the start time of this service run; 099 List<BundleJobBean> jobList = null; 100 // pause bundles as needed; 101 try { 102 jobList = jpaService.execute(new BundleJobsGetUnpausedJPAExecutor(-1)); 103 if (jobList != null) { 104 for (BundleJobBean bundleJob : jobList) { 105 if ((bundleJob.getPauseTime() != null) && !bundleJob.getPauseTime().after(d)) { 106 new BundlePauseXCommand(bundleJob).call(); 107 LOG.debug("Calling BundlePauseXCommand for bundle job = " + bundleJob.getId()); 108 } 109 } 110 } 111 } 112 catch (Exception ex) { 113 LOG.warn("Exception happened when pausing/unpausing/starting Bundle jobs", ex); 114 } 115 // unpause bundles as needed; 116 try { 117 jobList = jpaService.execute(new BundleJobsGetPausedJPAExecutor(-1)); 118 if (jobList != null) { 119 for (BundleJobBean bundleJob : jobList) { 120 if ((bundleJob.getPauseTime() == null || bundleJob.getPauseTime().after(d))) { 121 new BundleUnpauseXCommand(bundleJob).call(); 122 LOG.debug("Calling BundleUnpauseXCommand for bundle job = " + bundleJob.getId()); 123 } 124 } 125 } 126 } 127 catch (Exception ex) { 128 LOG.warn("Exception happened when pausing/unpausing/starting Bundle jobs", ex); 129 } 130 // start bundles as needed; 131 try { 132 jobList = jpaService.execute(new BundleJobsGetNeedStartJPAExecutor(d)); 133 if (jobList != null) { 134 for (BundleJobBean bundleJob : jobList) { 135 bundleJob.setKickoffTime(d); 136 new BundleStartXCommand(bundleJob.getId()).call(); 137 LOG.debug("Calling BundleStartXCommand for bundle job = " + bundleJob.getId()); 138 } 139 } 140 } 141 catch (Exception ex) { 142 LOG.warn("Exception happened when pausing/unpausing/starting Bundle jobs", ex); 143 } 144 } 145 146 private void updateCoord() { 147 Date d = new Date(); // records the start time of this service run; 148 List<CoordinatorJobBean> jobList = null; 149 Configuration conf = Services.get().getConf(); 150 boolean backwardSupportForCoordStatus = conf.getBoolean(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false); 151 152 // pause coordinators as needed; 153 try { 154 jobList = jpaService.execute(new CoordJobsGetUnpausedJPAExecutor(-1)); 155 if (jobList != null) { 156 for (CoordinatorJobBean coordJob : jobList) { 157 // if namespace 0.1 is used and backward support is true, then ignore this coord job 158 if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null 159 && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) { 160 continue; 161 } 162 if ((coordJob.getPauseTime() != null) && !coordJob.getPauseTime().after(d)) { 163 new CoordPauseXCommand(coordJob).call(); 164 LOG.debug("Calling CoordPauseXCommand for coordinator job = " + coordJob.getId()); 165 } 166 } 167 } 168 } 169 catch (Exception ex) { 170 LOG.warn("Exception happened when pausing/unpausing Coordinator jobs", ex); 171 } 172 // unpause coordinators as needed; 173 try { 174 jobList = jpaService.execute(new CoordJobsGetPausedJPAExecutor(-1)); 175 if (jobList != null) { 176 for (CoordinatorJobBean coordJob : jobList) { 177 // if namespace 0.1 is used and backward support is true, then ignore this coord job 178 if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null 179 && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) { 180 continue; 181 } 182 if ((coordJob.getPauseTime() == null || coordJob.getPauseTime().after(d))) { 183 new CoordUnpauseXCommand(coordJob).call(); 184 LOG.debug("Calling CoordUnpauseXCommand for coordinator job = " + coordJob.getId()); 185 } 186 } 187 } 188 } 189 catch (Exception ex) { 190 LOG.warn("Exception happened when pausing/unpausing Coordinator jobs", ex); 191 } 192 } 193 } 194 195 /** 196 * Initializes the {@link PauseTransitService}. 197 * 198 * @param services services instance. 199 */ 200 @Override 201 public void init(Services services) { 202 Configuration conf = services.getConf(); 203 Runnable bundlePauseStartRunnable = new PauseTransitRunnable(); 204 services.get(SchedulerService.class).schedule(bundlePauseStartRunnable, 10, 205 conf.getInt(CONF_BUNDLE_PAUSE_START_INTERVAL, 60), SchedulerService.Unit.SEC); 206 } 207 208 /** 209 * Destroy the StateTransit Jobs Service. 210 */ 211 @Override 212 public void destroy() { 213 } 214 215 /** 216 * Return the public interface for the purge jobs service. 217 * 218 * @return {@link PauseTransitService}. 219 */ 220 @Override 221 public Class<? extends Service> getInterface() { 222 return PauseTransitService.class; 223 } 224}