001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.util.ArrayList; 021 import java.util.Date; 022 import java.util.List; 023 024 import org.apache.hadoop.conf.Configuration; 025 import org.apache.oozie.CoordinatorJobBean; 026 import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand; 027 import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor; 028 import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor; 029 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor; 030 import org.apache.oozie.executor.jpa.CoordJobsToBeMaterializedJPAExecutor; 031 import org.apache.oozie.executor.jpa.JPAExecutorException; 032 import org.apache.oozie.util.XCallable; 033 import org.apache.oozie.util.XLog; 034 035 /** 036 * The coordinator Materialization Lookup trigger service schedule lookup trigger command for every interval (default is 037 * 5 minutes ). This interval could be configured through oozie configuration defined is either oozie-default.xml or 038 * oozie-site.xml using the property name oozie.service.CoordMaterializeTriggerService.lookup.interval 039 */ 040 public class CoordMaterializeTriggerService implements Service { 041 public static final String CONF_PREFIX = Service.CONF_PREFIX + "CoordMaterializeTriggerService."; 042 /** 043 * Time interval, in seconds, at which the Job materialization service will be scheduled to run. 044 */ 045 public static final String CONF_LOOKUP_INTERVAL = CONF_PREFIX + "lookup.interval"; 046 /** 047 * This configuration defined the duration for which job should be materialized in future 048 */ 049 public static final String CONF_MATERIALIZATION_WINDOW = CONF_PREFIX + "materialization.window"; 050 /** 051 * The number of callables to be queued in a batch. 052 */ 053 public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size"; 054 /** 055 * The number of coordinator jobs to be picked for materialization at a given time. 056 */ 057 public static final String CONF_MATERIALIZATION_SYSTEM_LIMIT = CONF_PREFIX + "materialization.system.limit"; 058 059 private static final String INSTRUMENTATION_GROUP = "coord_job_mat"; 060 private static final String INSTR_MAT_JOBS_COUNTER = "jobs"; 061 private static final int CONF_LOOKUP_INTERVAL_DEFAULT = 300; 062 private static final int CONF_MATERIALIZATION_WINDOW_DEFAULT = 3600; 063 private static final int CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT = 50; 064 065 /** 066 * This runnable class will run in every "interval" to queue CoordMaterializeTransitionXCommand. 067 */ 068 static class CoordMaterializeTriggerRunnable implements Runnable { 069 private int materializationWindow; 070 private long delay = 0; 071 private List<XCallable<Void>> callables; 072 private List<XCallable<Void>> delayedCallables; 073 074 public CoordMaterializeTriggerRunnable(int materializationWindow) { 075 this.materializationWindow = materializationWindow; 076 } 077 078 @Override 079 public void run() { 080 runCoordJobMatLookup(); 081 082 if (null != callables) { 083 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 084 if (ret == false) { 085 XLog.getLog(getClass()).warn( 086 "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. " 087 + "Most possibly command queue is full. Queue size is :" 088 + Services.get().get(CallableQueueService.class).queueSize()); 089 } 090 callables = null; 091 } 092 if (null != delayedCallables) { 093 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay); 094 if (ret == false) { 095 XLog.getLog(getClass()).warn( 096 "Unable to queue the delayedCallables commands for CoordMaterializeTriggerRunnable. " 097 + "Most possibly Callable queue is full. Queue size is :" 098 + Services.get().get(CallableQueueService.class).queueSize()); 099 } 100 delayedCallables = null; 101 this.delay = 0; 102 } 103 } 104 105 /** 106 * Recover coordinator jobs that should be materialized 107 */ 108 private void runCoordJobMatLookup() { 109 XLog.Info.get().clear(); 110 XLog LOG = XLog.getLog(getClass()); 111 JPAService jpaService = Services.get().get(JPAService.class); 112 try { 113 114 // get current date 115 Date currDate = new Date(new Date().getTime() + CONF_LOOKUP_INTERVAL_DEFAULT * 1000); 116 // get list of all jobs that have actions that should be materialized. 117 int materializationLimit = Services.get().getConf() 118 .getInt(CONF_MATERIALIZATION_SYSTEM_LIMIT, CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT); 119 CoordJobsToBeMaterializedJPAExecutor cmatcmd = new CoordJobsToBeMaterializedJPAExecutor(currDate, 120 materializationLimit); 121 List<CoordinatorJobBean> materializeJobs = jpaService.execute(cmatcmd); 122 LOG.debug("CoordMaterializeTriggerService - Curr Date= " + currDate + ", Num jobs to materialize = " 123 + materializeJobs.size()); 124 for (CoordinatorJobBean coordJob : materializeJobs) { 125 Services.get().get(InstrumentationService.class).get() 126 .incr(INSTRUMENTATION_GROUP, INSTR_MAT_JOBS_COUNTER, 1); 127 int numWaitingActions = jpaService 128 .execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId())); 129 LOG.debug("Job :" + coordJob.getId() + " numWaitingActions : " + numWaitingActions 130 + " MatThrottle : " + coordJob.getMatThrottling()); 131 // update lastModifiedTime so next time others might have higher chance to get pick up 132 coordJob.setLastModifiedTime(new Date()); 133 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 134 if (numWaitingActions >= coordJob.getMatThrottling()) { 135 LOG.debug("Materialization skipped for JobID [" + coordJob.getId() + " already waiting " 136 + numWaitingActions + " actions. MatThrottle is : " + coordJob.getMatThrottling()); 137 continue; 138 } 139 queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow)); 140 141 } 142 143 } 144 catch (JPAExecutorException jex) { 145 LOG.warn("JPAExecutorException while attempting to materialize coordinator jobs", jex); 146 } 147 } 148 149 /** 150 * Adds callables to a list. If the number of callables in the list reaches {@link 151 * CoordMaterializeTriggerService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list 152 * is reset. 153 * 154 * @param callable the callable to queue. 155 */ 156 private void queueCallable(XCallable<Void> callable) { 157 if (callables == null) { 158 callables = new ArrayList<XCallable<Void>>(); 159 } 160 callables.add(callable); 161 if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) { 162 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 163 if (ret == false) { 164 XLog.getLog(getClass()).warn( 165 "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. " 166 + "Most possibly command queue is full. Queue size is :" 167 + Services.get().get(CallableQueueService.class).queueSize()); 168 } 169 callables = new ArrayList<XCallable<Void>>(); 170 } 171 } 172 173 } 174 175 @Override 176 public void init(Services services) throws ServiceException { 177 Configuration conf = services.getConf(); 178 Runnable lookupTriggerJobsRunnable = new CoordMaterializeTriggerRunnable(conf.getInt( 179 CONF_MATERIALIZATION_WINDOW, CONF_MATERIALIZATION_WINDOW_DEFAULT));// Default is 1 hour 180 services.get(SchedulerService.class).schedule(lookupTriggerJobsRunnable, 10, 181 conf.getInt(CONF_LOOKUP_INTERVAL, CONF_LOOKUP_INTERVAL_DEFAULT),// Default is 5 minutes 182 SchedulerService.Unit.SEC); 183 } 184 185 @Override 186 public void destroy() { 187 } 188 189 @Override 190 public Class<? extends Service> getInterface() { 191 return CoordMaterializeTriggerService.class; 192 } 193 194 }