This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.util.ArrayList;
021 import java.util.Date;
022 import java.util.HashMap;
023 import java.util.HashSet;
024 import java.util.List;
025 import java.util.Map;
026 import java.util.Set;
027 import java.util.Map.Entry;
028 import java.util.concurrent.BlockingQueue;
029 import java.util.concurrent.ConcurrentHashMap;
030 import java.util.concurrent.RejectedExecutionException;
031 import java.util.concurrent.ThreadPoolExecutor;
032 import java.util.concurrent.TimeUnit;
033 import java.util.concurrent.atomic.AtomicInteger;
034 import java.util.concurrent.atomic.AtomicLong;
035
036 import org.apache.hadoop.conf.Configuration;
037 import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
038 import org.apache.oozie.util.Instrumentable;
039 import org.apache.oozie.util.Instrumentation;
040 import org.apache.oozie.util.PollablePriorityDelayQueue;
041 import org.apache.oozie.util.PriorityDelayQueue;
042 import org.apache.oozie.util.XCallable;
043 import org.apache.oozie.util.XLog;
044 import org.apache.oozie.util.PriorityDelayQueue.QueueElement;
045
046 /**
047 * The callable queue service queues {@link XCallable}s for asynchronous execution.
048 * <p/>
049 * Callables can be queued for immediate execution or for delayed execution (some time in the future).
050 * <p/>
051 * Callables are consumed from the queue for execution based on their priority.
052 * <p/>
053 * When the queues (for immediate execution and for delayed execution) are full, the callable queue service stops
054 * queuing callables.
055 * <p/>
056 * A thread-pool is used to execute the callables asynchronously.
057 * <p/>
058 * The following configuration parameters control the callable queue service:
059 * <p/>
060 * {@link #CONF_QUEUE_SIZE} size of the immediate execution queue. Defaulf value is 10000.
061 * <p/>
062 * {@link #CONF_THREADS} number of threads in the thread-pool used for asynchronous command execution. When this number
063 * of threads is reached, commands remain the queue until threads become available. Sets up a priority queue for the
064 * execution of Commands via a ThreadPool. Sets up a Delayed Queue to handle actions which will be ready for execution
065 * sometime in the future.
066 */
067 public class CallableQueueService implements Service, Instrumentable {
068 private static final String INSTRUMENTATION_GROUP = "callablequeue";
069 private static final String INSTR_IN_QUEUE_TIME_TIMER = "time.in.queue";
070 private static final String INSTR_EXECUTED_COUNTER = "executed";
071 private static final String INSTR_FAILED_COUNTER = "failed";
072 private static final String INSTR_QUEUED_COUNTER = "queued";
073 private static final String INSTR_QUEUE_SIZE_SAMPLER = "queue.size";
074 private static final String INSTR_THREADS_ACTIVE_SAMPLER = "threads.active";
075
076 public static final String CONF_PREFIX = Service.CONF_PREFIX + "CallableQueueService.";
077
078 public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size";
079 public static final String CONF_THREADS = CONF_PREFIX + "threads";
080 public static final String CONF_CALLABLE_CONCURRENCY = CONF_PREFIX + "callable.concurrency";
081 public static final String CONF_CALLABLE_NEXT_ELIGIBLE = CONF_PREFIX + "callable.next.eligible";
082
083 public static final int CONCURRENCY_DELAY = 500;
084
085 public static final int SAFE_MODE_DELAY = 60000;
086
087 private final Map<String, AtomicInteger> activeCallables = new HashMap<String, AtomicInteger>();
088
089 private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>();
090
091 private int maxCallableConcurrency;
092
093 private boolean callableBegin(XCallable<?> callable) {
094 synchronized (activeCallables) {
095 AtomicInteger counter = activeCallables.get(callable.getType());
096 if (counter == null) {
097 counter = new AtomicInteger(1);
098 activeCallables.put(callable.getType(), counter);
099 return true;
100 }
101 else {
102 int i = counter.incrementAndGet();
103 return i <= maxCallableConcurrency;
104 }
105 }
106 }
107
108 private void callableEnd(XCallable<?> callable) {
109 synchronized (activeCallables) {
110 AtomicInteger counter = activeCallables.get(callable.getType());
111 if (counter == null) {
112 throw new IllegalStateException("It should not happen");
113 }
114 else {
115 counter.decrementAndGet();
116 }
117 }
118 }
119
120 private boolean callableReachMaxConcurrency(XCallable<?> callable) {
121 synchronized (activeCallables) {
122 AtomicInteger counter = activeCallables.get(callable.getType());
123 if (counter == null) {
124 return true;
125 }
126 else {
127 int i = counter.get();
128 return i < maxCallableConcurrency;
129 }
130 }
131 }
132
133 // Callables are wrapped with the this wrapper for execution, for logging
134 // and instrumentation.
135 // The wrapper implements Runnable and Comparable to be able to work with an
136 // executor and a priority queue.
137 class CallableWrapper extends PriorityDelayQueue.QueueElement<XCallable<?>> implements Runnable {
138 private Instrumentation.Cron cron;
139
140 public CallableWrapper(XCallable<?> callable, long delay) {
141 super(callable, callable.getPriority(), delay, TimeUnit.MILLISECONDS);
142 cron = new Instrumentation.Cron();
143 cron.start();
144 }
145
146 public void run() {
147 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
148 log.info("Oozie is in SAFEMODE, requeuing callable [{0}] with [{1}]ms delay", getElement().getType(),
149 SAFE_MODE_DELAY);
150 setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS);
151 removeFromUniqueCallables();
152 queue(this, true);
153 return;
154 }
155 XCallable<?> callable = getElement();
156 try {
157 if (callableBegin(callable)) {
158 cron.stop();
159 addInQueueCron(cron);
160 XLog.Info.get().clear();
161 XLog log = XLog.getLog(getClass());
162 log.trace("executing callable [{0}]", callable.getName());
163
164 removeFromUniqueCallables();
165 try {
166 callable.call();
167 incrCounter(INSTR_EXECUTED_COUNTER, 1);
168 log.trace("executed callable [{0}]", callable.getName());
169 }
170 catch (Exception ex) {
171 incrCounter(INSTR_FAILED_COUNTER, 1);
172 log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
173 }
174 finally {
175 XLog.Info.get().clear();
176 }
177 }
178 else {
179 log.warn("max concurrency for callable [{0}] exceeded, requeueing with [{1}]ms delay", callable
180 .getType(), CONCURRENCY_DELAY);
181 setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS);
182 removeFromUniqueCallables();
183 queue(this, true);
184 incrCounter(callable.getType() + "#exceeded.concurrency", 1);
185 }
186 }
187 finally {
188 callableEnd(callable);
189 }
190 }
191
192 /**
193 * @return String the queue dump
194 */
195 @Override
196 public String toString() {
197 return "delay=" + getDelay(TimeUnit.MILLISECONDS) + ", elements=" + getElement().toString();
198 }
199
200 /**
201 * Filter the duplicate callables from the list before queue this.
202 * <p/>
203 * If it is single callable, checking if key is in unique map or not.
204 * <p/>
205 * If it is composite callable, remove duplicates callables from the composite.
206 *
207 * @return true if this callable should be queued
208 */
209 public boolean filterDuplicates() {
210 XCallable<?> callable = getElement();
211 if (callable instanceof CompositeCallable) {
212 return ((CompositeCallable) callable).removeDuplicates();
213 }
214 else {
215 return uniqueCallables.containsKey(callable.getKey()) == false;
216 }
217 }
218
219 /**
220 * Add the keys to the set
221 */
222 public void addToUniqueCallables() {
223 XCallable<?> callable = getElement();
224 if (callable instanceof CompositeCallable) {
225 ((CompositeCallable) callable).addToUniqueCallables();
226 }
227 else {
228 ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date());
229 }
230 }
231
232 /**
233 * Remove the keys from the set
234 */
235 public void removeFromUniqueCallables() {
236 XCallable<?> callable = getElement();
237 if (callable instanceof CompositeCallable) {
238 ((CompositeCallable) callable).removeFromUniqueCallables();
239 }
240 else {
241 uniqueCallables.remove(callable.getKey());
242 }
243 }
244
245 }
246
247 class CompositeCallable implements XCallable<Void> {
248 private List<XCallable<?>> callables;
249 private String name;
250 private int priority;
251 private long createdTime;
252
253 public CompositeCallable(List<? extends XCallable<?>> callables) {
254 this.callables = new ArrayList<XCallable<?>>(callables);
255 priority = 0;
256 createdTime = Long.MAX_VALUE;
257 StringBuilder sb = new StringBuilder();
258 String separator = "[";
259 for (XCallable<?> callable : callables) {
260 priority = Math.max(priority, callable.getPriority());
261 createdTime = Math.min(createdTime, callable.getCreatedTime());
262 sb.append(separator).append(callable.getName());
263 separator = ",";
264 }
265 sb.append("]");
266 name = sb.toString();
267 }
268
269 @Override
270 public String getName() {
271 return name;
272 }
273
274 @Override
275 public String getType() {
276 return "#composite#" + callables.get(0).getType();
277 }
278
279 @Override
280 public String getKey() {
281 return "#composite#" + callables.get(0).getKey();
282 }
283
284 @Override
285 public int getPriority() {
286 return priority;
287 }
288
289 @Override
290 public long getCreatedTime() {
291 return createdTime;
292 }
293
294 public Void call() throws Exception {
295 XLog log = XLog.getLog(getClass());
296
297 for (XCallable<?> callable : callables) {
298 log.trace("executing callable [{0}]", callable.getName());
299 try {
300 callable.call();
301 incrCounter(INSTR_EXECUTED_COUNTER, 1);
302 log.trace("executed callable [{0}]", callable.getName());
303 }
304 catch (Exception ex) {
305 incrCounter(INSTR_FAILED_COUNTER, 1);
306 log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
307 }
308 }
309
310 // ticking -1 not to count the call to the composite callable
311 incrCounter(INSTR_EXECUTED_COUNTER, -1);
312 return null;
313 }
314
315 /*
316 * (non-Javadoc)
317 *
318 * @see java.lang.Object#toString()
319 */
320 @Override
321 public String toString() {
322 if (callables.size() == 0) {
323 return null;
324 }
325 StringBuilder sb = new StringBuilder();
326 int size = callables.size();
327 for (int i = 0; i < size; i++) {
328 XCallable<?> callable = callables.get(i);
329 sb.append("(");
330 sb.append(callable.toString());
331 if (i + 1 == size) {
332 sb.append(")");
333 }
334 else {
335 sb.append("),");
336 }
337 }
338 return sb.toString();
339 }
340
341 /**
342 * Remove the duplicate callables from the list before queue them
343 *
344 * @return true if callables should be queued
345 */
346 public boolean removeDuplicates() {
347 Set<String> set = new HashSet<String>();
348 List<XCallable<?>> filteredCallables = new ArrayList<XCallable<?>>();
349 if (callables.size() == 0) {
350 return false;
351 }
352 for (XCallable<?> callable : callables) {
353 if (!uniqueCallables.containsKey(callable.getKey()) && !set.contains(callable.getKey())) {
354 filteredCallables.add(callable);
355 set.add(callable.getKey());
356 }
357 }
358 callables = filteredCallables;
359 if (callables.size() == 0) {
360 return false;
361 }
362 return true;
363 }
364
365 /**
366 * Add the keys to the set
367 */
368 public void addToUniqueCallables() {
369 for (XCallable<?> callable : callables) {
370 ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date());
371 }
372 }
373
374 /**
375 * Remove the keys from the set
376 */
377 public void removeFromUniqueCallables() {
378 for (XCallable<?> callable : callables) {
379 uniqueCallables.remove(callable.getKey());
380 }
381 }
382
383 }
384
385 private XLog log = XLog.getLog(getClass());
386
387 private int queueSize;
388 private PriorityDelayQueue<CallableWrapper> queue;
389 private AtomicLong delayQueueExecCounter = new AtomicLong(0);
390 private ThreadPoolExecutor executor;
391 private Instrumentation instrumentation;
392
393 /**
394 * Convenience method for instrumentation counters.
395 *
396 * @param name counter name.
397 * @param count count to increment the counter.
398 */
399 private void incrCounter(String name, int count) {
400 if (instrumentation != null) {
401 instrumentation.incr(INSTRUMENTATION_GROUP, name, count);
402 }
403 }
404
405 private void addInQueueCron(Instrumentation.Cron cron) {
406 if (instrumentation != null) {
407 instrumentation.addCron(INSTRUMENTATION_GROUP, INSTR_IN_QUEUE_TIME_TIMER, cron);
408 }
409 }
410
411 /**
412 * Initialize the command queue service.
413 *
414 * @param services services instance.
415 */
416 @Override
417 @SuppressWarnings("unchecked")
418 public void init(Services services) {
419 Configuration conf = services.getConf();
420
421 queueSize = conf.getInt(CONF_QUEUE_SIZE, 10000);
422 int threads = conf.getInt(CONF_THREADS, 10);
423 boolean callableNextEligible = conf.getBoolean(CONF_CALLABLE_NEXT_ELIGIBLE, true);
424
425 if (!callableNextEligible) {
426 queue = new PriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
427 @Override
428 protected void debug(String msgTemplate, Object... msgArgs) {
429 log.trace(msgTemplate, msgArgs);
430 }
431 };
432 }
433 else {
434 // If the head of this queue has already reached max concurrency, continuously find next one
435 // which has not yet reach max concurrency.Overrided method 'eligibleToPoll' to check if the
436 // element of this queue has reached the maximum concurrency.
437 queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS,
438 queueSize) {
439 @Override
440 protected void debug(String msgTemplate, Object... msgArgs) {
441 log.trace(msgTemplate, msgArgs);
442 }
443
444 @Override
445 protected boolean eligibleToPoll(QueueElement<?> element) {
446 if (element != null) {
447 CallableWrapper wrapper = (CallableWrapper) element;
448 if (element.getElement() != null) {
449 return callableReachMaxConcurrency(wrapper.getElement());
450 }
451 }
452 return false;
453 }
454
455 };
456 }
457
458 // IMPORTANT: The ThreadPoolExecutor does not always the execute
459 // commands out of the queue, there are
460 // certain conditions where commands are pushed directly to a thread.
461 // As we are using a queue with DELAYED semantics (i.e. execute the
462 // command in 5 mins) we need to make
463 // sure that the commands are always pushed to the queue.
464 // To achieve this (by looking a the ThreadPoolExecutor.execute()
465 // implementation, we are making the pool
466 // minimum size equals to the maximum size (thus threads are keep always
467 // running) and we are warming up
468 // all those threads (the for loop that runs dummy runnables).
469 executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue);
470
471 for (int i = 0; i < threads; i++) {
472 executor.execute(new Runnable() {
473 public void run() {
474 try {
475 Thread.sleep(100);
476 }
477 catch (InterruptedException ex) {
478 log.warn("Could not warm up threadpool {0}", ex.getMessage(), ex);
479 }
480 }
481 });
482 }
483
484 maxCallableConcurrency = conf.getInt(CONF_CALLABLE_CONCURRENCY, 3);
485 }
486
487 /**
488 * Destroy the command queue service.
489 */
490 @Override
491 public void destroy() {
492 try {
493 long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds
494 executor.shutdown();
495 queue.clear();
496 while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
497 log.info("Waiting for executor to shutdown");
498 if (System.currentTimeMillis() > limit) {
499 log.warn("Gave up, continuing without waiting for executor to shutdown");
500 break;
501 }
502 }
503 }
504 catch (InterruptedException ex) {
505 log.warn(ex);
506 }
507 }
508
509 /**
510 * Return the public interface for command queue service.
511 *
512 * @return {@link CallableQueueService}.
513 */
514 @Override
515 public Class<? extends Service> getInterface() {
516 return CallableQueueService.class;
517 }
518
519 /**
520 * @return int size of queue
521 */
522 public synchronized int queueSize() {
523 return queue.size();
524 }
525
526 private synchronized boolean queue(CallableWrapper wrapper, boolean ignoreQueueSize) {
527 if (!ignoreQueueSize && queue.size() >= queueSize) {
528 log.warn("queue if full, ignoring queuing for [{0}]", wrapper.getElement());
529 return false;
530 }
531 if (!executor.isShutdown()) {
532 if (wrapper.filterDuplicates()) {
533 wrapper.addToUniqueCallables();
534 try {
535 executor.execute(wrapper);
536 }
537 catch (RejectedExecutionException ree) {
538 wrapper.removeFromUniqueCallables();
539 throw ree;
540 }
541 }
542 }
543 else {
544 log.warn("Executor shutting down, ignoring queueing of [{0}]", wrapper.getElement());
545 }
546 return true;
547 }
548
549 /**
550 * Queue a callable for asynchronous execution.
551 *
552 * @param callable callable to queue.
553 * @return <code>true</code> if the callable was queued, <code>false</code> if the queue is full and the callable
554 * was not queued.
555 */
556 public boolean queue(XCallable<?> callable) {
557 return queue(callable, 0);
558 }
559
560 /**
561 * Queue a list of callables for serial execution.
562 * <p/>
563 * Useful to serialize callables that may compete with each other for resources.
564 * <p/>
565 * All callables will be processed with the priority of the highest priority of all callables.
566 *
567 * @param callables callables to be executed by the composite callable.
568 * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables
569 * were not queued.
570 */
571 @SuppressWarnings("unchecked")
572 public boolean queueSerial(List<? extends XCallable<?>> callables) {
573 return queueSerial(callables, 0);
574 }
575
576 /**
577 * Queue a callable for asynchronous execution sometime in the future.
578 *
579 * @param callable callable to queue for delayed execution
580 * @param delay time, in milliseconds, that the callable should be delayed.
581 * @return <code>true</code> if the callable was queued, <code>false</code> if the queue is full and the callable
582 * was not queued.
583 */
584 public synchronized boolean queue(XCallable<?> callable, long delay) {
585 if (callable == null) {
586 return true;
587 }
588 boolean queued = false;
589 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
590 log.warn("[queue] System is in SAFEMODE. Hence no callable is queued. current queue size " + queue.size());
591 }
592 else {
593 queued = queue(new CallableWrapper(callable, delay), false);
594 if (queued) {
595 incrCounter(INSTR_QUEUED_COUNTER, 1);
596 }
597 else {
598 log.warn("Could not queue callable");
599 }
600 }
601 return queued;
602 }
603
604 /**
605 * Queue a list of callables for serial execution sometime in the future.
606 * <p/>
607 * Useful to serialize callables that may compete with each other for resources.
608 * <p/>
609 * All callables will be processed with the priority of the highest priority of all callables.
610 *
611 * @param callables callables to be executed by the composite callable.
612 * @param delay time, in milliseconds, that the callable should be delayed.
613 * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables
614 * were not queued.
615 */
616 @SuppressWarnings("unchecked")
617 public synchronized boolean queueSerial(List<? extends XCallable<?>> callables, long delay) {
618 boolean queued;
619 if (callables == null || callables.size() == 0) {
620 queued = true;
621 }
622 else if (callables.size() == 1) {
623 queued = queue(callables.get(0), delay);
624 }
625 else {
626 XCallable<?> callable = new CompositeCallable(callables);
627 queued = queue(callable, delay);
628 if (queued) {
629 incrCounter(INSTR_QUEUED_COUNTER, callables.size());
630 }
631 }
632 return queued;
633 }
634
635 /**
636 * Instruments the callable queue service.
637 *
638 * @param instr instance to instrument the callable queue service to.
639 */
640 public void instrument(Instrumentation instr) {
641 instrumentation = instr;
642 instr.addSampler(INSTRUMENTATION_GROUP, INSTR_QUEUE_SIZE_SAMPLER, 60, 1, new Instrumentation.Variable<Long>() {
643 public Long getValue() {
644 return (long) queue.size();
645 }
646 });
647 instr.addSampler(INSTRUMENTATION_GROUP, INSTR_THREADS_ACTIVE_SAMPLER, 60, 1,
648 new Instrumentation.Variable<Long>() {
649 public Long getValue() {
650 return (long) executor.getActiveCount();
651 }
652 });
653 }
654
655 /**
656 * Get the list of strings of queue dump
657 *
658 * @return the list of string that representing each CallableWrapper
659 */
660 public List<String> getQueueDump() {
661 List<String> list = new ArrayList<String>();
662 for (QueueElement<CallableWrapper> qe : queue) {
663 if (qe.toString() == null) {
664 continue;
665 }
666 list.add(qe.toString());
667 }
668 return list;
669 }
670
671 /**
672 * Get the list of strings of uniqueness map dump
673 *
674 * @return the list of string that representing the key of each command in the queue
675 */
676 public List<String> getUniqueDump() {
677 List<String> list = new ArrayList<String>();
678 for (Entry<String, Date> entry : uniqueCallables.entrySet()) {
679 list.add(entry.toString());
680 }
681 return list;
682 }
683
684 }