001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.util.ArrayList;
021    import java.util.Collections;
022    import java.util.Date;
023    import java.util.HashMap;
024    import java.util.HashSet;
025    import java.util.LinkedHashSet;
026    import java.util.List;
027    import java.util.Map;
028    import java.util.Set;
029    import java.util.Map.Entry;
030    import java.util.concurrent.BlockingQueue;
031    import java.util.concurrent.ConcurrentHashMap;
032    import java.util.concurrent.RejectedExecutionException;
033    import java.util.concurrent.ThreadPoolExecutor;
034    import java.util.concurrent.TimeUnit;
035    import java.util.concurrent.atomic.AtomicInteger;
036    
037    import org.apache.hadoop.conf.Configuration;
038    import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
039    import org.apache.oozie.util.Instrumentable;
040    import org.apache.oozie.util.Instrumentation;
041    import org.apache.oozie.util.PollablePriorityDelayQueue;
042    import org.apache.oozie.util.PriorityDelayQueue;
043    import org.apache.oozie.util.XCallable;
044    import org.apache.oozie.util.XLog;
045    import org.apache.oozie.util.PriorityDelayQueue.QueueElement;
046    
047    /**
048     * The callable queue service queues {@link XCallable}s for asynchronous execution.
049     * <p/>
050     * Callables can be queued for immediate execution or for delayed execution (some time in the future).
051     * <p/>
052     * Callables are consumed from the queue for execution based on their priority.
053     * <p/>
054     * When the queues (for immediate execution and for delayed execution) are full, the callable queue service stops
055     * queuing callables.
056     * <p/>
057     * A thread-pool is used to execute the callables asynchronously.
058     * <p/>
059     * The following configuration parameters control the callable queue service:
060     * <p/>
061     * {@link #CONF_QUEUE_SIZE} size of the immediate execution queue. Defaulf value is 10000.
062     * <p/>
063     * {@link #CONF_THREADS} number of threads in the thread-pool used for asynchronous command execution. When this number
064     * of threads is reached, commands remain the queue until threads become available. Sets up a priority queue for the
065     * execution of Commands via a ThreadPool. Sets up a Delayed Queue to handle actions which will be ready for execution
066     * sometime in the future.
067     */
068    public class CallableQueueService implements Service, Instrumentable {
069        private static final String INSTRUMENTATION_GROUP = "callablequeue";
070        private static final String INSTR_IN_QUEUE_TIME_TIMER = "time.in.queue";
071        private static final String INSTR_EXECUTED_COUNTER = "executed";
072        private static final String INSTR_FAILED_COUNTER = "failed";
073        private static final String INSTR_QUEUED_COUNTER = "queued";
074        private static final String INSTR_QUEUE_SIZE_SAMPLER = "queue.size";
075        private static final String INSTR_THREADS_ACTIVE_SAMPLER = "threads.active";
076    
077        public static final String CONF_PREFIX = Service.CONF_PREFIX + "CallableQueueService.";
078    
079        public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size";
080        public static final String CONF_THREADS = CONF_PREFIX + "threads";
081        public static final String CONF_CALLABLE_CONCURRENCY = CONF_PREFIX + "callable.concurrency";
082        public static final String CONF_CALLABLE_NEXT_ELIGIBLE = CONF_PREFIX + "callable.next.eligible";
083        public static final String CONF_CALLABLE_INTERRUPT_TYPES = CONF_PREFIX + "InterruptTypes";
084        public static final String CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE = CONF_PREFIX + "InterruptMapMaxSize";
085    
086        public static final int CONCURRENCY_DELAY = 500;
087    
088        public static final int SAFE_MODE_DELAY = 60000;
089    
090        private final Map<String, AtomicInteger> activeCallables = new HashMap<String, AtomicInteger>();
091    
092        private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>();
093    
094        private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<String, Set<XCallable<?>>>();
095    
096        public static final HashSet<String> INTERRUPT_TYPES = new HashSet<String>();
097    
098        private int interruptMapMaxSize;
099    
100        private int maxCallableConcurrency;
101    
102        private boolean callableBegin(XCallable<?> callable) {
103            synchronized (activeCallables) {
104                AtomicInteger counter = activeCallables.get(callable.getType());
105                if (counter == null) {
106                    counter = new AtomicInteger(1);
107                    activeCallables.put(callable.getType(), counter);
108                    return true;
109                }
110                else {
111                    int i = counter.incrementAndGet();
112                    return i <= maxCallableConcurrency;
113                }
114            }
115        }
116    
117        private void callableEnd(XCallable<?> callable) {
118            synchronized (activeCallables) {
119                AtomicInteger counter = activeCallables.get(callable.getType());
120                if (counter == null) {
121                    throw new IllegalStateException("It should not happen");
122                }
123                else {
124                    counter.decrementAndGet();
125                }
126            }
127        }
128    
129        private boolean callableReachMaxConcurrency(XCallable<?> callable) {
130            synchronized (activeCallables) {
131                AtomicInteger counter = activeCallables.get(callable.getType());
132                if (counter == null) {
133                    return true;
134                }
135                else {
136                    int i = counter.get();
137                    return i < maxCallableConcurrency;
138                }
139            }
140        }
141    
142        // Callables are wrapped with the this wrapper for execution, for logging
143        // and instrumentation.
144        // The wrapper implements Runnable and Comparable to be able to work with an
145        // executor and a priority queue.
146        class CallableWrapper extends PriorityDelayQueue.QueueElement<XCallable<?>> implements Runnable {
147            private Instrumentation.Cron cron;
148    
149            public CallableWrapper(XCallable<?> callable, long delay) {
150                super(callable, callable.getPriority(), delay, TimeUnit.MILLISECONDS);
151                cron = new Instrumentation.Cron();
152                cron.start();
153            }
154    
155            public void run() {
156                if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
157                    log.info("Oozie is in SAFEMODE, requeuing callable [{0}] with [{1}]ms delay", getElement().getType(),
158                            SAFE_MODE_DELAY);
159                    setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS);
160                    removeFromUniqueCallables();
161                    queue(this, true);
162                    return;
163                }
164                XCallable<?> callable = getElement();
165                try {
166                    if (callableBegin(callable)) {
167                        cron.stop();
168                        addInQueueCron(cron);
169                        XLog.Info.get().clear();
170                        XLog log = XLog.getLog(getClass());
171                        log.trace("executing callable [{0}]", callable.getName());
172    
173                        removeFromUniqueCallables();
174                        try {
175                            callable.call();
176                            incrCounter(INSTR_EXECUTED_COUNTER, 1);
177                            log.trace("executed callable [{0}]", callable.getName());
178                        }
179                        catch (Exception ex) {
180                            incrCounter(INSTR_FAILED_COUNTER, 1);
181                            log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
182                        }
183                        finally {
184                            XLog.Info.get().clear();
185                        }
186                    }
187                    else {
188                        log.warn("max concurrency for callable [{0}] exceeded, requeueing with [{1}]ms delay", callable
189                                .getType(), CONCURRENCY_DELAY);
190                        setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS);
191                        removeFromUniqueCallables();
192                        queue(this, true);
193                        incrCounter(callable.getType() + "#exceeded.concurrency", 1);
194                    }
195                }
196                finally {
197                    callableEnd(callable);
198                }
199            }
200    
201            /**
202             * @return String the queue dump
203             */
204            @Override
205            public String toString() {
206                return "delay=" + getDelay(TimeUnit.MILLISECONDS) + ", elements=" + getElement().toString();
207            }
208    
209            /**
210             * Filter the duplicate callables from the list before queue this.
211             * <p/>
212             * If it is single callable, checking if key is in unique map or not.
213             * <p/>
214             * If it is composite callable, remove duplicates callables from the composite.
215             *
216             * @return true if this callable should be queued
217             */
218            public boolean filterDuplicates() {
219                XCallable<?> callable = getElement();
220                if (callable instanceof CompositeCallable) {
221                    return ((CompositeCallable) callable).removeDuplicates();
222                }
223                else {
224                    return uniqueCallables.containsKey(callable.getKey()) == false;
225                }
226            }
227    
228            /**
229             * Add the keys to the set
230             */
231            public void addToUniqueCallables() {
232                XCallable<?> callable = getElement();
233                if (callable instanceof CompositeCallable) {
234                    ((CompositeCallable) callable).addToUniqueCallables();
235                }
236                else {
237                    ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date());
238                }
239            }
240    
241            /**
242             * Remove the keys from the set
243             */
244            public void removeFromUniqueCallables() {
245                XCallable<?> callable = getElement();
246                if (callable instanceof CompositeCallable) {
247                    ((CompositeCallable) callable).removeFromUniqueCallables();
248                }
249                else {
250                    uniqueCallables.remove(callable.getKey());
251                }
252            }
253        }
254    
255        class CompositeCallable implements XCallable<Void> {
256            private List<XCallable<?>> callables;
257            private String name;
258            private int priority;
259            private long createdTime;
260    
261            public CompositeCallable(List<? extends XCallable<?>> callables) {
262                this.callables = new ArrayList<XCallable<?>>(callables);
263                priority = 0;
264                createdTime = Long.MAX_VALUE;
265                StringBuilder sb = new StringBuilder();
266                String separator = "[";
267                for (XCallable<?> callable : callables) {
268                    priority = Math.max(priority, callable.getPriority());
269                    createdTime = Math.min(createdTime, callable.getCreatedTime());
270                    sb.append(separator).append(callable.getName());
271                    separator = ",";
272                }
273                sb.append("]");
274                name = sb.toString();
275            }
276    
277            @Override
278            public String getName() {
279                return name;
280            }
281    
282            @Override
283            public String getType() {
284                return "#composite#" + callables.get(0).getType();
285            }
286    
287            @Override
288            public String getKey() {
289                return "#composite#" + callables.get(0).getKey();
290            }
291    
292            @Override
293            public String getEntityKey() {
294                return "#composite#" + callables.get(0).getEntityKey();
295            }
296    
297            @Override
298            public int getPriority() {
299                return priority;
300            }
301    
302            @Override
303            public long getCreatedTime() {
304                return createdTime;
305            }
306    
307            @Override
308            public void setInterruptMode(boolean mode) {
309            }
310    
311            @Override
312            public boolean inInterruptMode() {
313                return false;
314            }
315    
316            public List<XCallable<?>> getCallables() {
317                return this.callables;
318            }
319    
320            public Void call() throws Exception {
321                XLog log = XLog.getLog(getClass());
322    
323                for (XCallable<?> callable : callables) {
324                    log.trace("executing callable [{0}]", callable.getName());
325                    try {
326                        callable.call();
327                        incrCounter(INSTR_EXECUTED_COUNTER, 1);
328                        log.trace("executed callable [{0}]", callable.getName());
329                    }
330                    catch (Exception ex) {
331                        incrCounter(INSTR_FAILED_COUNTER, 1);
332                        log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
333                    }
334                }
335    
336                // ticking -1 not to count the call to the composite callable
337                incrCounter(INSTR_EXECUTED_COUNTER, -1);
338                return null;
339            }
340    
341            /*
342             * (non-Javadoc)
343             *
344             * @see java.lang.Object#toString()
345             */
346            @Override
347            public String toString() {
348                if (callables.size() == 0) {
349                    return null;
350                }
351                StringBuilder sb = new StringBuilder();
352                int size = callables.size();
353                for (int i = 0; i < size; i++) {
354                    XCallable<?> callable = callables.get(i);
355                    sb.append("(");
356                    sb.append(callable.toString());
357                    if (i + 1 == size) {
358                        sb.append(")");
359                    }
360                    else {
361                        sb.append("),");
362                    }
363                }
364                return sb.toString();
365            }
366    
367            /**
368             * Remove the duplicate callables from the list before queue them
369             *
370             * @return true if callables should be queued
371             */
372            public boolean removeDuplicates() {
373                Set<String> set = new HashSet<String>();
374                List<XCallable<?>> filteredCallables = new ArrayList<XCallable<?>>();
375                if (callables.size() == 0) {
376                    return false;
377                }
378                for (XCallable<?> callable : callables) {
379                    if (!uniqueCallables.containsKey(callable.getKey()) && !set.contains(callable.getKey())) {
380                        filteredCallables.add(callable);
381                        set.add(callable.getKey());
382                    }
383                }
384                callables = filteredCallables;
385                if (callables.size() == 0) {
386                    return false;
387                }
388                return true;
389            }
390    
391            /**
392             * Add the keys to the set
393             */
394            public void addToUniqueCallables() {
395                for (XCallable<?> callable : callables) {
396                    ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date());
397                }
398            }
399    
400            /**
401             * Remove the keys from the set
402             */
403            public void removeFromUniqueCallables() {
404                for (XCallable<?> callable : callables) {
405                    uniqueCallables.remove(callable.getKey());
406                }
407            }
408        }
409    
410        private XLog log = XLog.getLog(getClass());
411    
412        private int queueSize;
413        private PriorityDelayQueue<CallableWrapper> queue;
414        private ThreadPoolExecutor executor;
415        private Instrumentation instrumentation;
416    
417        /**
418         * Convenience method for instrumentation counters.
419         *
420         * @param name counter name.
421         * @param count count to increment the counter.
422         */
423        private void incrCounter(String name, int count) {
424            if (instrumentation != null) {
425                instrumentation.incr(INSTRUMENTATION_GROUP, name, count);
426            }
427        }
428    
429        private void addInQueueCron(Instrumentation.Cron cron) {
430            if (instrumentation != null) {
431                instrumentation.addCron(INSTRUMENTATION_GROUP, INSTR_IN_QUEUE_TIME_TIMER, cron);
432            }
433        }
434    
435        /**
436         * Initialize the command queue service.
437         *
438         * @param services services instance.
439         */
440        @Override
441        @SuppressWarnings({ "unchecked", "rawtypes" })
442        public void init(Services services) {
443            Configuration conf = services.getConf();
444    
445            queueSize = conf.getInt(CONF_QUEUE_SIZE, 10000);
446            int threads = conf.getInt(CONF_THREADS, 10);
447            boolean callableNextEligible = conf.getBoolean(CONF_CALLABLE_NEXT_ELIGIBLE, true);
448    
449            for (String type : conf.getStringCollection(CONF_CALLABLE_INTERRUPT_TYPES)) {
450                log.debug("Adding interrupt type [{0}]", type);
451                INTERRUPT_TYPES.add(type);
452            }
453    
454            if (!callableNextEligible) {
455                queue = new PriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
456                    @Override
457                    protected void debug(String msgTemplate, Object... msgArgs) {
458                        log.trace(msgTemplate, msgArgs);
459                    }
460                };
461            }
462            else {
463                // If the head of this queue has already reached max concurrency,
464                // continuously find next one
465                // which has not yet reach max concurrency.Overrided method
466                // 'eligibleToPoll' to check if the
467                // element of this queue has reached the maximum concurrency.
468                queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
469                    @Override
470                    protected void debug(String msgTemplate, Object... msgArgs) {
471                        log.trace(msgTemplate, msgArgs);
472                    }
473    
474                    @Override
475                    protected boolean eligibleToPoll(QueueElement<?> element) {
476                        if (element != null) {
477                            CallableWrapper wrapper = (CallableWrapper) element;
478                            if (element.getElement() != null) {
479                                return callableReachMaxConcurrency(wrapper.getElement());
480                            }
481                        }
482                        return false;
483                    }
484    
485                };
486            }
487    
488            interruptMapMaxSize = conf.getInt(CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE, 100);
489    
490            // IMPORTANT: The ThreadPoolExecutor does not always the execute
491            // commands out of the queue, there are
492            // certain conditions where commands are pushed directly to a thread.
493            // As we are using a queue with DELAYED semantics (i.e. execute the
494            // command in 5 mins) we need to make
495            // sure that the commands are always pushed to the queue.
496            // To achieve this (by looking a the ThreadPoolExecutor.execute()
497            // implementation, we are making the pool
498            // minimum size equals to the maximum size (thus threads are keep always
499            // running) and we are warming up
500            // all those threads (the for loop that runs dummy runnables).
501            executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue);
502    
503            for (int i = 0; i < threads; i++) {
504                executor.execute(new Runnable() {
505                    public void run() {
506                        try {
507                            Thread.sleep(100);
508                        }
509                        catch (InterruptedException ex) {
510                            log.warn("Could not warm up threadpool {0}", ex.getMessage(), ex);
511                        }
512                    }
513                });
514            }
515    
516            maxCallableConcurrency = conf.getInt(CONF_CALLABLE_CONCURRENCY, 3);
517        }
518    
519        /**
520         * Destroy the command queue service.
521         */
522        @Override
523        public void destroy() {
524            try {
525                long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds
526                executor.shutdown();
527                queue.clear();
528                while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
529                    log.info("Waiting for executor to shutdown");
530                    if (System.currentTimeMillis() > limit) {
531                        log.warn("Gave up, continuing without waiting for executor to shutdown");
532                        break;
533                    }
534                }
535            }
536            catch (InterruptedException ex) {
537                log.warn(ex);
538            }
539        }
540    
541        /**
542         * Return the public interface for command queue service.
543         *
544         * @return {@link CallableQueueService}.
545         */
546        @Override
547        public Class<? extends Service> getInterface() {
548            return CallableQueueService.class;
549        }
550    
551        /**
552         * @return int size of queue
553         */
554        public synchronized int queueSize() {
555            return queue.size();
556        }
557    
558        private synchronized boolean queue(CallableWrapper wrapper, boolean ignoreQueueSize) {
559            if (!ignoreQueueSize && queue.size() >= queueSize) {
560                log.warn("queue if full, ignoring queuing for [{0}]", wrapper.getElement());
561                return false;
562            }
563            if (!executor.isShutdown()) {
564                if (wrapper.filterDuplicates()) {
565                    wrapper.addToUniqueCallables();
566                    try {
567                        executor.execute(wrapper);
568                    }
569                    catch (RejectedExecutionException ree) {
570                        wrapper.removeFromUniqueCallables();
571                        throw ree;
572                    }
573                }
574            }
575            else {
576                log.warn("Executor shutting down, ignoring queueing of [{0}]", wrapper.getElement());
577            }
578            return true;
579        }
580    
581        /**
582         * Queue a callable for asynchronous execution.
583         *
584         * @param callable callable to queue.
585         * @return <code>true</code> if the callable was queued, <code>false</code> if the queue is full and the callable
586         *         was not queued.
587         */
588        public boolean queue(XCallable<?> callable) {
589            return queue(callable, 0);
590        }
591    
592        /**
593         * Queue a list of callables for serial execution.
594         * <p/>
595         * Useful to serialize callables that may compete with each other for resources.
596         * <p/>
597         * All callables will be processed with the priority of the highest priority of all callables.
598         *
599         * @param callables callables to be executed by the composite callable.
600         * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables
601         *         were not queued.
602         */
603        public boolean queueSerial(List<? extends XCallable<?>> callables) {
604            return queueSerial(callables, 0);
605        }
606    
607        /**
608         * Queue a callable for asynchronous execution sometime in the future.
609         *
610         * @param callable callable to queue for delayed execution
611         * @param delay time, in milliseconds, that the callable should be delayed.
612         * @return <code>true</code> if the callable was queued, <code>false</code>
613         *         if the queue is full and the callable was not queued.
614         */
615        public synchronized boolean queue(XCallable<?> callable, long delay) {
616            if (callable == null) {
617                return true;
618            }
619            boolean queued = false;
620            if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
621                log.warn("[queue] System is in SAFEMODE. Hence no callable is queued. current queue size " + queue.size());
622            }
623            else {
624                checkInterruptTypes(callable);
625                queued = queue(new CallableWrapper(callable, delay), false);
626                if (queued) {
627                    incrCounter(INSTR_QUEUED_COUNTER, 1);
628                }
629                else {
630                    log.warn("Could not queue callable");
631                }
632            }
633            return queued;
634        }
635    
636        /**
637         * Queue a list of callables for serial execution sometime in the future.
638         * <p/>
639         * Useful to serialize callables that may compete with each other for resources.
640         * <p/>
641         * All callables will be processed with the priority of the highest priority of all callables.
642         *
643         * @param callables callables to be executed by the composite callable.
644         * @param delay time, in milliseconds, that the callable should be delayed.
645         * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables
646         *         were not queued.
647         */
648        public synchronized boolean queueSerial(List<? extends XCallable<?>> callables, long delay) {
649            boolean queued;
650            if (callables == null || callables.size() == 0) {
651                queued = true;
652            }
653            else if (callables.size() == 1) {
654                queued = queue(callables.get(0), delay);
655            }
656            else {
657                XCallable<?> callable = new CompositeCallable(callables);
658                queued = queue(callable, delay);
659                if (queued) {
660                    incrCounter(INSTR_QUEUED_COUNTER, callables.size());
661                }
662            }
663            return queued;
664        }
665    
666        /**
667         * Instruments the callable queue service.
668         *
669         * @param instr instance to instrument the callable queue service to.
670         */
671        public void instrument(Instrumentation instr) {
672            instrumentation = instr;
673            instr.addSampler(INSTRUMENTATION_GROUP, INSTR_QUEUE_SIZE_SAMPLER, 60, 1, new Instrumentation.Variable<Long>() {
674                public Long getValue() {
675                    return (long) queue.size();
676                }
677            });
678            instr.addSampler(INSTRUMENTATION_GROUP, INSTR_THREADS_ACTIVE_SAMPLER, 60, 1,
679                    new Instrumentation.Variable<Long>() {
680                        public Long getValue() {
681                            return (long) executor.getActiveCount();
682                        }
683                    });
684        }
685    
686        /**
687         * check the interrupt map for the existence of an interrupt commands if
688         * exist a List of Interrupt Callable for the same lock key will bereturned,
689         * otherwise it will return null
690         */
691        public Set<XCallable<?>> checkInterrupts(String lockKey) {
692    
693            if (lockKey != null) {
694                return interruptCommandsMap.remove(lockKey);
695            }
696            return null;
697        }
698    
699        /**
700         * check if the callable is of an interrupt type and insert it into the map
701         * accordingly
702         *
703         * @param callable
704         */
705        public void checkInterruptTypes(XCallable<?> callable) {
706            if ((callable instanceof CompositeCallable) && (((CompositeCallable) callable).getCallables() != null)) {
707                for (XCallable<?> singleCallable : ((CompositeCallable) callable).getCallables()) {
708                    if (INTERRUPT_TYPES.contains(singleCallable.getType())) {
709                        insertCallableIntoInterruptMap(singleCallable);
710                    }
711                }
712            }
713            else if (INTERRUPT_TYPES.contains(callable.getType())) {
714                insertCallableIntoInterruptMap(callable);
715            }
716        }
717    
718        /**
719         * insert a new callable in the Interrupt Command Map add a new element to
720         * the list or create a new list accordingly
721         *
722         * @param callable
723         */
724        public void insertCallableIntoInterruptMap(XCallable<?> callable) {
725            if (interruptCommandsMap.size() < interruptMapMaxSize) {
726                Set<XCallable<?>> newSet = Collections.synchronizedSet(new LinkedHashSet<XCallable<?>>());
727                Set<XCallable<?>> interruptSet = interruptCommandsMap.putIfAbsent(callable.getEntityKey(), newSet);
728                if (interruptSet == null) {
729                    interruptSet = newSet;
730                }
731                if (interruptSet.add(callable)) {
732                    log.trace("Inserting an interrupt element [{0}] to the interrupt map", callable.toString());
733                } else {
734                    log.trace("Interrupt element [{0}] already present", callable.toString());
735                }
736            }
737            else {
738                log.warn(
739                        "The interrupt map reached max size of [{0}], an interrupt element [{1}] will not added to the map [{1}]",
740                        interruptCommandsMap.size(), callable.toString());
741            }
742        }
743    
744        /**
745         * Get the list of strings of queue dump
746         *
747         * @return the list of string that representing each CallableWrapper
748         */
749        public List<String> getQueueDump() {
750            List<String> list = new ArrayList<String>();
751            for (QueueElement<CallableWrapper> qe : queue) {
752                if (qe.toString() == null) {
753                    continue;
754                }
755                list.add(qe.toString());
756            }
757            return list;
758        }
759    
760        /**
761         * Get the list of strings of uniqueness map dump
762         *
763         * @return the list of string that representing the key of each command in the queue
764         */
765        public List<String> getUniqueDump() {
766            List<String> list = new ArrayList<String>();
767            for (Entry<String, Date> entry : uniqueCallables.entrySet()) {
768                list.add(entry.toString());
769            }
770            return list;
771        }
772    
773    }