001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.util.Date; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Set; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.oozie.BundleJobBean; 028import org.apache.oozie.CoordinatorJobBean; 029import org.apache.oozie.ErrorCode; 030import org.apache.oozie.command.CommandException; 031import org.apache.oozie.command.bundle.BundleStatusTransitXCommand; 032import org.apache.oozie.command.coord.CoordStatusTransitXCommand; 033import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; 034import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; 035import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor; 036import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 037import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 038import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor; 039import org.apache.oozie.executor.jpa.JPAExecutorException; 040import org.apache.oozie.util.DateUtils; 041import org.apache.oozie.lock.LockToken; 042import org.apache.oozie.util.XLog; 043 044/** 045 * StateTransitService is scheduled to run at the configured interval. 046 * <p> 047 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 (job 048 * done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to 049 * SUCCEEDED. 050 */ 051public class StatusTransitService implements Service { 052 private static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService."; 053 private static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval"; 054 public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX 055 + "backward.support.for.coord.status"; 056 public static final String CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR = CONF_PREFIX 057 + "backward.support.for.states.without.error"; 058 public static int limit = -1; 059 public static Date lastInstanceStartTime = null; 060 public final static XLog LOG = XLog.getLog(StatusTransitRunnable.class); 061 062 /** 063 * StateTransitRunnable is the runnable which is scheduled to run at the configured interval. 064 * <p> 065 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 066 * (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to 067 * SUCCEEDED. 068 */ 069 public static class StatusTransitRunnable implements Runnable { 070 private JPAService jpaService = null; 071 private LockToken lock; 072 073 private Set<String> coordFailedIds = new HashSet<String>(); 074 private Set<String> bundleFailedIds = new HashSet<String>(); 075 076 public StatusTransitRunnable() { 077 jpaService = Services.get().get(JPAService.class); 078 if (jpaService == null) { 079 LOG.error("Missing JPAService"); 080 } 081 } 082 083 @Override 084 public void run() { 085 try { 086 final Date curDate = new Date(); // records the start time of this service run; 087 088 // first check if there is some other instance running; 089 lock = Services.get().get(MemoryLocksService.class) 090 .getWriteLock(StatusTransitService.class.getName(), lockTimeout); 091 if (lock == null) { 092 LOG.info("This StatusTransitService instance" 093 + " will not run since there is already an instance running"); 094 } 095 else { 096 LOG.info("Acquired lock for [{0}]", StatusTransitService.class.getName()); 097 coordTransit(); 098 bundleTransit(); 099 lastInstanceStartTime = curDate; 100 } 101 } 102 catch (Exception ex) { 103 LOG.warn("Exception happened during StatusTransitRunnable ", ex); 104 } 105 finally { 106 if (lock != null) { 107 lock.release(); 108 LOG.info("Released lock for [{0}]", StatusTransitService.class.getName()); 109 } 110 } 111 } 112 113 /** 114 * Aggregate bundle actions' status to bundle jobs 115 * 116 * @throws JPAExecutorException thrown if failed in db updates or retrievals 117 * @throws CommandException thrown if failed to run commands 118 */ 119 private void bundleTransit() throws JPAExecutorException, CommandException { 120 List<BundleJobBean> pendingJobCheckList; 121 final Set<String> bundleIds = new HashSet<String>(); 122 123 if (lastInstanceStartTime == null) { 124 LOG.info("Running bundle status service first instance"); 125 // This is the first instance, we need to check for all pending or running jobs; 126 // TODO currently limit is = -1. Need to do actual batching 127 pendingJobCheckList = jpaService.execute(new BundleJobsGetRunningOrPendingJPAExecutor(limit)); 128 } 129 else { 130 LOG.info("Running bundle status service from last instance time = " 131 + DateUtils.formatDateOozieTZ(lastInstanceStartTime)); 132 // this is not the first instance, we should only check jobs that have actions been 133 // updated >= start time of last service run; 134 pendingJobCheckList = BundleJobQueryExecutor.getInstance().getList( 135 BundleJobQuery.GET_BUNDLE_IDS_FOR_STATUS_TRANSIT, lastInstanceStartTime); 136 } 137 for (BundleJobBean job : pendingJobCheckList) { 138 bundleIds.add(job.getId()); 139 } 140 bundleIds.addAll(bundleFailedIds); 141 bundleFailedIds.clear(); 142 for (final String jobId : bundleIds) { 143 try { 144 new BundleStatusTransitXCommand(jobId).call(); 145 } 146 catch (CommandException e) { 147 // Unable to acquire lock. Will try next time 148 if (e.getErrorCode() == ErrorCode.E0606) { 149 bundleFailedIds.add(jobId); 150 LOG.info("Unable to acquire lock for " + jobId + ". Will try next time"); 151 } 152 else { 153 LOG.error("Error running BundleStatusTransitXCommand for job " + jobId, e); 154 } 155 156 } 157 } 158 } 159 160 /** 161 * Aggregate coordinator actions' status to coordinator jobs 162 * 163 * @throws JPAExecutorException thrown if failed in db updates or retrievals 164 * @throws CommandException thrown if failed to run commands 165 */ 166 private void coordTransit() throws JPAExecutorException, CommandException { 167 List<CoordinatorJobBean> pendingJobCheckList = null; 168 final Set<String> coordIds = new HashSet<String>(); 169 if (lastInstanceStartTime == null) { 170 LOG.info("Running coordinator status service first instance"); 171 // this is the first instance, we need to check for all pending jobs; 172 pendingJobCheckList = jpaService.execute(new CoordJobsGetPendingJPAExecutor(limit)); 173 } 174 else { 175 LOG.info("Running coordinator status service from last instance time = " 176 + DateUtils.formatDateOozieTZ(lastInstanceStartTime)); 177 // this is not the first instance, we should only check jobs. 178 // that have actions or jobs been updated >= start time of last service run; 179 pendingJobCheckList = CoordJobQueryExecutor.getInstance().getList( 180 CoordJobQuery.GET_COORD_IDS_FOR_STATUS_TRANSIT, lastInstanceStartTime); 181 182 pendingJobCheckList.addAll(CoordJobQueryExecutor.getInstance().getList( 183 CoordJobQuery.GET_COORD_JOBS_CHANGED, lastInstanceStartTime)); 184 } 185 for (final CoordinatorJobBean job : pendingJobCheckList) { 186 coordIds.add(job.getId()); 187 } 188 coordIds.addAll(coordFailedIds); 189 coordFailedIds.clear(); 190 for (final String coordId : coordIds) { 191 try { 192 new CoordStatusTransitXCommand(coordId).call(); 193 } 194 catch (CommandException e) { 195 // Unable to acquire lock. Will try next time 196 if (e.getErrorCode() == ErrorCode.E0606) { 197 coordFailedIds.add(coordId); 198 LOG.info("Unable to acquire lock for " + coordId + ". Will try next time"); 199 200 } 201 else { 202 LOG.error("Error running CoordStatusTransitXCommand for job " + coordId, e); 203 } 204 205 } 206 } 207 } 208 } 209 210 /** 211 * Initializes the {@link StatusTransitService}. 212 * 213 * @param services services instance. 214 */ 215 @Override 216 public void init(Services services) { 217 final Configuration conf = services.getConf(); 218 Runnable stateTransitRunnable = new StatusTransitRunnable(); 219 services.get(SchedulerService.class).schedule(stateTransitRunnable, 10, 220 ConfigurationService.getInt(conf, CONF_STATUSTRANSIT_INTERVAL), SchedulerService.Unit.SEC); 221 } 222 223 /** 224 * Destroy the StateTransit Jobs Service. 225 */ 226 @Override 227 public void destroy() { 228 } 229 230 /** 231 * Return the public interface for the purge jobs service. 232 * 233 * @return {@link StatusTransitService}. 234 */ 235 @Override 236 public Class<? extends Service> getInterface() { 237 return StatusTransitService.class; 238 } 239 240}