001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import java.util.ArrayList; 021import java.util.Date; 022import java.util.List; 023 024import org.apache.hadoop.conf.Configuration; 025import org.apache.oozie.CoordinatorJobBean; 026import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand; 027import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 028import org.apache.oozie.executor.jpa.JPAExecutorException; 029import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 030import org.apache.oozie.lock.LockToken; 031import org.apache.oozie.util.XCallable; 032import org.apache.oozie.util.XLog; 033import org.apache.oozie.util.DateUtils; 034 035/** 036 * The coordinator Materialization Lookup trigger service schedule lookup trigger command for every interval (default is 037 * 5 minutes ). This interval could be configured through oozie configuration defined is either oozie-default.xml or 038 * oozie-site.xml using the property name oozie.service.CoordMaterializeTriggerService.lookup.interval 039 */ 040public class CoordMaterializeTriggerService implements Service { 041 public static final String CONF_PREFIX = Service.CONF_PREFIX + "CoordMaterializeTriggerService."; 042 /** 043 * Time interval, in seconds, at which the Job materialization service will be scheduled to run. 044 */ 045 public static final String CONF_LOOKUP_INTERVAL = CONF_PREFIX + "lookup.interval"; 046 047 public static final String CONF_SCHEDULING_INTERVAL = CONF_PREFIX + "scheduling.interval"; 048 /** 049 * This configuration defined the duration for which job should be materialized in future 050 */ 051 public static final String CONF_MATERIALIZATION_WINDOW = CONF_PREFIX + "materialization.window"; 052 /** 053 * The number of callables to be queued in a batch. 054 */ 055 public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size"; 056 /** 057 * The number of coordinator jobs to be picked for materialization at a given time. 058 */ 059 public static final String CONF_MATERIALIZATION_SYSTEM_LIMIT = CONF_PREFIX + "materialization.system.limit"; 060 061 private static final String INSTRUMENTATION_GROUP = "coord_job_mat"; 062 private static final String INSTR_MAT_JOBS_COUNTER = "jobs"; 063 public static final int CONF_LOOKUP_INTERVAL_DEFAULT = 300; 064 private static final int CONF_MATERIALIZATION_WINDOW_DEFAULT = 3600; 065 private static final int CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT = 50; 066 067 /** 068 * This runnable class will run in every "interval" to queue CoordMaterializeTransitionXCommand. 069 */ 070 static class CoordMaterializeTriggerRunnable implements Runnable { 071 private int materializationWindow; 072 private int lookupInterval; 073 private long delay = 0; 074 private List<XCallable<Void>> callables; 075 private List<XCallable<Void>> delayedCallables; 076 private XLog LOG = XLog.getLog(getClass()); 077 078 079 public CoordMaterializeTriggerRunnable(int materializationWindow, int lookupInterval) { 080 this.materializationWindow = materializationWindow; 081 this.lookupInterval = lookupInterval; 082 } 083 084 @Override 085 public void run() { 086 LockToken lock = null; 087 088 // first check if there is some other running instance from the same service; 089 try { 090 lock = Services.get().get(MemoryLocksService.class) 091 .getWriteLock(CoordMaterializeTriggerService.class.getName(), lockTimeout); 092 093 if (lock != null) { 094 runCoordJobMatLookup(); 095 if (null != callables) { 096 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 097 if (ret == false) { 098 XLog.getLog(getClass()).warn( 099 "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. " 100 + "Most possibly command queue is full. Queue size is :" 101 + Services.get().get(CallableQueueService.class).queueSize()); 102 } 103 callables = null; 104 } 105 if (null != delayedCallables) { 106 boolean ret = Services.get().get(CallableQueueService.class) 107 .queueSerial(delayedCallables, this.delay); 108 if (ret == false) { 109 XLog.getLog(getClass()).warn( 110 "Unable to queue the delayedCallables commands for CoordMaterializeTriggerRunnable. " 111 + "Most possibly Callable queue is full. Queue size is :" 112 + Services.get().get(CallableQueueService.class).queueSize()); 113 } 114 delayedCallables = null; 115 this.delay = 0; 116 } 117 } 118 119 else { 120 LOG.debug("Can't obtain lock, skipping"); 121 } 122 } 123 catch (Exception e) { 124 LOG.error("Exception", e); 125 } 126 finally { 127 if (lock != null) { 128 lock.release(); 129 LOG.info("Released lock for [{0}]", CoordMaterializeTriggerService.class.getName()); 130 } 131 132 } 133 134 } 135 136 /** 137 * Recover coordinator jobs that should be materialized 138 */ 139 private void runCoordJobMatLookup() { 140 XLog.Info.get().clear(); 141 XLog LOG = XLog.getLog(getClass()); 142 try { 143 // get current date 144 Date currDate = new Date(new Date().getTime() + lookupInterval * 1000); 145 // get list of all jobs that have actions that should be materialized. 146 int materializationLimit = Services.get().getConf() 147 .getInt(CONF_MATERIALIZATION_SYSTEM_LIMIT, CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT); 148 materializeCoordJobs(currDate, materializationLimit, LOG); 149 } 150 151 catch (Exception ex) { 152 LOG.error("Exception while attempting to materialize coordinator jobs, {0}", ex.getMessage(), ex); 153 } 154 } 155 156 private void materializeCoordJobs(Date currDate, int limit, XLog LOG) throws JPAExecutorException { 157 try { 158 List<CoordinatorJobBean> materializeJobs = CoordJobQueryExecutor.getInstance().getList( 159 CoordJobQuery.GET_COORD_JOBS_OLDER_FOR_MATERILZATION, currDate, limit); 160 LOG.info("CoordMaterializeTriggerService - Curr Date= " + DateUtils.formatDateOozieTZ(currDate) 161 + ", Num jobs to materialize = " + materializeJobs.size()); 162 for (CoordinatorJobBean coordJob : materializeJobs) { 163 Services.get().get(InstrumentationService.class).get() 164 .incr(INSTRUMENTATION_GROUP, INSTR_MAT_JOBS_COUNTER, 1); 165 queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow)); 166 coordJob.setLastModifiedTime(new Date()); 167 // TODO In place of calling single query, we should call bulk update. 168 CoordJobQueryExecutor.getInstance().executeUpdate( 169 CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, coordJob); 170 171 } 172 } 173 catch (JPAExecutorException jex) { 174 LOG.warn("JPAExecutorException while attempting to materialize coordinator jobs", jex); 175 } 176 } 177 178 /** 179 * Adds callables to a list. If the number of callables in the list reaches {@link 180 * CoordMaterializeTriggerService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list 181 * is reset. 182 * 183 * @param callable the callable to queue. 184 */ 185 private void queueCallable(XCallable<Void> callable) { 186 if (callables == null) { 187 callables = new ArrayList<XCallable<Void>>(); 188 } 189 callables.add(callable); 190 if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) { 191 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 192 if (ret == false) { 193 XLog.getLog(getClass()).warn( 194 "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. " 195 + "Most possibly command queue is full. Queue size is :" 196 + Services.get().get(CallableQueueService.class).queueSize()); 197 } 198 callables = new ArrayList<XCallable<Void>>(); 199 } 200 } 201 202 } 203 204 @Override 205 public void init(Services services) throws ServiceException { 206 Configuration conf = services.getConf(); 207 // default is 3600sec (1hr) 208 int materializationWindow = conf.getInt(CONF_MATERIALIZATION_WINDOW, CONF_MATERIALIZATION_WINDOW_DEFAULT); 209 // default is 300sec (5min) 210 int lookupInterval = Services.get().getConf().getInt(CONF_LOOKUP_INTERVAL, CONF_LOOKUP_INTERVAL_DEFAULT); 211 // default is 300sec (5min) 212 int schedulingInterval = Services.get().getConf().getInt(CONF_SCHEDULING_INTERVAL, lookupInterval); 213 214 Runnable lookupTriggerJobsRunnable = new CoordMaterializeTriggerRunnable(materializationWindow, lookupInterval); 215 216 services.get(SchedulerService.class).schedule(lookupTriggerJobsRunnable, 10, schedulingInterval, 217 SchedulerService.Unit.SEC); 218 } 219 220 @Override 221 public void destroy() { 222 } 223 224 @Override 225 public Class<? extends Service> getInterface() { 226 return CoordMaterializeTriggerService.class; 227 } 228 229}