001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.Date;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.LinkedHashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.Map.Entry;
030import java.util.concurrent.BlockingQueue;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.ThreadPoolExecutor;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicInteger;
035
036import org.apache.hadoop.conf.Configuration;
037import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
038import org.apache.oozie.util.Instrumentable;
039import org.apache.oozie.util.Instrumentation;
040import org.apache.oozie.util.PollablePriorityDelayQueue;
041import org.apache.oozie.util.PriorityDelayQueue;
042import org.apache.oozie.util.XCallable;
043import org.apache.oozie.util.XLog;
044import org.apache.oozie.util.PriorityDelayQueue.QueueElement;
045
046/**
047 * The callable queue service queues {@link XCallable}s for asynchronous execution.
048 * <p/>
049 * Callables can be queued for immediate execution or for delayed execution (some time in the future).
050 * <p/>
051 * Callables are consumed from the queue for execution based on their priority.
052 * <p/>
053 * When the queues (for immediate execution and for delayed execution) are full, the callable queue service stops
054 * queuing callables.
055 * <p/>
056 * A thread-pool is used to execute the callables asynchronously.
057 * <p/>
058 * The following configuration parameters control the callable queue service:
059 * <p/>
060 * {@link #CONF_QUEUE_SIZE} size of the immediate execution queue. Defaulf value is 10000.
061 * <p/>
062 * {@link #CONF_THREADS} number of threads in the thread-pool used for asynchronous command execution. When this number
063 * of threads is reached, commands remain the queue until threads become available. Sets up a priority queue for the
064 * execution of Commands via a ThreadPool. Sets up a Delayed Queue to handle actions which will be ready for execution
065 * sometime in the future.
066 */
067public class CallableQueueService implements Service, Instrumentable {
068    private static final String INSTRUMENTATION_GROUP = "callablequeue";
069    private static final String INSTR_IN_QUEUE_TIME_TIMER = "time.in.queue";
070    private static final String INSTR_EXECUTED_COUNTER = "executed";
071    private static final String INSTR_FAILED_COUNTER = "failed";
072    private static final String INSTR_QUEUED_COUNTER = "queued";
073    private static final String INSTR_QUEUE_SIZE_SAMPLER = "queue.size";
074    private static final String INSTR_THREADS_ACTIVE_SAMPLER = "threads.active";
075
076    public static final String CONF_PREFIX = Service.CONF_PREFIX + "CallableQueueService.";
077
078    public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size";
079    public static final String CONF_THREADS = CONF_PREFIX + "threads";
080    public static final String CONF_CALLABLE_CONCURRENCY = CONF_PREFIX + "callable.concurrency";
081    public static final String CONF_CALLABLE_NEXT_ELIGIBLE = CONF_PREFIX + "callable.next.eligible";
082    public static final String CONF_CALLABLE_INTERRUPT_TYPES = CONF_PREFIX + "InterruptTypes";
083    public static final String CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE = CONF_PREFIX + "InterruptMapMaxSize";
084
085    public static final int CONCURRENCY_DELAY = 500;
086
087    public static final int SAFE_MODE_DELAY = 60000;
088
089    private final Map<String, AtomicInteger> activeCallables = new HashMap<String, AtomicInteger>();
090
091    private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>();
092
093    private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<String, Set<XCallable<?>>>();
094
095    public static final HashSet<String> INTERRUPT_TYPES = new HashSet<String>();
096
097    private int interruptMapMaxSize;
098
099    private int maxCallableConcurrency;
100
101    private boolean callableBegin(XCallable<?> callable) {
102        synchronized (activeCallables) {
103            AtomicInteger counter = activeCallables.get(callable.getType());
104            if (counter == null) {
105                counter = new AtomicInteger(1);
106                activeCallables.put(callable.getType(), counter);
107                return true;
108            }
109            else {
110                int i = counter.incrementAndGet();
111                return i <= maxCallableConcurrency;
112            }
113        }
114    }
115
116    private void callableEnd(XCallable<?> callable) {
117        synchronized (activeCallables) {
118            AtomicInteger counter = activeCallables.get(callable.getType());
119            if (counter == null) {
120                throw new IllegalStateException("It should not happen");
121            }
122            else {
123                counter.decrementAndGet();
124            }
125        }
126    }
127
128    private boolean callableReachMaxConcurrency(XCallable<?> callable) {
129        synchronized (activeCallables) {
130            AtomicInteger counter = activeCallables.get(callable.getType());
131            if (counter == null) {
132                return true;
133            }
134            else {
135                int i = counter.get();
136                return i < maxCallableConcurrency;
137            }
138        }
139    }
140
141    // Callables are wrapped with the this wrapper for execution, for logging
142    // and instrumentation.
143    // The wrapper implements Runnable and Comparable to be able to work with an
144    // executor and a priority queue.
145    class CallableWrapper extends PriorityDelayQueue.QueueElement<XCallable<?>> implements Runnable {
146        private Instrumentation.Cron cron;
147
148        public CallableWrapper(XCallable<?> callable, long delay) {
149            super(callable, callable.getPriority(), delay, TimeUnit.MILLISECONDS);
150            cron = new Instrumentation.Cron();
151            cron.start();
152        }
153
154        public void run() {
155            XCallable<?> callable = null;
156            try {
157                removeFromUniqueCallables();
158                if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
159                    log.info("Oozie is in SAFEMODE, requeuing callable [{0}] with [{1}]ms delay", getElement().getType(),
160                            SAFE_MODE_DELAY);
161                    setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS);
162                    queue(this, true);
163                    return;
164                }
165                callable = getElement();
166                if (callableBegin(callable)) {
167                    cron.stop();
168                    addInQueueCron(cron);
169                    XLog.Info.get().clear();
170                    XLog log = XLog.getLog(getClass());
171                    log.trace("executing callable [{0}]", callable.getName());
172
173                    try {
174                        callable.call();
175                        incrCounter(INSTR_EXECUTED_COUNTER, 1);
176                        log.trace("executed callable [{0}]", callable.getName());
177                    }
178                    catch (Exception ex) {
179                        incrCounter(INSTR_FAILED_COUNTER, 1);
180                        log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
181                    }
182                    finally {
183                        XLog.Info.get().clear();
184                    }
185                }
186                else {
187                    log.warn("max concurrency for callable [{0}] exceeded, requeueing with [{1}]ms delay", callable
188                            .getType(), CONCURRENCY_DELAY);
189                    setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS);
190                    queue(this, true);
191                    incrCounter(callable.getType() + "#exceeded.concurrency", 1);
192                }
193            }
194            catch (Throwable t) {
195                incrCounter(INSTR_FAILED_COUNTER, 1);
196                log.warn("exception callable [{0}], {1}", callable == null ? "N/A" : callable.getName(),
197                        t.getMessage(), t);
198            }
199            finally {
200                if (callable != null) {
201                    callableEnd(callable);
202                }
203            }
204        }
205
206        /**
207         * Filter the duplicate callables from the list before queue this.
208         * <p/>
209         * If it is single callable, checking if key is in unique map or not.
210         * <p/>
211         * If it is composite callable, remove duplicates callables from the composite.
212         *
213         * @return true if this callable should be queued
214         */
215        public boolean filterDuplicates() {
216            XCallable<?> callable = getElement();
217            if (callable instanceof CompositeCallable) {
218                return ((CompositeCallable) callable).removeDuplicates();
219            }
220            else {
221                return uniqueCallables.containsKey(callable.getKey()) == false;
222            }
223        }
224
225        /**
226         * Add the keys to the set
227         */
228        public void addToUniqueCallables() {
229            XCallable<?> callable = getElement();
230            if (callable instanceof CompositeCallable) {
231                ((CompositeCallable) callable).addToUniqueCallables();
232            }
233            else {
234                ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date());
235            }
236        }
237
238        /**
239         * Remove the keys from the set
240         */
241        public void removeFromUniqueCallables() {
242            XCallable<?> callable = getElement();
243            if (callable instanceof CompositeCallable) {
244                ((CompositeCallable) callable).removeFromUniqueCallables();
245            }
246            else {
247                uniqueCallables.remove(callable.getKey());
248            }
249        }
250    }
251
252    class CompositeCallable implements XCallable<Void> {
253        private List<XCallable<?>> callables;
254        private String name;
255        private int priority;
256        private long createdTime;
257
258        public CompositeCallable(List<? extends XCallable<?>> callables) {
259            this.callables = new ArrayList<XCallable<?>>(callables);
260            priority = 0;
261            createdTime = Long.MAX_VALUE;
262            StringBuilder sb = new StringBuilder();
263            String separator = "[";
264            for (XCallable<?> callable : callables) {
265                priority = Math.max(priority, callable.getPriority());
266                createdTime = Math.min(createdTime, callable.getCreatedTime());
267                sb.append(separator).append(callable.getName());
268                separator = ",";
269            }
270            sb.append("]");
271            name = sb.toString();
272        }
273
274        @Override
275        public String getName() {
276            return name;
277        }
278
279        @Override
280        public String getType() {
281            return "#composite#" + callables.get(0).getType();
282        }
283
284        @Override
285        public String getKey() {
286            return "#composite#" + callables.get(0).getKey();
287        }
288
289        @Override
290        public String getEntityKey() {
291            return "#composite#" + callables.get(0).getEntityKey();
292        }
293
294        @Override
295        public int getPriority() {
296            return priority;
297        }
298
299        @Override
300        public long getCreatedTime() {
301            return createdTime;
302        }
303
304        @Override
305        public void setInterruptMode(boolean mode) {
306        }
307
308        @Override
309        public boolean inInterruptMode() {
310            return false;
311        }
312
313        public List<XCallable<?>> getCallables() {
314            return this.callables;
315        }
316
317        public Void call() throws Exception {
318            XLog log = XLog.getLog(getClass());
319
320            for (XCallable<?> callable : callables) {
321                log.trace("executing callable [{0}]", callable.getName());
322                try {
323                    callable.call();
324                    incrCounter(INSTR_EXECUTED_COUNTER, 1);
325                    log.trace("executed callable [{0}]", callable.getName());
326                }
327                catch (Exception ex) {
328                    incrCounter(INSTR_FAILED_COUNTER, 1);
329                    log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
330                }
331            }
332
333            // ticking -1 not to count the call to the composite callable
334            incrCounter(INSTR_EXECUTED_COUNTER, -1);
335            return null;
336        }
337
338        /*
339         * (non-Javadoc)
340         *
341         * @see java.lang.Object#toString()
342         */
343        @Override
344        public String toString() {
345            if (callables.size() == 0) {
346                return null;
347            }
348            StringBuilder sb = new StringBuilder();
349            int size = callables.size();
350            for (int i = 0; i < size; i++) {
351                XCallable<?> callable = callables.get(i);
352                sb.append("(");
353                sb.append(callable.toString());
354                if (i + 1 == size) {
355                    sb.append(")");
356                }
357                else {
358                    sb.append("),");
359                }
360            }
361            return sb.toString();
362        }
363
364        /**
365         * Remove the duplicate callables from the list before queue them
366         *
367         * @return true if callables should be queued
368         */
369        public boolean removeDuplicates() {
370            Set<String> set = new HashSet<String>();
371            List<XCallable<?>> filteredCallables = new ArrayList<XCallable<?>>();
372            if (callables.size() == 0) {
373                return false;
374            }
375            for (XCallable<?> callable : callables) {
376                if (!uniqueCallables.containsKey(callable.getKey()) && !set.contains(callable.getKey())) {
377                    filteredCallables.add(callable);
378                    set.add(callable.getKey());
379                }
380            }
381            callables = filteredCallables;
382            if (callables.size() == 0) {
383                return false;
384            }
385            return true;
386        }
387
388        /**
389         * Add the keys to the set
390         */
391        public void addToUniqueCallables() {
392            for (XCallable<?> callable : callables) {
393                ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date());
394            }
395        }
396
397        /**
398         * Remove the keys from the set
399         */
400        public void removeFromUniqueCallables() {
401            for (XCallable<?> callable : callables) {
402                uniqueCallables.remove(callable.getKey());
403            }
404        }
405    }
406
407    private XLog log = XLog.getLog(getClass());
408
409    private int queueSize;
410    private PriorityDelayQueue<CallableWrapper> queue;
411    private ThreadPoolExecutor executor;
412    private Instrumentation instrumentation;
413
414    /**
415     * Convenience method for instrumentation counters.
416     *
417     * @param name counter name.
418     * @param count count to increment the counter.
419     */
420    private void incrCounter(String name, int count) {
421        if (instrumentation != null) {
422            instrumentation.incr(INSTRUMENTATION_GROUP, name, count);
423        }
424    }
425
426    private void addInQueueCron(Instrumentation.Cron cron) {
427        if (instrumentation != null) {
428            instrumentation.addCron(INSTRUMENTATION_GROUP, INSTR_IN_QUEUE_TIME_TIMER, cron);
429        }
430    }
431
432    /**
433     * Initialize the command queue service.
434     *
435     * @param services services instance.
436     */
437    @Override
438    @SuppressWarnings({ "unchecked", "rawtypes" })
439    public void init(Services services) {
440        Configuration conf = services.getConf();
441
442        queueSize = conf.getInt(CONF_QUEUE_SIZE, 10000);
443        int threads = conf.getInt(CONF_THREADS, 10);
444        boolean callableNextEligible = conf.getBoolean(CONF_CALLABLE_NEXT_ELIGIBLE, true);
445
446        for (String type : conf.getStringCollection(CONF_CALLABLE_INTERRUPT_TYPES)) {
447            log.debug("Adding interrupt type [{0}]", type);
448            INTERRUPT_TYPES.add(type);
449        }
450
451        if (!callableNextEligible) {
452            queue = new PriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
453                @Override
454                protected void debug(String msgTemplate, Object... msgArgs) {
455                    log.trace(msgTemplate, msgArgs);
456                }
457            };
458        }
459        else {
460            // If the head of this queue has already reached max concurrency,
461            // continuously find next one
462            // which has not yet reach max concurrency.Overrided method
463            // 'eligibleToPoll' to check if the
464            // element of this queue has reached the maximum concurrency.
465            queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
466                @Override
467                protected void debug(String msgTemplate, Object... msgArgs) {
468                    log.trace(msgTemplate, msgArgs);
469                }
470
471                @Override
472                protected boolean eligibleToPoll(QueueElement<?> element) {
473                    if (element != null) {
474                        CallableWrapper wrapper = (CallableWrapper) element;
475                        if (element.getElement() != null) {
476                            return callableReachMaxConcurrency(wrapper.getElement());
477                        }
478                    }
479                    return false;
480                }
481
482            };
483        }
484
485        interruptMapMaxSize = conf.getInt(CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE, 100);
486
487        // IMPORTANT: The ThreadPoolExecutor does not always the execute
488        // commands out of the queue, there are
489        // certain conditions where commands are pushed directly to a thread.
490        // As we are using a queue with DELAYED semantics (i.e. execute the
491        // command in 5 mins) we need to make
492        // sure that the commands are always pushed to the queue.
493        // To achieve this (by looking a the ThreadPoolExecutor.execute()
494        // implementation, we are making the pool
495        // minimum size equals to the maximum size (thus threads are keep always
496        // running) and we are warming up
497        // all those threads (the for loop that runs dummy runnables).
498        executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue);
499
500        for (int i = 0; i < threads; i++) {
501            executor.execute(new Runnable() {
502                public void run() {
503                    try {
504                        Thread.sleep(100);
505                    }
506                    catch (InterruptedException ex) {
507                        log.warn("Could not warm up threadpool {0}", ex.getMessage(), ex);
508                    }
509                }
510            });
511        }
512
513        maxCallableConcurrency = conf.getInt(CONF_CALLABLE_CONCURRENCY, 3);
514    }
515
516    /**
517     * Destroy the command queue service.
518     */
519    @Override
520    public void destroy() {
521        try {
522            long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds
523            executor.shutdown();
524            queue.clear();
525            while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
526                log.info("Waiting for executor to shutdown");
527                if (System.currentTimeMillis() > limit) {
528                    log.warn("Gave up, continuing without waiting for executor to shutdown");
529                    break;
530                }
531            }
532        }
533        catch (InterruptedException ex) {
534            log.warn(ex);
535        }
536    }
537
538    /**
539     * Return the public interface for command queue service.
540     *
541     * @return {@link CallableQueueService}.
542     */
543    @Override
544    public Class<? extends Service> getInterface() {
545        return CallableQueueService.class;
546    }
547
548    /**
549     * @return int size of queue
550     */
551    public synchronized int queueSize() {
552        return queue.size();
553    }
554
555    private synchronized boolean queue(CallableWrapper wrapper, boolean ignoreQueueSize) {
556        if (!ignoreQueueSize && queue.size() >= queueSize) {
557            log.warn("queue full, ignoring queuing for [{0}]", wrapper.getElement().getKey());
558            return false;
559        }
560        if (!executor.isShutdown()) {
561            if (wrapper.filterDuplicates()) {
562                wrapper.addToUniqueCallables();
563                try {
564                    executor.execute(wrapper);
565                }
566                catch (Throwable ree) {
567                    wrapper.removeFromUniqueCallables();
568                    throw new RuntimeException(ree);
569                }
570            }
571        }
572        else {
573            log.warn("Executor shutting down, ignoring queueing of [{0}]", wrapper.getElement().getKey());
574        }
575        return true;
576    }
577
578    /**
579     * Queue a callable for asynchronous execution.
580     *
581     * @param callable callable to queue.
582     * @return <code>true</code> if the callable was queued, <code>false</code> if the queue is full and the callable
583     *         was not queued.
584     */
585    public boolean queue(XCallable<?> callable) {
586        return queue(callable, 0);
587    }
588
589    /**
590     * Queue a list of callables for serial execution.
591     * <p/>
592     * Useful to serialize callables that may compete with each other for resources.
593     * <p/>
594     * All callables will be processed with the priority of the highest priority of all callables.
595     *
596     * @param callables callables to be executed by the composite callable.
597     * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables
598     *         were not queued.
599     */
600    public boolean queueSerial(List<? extends XCallable<?>> callables) {
601        return queueSerial(callables, 0);
602    }
603
604    /**
605     * Queue a callable for asynchronous execution sometime in the future.
606     *
607     * @param callable callable to queue for delayed execution
608     * @param delay time, in milliseconds, that the callable should be delayed.
609     * @return <code>true</code> if the callable was queued, <code>false</code>
610     *         if the queue is full and the callable was not queued.
611     */
612    public synchronized boolean queue(XCallable<?> callable, long delay) {
613        if (callable == null) {
614            return true;
615        }
616        boolean queued = false;
617        if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
618            log.warn("[queue] System is in SAFEMODE. Hence no callable is queued. current queue size " + queue.size());
619        }
620        else {
621            checkInterruptTypes(callable);
622            queued = queue(new CallableWrapper(callable, delay), false);
623            if (queued) {
624                incrCounter(INSTR_QUEUED_COUNTER, 1);
625            }
626            else {
627                log.warn("Could not queue callable");
628            }
629        }
630        return queued;
631    }
632
633    /**
634     * Queue a list of callables for serial execution sometime in the future.
635     * <p/>
636     * Useful to serialize callables that may compete with each other for resources.
637     * <p/>
638     * All callables will be processed with the priority of the highest priority of all callables.
639     *
640     * @param callables callables to be executed by the composite callable.
641     * @param delay time, in milliseconds, that the callable should be delayed.
642     * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables
643     *         were not queued.
644     */
645    public synchronized boolean queueSerial(List<? extends XCallable<?>> callables, long delay) {
646        boolean queued;
647        if (callables == null || callables.size() == 0) {
648            queued = true;
649        }
650        else if (callables.size() == 1) {
651            queued = queue(callables.get(0), delay);
652        }
653        else {
654            XCallable<?> callable = new CompositeCallable(callables);
655            queued = queue(callable, delay);
656            if (queued) {
657                incrCounter(INSTR_QUEUED_COUNTER, callables.size());
658            }
659        }
660        return queued;
661    }
662
663    /**
664     * Instruments the callable queue service.
665     *
666     * @param instr instance to instrument the callable queue service to.
667     */
668    public void instrument(Instrumentation instr) {
669        instrumentation = instr;
670        instr.addSampler(INSTRUMENTATION_GROUP, INSTR_QUEUE_SIZE_SAMPLER, 60, 1, new Instrumentation.Variable<Long>() {
671            public Long getValue() {
672                return (long) queue.size();
673            }
674        });
675        instr.addSampler(INSTRUMENTATION_GROUP, INSTR_THREADS_ACTIVE_SAMPLER, 60, 1,
676                new Instrumentation.Variable<Long>() {
677                    public Long getValue() {
678                        return (long) executor.getActiveCount();
679                    }
680                });
681    }
682
683    /**
684     * check the interrupt map for the existence of an interrupt commands if
685     * exist a List of Interrupt Callable for the same lock key will bereturned,
686     * otherwise it will return null
687     */
688    public Set<XCallable<?>> checkInterrupts(String lockKey) {
689
690        if (lockKey != null) {
691            return interruptCommandsMap.remove(lockKey);
692        }
693        return null;
694    }
695
696    /**
697     * check if the callable is of an interrupt type and insert it into the map
698     * accordingly
699     *
700     * @param callable
701     */
702    public void checkInterruptTypes(XCallable<?> callable) {
703        if ((callable instanceof CompositeCallable) && (((CompositeCallable) callable).getCallables() != null)) {
704            for (XCallable<?> singleCallable : ((CompositeCallable) callable).getCallables()) {
705                if (INTERRUPT_TYPES.contains(singleCallable.getType())) {
706                    insertCallableIntoInterruptMap(singleCallable);
707                }
708            }
709        }
710        else if (INTERRUPT_TYPES.contains(callable.getType())) {
711            insertCallableIntoInterruptMap(callable);
712        }
713    }
714
715    /**
716     * insert a new callable in the Interrupt Command Map add a new element to
717     * the list or create a new list accordingly
718     *
719     * @param callable
720     */
721    public void insertCallableIntoInterruptMap(XCallable<?> callable) {
722        if (interruptCommandsMap.size() < interruptMapMaxSize) {
723            Set<XCallable<?>> newSet = Collections.synchronizedSet(new LinkedHashSet<XCallable<?>>());
724            Set<XCallable<?>> interruptSet = interruptCommandsMap.putIfAbsent(callable.getEntityKey(), newSet);
725            if (interruptSet == null) {
726                interruptSet = newSet;
727            }
728            if (interruptSet.add(callable)) {
729                log.trace("Inserting an interrupt element [{0}] to the interrupt map", callable.toString());
730            } else {
731                log.trace("Interrupt element [{0}] already present", callable.toString());
732            }
733        }
734        else {
735            log.warn(
736                    "The interrupt map reached max size of [{0}], an interrupt element [{1}] will not added to the map [{1}]",
737                    interruptCommandsMap.size(), callable.toString());
738        }
739    }
740
741    /**
742     * Get the list of strings of queue dump
743     *
744     * @return the list of string that representing each CallableWrapper
745     */
746    public List<String> getQueueDump() {
747        List<String> list = new ArrayList<String>();
748        for (QueueElement<CallableWrapper> qe : queue) {
749            if (qe.toString() == null) {
750                continue;
751            }
752            list.add(qe.toString());
753        }
754        return list;
755    }
756
757    /**
758     * Get the list of strings of uniqueness map dump
759     *
760     * @return the list of string that representing the key of each command in the queue
761     */
762    public List<String> getUniqueDump() {
763        List<String> list = new ArrayList<String>();
764        for (Entry<String, Date> entry : uniqueCallables.entrySet()) {
765            list.add(entry.toString());
766        }
767        return list;
768    }
769
770}