This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.util.ArrayList;
021 import java.util.Collections;
022 import java.util.Date;
023 import java.util.HashMap;
024 import java.util.HashSet;
025 import java.util.LinkedHashSet;
026 import java.util.List;
027 import java.util.Map;
028 import java.util.Set;
029 import java.util.Map.Entry;
030 import java.util.concurrent.BlockingQueue;
031 import java.util.concurrent.ConcurrentHashMap;
032 import java.util.concurrent.RejectedExecutionException;
033 import java.util.concurrent.ThreadPoolExecutor;
034 import java.util.concurrent.TimeUnit;
035 import java.util.concurrent.atomic.AtomicInteger;
036
037 import org.apache.hadoop.conf.Configuration;
038 import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
039 import org.apache.oozie.util.Instrumentable;
040 import org.apache.oozie.util.Instrumentation;
041 import org.apache.oozie.util.PollablePriorityDelayQueue;
042 import org.apache.oozie.util.PriorityDelayQueue;
043 import org.apache.oozie.util.XCallable;
044 import org.apache.oozie.util.XLog;
045 import org.apache.oozie.util.PriorityDelayQueue.QueueElement;
046
047 /**
048 * The callable queue service queues {@link XCallable}s for asynchronous execution.
049 * <p/>
050 * Callables can be queued for immediate execution or for delayed execution (some time in the future).
051 * <p/>
052 * Callables are consumed from the queue for execution based on their priority.
053 * <p/>
054 * When the queues (for immediate execution and for delayed execution) are full, the callable queue service stops
055 * queuing callables.
056 * <p/>
057 * A thread-pool is used to execute the callables asynchronously.
058 * <p/>
059 * The following configuration parameters control the callable queue service:
060 * <p/>
061 * {@link #CONF_QUEUE_SIZE} size of the immediate execution queue. Defaulf value is 10000.
062 * <p/>
063 * {@link #CONF_THREADS} number of threads in the thread-pool used for asynchronous command execution. When this number
064 * of threads is reached, commands remain the queue until threads become available. Sets up a priority queue for the
065 * execution of Commands via a ThreadPool. Sets up a Delayed Queue to handle actions which will be ready for execution
066 * sometime in the future.
067 */
068 public class CallableQueueService implements Service, Instrumentable {
069 private static final String INSTRUMENTATION_GROUP = "callablequeue";
070 private static final String INSTR_IN_QUEUE_TIME_TIMER = "time.in.queue";
071 private static final String INSTR_EXECUTED_COUNTER = "executed";
072 private static final String INSTR_FAILED_COUNTER = "failed";
073 private static final String INSTR_QUEUED_COUNTER = "queued";
074 private static final String INSTR_QUEUE_SIZE_SAMPLER = "queue.size";
075 private static final String INSTR_THREADS_ACTIVE_SAMPLER = "threads.active";
076
077 public static final String CONF_PREFIX = Service.CONF_PREFIX + "CallableQueueService.";
078
079 public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size";
080 public static final String CONF_THREADS = CONF_PREFIX + "threads";
081 public static final String CONF_CALLABLE_CONCURRENCY = CONF_PREFIX + "callable.concurrency";
082 public static final String CONF_CALLABLE_NEXT_ELIGIBLE = CONF_PREFIX + "callable.next.eligible";
083 public static final String CONF_CALLABLE_INTERRUPT_TYPES = CONF_PREFIX + "InterruptTypes";
084 public static final String CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE = CONF_PREFIX + "InterruptMapMaxSize";
085
086 public static final int CONCURRENCY_DELAY = 500;
087
088 public static final int SAFE_MODE_DELAY = 60000;
089
090 private final Map<String, AtomicInteger> activeCallables = new HashMap<String, AtomicInteger>();
091
092 private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>();
093
094 private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<String, Set<XCallable<?>>>();
095
096 public static final HashSet<String> INTERRUPT_TYPES = new HashSet<String>();
097
098 private int interruptMapMaxSize;
099
100 private int maxCallableConcurrency;
101
102 private boolean callableBegin(XCallable<?> callable) {
103 synchronized (activeCallables) {
104 AtomicInteger counter = activeCallables.get(callable.getType());
105 if (counter == null) {
106 counter = new AtomicInteger(1);
107 activeCallables.put(callable.getType(), counter);
108 return true;
109 }
110 else {
111 int i = counter.incrementAndGet();
112 return i <= maxCallableConcurrency;
113 }
114 }
115 }
116
117 private void callableEnd(XCallable<?> callable) {
118 synchronized (activeCallables) {
119 AtomicInteger counter = activeCallables.get(callable.getType());
120 if (counter == null) {
121 throw new IllegalStateException("It should not happen");
122 }
123 else {
124 counter.decrementAndGet();
125 }
126 }
127 }
128
129 private boolean callableReachMaxConcurrency(XCallable<?> callable) {
130 synchronized (activeCallables) {
131 AtomicInteger counter = activeCallables.get(callable.getType());
132 if (counter == null) {
133 return true;
134 }
135 else {
136 int i = counter.get();
137 return i < maxCallableConcurrency;
138 }
139 }
140 }
141
142 // Callables are wrapped with the this wrapper for execution, for logging
143 // and instrumentation.
144 // The wrapper implements Runnable and Comparable to be able to work with an
145 // executor and a priority queue.
146 class CallableWrapper extends PriorityDelayQueue.QueueElement<XCallable<?>> implements Runnable {
147 private Instrumentation.Cron cron;
148
149 public CallableWrapper(XCallable<?> callable, long delay) {
150 super(callable, callable.getPriority(), delay, TimeUnit.MILLISECONDS);
151 cron = new Instrumentation.Cron();
152 cron.start();
153 }
154
155 public void run() {
156 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
157 log.info("Oozie is in SAFEMODE, requeuing callable [{0}] with [{1}]ms delay", getElement().getType(),
158 SAFE_MODE_DELAY);
159 setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS);
160 removeFromUniqueCallables();
161 queue(this, true);
162 return;
163 }
164 XCallable<?> callable = getElement();
165 try {
166 if (callableBegin(callable)) {
167 cron.stop();
168 addInQueueCron(cron);
169 XLog.Info.get().clear();
170 XLog log = XLog.getLog(getClass());
171 log.trace("executing callable [{0}]", callable.getName());
172
173 removeFromUniqueCallables();
174 try {
175 callable.call();
176 incrCounter(INSTR_EXECUTED_COUNTER, 1);
177 log.trace("executed callable [{0}]", callable.getName());
178 }
179 catch (Exception ex) {
180 incrCounter(INSTR_FAILED_COUNTER, 1);
181 log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
182 }
183 finally {
184 XLog.Info.get().clear();
185 }
186 }
187 else {
188 log.warn("max concurrency for callable [{0}] exceeded, requeueing with [{1}]ms delay", callable
189 .getType(), CONCURRENCY_DELAY);
190 setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS);
191 removeFromUniqueCallables();
192 queue(this, true);
193 incrCounter(callable.getType() + "#exceeded.concurrency", 1);
194 }
195 }
196 finally {
197 callableEnd(callable);
198 }
199 }
200
201 /**
202 * @return String the queue dump
203 */
204 @Override
205 public String toString() {
206 return "delay=" + getDelay(TimeUnit.MILLISECONDS) + ", elements=" + getElement().toString();
207 }
208
209 /**
210 * Filter the duplicate callables from the list before queue this.
211 * <p/>
212 * If it is single callable, checking if key is in unique map or not.
213 * <p/>
214 * If it is composite callable, remove duplicates callables from the composite.
215 *
216 * @return true if this callable should be queued
217 */
218 public boolean filterDuplicates() {
219 XCallable<?> callable = getElement();
220 if (callable instanceof CompositeCallable) {
221 return ((CompositeCallable) callable).removeDuplicates();
222 }
223 else {
224 return uniqueCallables.containsKey(callable.getKey()) == false;
225 }
226 }
227
228 /**
229 * Add the keys to the set
230 */
231 public void addToUniqueCallables() {
232 XCallable<?> callable = getElement();
233 if (callable instanceof CompositeCallable) {
234 ((CompositeCallable) callable).addToUniqueCallables();
235 }
236 else {
237 ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date());
238 }
239 }
240
241 /**
242 * Remove the keys from the set
243 */
244 public void removeFromUniqueCallables() {
245 XCallable<?> callable = getElement();
246 if (callable instanceof CompositeCallable) {
247 ((CompositeCallable) callable).removeFromUniqueCallables();
248 }
249 else {
250 uniqueCallables.remove(callable.getKey());
251 }
252 }
253 }
254
255 class CompositeCallable implements XCallable<Void> {
256 private List<XCallable<?>> callables;
257 private String name;
258 private int priority;
259 private long createdTime;
260
261 public CompositeCallable(List<? extends XCallable<?>> callables) {
262 this.callables = new ArrayList<XCallable<?>>(callables);
263 priority = 0;
264 createdTime = Long.MAX_VALUE;
265 StringBuilder sb = new StringBuilder();
266 String separator = "[";
267 for (XCallable<?> callable : callables) {
268 priority = Math.max(priority, callable.getPriority());
269 createdTime = Math.min(createdTime, callable.getCreatedTime());
270 sb.append(separator).append(callable.getName());
271 separator = ",";
272 }
273 sb.append("]");
274 name = sb.toString();
275 }
276
277 @Override
278 public String getName() {
279 return name;
280 }
281
282 @Override
283 public String getType() {
284 return "#composite#" + callables.get(0).getType();
285 }
286
287 @Override
288 public String getKey() {
289 return "#composite#" + callables.get(0).getKey();
290 }
291
292 @Override
293 public String getEntityKey() {
294 return "#composite#" + callables.get(0).getEntityKey();
295 }
296
297 @Override
298 public int getPriority() {
299 return priority;
300 }
301
302 @Override
303 public long getCreatedTime() {
304 return createdTime;
305 }
306
307 @Override
308 public void setInterruptMode(boolean mode) {
309 }
310
311 @Override
312 public boolean inInterruptMode() {
313 return false;
314 }
315
316 public List<XCallable<?>> getCallables() {
317 return this.callables;
318 }
319
320 public Void call() throws Exception {
321 XLog log = XLog.getLog(getClass());
322
323 for (XCallable<?> callable : callables) {
324 log.trace("executing callable [{0}]", callable.getName());
325 try {
326 callable.call();
327 incrCounter(INSTR_EXECUTED_COUNTER, 1);
328 log.trace("executed callable [{0}]", callable.getName());
329 }
330 catch (Exception ex) {
331 incrCounter(INSTR_FAILED_COUNTER, 1);
332 log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
333 }
334 }
335
336 // ticking -1 not to count the call to the composite callable
337 incrCounter(INSTR_EXECUTED_COUNTER, -1);
338 return null;
339 }
340
341 /*
342 * (non-Javadoc)
343 *
344 * @see java.lang.Object#toString()
345 */
346 @Override
347 public String toString() {
348 if (callables.size() == 0) {
349 return null;
350 }
351 StringBuilder sb = new StringBuilder();
352 int size = callables.size();
353 for (int i = 0; i < size; i++) {
354 XCallable<?> callable = callables.get(i);
355 sb.append("(");
356 sb.append(callable.toString());
357 if (i + 1 == size) {
358 sb.append(")");
359 }
360 else {
361 sb.append("),");
362 }
363 }
364 return sb.toString();
365 }
366
367 /**
368 * Remove the duplicate callables from the list before queue them
369 *
370 * @return true if callables should be queued
371 */
372 public boolean removeDuplicates() {
373 Set<String> set = new HashSet<String>();
374 List<XCallable<?>> filteredCallables = new ArrayList<XCallable<?>>();
375 if (callables.size() == 0) {
376 return false;
377 }
378 for (XCallable<?> callable : callables) {
379 if (!uniqueCallables.containsKey(callable.getKey()) && !set.contains(callable.getKey())) {
380 filteredCallables.add(callable);
381 set.add(callable.getKey());
382 }
383 }
384 callables = filteredCallables;
385 if (callables.size() == 0) {
386 return false;
387 }
388 return true;
389 }
390
391 /**
392 * Add the keys to the set
393 */
394 public void addToUniqueCallables() {
395 for (XCallable<?> callable : callables) {
396 ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date());
397 }
398 }
399
400 /**
401 * Remove the keys from the set
402 */
403 public void removeFromUniqueCallables() {
404 for (XCallable<?> callable : callables) {
405 uniqueCallables.remove(callable.getKey());
406 }
407 }
408 }
409
410 private XLog log = XLog.getLog(getClass());
411
412 private int queueSize;
413 private PriorityDelayQueue<CallableWrapper> queue;
414 private ThreadPoolExecutor executor;
415 private Instrumentation instrumentation;
416
417 /**
418 * Convenience method for instrumentation counters.
419 *
420 * @param name counter name.
421 * @param count count to increment the counter.
422 */
423 private void incrCounter(String name, int count) {
424 if (instrumentation != null) {
425 instrumentation.incr(INSTRUMENTATION_GROUP, name, count);
426 }
427 }
428
429 private void addInQueueCron(Instrumentation.Cron cron) {
430 if (instrumentation != null) {
431 instrumentation.addCron(INSTRUMENTATION_GROUP, INSTR_IN_QUEUE_TIME_TIMER, cron);
432 }
433 }
434
435 /**
436 * Initialize the command queue service.
437 *
438 * @param services services instance.
439 */
440 @Override
441 @SuppressWarnings({ "unchecked", "rawtypes" })
442 public void init(Services services) {
443 Configuration conf = services.getConf();
444
445 queueSize = conf.getInt(CONF_QUEUE_SIZE, 10000);
446 int threads = conf.getInt(CONF_THREADS, 10);
447 boolean callableNextEligible = conf.getBoolean(CONF_CALLABLE_NEXT_ELIGIBLE, true);
448
449 for (String type : conf.getStringCollection(CONF_CALLABLE_INTERRUPT_TYPES)) {
450 log.debug("Adding interrupt type [{0}]", type);
451 INTERRUPT_TYPES.add(type);
452 }
453
454 if (!callableNextEligible) {
455 queue = new PriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
456 @Override
457 protected void debug(String msgTemplate, Object... msgArgs) {
458 log.trace(msgTemplate, msgArgs);
459 }
460 };
461 }
462 else {
463 // If the head of this queue has already reached max concurrency,
464 // continuously find next one
465 // which has not yet reach max concurrency.Overrided method
466 // 'eligibleToPoll' to check if the
467 // element of this queue has reached the maximum concurrency.
468 queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
469 @Override
470 protected void debug(String msgTemplate, Object... msgArgs) {
471 log.trace(msgTemplate, msgArgs);
472 }
473
474 @Override
475 protected boolean eligibleToPoll(QueueElement<?> element) {
476 if (element != null) {
477 CallableWrapper wrapper = (CallableWrapper) element;
478 if (element.getElement() != null) {
479 return callableReachMaxConcurrency(wrapper.getElement());
480 }
481 }
482 return false;
483 }
484
485 };
486 }
487
488 interruptMapMaxSize = conf.getInt(CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE, 100);
489
490 // IMPORTANT: The ThreadPoolExecutor does not always the execute
491 // commands out of the queue, there are
492 // certain conditions where commands are pushed directly to a thread.
493 // As we are using a queue with DELAYED semantics (i.e. execute the
494 // command in 5 mins) we need to make
495 // sure that the commands are always pushed to the queue.
496 // To achieve this (by looking a the ThreadPoolExecutor.execute()
497 // implementation, we are making the pool
498 // minimum size equals to the maximum size (thus threads are keep always
499 // running) and we are warming up
500 // all those threads (the for loop that runs dummy runnables).
501 executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue);
502
503 for (int i = 0; i < threads; i++) {
504 executor.execute(new Runnable() {
505 public void run() {
506 try {
507 Thread.sleep(100);
508 }
509 catch (InterruptedException ex) {
510 log.warn("Could not warm up threadpool {0}", ex.getMessage(), ex);
511 }
512 }
513 });
514 }
515
516 maxCallableConcurrency = conf.getInt(CONF_CALLABLE_CONCURRENCY, 3);
517 }
518
519 /**
520 * Destroy the command queue service.
521 */
522 @Override
523 public void destroy() {
524 try {
525 long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds
526 executor.shutdown();
527 queue.clear();
528 while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
529 log.info("Waiting for executor to shutdown");
530 if (System.currentTimeMillis() > limit) {
531 log.warn("Gave up, continuing without waiting for executor to shutdown");
532 break;
533 }
534 }
535 }
536 catch (InterruptedException ex) {
537 log.warn(ex);
538 }
539 }
540
541 /**
542 * Return the public interface for command queue service.
543 *
544 * @return {@link CallableQueueService}.
545 */
546 @Override
547 public Class<? extends Service> getInterface() {
548 return CallableQueueService.class;
549 }
550
551 /**
552 * @return int size of queue
553 */
554 public synchronized int queueSize() {
555 return queue.size();
556 }
557
558 private synchronized boolean queue(CallableWrapper wrapper, boolean ignoreQueueSize) {
559 if (!ignoreQueueSize && queue.size() >= queueSize) {
560 log.warn("queue if full, ignoring queuing for [{0}]", wrapper.getElement());
561 return false;
562 }
563 if (!executor.isShutdown()) {
564 if (wrapper.filterDuplicates()) {
565 wrapper.addToUniqueCallables();
566 try {
567 executor.execute(wrapper);
568 }
569 catch (RejectedExecutionException ree) {
570 wrapper.removeFromUniqueCallables();
571 throw ree;
572 }
573 }
574 }
575 else {
576 log.warn("Executor shutting down, ignoring queueing of [{0}]", wrapper.getElement());
577 }
578 return true;
579 }
580
581 /**
582 * Queue a callable for asynchronous execution.
583 *
584 * @param callable callable to queue.
585 * @return <code>true</code> if the callable was queued, <code>false</code> if the queue is full and the callable
586 * was not queued.
587 */
588 public boolean queue(XCallable<?> callable) {
589 return queue(callable, 0);
590 }
591
592 /**
593 * Queue a list of callables for serial execution.
594 * <p/>
595 * Useful to serialize callables that may compete with each other for resources.
596 * <p/>
597 * All callables will be processed with the priority of the highest priority of all callables.
598 *
599 * @param callables callables to be executed by the composite callable.
600 * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables
601 * were not queued.
602 */
603 public boolean queueSerial(List<? extends XCallable<?>> callables) {
604 return queueSerial(callables, 0);
605 }
606
607 /**
608 * Queue a callable for asynchronous execution sometime in the future.
609 *
610 * @param callable callable to queue for delayed execution
611 * @param delay time, in milliseconds, that the callable should be delayed.
612 * @return <code>true</code> if the callable was queued, <code>false</code>
613 * if the queue is full and the callable was not queued.
614 */
615 public synchronized boolean queue(XCallable<?> callable, long delay) {
616 if (callable == null) {
617 return true;
618 }
619 boolean queued = false;
620 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
621 log.warn("[queue] System is in SAFEMODE. Hence no callable is queued. current queue size " + queue.size());
622 }
623 else {
624 checkInterruptTypes(callable);
625 queued = queue(new CallableWrapper(callable, delay), false);
626 if (queued) {
627 incrCounter(INSTR_QUEUED_COUNTER, 1);
628 }
629 else {
630 log.warn("Could not queue callable");
631 }
632 }
633 return queued;
634 }
635
636 /**
637 * Queue a list of callables for serial execution sometime in the future.
638 * <p/>
639 * Useful to serialize callables that may compete with each other for resources.
640 * <p/>
641 * All callables will be processed with the priority of the highest priority of all callables.
642 *
643 * @param callables callables to be executed by the composite callable.
644 * @param delay time, in milliseconds, that the callable should be delayed.
645 * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables
646 * were not queued.
647 */
648 public synchronized boolean queueSerial(List<? extends XCallable<?>> callables, long delay) {
649 boolean queued;
650 if (callables == null || callables.size() == 0) {
651 queued = true;
652 }
653 else if (callables.size() == 1) {
654 queued = queue(callables.get(0), delay);
655 }
656 else {
657 XCallable<?> callable = new CompositeCallable(callables);
658 queued = queue(callable, delay);
659 if (queued) {
660 incrCounter(INSTR_QUEUED_COUNTER, callables.size());
661 }
662 }
663 return queued;
664 }
665
666 /**
667 * Instruments the callable queue service.
668 *
669 * @param instr instance to instrument the callable queue service to.
670 */
671 public void instrument(Instrumentation instr) {
672 instrumentation = instr;
673 instr.addSampler(INSTRUMENTATION_GROUP, INSTR_QUEUE_SIZE_SAMPLER, 60, 1, new Instrumentation.Variable<Long>() {
674 public Long getValue() {
675 return (long) queue.size();
676 }
677 });
678 instr.addSampler(INSTRUMENTATION_GROUP, INSTR_THREADS_ACTIVE_SAMPLER, 60, 1,
679 new Instrumentation.Variable<Long>() {
680 public Long getValue() {
681 return (long) executor.getActiveCount();
682 }
683 });
684 }
685
686 /**
687 * check the interrupt map for the existence of an interrupt commands if
688 * exist a List of Interrupt Callable for the same lock key will bereturned,
689 * otherwise it will return null
690 */
691 public Set<XCallable<?>> checkInterrupts(String lockKey) {
692
693 if (lockKey != null) {
694 return interruptCommandsMap.remove(lockKey);
695 }
696 return null;
697 }
698
699 /**
700 * check if the callable is of an interrupt type and insert it into the map
701 * accordingly
702 *
703 * @param callable
704 */
705 public void checkInterruptTypes(XCallable<?> callable) {
706 if ((callable instanceof CompositeCallable) && (((CompositeCallable) callable).getCallables() != null)) {
707 for (XCallable<?> singleCallable : ((CompositeCallable) callable).getCallables()) {
708 if (INTERRUPT_TYPES.contains(singleCallable.getType())) {
709 insertCallableIntoInterruptMap(singleCallable);
710 }
711 }
712 }
713 else if (INTERRUPT_TYPES.contains(callable.getType())) {
714 insertCallableIntoInterruptMap(callable);
715 }
716 }
717
718 /**
719 * insert a new callable in the Interrupt Command Map add a new element to
720 * the list or create a new list accordingly
721 *
722 * @param callable
723 */
724 public void insertCallableIntoInterruptMap(XCallable<?> callable) {
725 if (interruptCommandsMap.size() < interruptMapMaxSize) {
726 Set<XCallable<?>> newSet = Collections.synchronizedSet(new LinkedHashSet<XCallable<?>>());
727 Set<XCallable<?>> interruptSet = interruptCommandsMap.putIfAbsent(callable.getEntityKey(), newSet);
728 if (interruptSet == null) {
729 interruptSet = newSet;
730 }
731 if (interruptSet.add(callable)) {
732 log.trace("Inserting an interrupt element [{0}] to the interrupt map", callable.toString());
733 } else {
734 log.trace("Interrupt element [{0}] already present", callable.toString());
735 }
736 }
737 else {
738 log.warn(
739 "The interrupt map reached max size of [{0}], an interrupt element [{1}] will not added to the map [{1}]",
740 interruptCommandsMap.size(), callable.toString());
741 }
742 }
743
744 /**
745 * Get the list of strings of queue dump
746 *
747 * @return the list of string that representing each CallableWrapper
748 */
749 public List<String> getQueueDump() {
750 List<String> list = new ArrayList<String>();
751 for (QueueElement<CallableWrapper> qe : queue) {
752 if (qe.toString() == null) {
753 continue;
754 }
755 list.add(qe.toString());
756 }
757 return list;
758 }
759
760 /**
761 * Get the list of strings of uniqueness map dump
762 *
763 * @return the list of string that representing the key of each command in the queue
764 */
765 public List<String> getUniqueDump() {
766 List<String> list = new ArrayList<String>();
767 for (Entry<String, Date> entry : uniqueCallables.entrySet()) {
768 list.add(entry.toString());
769 }
770 return list;
771 }
772
773 }