001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.util.ArrayList; 021 import java.util.Date; 022 import java.util.List; 023 024 import org.apache.hadoop.conf.Configuration; 025 import org.apache.oozie.CoordinatorJobBean; 026 import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand; 027 import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor; 028 import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor; 029 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor; 030 import org.apache.oozie.executor.jpa.CoordJobsToBeMaterializedJPAExecutor; 031 import org.apache.oozie.executor.jpa.JPAExecutorException; 032 import org.apache.oozie.util.XCallable; 033 import org.apache.oozie.util.XLog; 034 035 /** 036 * The coordinator Materialization Lookup trigger service schedule lookup trigger command for every interval (default is 037 * 5 minutes ). This interval could be configured through oozie configuration defined is either oozie-default.xml or 038 * oozie-site.xml using the property name oozie.service.CoordMaterializeTriggerService.lookup.interval 039 */ 040 public class CoordMaterializeTriggerService implements Service { 041 public static final String CONF_PREFIX = Service.CONF_PREFIX + "CoordMaterializeTriggerService."; 042 /** 043 * Time interval, in seconds, at which the Job materialization service will be scheduled to run. 044 */ 045 public static final String CONF_LOOKUP_INTERVAL = CONF_PREFIX + "lookup.interval"; 046 /** 047 * This configuration defined the duration for which job should be materialized in future 048 */ 049 public static final String CONF_MATERIALIZATION_WINDOW = CONF_PREFIX + "materialization.window"; 050 /** 051 * The number of callables to be queued in a batch. 052 */ 053 public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size"; 054 /** 055 * The number of coordinator jobs to be picked for materialization at a given time. 056 */ 057 public static final String CONF_MATERIALIZATION_SYSTEM_LIMIT = CONF_PREFIX + "materialization.system.limit"; 058 059 private static final String INSTRUMENTATION_GROUP = "coord_job_mat"; 060 private static final String INSTR_MAT_JOBS_COUNTER = "jobs"; 061 private static final int CONF_LOOKUP_INTERVAL_DEFAULT = 300; 062 private static final int CONF_MATERIALIZATION_WINDOW_DEFAULT = 3600; 063 private static final int CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT = 50; 064 065 /** 066 * This runnable class will run in every "interval" to queue CoordMaterializeTransitionXCommand. 067 */ 068 static class CoordMaterializeTriggerRunnable implements Runnable { 069 private int materializationWindow; 070 private int lookupInterval; 071 private long delay = 0; 072 private List<XCallable<Void>> callables; 073 private List<XCallable<Void>> delayedCallables; 074 075 public CoordMaterializeTriggerRunnable(int materializationWindow, int lookupInterval) { 076 this.materializationWindow = materializationWindow; 077 this.lookupInterval = lookupInterval; 078 } 079 080 @Override 081 public void run() { 082 runCoordJobMatLookup(); 083 084 if (null != callables) { 085 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 086 if (ret == false) { 087 XLog.getLog(getClass()).warn( 088 "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. " 089 + "Most possibly command queue is full. Queue size is :" 090 + Services.get().get(CallableQueueService.class).queueSize()); 091 } 092 callables = null; 093 } 094 if (null != delayedCallables) { 095 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay); 096 if (ret == false) { 097 XLog.getLog(getClass()).warn( 098 "Unable to queue the delayedCallables commands for CoordMaterializeTriggerRunnable. " 099 + "Most possibly Callable queue is full. Queue size is :" 100 + Services.get().get(CallableQueueService.class).queueSize()); 101 } 102 delayedCallables = null; 103 this.delay = 0; 104 } 105 } 106 107 /** 108 * Recover coordinator jobs that should be materialized 109 */ 110 private void runCoordJobMatLookup() { 111 XLog.Info.get().clear(); 112 XLog LOG = XLog.getLog(getClass()); 113 JPAService jpaService = Services.get().get(JPAService.class); 114 try { 115 116 // get current date 117 Date currDate = new Date(new Date().getTime() + lookupInterval * 1000); 118 // get list of all jobs that have actions that should be materialized. 119 int materializationLimit = Services.get().getConf() 120 .getInt(CONF_MATERIALIZATION_SYSTEM_LIMIT, CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT); 121 CoordJobsToBeMaterializedJPAExecutor cmatcmd = new CoordJobsToBeMaterializedJPAExecutor(currDate, 122 materializationLimit); 123 List<CoordinatorJobBean> materializeJobs = jpaService.execute(cmatcmd); 124 LOG.info("CoordMaterializeTriggerService - Curr Date= " + currDate + ", Num jobs to materialize = " 125 + materializeJobs.size()); 126 for (CoordinatorJobBean coordJob : materializeJobs) { 127 Services.get().get(InstrumentationService.class).get() 128 .incr(INSTRUMENTATION_GROUP, INSTR_MAT_JOBS_COUNTER, 1); 129 int numWaitingActions = jpaService 130 .execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId())); 131 LOG.info("Job :" + coordJob.getId() + " numWaitingActions : " + numWaitingActions 132 + " MatThrottle : " + coordJob.getMatThrottling()); 133 // update lastModifiedTime so next time others might have higher chance to get pick up 134 coordJob.setLastModifiedTime(new Date()); 135 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 136 if (numWaitingActions >= coordJob.getMatThrottling()) { 137 LOG.info("info for JobID [" + coordJob.getId() + " already waiting " 138 + numWaitingActions + " actions. MatThrottle is : " + coordJob.getMatThrottling()); 139 continue; 140 } 141 queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow)); 142 143 } 144 145 } 146 catch (JPAExecutorException jex) { 147 LOG.warn("JPAExecutorException while attempting to materialize coordinator jobs", jex); 148 } 149 } 150 151 /** 152 * Adds callables to a list. If the number of callables in the list reaches {@link 153 * CoordMaterializeTriggerService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list 154 * is reset. 155 * 156 * @param callable the callable to queue. 157 */ 158 private void queueCallable(XCallable<Void> callable) { 159 if (callables == null) { 160 callables = new ArrayList<XCallable<Void>>(); 161 } 162 callables.add(callable); 163 if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) { 164 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 165 if (ret == false) { 166 XLog.getLog(getClass()).warn( 167 "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. " 168 + "Most possibly command queue is full. Queue size is :" 169 + Services.get().get(CallableQueueService.class).queueSize()); 170 } 171 callables = new ArrayList<XCallable<Void>>(); 172 } 173 } 174 175 } 176 177 @Override 178 public void init(Services services) throws ServiceException { 179 Configuration conf = services.getConf(); 180 // default is 3600sec (1hr) 181 int materializationWindow = conf.getInt(CONF_MATERIALIZATION_WINDOW, CONF_MATERIALIZATION_WINDOW_DEFAULT); 182 // default is 300sec (5min) 183 int lookupInterval = Services.get().getConf().getInt(CONF_LOOKUP_INTERVAL, CONF_LOOKUP_INTERVAL_DEFAULT); 184 185 Runnable lookupTriggerJobsRunnable = new CoordMaterializeTriggerRunnable(materializationWindow, lookupInterval); 186 187 services.get(SchedulerService.class).schedule(lookupTriggerJobsRunnable, 10, lookupInterval, 188 SchedulerService.Unit.SEC); 189 } 190 191 @Override 192 public void destroy() { 193 } 194 195 @Override 196 public Class<? extends Service> getInterface() { 197 return CoordMaterializeTriggerService.class; 198 } 199 200 }