001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.Date;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.LinkedHashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Map.Entry;
030import java.util.Set;
031import java.util.concurrent.BlockingQueue;
032import java.util.concurrent.Callable;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.Future;
035import java.util.concurrent.RunnableFuture;
036import java.util.concurrent.ThreadPoolExecutor;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicInteger;
039
040import org.apache.hadoop.conf.Configuration;
041import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
042import org.apache.oozie.util.Instrumentable;
043import org.apache.oozie.util.Instrumentation;
044import org.apache.oozie.util.NamedThreadFactory;
045import org.apache.oozie.util.PollablePriorityDelayQueue;
046import org.apache.oozie.util.PriorityDelayQueue;
047import org.apache.oozie.util.PriorityDelayQueue.QueueElement;
048import org.apache.oozie.util.XCallable;
049import org.apache.oozie.util.XLog;
050
051/**
052 * The callable queue service queues {@link XCallable}s for asynchronous execution.
053 * <p>
054 * Callables can be queued for immediate execution or for delayed execution (some time in the future).
055 * <p>
056 * Callables are consumed from the queue for execution based on their priority.
057 * <p>
058 * When the queues (for immediate execution and for delayed execution) are full, the callable queue service stops
059 * queuing callables.
060 * <p>
061 * A thread-pool is used to execute the callables asynchronously.
062 * <p>
063 * The following configuration parameters control the callable queue service:
064 * <p>
065 * {@link #CONF_QUEUE_SIZE} size of the immediate execution queue. Defaulf value is 10000.
066 * <p>
067 * {@link #CONF_THREADS} number of threads in the thread-pool used for asynchronous command execution. When this number
068 * of threads is reached, commands remain the queue until threads become available. Sets up a priority queue for the
069 * execution of Commands via a ThreadPool. Sets up a Delayed Queue to handle actions which will be ready for execution
070 * sometime in the future.
071 */
072public class CallableQueueService implements Service, Instrumentable {
073    private static final String INSTRUMENTATION_GROUP = "callablequeue";
074    private static final String INSTR_IN_QUEUE_TIME_TIMER = "time.in.queue";
075    private static final String INSTR_EXECUTED_COUNTER = "executed";
076    private static final String INSTR_FAILED_COUNTER = "failed";
077    private static final String INSTR_QUEUED_COUNTER = "queued";
078    private static final String INSTR_QUEUE_SIZE_SAMPLER = "queue.size";
079    private static final String INSTR_THREADS_ACTIVE_SAMPLER = "threads.active";
080
081    public static final String CONF_PREFIX = Service.CONF_PREFIX + "CallableQueueService.";
082
083    public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size";
084    public static final String CONF_THREADS = CONF_PREFIX + "threads";
085    public static final String CONF_CALLABLE_CONCURRENCY = CONF_PREFIX + "callable.concurrency";
086    public static final String CONF_CALLABLE_NEXT_ELIGIBLE = CONF_PREFIX + "callable.next.eligible";
087    public static final String CONF_CALLABLE_INTERRUPT_TYPES = CONF_PREFIX + "InterruptTypes";
088    public static final String CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE = CONF_PREFIX + "InterruptMapMaxSize";
089
090    public static final int CONCURRENCY_DELAY = 500;
091
092    public static final int SAFE_MODE_DELAY = 60000;
093
094    private final Map<String, AtomicInteger> activeCallables = new HashMap<String, AtomicInteger>();
095
096    private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>();
097
098    private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<String, Set<XCallable<?>>>();
099
100    public static final HashSet<String> INTERRUPT_TYPES = new HashSet<String>();
101
102    private int interruptMapMaxSize;
103
104    private int maxCallableConcurrency;
105
106    private boolean callableBegin(XCallable<?> callable) {
107        synchronized (activeCallables) {
108            AtomicInteger counter = activeCallables.get(callable.getType());
109            if (counter == null) {
110                counter = new AtomicInteger(1);
111                activeCallables.put(callable.getType(), counter);
112                return true;
113            }
114            else {
115                int i = counter.incrementAndGet();
116                return i <= maxCallableConcurrency;
117            }
118        }
119    }
120
121    private void callableEnd(XCallable<?> callable) {
122        synchronized (activeCallables) {
123            AtomicInteger counter = activeCallables.get(callable.getType());
124            if (counter == null) {
125                throw new IllegalStateException("It should not happen");
126            }
127            else {
128                counter.decrementAndGet();
129            }
130        }
131    }
132
133    private boolean callableReachMaxConcurrency(XCallable<?> callable) {
134        synchronized (activeCallables) {
135            AtomicInteger counter = activeCallables.get(callable.getType());
136            if (counter == null) {
137                return true;
138            }
139            else {
140                int i = counter.get();
141                return i < maxCallableConcurrency;
142            }
143        }
144    }
145
146    // Callables are wrapped with the this wrapper for execution, for logging
147    // and instrumentation.
148    // The wrapper implements Runnable and Comparable to be able to work with an
149    // executor and a priority queue.
150    public class CallableWrapper<E> extends PriorityDelayQueue.QueueElement<E> implements Runnable, Callable<E> {
151        private Instrumentation.Cron cron;
152
153        public CallableWrapper(XCallable<E> callable, long delay) {
154            super(callable, callable.getPriority(), delay, TimeUnit.MILLISECONDS);
155            cron = new Instrumentation.Cron();
156            cron.start();
157        }
158
159        public void run() {
160            XCallable<?> callable = null;
161            try {
162                removeFromUniqueCallables();
163                if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
164                    log.info("Oozie is in SAFEMODE, requeuing callable [{0}] with [{1}]ms delay", getElement().getType(),
165                            SAFE_MODE_DELAY);
166                    setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS);
167                    queue(this, true);
168                    return;
169                }
170                callable = getElement();
171                if (callableBegin(callable)) {
172                    cron.stop();
173                    addInQueueCron(cron);
174                    XLog log = XLog.getLog(getClass());
175                    log.trace("executing callable [{0}]", callable.getName());
176
177                    try {
178                        //FutureTask.run() will invoke cllable.call()
179                        super.run();
180                        incrCounter(INSTR_EXECUTED_COUNTER, 1);
181                        log.trace("executed callable [{0}]", callable.getName());
182                    }
183                    catch (Exception ex) {
184                        incrCounter(INSTR_FAILED_COUNTER, 1);
185                        log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
186                    }
187                }
188                else {
189                    log.warn("max concurrency for callable [{0}] exceeded, requeueing with [{1}]ms delay", callable
190                            .getType(), CONCURRENCY_DELAY);
191                    setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS);
192                    queue(this, true);
193                    incrCounter(callable.getType() + "#exceeded.concurrency", 1);
194                }
195            }
196            catch (Throwable t) {
197                incrCounter(INSTR_FAILED_COUNTER, 1);
198                log.warn("exception callable [{0}], {1}", callable == null ? "N/A" : callable.getName(),
199                        t.getMessage(), t);
200            }
201            finally {
202                if (callable != null) {
203                    callableEnd(callable);
204                }
205            }
206        }
207
208        /**
209         * Filter the duplicate callables from the list before queue this.
210         * <p>
211         * If it is single callable, checking if key is in unique map or not.
212         * <p>
213         * If it is composite callable, remove duplicates callables from the composite.
214         *
215         * @return true if this callable should be queued
216         */
217        public boolean filterDuplicates() {
218            XCallable<?> callable = getElement();
219            if (callable instanceof CompositeCallable) {
220                return ((CompositeCallable) callable).removeDuplicates();
221            }
222            else {
223                return uniqueCallables.containsKey(callable.getKey()) == false;
224            }
225        }
226
227        /**
228         * Add the keys to the set
229         */
230        public void addToUniqueCallables() {
231            XCallable<?> callable = getElement();
232            if (callable instanceof CompositeCallable) {
233                ((CompositeCallable) callable).addToUniqueCallables();
234            }
235            else {
236                ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date());
237            }
238        }
239
240        /**
241         * Remove the keys from the set
242         */
243        public void removeFromUniqueCallables() {
244            XCallable<?> callable = getElement();
245            if (callable instanceof CompositeCallable) {
246                ((CompositeCallable) callable).removeFromUniqueCallables();
247            }
248            else {
249                uniqueCallables.remove(callable.getKey());
250            }
251        }
252
253        //this will not get called, bcz  newTaskFor of threadpool will convert it in futureTask which is a runnable.
254        // futureTask  will call the cllable.call from run method. so we override run to call super.run method.
255        @Override
256        public E call() throws Exception {
257            return null;
258        }
259    }
260
261    class CompositeCallable implements XCallable<Void> {
262        private List<XCallable<?>> callables;
263        private String name;
264        private int priority;
265        private long createdTime;
266
267        public CompositeCallable(List<? extends XCallable<?>> callables) {
268            this.callables = new ArrayList<XCallable<?>>(callables);
269            priority = 0;
270            createdTime = Long.MAX_VALUE;
271            StringBuilder sb = new StringBuilder();
272            String separator = "[";
273            for (XCallable<?> callable : callables) {
274                priority = Math.max(priority, callable.getPriority());
275                createdTime = Math.min(createdTime, callable.getCreatedTime());
276                sb.append(separator).append(callable.getName());
277                separator = ",";
278            }
279            sb.append("]");
280            name = sb.toString();
281        }
282
283        @Override
284        public String getName() {
285            return name;
286        }
287
288        @Override
289        public String getType() {
290            return "#composite#" + callables.get(0).getType();
291        }
292
293        @Override
294        public String getKey() {
295            return "#composite#" + callables.get(0).getKey();
296        }
297
298        @Override
299        public String getEntityKey() {
300            return "#composite#" + callables.get(0).getEntityKey();
301        }
302
303        @Override
304        public int getPriority() {
305            return priority;
306        }
307
308        @Override
309        public long getCreatedTime() {
310            return createdTime;
311        }
312
313        @Override
314        public void setInterruptMode(boolean mode) {
315        }
316
317        @Override
318        public boolean inInterruptMode() {
319            return false;
320        }
321
322        public List<XCallable<?>> getCallables() {
323            return this.callables;
324        }
325
326        public Void call() throws Exception {
327            XLog log = XLog.getLog(getClass());
328
329            for (XCallable<?> callable : callables) {
330                log.trace("executing callable [{0}]", callable.getName());
331                try {
332                    callable.call();
333                    incrCounter(INSTR_EXECUTED_COUNTER, 1);
334                    log.trace("executed callable [{0}]", callable.getName());
335                }
336                catch (Exception ex) {
337                    incrCounter(INSTR_FAILED_COUNTER, 1);
338                    log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
339                }
340            }
341
342            // ticking -1 not to count the call to the composite callable
343            incrCounter(INSTR_EXECUTED_COUNTER, -1);
344            return null;
345        }
346
347        /*
348         * (non-Javadoc)
349         *
350         * @see java.lang.Object#toString()
351         */
352        @Override
353        public String toString() {
354            if (callables.size() == 0) {
355                return null;
356            }
357            StringBuilder sb = new StringBuilder();
358            int size = callables.size();
359            for (int i = 0; i < size; i++) {
360                XCallable<?> callable = callables.get(i);
361                sb.append("(");
362                sb.append(callable.toString());
363                if (i + 1 == size) {
364                    sb.append(")");
365                }
366                else {
367                    sb.append("),");
368                }
369            }
370            return sb.toString();
371        }
372
373        /**
374         * Remove the duplicate callables from the list before queue them
375         *
376         * @return true if callables should be queued
377         */
378        public boolean removeDuplicates() {
379            Set<String> set = new HashSet<String>();
380            List<XCallable<?>> filteredCallables = new ArrayList<XCallable<?>>();
381            if (callables.size() == 0) {
382                return false;
383            }
384            for (XCallable<?> callable : callables) {
385                if (!uniqueCallables.containsKey(callable.getKey()) && !set.contains(callable.getKey())) {
386                    filteredCallables.add(callable);
387                    set.add(callable.getKey());
388                }
389            }
390            callables = filteredCallables;
391            if (callables.size() == 0) {
392                return false;
393            }
394            return true;
395        }
396
397        /**
398         * Add the keys to the set
399         */
400        public void addToUniqueCallables() {
401            for (XCallable<?> callable : callables) {
402                ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date());
403            }
404        }
405
406        /**
407         * Remove the keys from the set
408         */
409        public void removeFromUniqueCallables() {
410            for (XCallable<?> callable : callables) {
411                uniqueCallables.remove(callable.getKey());
412            }
413        }
414    }
415
416    private XLog log = XLog.getLog(getClass());
417
418    private int queueSize;
419    private PriorityDelayQueue<CallableWrapper> queue;
420    private ThreadPoolExecutor executor;
421    private Instrumentation instrumentation;
422
423    /**
424     * Convenience method for instrumentation counters.
425     *
426     * @param name counter name.
427     * @param count count to increment the counter.
428     */
429    private void incrCounter(String name, int count) {
430        if (instrumentation != null) {
431            instrumentation.incr(INSTRUMENTATION_GROUP, name, count);
432        }
433    }
434
435    private void addInQueueCron(Instrumentation.Cron cron) {
436        if (instrumentation != null) {
437            instrumentation.addCron(INSTRUMENTATION_GROUP, INSTR_IN_QUEUE_TIME_TIMER, cron);
438        }
439    }
440
441    /**
442     * Initialize the command queue service.
443     *
444     * @param services services instance.
445     */
446    @Override
447    @SuppressWarnings({ "unchecked", "rawtypes" })
448    public void init(Services services) {
449        Configuration conf = services.getConf();
450
451        queueSize = ConfigurationService.getInt(conf, CONF_QUEUE_SIZE);
452        int threads = ConfigurationService.getInt(conf, CONF_THREADS);
453        boolean callableNextEligible = ConfigurationService.getBoolean(conf, CONF_CALLABLE_NEXT_ELIGIBLE);
454
455        for (String type : ConfigurationService.getStrings(conf, CONF_CALLABLE_INTERRUPT_TYPES)) {
456            log.debug("Adding interrupt type [{0}]", type);
457            INTERRUPT_TYPES.add(type);
458        }
459
460        if (!callableNextEligible) {
461            queue = new PriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
462                @Override
463                protected void debug(String msgTemplate, Object... msgArgs) {
464                    log.trace(msgTemplate, msgArgs);
465                }
466            };
467        }
468        else {
469            // If the head of this queue has already reached max concurrency,
470            // continuously find next one
471            // which has not yet reach max concurrency.Overrided method
472            // 'eligibleToPoll' to check if the
473            // element of this queue has reached the maximum concurrency.
474            queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
475                @Override
476                protected void debug(String msgTemplate, Object... msgArgs) {
477                    log.trace(msgTemplate, msgArgs);
478                }
479
480                @Override
481                protected boolean eligibleToPoll(QueueElement<?> element) {
482                    if (element != null) {
483                        CallableWrapper wrapper = (CallableWrapper) element;
484                        if (element.getElement() != null) {
485                            return callableReachMaxConcurrency(wrapper.getElement());
486                        }
487                    }
488                    return false;
489                }
490
491            };
492        }
493
494        interruptMapMaxSize = ConfigurationService.getInt(conf, CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE);
495
496        // IMPORTANT: The ThreadPoolExecutor does not always the execute
497        // commands out of the queue, there are
498        // certain conditions where commands are pushed directly to a thread.
499        // As we are using a queue with DELAYED semantics (i.e. execute the
500        // command in 5 mins) we need to make
501        // sure that the commands are always pushed to the queue.
502        // To achieve this (by looking a the ThreadPoolExecutor.execute()
503        // implementation, we are making the pool
504        // minimum size equals to the maximum size (thus threads are keep always
505        // running) and we are warming up
506        // all those threads (the for loop that runs dummy runnables).
507        executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue,
508                new NamedThreadFactory("CallableQueue")) {
509            protected void beforeExecute(Thread t, Runnable r) {
510                super.beforeExecute(t,r);
511                XLog.Info.get().clear();
512            }
513            protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
514                return (RunnableFuture<T>)callable;
515            }
516        };
517
518        for (int i = 0; i < threads; i++) {
519            executor.execute(new Runnable() {
520                public void run() {
521                    try {
522                        Thread.sleep(100);
523                    }
524                    catch (InterruptedException ex) {
525                        log.warn("Could not warm up threadpool {0}", ex.getMessage(), ex);
526                    }
527                }
528            });
529        }
530
531        maxCallableConcurrency = ConfigurationService.getInt(conf, CONF_CALLABLE_CONCURRENCY);
532    }
533
534    /**
535     * Destroy the command queue service.
536     */
537    @Override
538    public void destroy() {
539        try {
540            long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds
541            executor.shutdown();
542            queue.clear();
543            while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
544                log.info("Waiting for executor to shutdown");
545                if (System.currentTimeMillis() > limit) {
546                    log.warn("Gave up, continuing without waiting for executor to shutdown");
547                    break;
548                }
549            }
550        }
551        catch (InterruptedException ex) {
552            log.warn(ex);
553        }
554    }
555
556    /**
557     * Return the public interface for command queue service.
558     *
559     * @return {@link CallableQueueService}.
560     */
561    @Override
562    public Class<? extends Service> getInterface() {
563        return CallableQueueService.class;
564    }
565
566    /**
567     * @return int size of queue
568     */
569    public synchronized int queueSize() {
570        return queue.size();
571    }
572
573    private synchronized boolean queue(CallableWrapper wrapper, boolean ignoreQueueSize) {
574        if (!ignoreQueueSize && queue.size() >= queueSize) {
575            log.warn("queue full, ignoring queuing for [{0}]", wrapper.getElement().getKey());
576            return false;
577        }
578        if (!executor.isShutdown()) {
579            if (wrapper.filterDuplicates()) {
580                wrapper.addToUniqueCallables();
581                try {
582                    executor.execute(wrapper);
583                }
584                catch (Throwable ree) {
585                    wrapper.removeFromUniqueCallables();
586                    throw new RuntimeException(ree);
587                }
588            }
589        }
590        else {
591            log.warn("Executor shutting down, ignoring queueing of [{0}]", wrapper.getElement().getKey());
592        }
593        return true;
594    }
595
596    /**
597     * Queue a callable for asynchronous execution.
598     *
599     * @param callable callable to queue.
600     * @return <code>true</code> if the callable was queued, <code>false</code> if the queue is full and the callable
601     *         was not queued.
602     */
603    public boolean queue(XCallable<?> callable) {
604        return queue(callable, 0);
605    }
606
607    /**
608     * Queue a list of callables for serial execution.
609     * <p>
610     * Useful to serialize callables that may compete with each other for resources.
611     * <p>
612     * All callables will be processed with the priority of the highest priority of all callables.
613     *
614     * @param callables callables to be executed by the composite callable.
615     * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables
616     *         were not queued.
617     */
618    public boolean queueSerial(List<? extends XCallable<?>> callables) {
619        return queueSerial(callables, 0);
620    }
621
622    /**
623     * Queue a callable for asynchronous execution sometime in the future.
624     *
625     * @param callable callable to queue for delayed execution
626     * @param delay time, in milliseconds, that the callable should be delayed.
627     * @return <code>true</code> if the callable was queued, <code>false</code>
628     *         if the queue is full and the callable was not queued.
629     */
630    public synchronized boolean queue(XCallable<?> callable, long delay) {
631        if (callable == null) {
632            return true;
633        }
634        boolean queued = false;
635        if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
636            log.warn("[queue] System is in SAFEMODE. Hence no callable is queued. current queue size " + queue.size());
637        }
638        else {
639            checkInterruptTypes(callable);
640            queued = queue(new CallableWrapper(callable, delay), false);
641            if (queued) {
642                incrCounter(INSTR_QUEUED_COUNTER, 1);
643            }
644            else {
645                log.warn("Could not queue callable");
646            }
647        }
648        return queued;
649    }
650
651    /**
652     * Queue a list of callables for serial execution sometime in the future.
653     * <p>
654     * Useful to serialize callables that may compete with each other for resources.
655     * <p>
656     * All callables will be processed with the priority of the highest priority of all callables.
657     *
658     * @param callables callables to be executed by the composite callable.
659     * @param delay time, in milliseconds, that the callable should be delayed.
660     * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables
661     *         were not queued.
662     */
663    public synchronized boolean queueSerial(List<? extends XCallable<?>> callables, long delay) {
664        boolean queued;
665        if (callables == null || callables.size() == 0) {
666            queued = true;
667        }
668        else if (callables.size() == 1) {
669            queued = queue(callables.get(0), delay);
670        }
671        else {
672            XCallable<?> callable = new CompositeCallable(callables);
673            queued = queue(callable, delay);
674            if (queued) {
675                incrCounter(INSTR_QUEUED_COUNTER, callables.size());
676            }
677        }
678        return queued;
679    }
680
681    /**
682     * Instruments the callable queue service.
683     *
684     * @param instr instance to instrument the callable queue service to.
685     */
686    public void instrument(Instrumentation instr) {
687        instrumentation = instr;
688        instr.addSampler(INSTRUMENTATION_GROUP, INSTR_QUEUE_SIZE_SAMPLER, 60, 1, new Instrumentation.Variable<Long>() {
689            public Long getValue() {
690                return (long) queue.size();
691            }
692        });
693        instr.addSampler(INSTRUMENTATION_GROUP, INSTR_THREADS_ACTIVE_SAMPLER, 60, 1,
694                new Instrumentation.Variable<Long>() {
695                    public Long getValue() {
696                        return (long) executor.getActiveCount();
697                    }
698                });
699    }
700
701    /**
702     * check the interrupt map for the existence of an interrupt commands if
703     * exist a List of Interrupt Callable for the same lock key will bereturned,
704     * otherwise it will return null
705     */
706    public Set<XCallable<?>> checkInterrupts(String lockKey) {
707
708        if (lockKey != null) {
709            return interruptCommandsMap.remove(lockKey);
710        }
711        return null;
712    }
713
714    /**
715     * check if the callable is of an interrupt type and insert it into the map
716     * accordingly
717     *
718     * @param callable
719     */
720    public void checkInterruptTypes(XCallable<?> callable) {
721        if ((callable instanceof CompositeCallable) && (((CompositeCallable) callable).getCallables() != null)) {
722            for (XCallable<?> singleCallable : ((CompositeCallable) callable).getCallables()) {
723                if (INTERRUPT_TYPES.contains(singleCallable.getType())) {
724                    insertCallableIntoInterruptMap(singleCallable);
725                }
726            }
727        }
728        else if (INTERRUPT_TYPES.contains(callable.getType())) {
729            insertCallableIntoInterruptMap(callable);
730        }
731    }
732
733    /**
734     * insert a new callable in the Interrupt Command Map add a new element to
735     * the list or create a new list accordingly
736     *
737     * @param callable
738     */
739    public void insertCallableIntoInterruptMap(XCallable<?> callable) {
740        if (interruptCommandsMap.size() < interruptMapMaxSize) {
741            Set<XCallable<?>> newSet = Collections.synchronizedSet(new LinkedHashSet<XCallable<?>>());
742            Set<XCallable<?>> interruptSet = interruptCommandsMap.putIfAbsent(callable.getEntityKey(), newSet);
743            if (interruptSet == null) {
744                interruptSet = newSet;
745            }
746            if (interruptSet.add(callable)) {
747                log.trace("Inserting an interrupt element [{0}] to the interrupt map", callable.toString());
748            } else {
749                log.trace("Interrupt element [{0}] already present", callable.toString());
750            }
751        }
752        else {
753            log.warn(
754                    "The interrupt map reached max size of [{0}], an interrupt element [{1}] will not added to the map [{1}]",
755                    interruptCommandsMap.size(), callable.toString());
756        }
757    }
758
759    /**
760     * Get the list of strings of queue dump
761     *
762     * @return the list of string that representing each CallableWrapper
763     */
764    public List<String> getQueueDump() {
765        List<String> list = new ArrayList<String>();
766        for (QueueElement<CallableWrapper> qe : queue) {
767            if (qe.toString() == null) {
768                continue;
769            }
770            list.add(qe.toString());
771        }
772        return list;
773    }
774
775    /**
776     * Get the list of strings of uniqueness map dump
777     *
778     * @return the list of string that representing the key of each command in the queue
779     */
780    public List<String> getUniqueDump() {
781        List<String> list = new ArrayList<String>();
782        for (Entry<String, Date> entry : uniqueCallables.entrySet()) {
783            list.add(entry.toString());
784        }
785        return list;
786    }
787
788    // Refer executor.invokeAll
789    public <T> List<Future<T>> invokeAll(List<CallableWrapper<T>> tasks)
790            throws InterruptedException {
791        return executor.invokeAll(tasks);
792    }
793
794}