This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.Date;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.LinkedHashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.Map.Entry;
031import java.util.concurrent.BlockingQueue;
032import java.util.concurrent.ConcurrentHashMap;
033import java.util.concurrent.ThreadPoolExecutor;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicInteger;
036
037import org.apache.hadoop.conf.Configuration;
038import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
039import org.apache.oozie.command.XCommand;
040import org.apache.oozie.util.Instrumentable;
041import org.apache.oozie.util.Instrumentation;
042import org.apache.oozie.util.PollablePriorityDelayQueue;
043import org.apache.oozie.util.PriorityDelayQueue;
044import org.apache.oozie.util.XCallable;
045import org.apache.oozie.util.XLog;
046import org.apache.oozie.util.PriorityDelayQueue.QueueElement;
047
048/**
049 * The callable queue service queues {@link XCallable}s for asynchronous execution.
050 * <p/>
051 * Callables can be queued for immediate execution or for delayed execution (some time in the future).
052 * <p/>
053 * Callables are consumed from the queue for execution based on their priority.
054 * <p/>
055 * When the queues (for immediate execution and for delayed execution) are full, the callable queue service stops
056 * queuing callables.
057 * <p/>
058 * A thread-pool is used to execute the callables asynchronously.
059 * <p/>
060 * The following configuration parameters control the callable queue service:
061 * <p/>
062 * {@link #CONF_QUEUE_SIZE} size of the immediate execution queue. Defaulf value is 10000.
063 * <p/>
064 * {@link #CONF_THREADS} number of threads in the thread-pool used for asynchronous command execution. When this number
065 * of threads is reached, commands remain the queue until threads become available. Sets up a priority queue for the
066 * execution of Commands via a ThreadPool. Sets up a Delayed Queue to handle actions which will be ready for execution
067 * sometime in the future.
068 */
069public class CallableQueueService implements Service, Instrumentable {
070    private static final String INSTRUMENTATION_GROUP = "callablequeue";
071    private static final String INSTR_IN_QUEUE_TIME_TIMER = "time.in.queue";
072    private static final String INSTR_EXECUTED_COUNTER = "executed";
073    private static final String INSTR_FAILED_COUNTER = "failed";
074    private static final String INSTR_QUEUED_COUNTER = "queued";
075    private static final String INSTR_QUEUE_SIZE_SAMPLER = "queue.size";
076    private static final String INSTR_THREADS_ACTIVE_SAMPLER = "threads.active";
077
078    public static final String CONF_PREFIX = Service.CONF_PREFIX + "CallableQueueService.";
079
080    public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size";
081    public static final String CONF_THREADS = CONF_PREFIX + "threads";
082    public static final String CONF_CALLABLE_CONCURRENCY = CONF_PREFIX + "callable.concurrency";
083    public static final String CONF_CALLABLE_NEXT_ELIGIBLE = CONF_PREFIX + "callable.next.eligible";
084    public static final String CONF_CALLABLE_INTERRUPT_TYPES = CONF_PREFIX + "InterruptTypes";
085    public static final String CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE = CONF_PREFIX + "InterruptMapMaxSize";
086
087    public static final int CONCURRENCY_DELAY = 500;
088
089    public static final int SAFE_MODE_DELAY = 60000;
090
091    private final Map<String, AtomicInteger> activeCallables = new HashMap<String, AtomicInteger>();
092
093    private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>();
094
095    private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<String, Set<XCallable<?>>>();
096
097    public static final HashSet<String> INTERRUPT_TYPES = new HashSet<String>();
098
099    private int interruptMapMaxSize;
100
101    private int maxCallableConcurrency;
102
103    private boolean callableBegin(XCallable<?> callable) {
104        synchronized (activeCallables) {
105            AtomicInteger counter = activeCallables.get(callable.getType());
106            if (counter == null) {
107                counter = new AtomicInteger(1);
108                activeCallables.put(callable.getType(), counter);
109                return true;
110            }
111            else {
112                int i = counter.incrementAndGet();
113                return i <= maxCallableConcurrency;
114            }
115        }
116    }
117
118    private void callableEnd(XCallable<?> callable) {
119        synchronized (activeCallables) {
120            AtomicInteger counter = activeCallables.get(callable.getType());
121            if (counter == null) {
122                throw new IllegalStateException("It should not happen");
123            }
124            else {
125                counter.decrementAndGet();
126            }
127        }
128    }
129
130    private boolean callableReachMaxConcurrency(XCallable<?> callable) {
131        synchronized (activeCallables) {
132            AtomicInteger counter = activeCallables.get(callable.getType());
133            if (counter == null) {
134                return true;
135            }
136            else {
137                int i = counter.get();
138                return i < maxCallableConcurrency;
139            }
140        }
141    }
142
143    // Callables are wrapped with the this wrapper for execution, for logging
144    // and instrumentation.
145    // The wrapper implements Runnable and Comparable to be able to work with an
146    // executor and a priority queue.
147    class CallableWrapper extends PriorityDelayQueue.QueueElement<XCallable<?>> implements Runnable {
148        private Instrumentation.Cron cron;
149
150        public CallableWrapper(XCallable<?> callable, long delay) {
151            super(callable, callable.getPriority(), delay, TimeUnit.MILLISECONDS);
152            cron = new Instrumentation.Cron();
153            cron.start();
154        }
155
156        public void run() {
157            XCallable<?> callable = null;
158            try {
159                removeFromUniqueCallables();
160                if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
161                    log.info("Oozie is in SAFEMODE, requeuing callable [{0}] with [{1}]ms delay", getElement().getType(),
162                            SAFE_MODE_DELAY);
163                    setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS);
164                    queue(this, true);
165                    return;
166                }
167                callable = getElement();
168                if (callableBegin(callable)) {
169                    cron.stop();
170                    addInQueueCron(cron);
171                    XLog log = XLog.getLog(getClass());
172                    log.trace("executing callable [{0}]", callable.getName());
173
174                    try {
175                        callable.call();
176                        incrCounter(INSTR_EXECUTED_COUNTER, 1);
177                        log.trace("executed callable [{0}]", callable.getName());
178                    }
179                    catch (Exception ex) {
180                        incrCounter(INSTR_FAILED_COUNTER, 1);
181                        log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
182                    }
183                }
184                else {
185                    log.warn("max concurrency for callable [{0}] exceeded, requeueing with [{1}]ms delay", callable
186                            .getType(), CONCURRENCY_DELAY);
187                    setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS);
188                    queue(this, true);
189                    incrCounter(callable.getType() + "#exceeded.concurrency", 1);
190                }
191            }
192            catch (Throwable t) {
193                incrCounter(INSTR_FAILED_COUNTER, 1);
194                log.warn("exception callable [{0}], {1}", callable == null ? "N/A" : callable.getName(),
195                        t.getMessage(), t);
196            }
197            finally {
198                if (callable != null) {
199                    callableEnd(callable);
200                }
201            }
202        }
203
204        /**
205         * Filter the duplicate callables from the list before queue this.
206         * <p/>
207         * If it is single callable, checking if key is in unique map or not.
208         * <p/>
209         * If it is composite callable, remove duplicates callables from the composite.
210         *
211         * @return true if this callable should be queued
212         */
213        public boolean filterDuplicates() {
214            XCallable<?> callable = getElement();
215            if (callable instanceof CompositeCallable) {
216                return ((CompositeCallable) callable).removeDuplicates();
217            }
218            else {
219                return uniqueCallables.containsKey(callable.getKey()) == false;
220            }
221        }
222
223        /**
224         * Add the keys to the set
225         */
226        public void addToUniqueCallables() {
227            XCallable<?> callable = getElement();
228            if (callable instanceof CompositeCallable) {
229                ((CompositeCallable) callable).addToUniqueCallables();
230            }
231            else {
232                ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date());
233            }
234        }
235
236        /**
237         * Remove the keys from the set
238         */
239        public void removeFromUniqueCallables() {
240            XCallable<?> callable = getElement();
241            if (callable instanceof CompositeCallable) {
242                ((CompositeCallable) callable).removeFromUniqueCallables();
243            }
244            else {
245                uniqueCallables.remove(callable.getKey());
246            }
247        }
248    }
249
250    class CompositeCallable implements XCallable<Void> {
251        private List<XCallable<?>> callables;
252        private String name;
253        private int priority;
254        private long createdTime;
255
256        public CompositeCallable(List<? extends XCallable<?>> callables) {
257            this.callables = new ArrayList<XCallable<?>>(callables);
258            priority = 0;
259            createdTime = Long.MAX_VALUE;
260            StringBuilder sb = new StringBuilder();
261            String separator = "[";
262            for (XCallable<?> callable : callables) {
263                priority = Math.max(priority, callable.getPriority());
264                createdTime = Math.min(createdTime, callable.getCreatedTime());
265                sb.append(separator).append(callable.getName());
266                separator = ",";
267            }
268            sb.append("]");
269            name = sb.toString();
270        }
271
272        @Override
273        public String getName() {
274            return name;
275        }
276
277        @Override
278        public String getType() {
279            return "#composite#" + callables.get(0).getType();
280        }
281
282        @Override
283        public String getKey() {
284            return "#composite#" + callables.get(0).getKey();
285        }
286
287        @Override
288        public String getEntityKey() {
289            return "#composite#" + callables.get(0).getEntityKey();
290        }
291
292        @Override
293        public int getPriority() {
294            return priority;
295        }
296
297        @Override
298        public long getCreatedTime() {
299            return createdTime;
300        }
301
302        @Override
303        public void setInterruptMode(boolean mode) {
304        }
305
306        @Override
307        public boolean inInterruptMode() {
308            return false;
309        }
310
311        public List<XCallable<?>> getCallables() {
312            return this.callables;
313        }
314
315        public Void call() throws Exception {
316            XLog log = XLog.getLog(getClass());
317
318            for (XCallable<?> callable : callables) {
319                log.trace("executing callable [{0}]", callable.getName());
320                try {
321                    callable.call();
322                    incrCounter(INSTR_EXECUTED_COUNTER, 1);
323                    log.trace("executed callable [{0}]", callable.getName());
324                }
325                catch (Exception ex) {
326                    incrCounter(INSTR_FAILED_COUNTER, 1);
327                    log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
328                }
329            }
330
331            // ticking -1 not to count the call to the composite callable
332            incrCounter(INSTR_EXECUTED_COUNTER, -1);
333            return null;
334        }
335
336        /*
337         * (non-Javadoc)
338         *
339         * @see java.lang.Object#toString()
340         */
341        @Override
342        public String toString() {
343            if (callables.size() == 0) {
344                return null;
345            }
346            StringBuilder sb = new StringBuilder();
347            int size = callables.size();
348            for (int i = 0; i < size; i++) {
349                XCallable<?> callable = callables.get(i);
350                sb.append("(");
351                sb.append(callable.toString());
352                if (i + 1 == size) {
353                    sb.append(")");
354                }
355                else {
356                    sb.append("),");
357                }
358            }
359            return sb.toString();
360        }
361
362        /**
363         * Remove the duplicate callables from the list before queue them
364         *
365         * @return true if callables should be queued
366         */
367        public boolean removeDuplicates() {
368            Set<String> set = new HashSet<String>();
369            List<XCallable<?>> filteredCallables = new ArrayList<XCallable<?>>();
370            if (callables.size() == 0) {
371                return false;
372            }
373            for (XCallable<?> callable : callables) {
374                if (!uniqueCallables.containsKey(callable.getKey()) && !set.contains(callable.getKey())) {
375                    filteredCallables.add(callable);
376                    set.add(callable.getKey());
377                }
378            }
379            callables = filteredCallables;
380            if (callables.size() == 0) {
381                return false;
382            }
383            return true;
384        }
385
386        /**
387         * Add the keys to the set
388         */
389        public void addToUniqueCallables() {
390            for (XCallable<?> callable : callables) {
391                ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date());
392            }
393        }
394
395        /**
396         * Remove the keys from the set
397         */
398        public void removeFromUniqueCallables() {
399            for (XCallable<?> callable : callables) {
400                uniqueCallables.remove(callable.getKey());
401            }
402        }
403    }
404
405    private XLog log = XLog.getLog(getClass());
406
407    private int queueSize;
408    private PriorityDelayQueue<CallableWrapper> queue;
409    private ThreadPoolExecutor executor;
410    private Instrumentation instrumentation;
411
412    /**
413     * Convenience method for instrumentation counters.
414     *
415     * @param name counter name.
416     * @param count count to increment the counter.
417     */
418    private void incrCounter(String name, int count) {
419        if (instrumentation != null) {
420            instrumentation.incr(INSTRUMENTATION_GROUP, name, count);
421        }
422    }
423
424    private void addInQueueCron(Instrumentation.Cron cron) {
425        if (instrumentation != null) {
426            instrumentation.addCron(INSTRUMENTATION_GROUP, INSTR_IN_QUEUE_TIME_TIMER, cron);
427        }
428    }
429
430    /**
431     * Initialize the command queue service.
432     *
433     * @param services services instance.
434     */
435    @Override
436    @SuppressWarnings({ "unchecked", "rawtypes" })
437    public void init(Services services) {
438        Configuration conf = services.getConf();
439
440        queueSize = ConfigurationService.getInt(conf, CONF_QUEUE_SIZE);
441        int threads = ConfigurationService.getInt(conf, CONF_THREADS);
442        boolean callableNextEligible = ConfigurationService.getBoolean(conf, CONF_CALLABLE_NEXT_ELIGIBLE);
443
444        for (String type : ConfigurationService.getStrings(conf, CONF_CALLABLE_INTERRUPT_TYPES)) {
445            log.debug("Adding interrupt type [{0}]", type);
446            INTERRUPT_TYPES.add(type);
447        }
448
449        if (!callableNextEligible) {
450            queue = new PriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
451                @Override
452                protected void debug(String msgTemplate, Object... msgArgs) {
453                    log.trace(msgTemplate, msgArgs);
454                }
455            };
456        }
457        else {
458            // If the head of this queue has already reached max concurrency,
459            // continuously find next one
460            // which has not yet reach max concurrency.Overrided method
461            // 'eligibleToPoll' to check if the
462            // element of this queue has reached the maximum concurrency.
463            queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
464                @Override
465                protected void debug(String msgTemplate, Object... msgArgs) {
466                    log.trace(msgTemplate, msgArgs);
467                }
468
469                @Override
470                protected boolean eligibleToPoll(QueueElement<?> element) {
471                    if (element != null) {
472                        CallableWrapper wrapper = (CallableWrapper) element;
473                        if (element.getElement() != null) {
474                            return callableReachMaxConcurrency(wrapper.getElement());
475                        }
476                    }
477                    return false;
478                }
479
480            };
481        }
482
483        interruptMapMaxSize = ConfigurationService.getInt(conf, CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE);
484
485        // IMPORTANT: The ThreadPoolExecutor does not always the execute
486        // commands out of the queue, there are
487        // certain conditions where commands are pushed directly to a thread.
488        // As we are using a queue with DELAYED semantics (i.e. execute the
489        // command in 5 mins) we need to make
490        // sure that the commands are always pushed to the queue.
491        // To achieve this (by looking a the ThreadPoolExecutor.execute()
492        // implementation, we are making the pool
493        // minimum size equals to the maximum size (thus threads are keep always
494        // running) and we are warming up
495        // all those threads (the for loop that runs dummy runnables).
496        executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue){
497            protected void beforeExecute(Thread t, Runnable r) {
498                super.beforeExecute(t,r);
499                XLog.Info.get().clear();
500            }
501        };
502
503        for (int i = 0; i < threads; i++) {
504            executor.execute(new Runnable() {
505                public void run() {
506                    try {
507                        Thread.sleep(100);
508                    }
509                    catch (InterruptedException ex) {
510                        log.warn("Could not warm up threadpool {0}", ex.getMessage(), ex);
511                    }
512                }
513            });
514        }
515
516        maxCallableConcurrency = ConfigurationService.getInt(conf, CONF_CALLABLE_CONCURRENCY);
517    }
518
519    /**
520     * Destroy the command queue service.
521     */
522    @Override
523    public void destroy() {
524        try {
525            long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds
526            executor.shutdown();
527            queue.clear();
528            while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
529                log.info("Waiting for executor to shutdown");
530                if (System.currentTimeMillis() > limit) {
531                    log.warn("Gave up, continuing without waiting for executor to shutdown");
532                    break;
533                }
534            }
535        }
536        catch (InterruptedException ex) {
537            log.warn(ex);
538        }
539    }
540
541    /**
542     * Return the public interface for command queue service.
543     *
544     * @return {@link CallableQueueService}.
545     */
546    @Override
547    public Class<? extends Service> getInterface() {
548        return CallableQueueService.class;
549    }
550
551    /**
552     * @return int size of queue
553     */
554    public synchronized int queueSize() {
555        return queue.size();
556    }
557
558    private synchronized boolean queue(CallableWrapper wrapper, boolean ignoreQueueSize) {
559        if (!ignoreQueueSize && queue.size() >= queueSize) {
560            log.warn("queue full, ignoring queuing for [{0}]", wrapper.getElement().getKey());
561            return false;
562        }
563        if (!executor.isShutdown()) {
564            if (wrapper.filterDuplicates()) {
565                wrapper.addToUniqueCallables();
566                try {
567                    executor.execute(wrapper);
568                }
569                catch (Throwable ree) {
570                    wrapper.removeFromUniqueCallables();
571                    throw new RuntimeException(ree);
572                }
573            }
574        }
575        else {
576            log.warn("Executor shutting down, ignoring queueing of [{0}]", wrapper.getElement().getKey());
577        }
578        return true;
579    }
580
581    /**
582     * Queue a callable for asynchronous execution.
583     *
584     * @param callable callable to queue.
585     * @return <code>true</code> if the callable was queued, <code>false</code> if the queue is full and the callable
586     *         was not queued.
587     */
588    public boolean queue(XCallable<?> callable) {
589        return queue(callable, 0);
590    }
591
592    /**
593     * Queue a list of callables for serial execution.
594     * <p/>
595     * Useful to serialize callables that may compete with each other for resources.
596     * <p/>
597     * All callables will be processed with the priority of the highest priority of all callables.
598     *
599     * @param callables callables to be executed by the composite callable.
600     * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables
601     *         were not queued.
602     */
603    public boolean queueSerial(List<? extends XCallable<?>> callables) {
604        return queueSerial(callables, 0);
605    }
606
607    /**
608     * Queue a callable for asynchronous execution sometime in the future.
609     *
610     * @param callable callable to queue for delayed execution
611     * @param delay time, in milliseconds, that the callable should be delayed.
612     * @return <code>true</code> if the callable was queued, <code>false</code>
613     *         if the queue is full and the callable was not queued.
614     */
615    public synchronized boolean queue(XCallable<?> callable, long delay) {
616        if (callable == null) {
617            return true;
618        }
619        boolean queued = false;
620        if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
621            log.warn("[queue] System is in SAFEMODE. Hence no callable is queued. current queue size " + queue.size());
622        }
623        else {
624            checkInterruptTypes(callable);
625            queued = queue(new CallableWrapper(callable, delay), false);
626            if (queued) {
627                incrCounter(INSTR_QUEUED_COUNTER, 1);
628            }
629            else {
630                log.warn("Could not queue callable");
631            }
632        }
633        return queued;
634    }
635
636    /**
637     * Queue a list of callables for serial execution sometime in the future.
638     * <p/>
639     * Useful to serialize callables that may compete with each other for resources.
640     * <p/>
641     * All callables will be processed with the priority of the highest priority of all callables.
642     *
643     * @param callables callables to be executed by the composite callable.
644     * @param delay time, in milliseconds, that the callable should be delayed.
645     * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables
646     *         were not queued.
647     */
648    public synchronized boolean queueSerial(List<? extends XCallable<?>> callables, long delay) {
649        boolean queued;
650        if (callables == null || callables.size() == 0) {
651            queued = true;
652        }
653        else if (callables.size() == 1) {
654            queued = queue(callables.get(0), delay);
655        }
656        else {
657            XCallable<?> callable = new CompositeCallable(callables);
658            queued = queue(callable, delay);
659            if (queued) {
660                incrCounter(INSTR_QUEUED_COUNTER, callables.size());
661            }
662        }
663        return queued;
664    }
665
666    /**
667     * Instruments the callable queue service.
668     *
669     * @param instr instance to instrument the callable queue service to.
670     */
671    public void instrument(Instrumentation instr) {
672        instrumentation = instr;
673        instr.addSampler(INSTRUMENTATION_GROUP, INSTR_QUEUE_SIZE_SAMPLER, 60, 1, new Instrumentation.Variable<Long>() {
674            public Long getValue() {
675                return (long) queue.size();
676            }
677        });
678        instr.addSampler(INSTRUMENTATION_GROUP, INSTR_THREADS_ACTIVE_SAMPLER, 60, 1,
679                new Instrumentation.Variable<Long>() {
680                    public Long getValue() {
681                        return (long) executor.getActiveCount();
682                    }
683                });
684    }
685
686    /**
687     * check the interrupt map for the existence of an interrupt commands if
688     * exist a List of Interrupt Callable for the same lock key will bereturned,
689     * otherwise it will return null
690     */
691    public Set<XCallable<?>> checkInterrupts(String lockKey) {
692
693        if (lockKey != null) {
694            return interruptCommandsMap.remove(lockKey);
695        }
696        return null;
697    }
698
699    /**
700     * check if the callable is of an interrupt type and insert it into the map
701     * accordingly
702     *
703     * @param callable
704     */
705    public void checkInterruptTypes(XCallable<?> callable) {
706        if ((callable instanceof CompositeCallable) && (((CompositeCallable) callable).getCallables() != null)) {
707            for (XCallable<?> singleCallable : ((CompositeCallable) callable).getCallables()) {
708                if (INTERRUPT_TYPES.contains(singleCallable.getType())) {
709                    insertCallableIntoInterruptMap(singleCallable);
710                }
711            }
712        }
713        else if (INTERRUPT_TYPES.contains(callable.getType())) {
714            insertCallableIntoInterruptMap(callable);
715        }
716    }
717
718    /**
719     * insert a new callable in the Interrupt Command Map add a new element to
720     * the list or create a new list accordingly
721     *
722     * @param callable
723     */
724    public void insertCallableIntoInterruptMap(XCallable<?> callable) {
725        if (interruptCommandsMap.size() < interruptMapMaxSize) {
726            Set<XCallable<?>> newSet = Collections.synchronizedSet(new LinkedHashSet<XCallable<?>>());
727            Set<XCallable<?>> interruptSet = interruptCommandsMap.putIfAbsent(callable.getEntityKey(), newSet);
728            if (interruptSet == null) {
729                interruptSet = newSet;
730            }
731            if (interruptSet.add(callable)) {
732                log.trace("Inserting an interrupt element [{0}] to the interrupt map", callable.toString());
733            } else {
734                log.trace("Interrupt element [{0}] already present", callable.toString());
735            }
736        }
737        else {
738            log.warn(
739                    "The interrupt map reached max size of [{0}], an interrupt element [{1}] will not added to the map [{1}]",
740                    interruptCommandsMap.size(), callable.toString());
741        }
742    }
743
744    /**
745     * Get the list of strings of queue dump
746     *
747     * @return the list of string that representing each CallableWrapper
748     */
749    public List<String> getQueueDump() {
750        List<String> list = new ArrayList<String>();
751        for (QueueElement<CallableWrapper> qe : queue) {
752            if (qe.toString() == null) {
753                continue;
754            }
755            list.add(qe.toString());
756        }
757        return list;
758    }
759
760    /**
761     * Get the list of strings of uniqueness map dump
762     *
763     * @return the list of string that representing the key of each command in the queue
764     */
765    public List<String> getUniqueDump() {
766        List<String> list = new ArrayList<String>();
767        for (Entry<String, Date> entry : uniqueCallables.entrySet()) {
768            list.add(entry.toString());
769        }
770        return list;
771    }
772
773}