001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.util.ArrayList; 022import java.util.Date; 023import java.util.List; 024 025import org.apache.oozie.CoordinatorJobBean; 026import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand; 027import org.apache.oozie.executor.jpa.BatchQueryExecutor; 028import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 029import org.apache.oozie.executor.jpa.JPAExecutorException; 030import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 031import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 032import org.apache.oozie.lock.LockToken; 033import org.apache.oozie.util.XCallable; 034import org.apache.oozie.util.XLog; 035import org.apache.oozie.util.DateUtils; 036 037/** 038 * The coordinator Materialization Lookup trigger service schedule lookup trigger command for every interval (default is 039 * 5 minutes ). This interval could be configured through oozie configuration defined is either oozie-default.xml or 040 * oozie-site.xml using the property name oozie.service.CoordMaterializeTriggerService.lookup.interval 041 */ 042public class CoordMaterializeTriggerService implements Service { 043 public static final String CONF_PREFIX = Service.CONF_PREFIX + "CoordMaterializeTriggerService."; 044 /** 045 * Time interval, in seconds, at which the Job materialization service will be scheduled to run. 046 */ 047 public static final String CONF_LOOKUP_INTERVAL = CONF_PREFIX + "lookup.interval"; 048 049 public static final String CONF_SCHEDULING_INTERVAL = CONF_PREFIX + "scheduling.interval"; 050 /** 051 * This configuration defined the duration for which job should be materialized in future 052 */ 053 public static final String CONF_MATERIALIZATION_WINDOW = CONF_PREFIX + "materialization.window"; 054 /** 055 * The number of callables to be queued in a batch. 056 */ 057 public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size"; 058 /** 059 * The number of coordinator jobs to be picked for materialization at a given time. 060 */ 061 public static final String CONF_MATERIALIZATION_SYSTEM_LIMIT = CONF_PREFIX + "materialization.system.limit"; 062 063 private static final String INSTRUMENTATION_GROUP = "coord_job_mat"; 064 private static final String INSTR_MAT_JOBS_COUNTER = "jobs"; 065 066 /** 067 * This runnable class will run in every "interval" to queue CoordMaterializeTransitionXCommand. 068 */ 069 static class CoordMaterializeTriggerRunnable implements Runnable { 070 private int materializationWindow; 071 private int lookupInterval; 072 private long delay = 0; 073 private List<XCallable<Void>> callables; 074 private List<XCallable<Void>> delayedCallables; 075 private XLog LOG = XLog.getLog(getClass()); 076 077 078 public CoordMaterializeTriggerRunnable(int materializationWindow, int lookupInterval) { 079 this.materializationWindow = materializationWindow; 080 this.lookupInterval = lookupInterval; 081 } 082 083 @Override 084 public void run() { 085 LockToken lock = null; 086 087 // first check if there is some other running instance from the same service; 088 try { 089 lock = Services.get().get(MemoryLocksService.class) 090 .getWriteLock(CoordMaterializeTriggerService.class.getName(), lockTimeout); 091 092 if (lock != null) { 093 runCoordJobMatLookup(); 094 if (null != callables) { 095 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 096 if (ret == false) { 097 XLog.getLog(getClass()).warn( 098 "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. " 099 + "Most possibly command queue is full. Queue size is :" 100 + Services.get().get(CallableQueueService.class).queueSize()); 101 } 102 callables = null; 103 } 104 if (null != delayedCallables) { 105 boolean ret = Services.get().get(CallableQueueService.class) 106 .queueSerial(delayedCallables, this.delay); 107 if (ret == false) { 108 XLog.getLog(getClass()).warn( 109 "Unable to queue the delayedCallables commands for CoordMaterializeTriggerRunnable. " 110 + "Most possibly Callable queue is full. Queue size is :" 111 + Services.get().get(CallableQueueService.class).queueSize()); 112 } 113 delayedCallables = null; 114 this.delay = 0; 115 } 116 } 117 118 else { 119 LOG.debug("Can't obtain lock, skipping"); 120 } 121 } 122 catch (Exception e) { 123 LOG.error("Exception", e); 124 } 125 finally { 126 if (lock != null) { 127 lock.release(); 128 LOG.info("Released lock for [{0}]", CoordMaterializeTriggerService.class.getName()); 129 } 130 131 } 132 133 } 134 135 /** 136 * Recover coordinator jobs that should be materialized 137 * @throws JPAExecutorException 138 */ 139 private void runCoordJobMatLookup() throws JPAExecutorException { 140 List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 141 XLog.Info.get().clear(); 142 XLog LOG = XLog.getLog(getClass()); 143 try { 144 // get current date 145 Date currDate = new Date(new Date().getTime() + lookupInterval * 1000); 146 // get list of all jobs that have actions that should be materialized. 147 int materializationLimit = ConfigurationService.getInt(CONF_MATERIALIZATION_SYSTEM_LIMIT); 148 materializeCoordJobs(currDate, materializationLimit, LOG, updateList); 149 } 150 151 catch (Exception ex) { 152 LOG.error("Exception while attempting to materialize coordinator jobs, {0}", ex.getMessage(), ex); 153 } 154 finally { 155 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 156 } 157 } 158 159 private void materializeCoordJobs(Date currDate, int limit, XLog LOG, List<UpdateEntry> updateList) 160 throws JPAExecutorException { 161 try { 162 List<CoordinatorJobBean> materializeJobs = CoordJobQueryExecutor.getInstance().getList( 163 CoordJobQuery.GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION, currDate, limit); 164 LOG.info("CoordMaterializeTriggerService - Curr Date= " + DateUtils.formatDateOozieTZ(currDate) 165 + ", Num jobs to materialize = " + materializeJobs.size()); 166 for (CoordinatorJobBean coordJob : materializeJobs) { 167 Services.get().get(InstrumentationService.class).get() 168 .incr(INSTRUMENTATION_GROUP, INSTR_MAT_JOBS_COUNTER, 1); 169 queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow)); 170 coordJob.setLastModifiedTime(new Date()); 171 updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, 172 coordJob)); 173 } 174 } 175 catch (JPAExecutorException jex) { 176 LOG.warn("JPAExecutorException while attempting to materialize coordinator jobs", jex); 177 } 178 } 179 180 /** 181 * Adds callables to a list. If the number of callables in the list reaches {@link 182 * CoordMaterializeTriggerService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list 183 * is reset. 184 * 185 * @param callable the callable to queue. 186 */ 187 private void queueCallable(XCallable<Void> callable) { 188 if (callables == null) { 189 callables = new ArrayList<XCallable<Void>>(); 190 } 191 callables.add(callable); 192 if (callables.size() == ConfigurationService.getInt(CONF_CALLABLE_BATCH_SIZE)) { 193 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 194 if (ret == false) { 195 XLog.getLog(getClass()).warn( 196 "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. " 197 + "Most possibly command queue is full. Queue size is :" 198 + Services.get().get(CallableQueueService.class).queueSize()); 199 } 200 callables = new ArrayList<XCallable<Void>>(); 201 } 202 } 203 204 } 205 206 @Override 207 public void init(Services services) throws ServiceException { 208 // default is 3600sec (1hr) 209 int materializationWindow = ConfigurationService.getInt(services.getConf(), CONF_MATERIALIZATION_WINDOW); 210 // default is 300sec (5min) 211 int lookupInterval = ConfigurationService.getInt(services.getConf(), CONF_LOOKUP_INTERVAL); 212 // default is 300sec (5min) 213 int schedulingInterval = Services.get().getConf().getInt(CONF_SCHEDULING_INTERVAL, lookupInterval); 214 215 Runnable lookupTriggerJobsRunnable = new CoordMaterializeTriggerRunnable(materializationWindow, lookupInterval); 216 217 services.get(SchedulerService.class).schedule(lookupTriggerJobsRunnable, 10, schedulingInterval, 218 SchedulerService.Unit.SEC); 219 } 220 221 @Override 222 public void destroy() { 223 } 224 225 @Override 226 public Class<? extends Service> getInterface() { 227 return CoordMaterializeTriggerService.class; 228 } 229 230}