This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.util.ArrayList;
021 import java.util.Date;
022 import java.util.List;
023
024 import org.apache.hadoop.conf.Configuration;
025 import org.apache.oozie.CoordinatorJobBean;
026 import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
027 import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
028 import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor;
029 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
030 import org.apache.oozie.executor.jpa.CoordJobsToBeMaterializedJPAExecutor;
031 import org.apache.oozie.executor.jpa.JPAExecutorException;
032 import org.apache.oozie.util.XCallable;
033 import org.apache.oozie.util.XLog;
034
035 /**
036 * The coordinator Materialization Lookup trigger service schedule lookup trigger command for every interval (default is
037 * 5 minutes ). This interval could be configured through oozie configuration defined is either oozie-default.xml or
038 * oozie-site.xml using the property name oozie.service.CoordMaterializeTriggerService.lookup.interval
039 */
040 public class CoordMaterializeTriggerService implements Service {
041 public static final String CONF_PREFIX = Service.CONF_PREFIX + "CoordMaterializeTriggerService.";
042 /**
043 * Time interval, in seconds, at which the Job materialization service will be scheduled to run.
044 */
045 public static final String CONF_LOOKUP_INTERVAL = CONF_PREFIX + "lookup.interval";
046 /**
047 * This configuration defined the duration for which job should be materialized in future
048 */
049 public static final String CONF_MATERIALIZATION_WINDOW = CONF_PREFIX + "materialization.window";
050 /**
051 * The number of callables to be queued in a batch.
052 */
053 public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
054 /**
055 * The number of coordinator jobs to be picked for materialization at a given time.
056 */
057 public static final String CONF_MATERIALIZATION_SYSTEM_LIMIT = CONF_PREFIX + "materialization.system.limit";
058
059 private static final String INSTRUMENTATION_GROUP = "coord_job_mat";
060 private static final String INSTR_MAT_JOBS_COUNTER = "jobs";
061 private static final int CONF_LOOKUP_INTERVAL_DEFAULT = 300;
062 private static final int CONF_MATERIALIZATION_WINDOW_DEFAULT = 3600;
063 private static final int CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT = 50;
064
065 /**
066 * This runnable class will run in every "interval" to queue CoordMaterializeTransitionXCommand.
067 */
068 static class CoordMaterializeTriggerRunnable implements Runnable {
069 private int materializationWindow;
070 private int lookupInterval;
071 private long delay = 0;
072 private List<XCallable<Void>> callables;
073 private List<XCallable<Void>> delayedCallables;
074
075 public CoordMaterializeTriggerRunnable(int materializationWindow, int lookupInterval) {
076 this.materializationWindow = materializationWindow;
077 this.lookupInterval = lookupInterval;
078 }
079
080 @Override
081 public void run() {
082 runCoordJobMatLookup();
083
084 if (null != callables) {
085 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
086 if (ret == false) {
087 XLog.getLog(getClass()).warn(
088 "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. "
089 + "Most possibly command queue is full. Queue size is :"
090 + Services.get().get(CallableQueueService.class).queueSize());
091 }
092 callables = null;
093 }
094 if (null != delayedCallables) {
095 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
096 if (ret == false) {
097 XLog.getLog(getClass()).warn(
098 "Unable to queue the delayedCallables commands for CoordMaterializeTriggerRunnable. "
099 + "Most possibly Callable queue is full. Queue size is :"
100 + Services.get().get(CallableQueueService.class).queueSize());
101 }
102 delayedCallables = null;
103 this.delay = 0;
104 }
105 }
106
107 /**
108 * Recover coordinator jobs that should be materialized
109 */
110 private void runCoordJobMatLookup() {
111 XLog.Info.get().clear();
112 XLog LOG = XLog.getLog(getClass());
113 JPAService jpaService = Services.get().get(JPAService.class);
114 try {
115
116 // get current date
117 Date currDate = new Date(new Date().getTime() + lookupInterval * 1000);
118 // get list of all jobs that have actions that should be materialized.
119 int materializationLimit = Services.get().getConf()
120 .getInt(CONF_MATERIALIZATION_SYSTEM_LIMIT, CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT);
121 CoordJobsToBeMaterializedJPAExecutor cmatcmd = new CoordJobsToBeMaterializedJPAExecutor(currDate,
122 materializationLimit);
123 List<CoordinatorJobBean> materializeJobs = jpaService.execute(cmatcmd);
124 LOG.info("CoordMaterializeTriggerService - Curr Date= " + currDate + ", Num jobs to materialize = "
125 + materializeJobs.size());
126 for (CoordinatorJobBean coordJob : materializeJobs) {
127 Services.get().get(InstrumentationService.class).get()
128 .incr(INSTRUMENTATION_GROUP, INSTR_MAT_JOBS_COUNTER, 1);
129 int numWaitingActions = jpaService
130 .execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId()));
131 LOG.info("Job :" + coordJob.getId() + " numWaitingActions : " + numWaitingActions
132 + " MatThrottle : " + coordJob.getMatThrottling());
133 // update lastModifiedTime so next time others might have higher chance to get pick up
134 coordJob.setLastModifiedTime(new Date());
135 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
136 if (numWaitingActions >= coordJob.getMatThrottling()) {
137 LOG.info("info for JobID [" + coordJob.getId() + " already waiting "
138 + numWaitingActions + " actions. MatThrottle is : " + coordJob.getMatThrottling());
139 continue;
140 }
141 queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow));
142
143 }
144
145 }
146 catch (JPAExecutorException jex) {
147 LOG.warn("JPAExecutorException while attempting to materialize coordinator jobs", jex);
148 }
149 }
150
151 /**
152 * Adds callables to a list. If the number of callables in the list reaches {@link
153 * CoordMaterializeTriggerService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list
154 * is reset.
155 *
156 * @param callable the callable to queue.
157 */
158 private void queueCallable(XCallable<Void> callable) {
159 if (callables == null) {
160 callables = new ArrayList<XCallable<Void>>();
161 }
162 callables.add(callable);
163 if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
164 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
165 if (ret == false) {
166 XLog.getLog(getClass()).warn(
167 "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. "
168 + "Most possibly command queue is full. Queue size is :"
169 + Services.get().get(CallableQueueService.class).queueSize());
170 }
171 callables = new ArrayList<XCallable<Void>>();
172 }
173 }
174
175 }
176
177 @Override
178 public void init(Services services) throws ServiceException {
179 Configuration conf = services.getConf();
180 // default is 3600sec (1hr)
181 int materializationWindow = conf.getInt(CONF_MATERIALIZATION_WINDOW, CONF_MATERIALIZATION_WINDOW_DEFAULT);
182 // default is 300sec (5min)
183 int lookupInterval = Services.get().getConf().getInt(CONF_LOOKUP_INTERVAL, CONF_LOOKUP_INTERVAL_DEFAULT);
184
185 Runnable lookupTriggerJobsRunnable = new CoordMaterializeTriggerRunnable(materializationWindow, lookupInterval);
186
187 services.get(SchedulerService.class).schedule(lookupTriggerJobsRunnable, 10, lookupInterval,
188 SchedulerService.Unit.SEC);
189 }
190
191 @Override
192 public void destroy() {
193 }
194
195 @Override
196 public Class<? extends Service> getInterface() {
197 return CoordMaterializeTriggerService.class;
198 }
199
200 }