This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.util.ArrayList;
021 import java.util.Date;
022 import java.util.List;
023
024 import org.apache.hadoop.conf.Configuration;
025 import org.apache.oozie.CoordinatorJobBean;
026 import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
027 import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
028 import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor;
029 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
030 import org.apache.oozie.executor.jpa.CoordJobsToBeMaterializedJPAExecutor;
031 import org.apache.oozie.executor.jpa.JPAExecutorException;
032 import org.apache.oozie.util.XCallable;
033 import org.apache.oozie.util.XLog;
034
035 /**
036 * The coordinator Materialization Lookup trigger service schedule lookup trigger command for every interval (default is
037 * 5 minutes ). This interval could be configured through oozie configuration defined is either oozie-default.xml or
038 * oozie-site.xml using the property name oozie.service.CoordMaterializeTriggerService.lookup.interval
039 */
040 public class CoordMaterializeTriggerService implements Service {
041 public static final String CONF_PREFIX = Service.CONF_PREFIX + "CoordMaterializeTriggerService.";
042 /**
043 * Time interval, in seconds, at which the Job materialization service will be scheduled to run.
044 */
045 public static final String CONF_LOOKUP_INTERVAL = CONF_PREFIX + "lookup.interval";
046 /**
047 * This configuration defined the duration for which job should be materialized in future
048 */
049 public static final String CONF_MATERIALIZATION_WINDOW = CONF_PREFIX + "materialization.window";
050 /**
051 * The number of callables to be queued in a batch.
052 */
053 public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
054 /**
055 * The number of coordinator jobs to be picked for materialization at a given time.
056 */
057 public static final String CONF_MATERIALIZATION_SYSTEM_LIMIT = CONF_PREFIX + "materialization.system.limit";
058
059 private static final String INSTRUMENTATION_GROUP = "coord_job_mat";
060 private static final String INSTR_MAT_JOBS_COUNTER = "jobs";
061 private static final int CONF_LOOKUP_INTERVAL_DEFAULT = 300;
062 private static final int CONF_MATERIALIZATION_WINDOW_DEFAULT = 3600;
063 private static final int CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT = 50;
064
065 /**
066 * This runnable class will run in every "interval" to queue CoordMaterializeTransitionXCommand.
067 */
068 static class CoordMaterializeTriggerRunnable implements Runnable {
069 private int materializationWindow;
070 private long delay = 0;
071 private List<XCallable<Void>> callables;
072 private List<XCallable<Void>> delayedCallables;
073
074 public CoordMaterializeTriggerRunnable(int materializationWindow) {
075 this.materializationWindow = materializationWindow;
076 }
077
078 @Override
079 public void run() {
080 runCoordJobMatLookup();
081
082 if (null != callables) {
083 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
084 if (ret == false) {
085 XLog.getLog(getClass()).warn(
086 "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. "
087 + "Most possibly command queue is full. Queue size is :"
088 + Services.get().get(CallableQueueService.class).queueSize());
089 }
090 callables = null;
091 }
092 if (null != delayedCallables) {
093 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
094 if (ret == false) {
095 XLog.getLog(getClass()).warn(
096 "Unable to queue the delayedCallables commands for CoordMaterializeTriggerRunnable. "
097 + "Most possibly Callable queue is full. Queue size is :"
098 + Services.get().get(CallableQueueService.class).queueSize());
099 }
100 delayedCallables = null;
101 this.delay = 0;
102 }
103 }
104
105 /**
106 * Recover coordinator jobs that should be materialized
107 */
108 private void runCoordJobMatLookup() {
109 XLog.Info.get().clear();
110 XLog LOG = XLog.getLog(getClass());
111 JPAService jpaService = Services.get().get(JPAService.class);
112 try {
113
114 // get current date
115 Date currDate = new Date(new Date().getTime() + CONF_LOOKUP_INTERVAL_DEFAULT * 1000);
116 // get list of all jobs that have actions that should be materialized.
117 int materializationLimit = Services.get().getConf()
118 .getInt(CONF_MATERIALIZATION_SYSTEM_LIMIT, CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT);
119 CoordJobsToBeMaterializedJPAExecutor cmatcmd = new CoordJobsToBeMaterializedJPAExecutor(currDate,
120 materializationLimit);
121 List<CoordinatorJobBean> materializeJobs = jpaService.execute(cmatcmd);
122 LOG.debug("CoordMaterializeTriggerService - Curr Date= " + currDate + ", Num jobs to materialize = "
123 + materializeJobs.size());
124 for (CoordinatorJobBean coordJob : materializeJobs) {
125 Services.get().get(InstrumentationService.class).get()
126 .incr(INSTRUMENTATION_GROUP, INSTR_MAT_JOBS_COUNTER, 1);
127 int numWaitingActions = jpaService
128 .execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId()));
129 LOG.debug("Job :" + coordJob.getId() + " numWaitingActions : " + numWaitingActions
130 + " MatThrottle : " + coordJob.getMatThrottling());
131 // update lastModifiedTime so next time others might have higher chance to get pick up
132 coordJob.setLastModifiedTime(new Date());
133 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
134 if (numWaitingActions >= coordJob.getMatThrottling()) {
135 LOG.debug("Materialization skipped for JobID [" + coordJob.getId() + " already waiting "
136 + numWaitingActions + " actions. MatThrottle is : " + coordJob.getMatThrottling());
137 continue;
138 }
139 queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow));
140
141 }
142
143 }
144 catch (JPAExecutorException jex) {
145 LOG.warn("JPAExecutorException while attempting to materialize coordinator jobs", jex);
146 }
147 }
148
149 /**
150 * Adds callables to a list. If the number of callables in the list reaches {@link
151 * CoordMaterializeTriggerService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list
152 * is reset.
153 *
154 * @param callable the callable to queue.
155 */
156 private void queueCallable(XCallable<Void> callable) {
157 if (callables == null) {
158 callables = new ArrayList<XCallable<Void>>();
159 }
160 callables.add(callable);
161 if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
162 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
163 if (ret == false) {
164 XLog.getLog(getClass()).warn(
165 "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. "
166 + "Most possibly command queue is full. Queue size is :"
167 + Services.get().get(CallableQueueService.class).queueSize());
168 }
169 callables = new ArrayList<XCallable<Void>>();
170 }
171 }
172
173 }
174
175 @Override
176 public void init(Services services) throws ServiceException {
177 Configuration conf = services.getConf();
178 Runnable lookupTriggerJobsRunnable = new CoordMaterializeTriggerRunnable(conf.getInt(
179 CONF_MATERIALIZATION_WINDOW, CONF_MATERIALIZATION_WINDOW_DEFAULT));// Default is 1 hour
180 services.get(SchedulerService.class).schedule(lookupTriggerJobsRunnable, 10,
181 conf.getInt(CONF_LOOKUP_INTERVAL, CONF_LOOKUP_INTERVAL_DEFAULT),// Default is 5 minutes
182 SchedulerService.Unit.SEC);
183 }
184
185 @Override
186 public void destroy() {
187 }
188
189 @Override
190 public Class<? extends Service> getInterface() {
191 return CoordMaterializeTriggerService.class;
192 }
193
194 }