001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.util.Date; 021 import java.util.List; 022 023 import org.apache.hadoop.conf.Configuration; 024 import org.apache.oozie.BundleJobBean; 025 import org.apache.oozie.CoordinatorJobBean; 026 import org.apache.oozie.command.bundle.BundlePauseXCommand; 027 import org.apache.oozie.command.bundle.BundleStartXCommand; 028 import org.apache.oozie.command.bundle.BundleUnpauseXCommand; 029 import org.apache.oozie.command.coord.CoordPauseXCommand; 030 import org.apache.oozie.command.coord.CoordUnpauseXCommand; 031 import org.apache.oozie.executor.jpa.BundleJobsGetNeedStartJPAExecutor; 032 import org.apache.oozie.executor.jpa.BundleJobsGetPausedJPAExecutor; 033 import org.apache.oozie.executor.jpa.BundleJobsGetUnpausedJPAExecutor; 034 import org.apache.oozie.executor.jpa.CoordJobsGetPausedJPAExecutor; 035 import org.apache.oozie.executor.jpa.CoordJobsGetUnpausedJPAExecutor; 036 import org.apache.oozie.service.SchedulerService; 037 import org.apache.oozie.service.Service; 038 import org.apache.oozie.service.Services; 039 import org.apache.oozie.util.MemoryLocks; 040 import org.apache.oozie.util.XLog; 041 042 /** 043 * PauseTransitService is the runnable which is scheduled to run at the configured interval, it checks all bundles 044 * to see if they should be paused, un-paused or started. 045 */ 046 public class PauseTransitService implements Service { 047 public static final String CONF_PREFIX = Service.CONF_PREFIX + "PauseTransitService."; 048 public static final String CONF_BUNDLE_PAUSE_START_INTERVAL = CONF_PREFIX + "PauseTransit.interval"; 049 private final static XLog LOG = XLog.getLog(PauseTransitService.class); 050 051 /** 052 * PauseTransitRunnable is the runnable which is scheduled to run at the configured interval, it checks all 053 * bundles to see if they should be paused, un-paused or started. 054 */ 055 static class PauseTransitRunnable implements Runnable { 056 private JPAService jpaService = null; 057 private MemoryLocks.LockToken lock; 058 059 public PauseTransitRunnable() { 060 jpaService = Services.get().get(JPAService.class); 061 if (jpaService == null) { 062 LOG.error("Missing JPAService"); 063 } 064 } 065 066 public void run() { 067 try { 068 // first check if there is some other running instance from the same service; 069 lock = Services.get().get(MemoryLocksService.class).getWriteLock( 070 PauseTransitService.class.getName(), lockTimeout); 071 if (lock == null) { 072 LOG.info("This PauseTransitService instance will" 073 + "not run since there is already an instance running"); 074 } 075 else { 076 LOG.info("Acquired lock for [{0}]", PauseTransitService.class.getName()); 077 078 updateBundle(); 079 updateCoord(); 080 } 081 } 082 catch (Exception ex) { 083 LOG.warn("Exception happened when pausing/unpausing/starting bundle/coord jobs", ex); 084 } 085 finally { 086 // release lock; 087 if (lock != null) { 088 lock.release(); 089 LOG.info("Released lock for [{0}]", PauseTransitService.class.getName()); 090 } 091 } 092 } 093 094 private void updateBundle() { 095 Date d = new Date(); // records the start time of this service run; 096 List<BundleJobBean> jobList = null; 097 // pause bundles as needed; 098 try { 099 jobList = jpaService.execute(new BundleJobsGetUnpausedJPAExecutor(-1)); 100 if (jobList != null) { 101 for (BundleJobBean bundleJob : jobList) { 102 if ((bundleJob.getPauseTime() != null) && !bundleJob.getPauseTime().after(d)) { 103 new BundlePauseXCommand(bundleJob).call(); 104 LOG.debug("Calling BundlePauseXCommand for bundle job = " + bundleJob.getId()); 105 } 106 } 107 } 108 } 109 catch (Exception ex) { 110 LOG.warn("Exception happened when pausing/unpausing/starting Bundle jobs", ex); 111 } 112 // unpause bundles as needed; 113 try { 114 jobList = jpaService.execute(new BundleJobsGetPausedJPAExecutor(-1)); 115 if (jobList != null) { 116 for (BundleJobBean bundleJob : jobList) { 117 if ((bundleJob.getPauseTime() == null || bundleJob.getPauseTime().after(d))) { 118 new BundleUnpauseXCommand(bundleJob).call(); 119 LOG.debug("Calling BundleUnpauseXCommand for bundle job = " + bundleJob.getId()); 120 } 121 } 122 } 123 } 124 catch (Exception ex) { 125 LOG.warn("Exception happened when pausing/unpausing/starting Bundle jobs", ex); 126 } 127 // start bundles as needed; 128 try { 129 jobList = jpaService.execute(new BundleJobsGetNeedStartJPAExecutor(d)); 130 if (jobList != null) { 131 for (BundleJobBean bundleJob : jobList) { 132 bundleJob.setKickoffTime(d); 133 new BundleStartXCommand(bundleJob.getId()).call(); 134 LOG.debug("Calling BundleStartXCommand for bundle job = " + bundleJob.getId()); 135 } 136 } 137 } 138 catch (Exception ex) { 139 LOG.warn("Exception happened when pausing/unpausing/starting Bundle jobs", ex); 140 } 141 } 142 143 private void updateCoord() { 144 Date d = new Date(); // records the start time of this service run; 145 List<CoordinatorJobBean> jobList = null; 146 Configuration conf = Services.get().getConf(); 147 boolean backwardSupportForCoordStatus = conf.getBoolean(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false); 148 149 // pause coordinators as needed; 150 try { 151 jobList = jpaService.execute(new CoordJobsGetUnpausedJPAExecutor(-1)); 152 if (jobList != null) { 153 for (CoordinatorJobBean coordJob : jobList) { 154 // if namespace 0.1 is used and backward support is true, then ignore this coord job 155 if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null 156 && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) { 157 continue; 158 } 159 if ((coordJob.getPauseTime() != null) && !coordJob.getPauseTime().after(d)) { 160 new CoordPauseXCommand(coordJob).call(); 161 LOG.debug("Calling CoordPauseXCommand for coordinator job = " + coordJob.getId()); 162 } 163 } 164 } 165 } 166 catch (Exception ex) { 167 LOG.warn("Exception happened when pausing/unpausing Coordinator jobs", ex); 168 } 169 // unpause coordinators as needed; 170 try { 171 jobList = jpaService.execute(new CoordJobsGetPausedJPAExecutor(-1)); 172 if (jobList != null) { 173 for (CoordinatorJobBean coordJob : jobList) { 174 // if namespace 0.1 is used and backward support is true, then ignore this coord job 175 if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null 176 && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) { 177 continue; 178 } 179 if ((coordJob.getPauseTime() == null || coordJob.getPauseTime().after(d))) { 180 new CoordUnpauseXCommand(coordJob).call(); 181 LOG.debug("Calling CoordUnpauseXCommand for coordinator job = " + coordJob.getId()); 182 } 183 } 184 } 185 } 186 catch (Exception ex) { 187 LOG.warn("Exception happened when pausing/unpausing Coordinator jobs", ex); 188 } 189 } 190 } 191 192 /** 193 * Initializes the {@link PauseTransitService}. 194 * 195 * @param services services instance. 196 */ 197 @Override 198 public void init(Services services) { 199 Configuration conf = services.getConf(); 200 Runnable bundlePauseStartRunnable = new PauseTransitRunnable(); 201 services.get(SchedulerService.class).schedule(bundlePauseStartRunnable, 10, 202 conf.getInt(CONF_BUNDLE_PAUSE_START_INTERVAL, 60), SchedulerService.Unit.SEC); 203 } 204 205 /** 206 * Destroy the StateTransit Jobs Service. 207 */ 208 @Override 209 public void destroy() { 210 } 211 212 /** 213 * Return the public interface for the purge jobs service. 214 * 215 * @return {@link PauseTransitService}. 216 */ 217 @Override 218 public Class<? extends Service> getInterface() { 219 return PauseTransitService.class; 220 } 221 }