001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.Date;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.LinkedHashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Map.Entry;
030import java.util.Set;
031import java.util.concurrent.BlockingQueue;
032import java.util.concurrent.Callable;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.Future;
035import java.util.concurrent.RunnableFuture;
036import java.util.concurrent.ThreadPoolExecutor;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicInteger;
039
040import org.apache.hadoop.conf.Configuration;
041import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
042import org.apache.oozie.util.Instrumentable;
043import org.apache.oozie.util.Instrumentation;
044import org.apache.oozie.util.NamedThreadFactory;
045import org.apache.oozie.util.PollablePriorityDelayQueue;
046import org.apache.oozie.util.PriorityDelayQueue;
047import org.apache.oozie.util.PriorityDelayQueue.QueueElement;
048import org.apache.oozie.util.XCallable;
049import org.apache.oozie.util.XLog;
050
051import com.google.common.collect.ImmutableSet;
052
053/**
054 * The callable queue service queues {@link XCallable}s for asynchronous execution.
055 * <p>
056 * Callables can be queued for immediate execution or for delayed execution (some time in the future).
057 * <p>
058 * Callables are consumed from the queue for execution based on their priority.
059 * <p>
060 * When the queues (for immediate execution and for delayed execution) are full, the callable queue service stops
061 * queuing callables.
062 * <p>
063 * A thread-pool is used to execute the callables asynchronously.
064 * <p>
065 * The following configuration parameters control the callable queue service:
066 * <p>
067 * {@link #CONF_QUEUE_SIZE} size of the immediate execution queue. Defaulf value is 10000.
068 * <p>
069 * {@link #CONF_THREADS} number of threads in the thread-pool used for asynchronous command execution. When this number
070 * of threads is reached, commands remain the queue until threads become available. Sets up a priority queue for the
071 * execution of Commands via a ThreadPool. Sets up a Delayed Queue to handle actions which will be ready for execution
072 * sometime in the future.
073 */
074public class CallableQueueService implements Service, Instrumentable {
075    private static final String INSTRUMENTATION_GROUP = "callablequeue";
076    private static final String INSTR_IN_QUEUE_TIME_TIMER = "time.in.queue";
077    private static final String INSTR_EXECUTED_COUNTER = "executed";
078    private static final String INSTR_FAILED_COUNTER = "failed";
079    private static final String INSTR_QUEUED_COUNTER = "queued";
080    private static final String INSTR_QUEUE_SIZE_SAMPLER = "queue.size";
081    private static final String INSTR_THREADS_ACTIVE_SAMPLER = "threads.active";
082
083    public static final String CONF_PREFIX = Service.CONF_PREFIX + "CallableQueueService.";
084
085    public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size";
086    public static final String CONF_THREADS = CONF_PREFIX + "threads";
087    public static final String CONF_CALLABLE_CONCURRENCY = CONF_PREFIX + "callable.concurrency";
088    public static final String CONF_CALLABLE_NEXT_ELIGIBLE = CONF_PREFIX + "callable.next.eligible";
089    public static final String CONF_CALLABLE_INTERRUPT_TYPES = CONF_PREFIX + "InterruptTypes";
090    public static final String CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE = CONF_PREFIX + "InterruptMapMaxSize";
091
092    public static final int CONCURRENCY_DELAY = 500;
093
094    public static final int SAFE_MODE_DELAY = 60000;
095
096    private final Map<String, AtomicInteger> activeCallables = new HashMap<String, AtomicInteger>();
097
098    private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>();
099
100    private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<>();
101
102    private Set<String> interruptTypes;
103
104    private int interruptMapMaxSize;
105
106    private int maxCallableConcurrency;
107
108    private boolean callableBegin(XCallable<?> callable) {
109        synchronized (activeCallables) {
110            AtomicInteger counter = activeCallables.get(callable.getType());
111            if (counter == null) {
112                counter = new AtomicInteger(1);
113                activeCallables.put(callable.getType(), counter);
114                return true;
115            }
116            else {
117                int i = counter.incrementAndGet();
118                return i <= maxCallableConcurrency;
119            }
120        }
121    }
122
123    private void callableEnd(XCallable<?> callable) {
124        synchronized (activeCallables) {
125            AtomicInteger counter = activeCallables.get(callable.getType());
126            if (counter == null) {
127                throw new IllegalStateException("It should not happen");
128            }
129            else {
130                counter.decrementAndGet();
131            }
132        }
133    }
134
135    private boolean callableReachMaxConcurrency(XCallable<?> callable) {
136        synchronized (activeCallables) {
137            AtomicInteger counter = activeCallables.get(callable.getType());
138            if (counter == null) {
139                return true;
140            }
141            else {
142                int i = counter.get();
143                return i < maxCallableConcurrency;
144            }
145        }
146    }
147
148    // Callables are wrapped with the this wrapper for execution, for logging
149    // and instrumentation.
150    // The wrapper implements Runnable and Comparable to be able to work with an
151    // executor and a priority queue.
152    public class CallableWrapper<E> extends PriorityDelayQueue.QueueElement<E> implements Runnable, Callable<E> {
153        private Instrumentation.Cron cron;
154
155        public CallableWrapper(XCallable<E> callable, long delay) {
156            super(callable, callable.getPriority(), delay, TimeUnit.MILLISECONDS);
157            cron = new Instrumentation.Cron();
158            cron.start();
159        }
160
161        public void run() {
162            XCallable<?> callable = null;
163            try {
164                removeFromUniqueCallables();
165                if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
166                    log.info("Oozie is in SAFEMODE, requeuing callable [{0}] with [{1}]ms delay", getElement().getType(),
167                            SAFE_MODE_DELAY);
168                    setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS);
169                    queue(this, true);
170                    return;
171                }
172                callable = getElement();
173                if (callableBegin(callable)) {
174                    cron.stop();
175                    addInQueueCron(cron);
176                    XLog log = XLog.getLog(getClass());
177                    log.trace("executing callable [{0}]", callable.getName());
178
179                    try {
180                        //FutureTask.run() will invoke cllable.call()
181                        super.run();
182                        incrCounter(INSTR_EXECUTED_COUNTER, 1);
183                        log.trace("executed callable [{0}]", callable.getName());
184                    }
185                    catch (Exception ex) {
186                        incrCounter(INSTR_FAILED_COUNTER, 1);
187                        log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
188                    }
189                }
190                else {
191                    log.warn("max concurrency for callable [{0}] exceeded, requeueing with [{1}]ms delay", callable
192                            .getType(), CONCURRENCY_DELAY);
193                    setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS);
194                    queue(this, true);
195                    incrCounter(callable.getType() + "#exceeded.concurrency", 1);
196                }
197            }
198            catch (Throwable t) {
199                incrCounter(INSTR_FAILED_COUNTER, 1);
200                log.warn("exception callable [{0}], {1}", callable == null ? "N/A" : callable.getName(),
201                        t.getMessage(), t);
202            }
203            finally {
204                if (callable != null) {
205                    callableEnd(callable);
206                }
207            }
208        }
209
210        /**
211         * Filter the duplicate callables from the list before queue this.
212         * <p>
213         * If it is single callable, checking if key is in unique map or not.
214         * <p>
215         * If it is composite callable, remove duplicates callables from the composite.
216         *
217         * @return true if this callable should be queued
218         */
219        public boolean filterDuplicates() {
220            XCallable<?> callable = getElement();
221            if (callable instanceof CompositeCallable) {
222                return ((CompositeCallable) callable).removeDuplicates();
223            }
224            else {
225                return uniqueCallables.containsKey(callable.getKey()) == false;
226            }
227        }
228
229        /**
230         * Add the keys to the set
231         */
232        public void addToUniqueCallables() {
233            XCallable<?> callable = getElement();
234            if (callable instanceof CompositeCallable) {
235                ((CompositeCallable) callable).addToUniqueCallables();
236            }
237            else {
238                ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date());
239            }
240        }
241
242        /**
243         * Remove the keys from the set
244         */
245        public void removeFromUniqueCallables() {
246            XCallable<?> callable = getElement();
247            if (callable instanceof CompositeCallable) {
248                ((CompositeCallable) callable).removeFromUniqueCallables();
249            }
250            else {
251                uniqueCallables.remove(callable.getKey());
252            }
253        }
254
255        //this will not get called, bcz  newTaskFor of threadpool will convert it in futureTask which is a runnable.
256        // futureTask  will call the cllable.call from run method. so we override run to call super.run method.
257        @Override
258        public E call() throws Exception {
259            return null;
260        }
261    }
262
263    class CompositeCallable implements XCallable<Void> {
264        private List<XCallable<?>> callables;
265        private String name;
266        private int priority;
267        private long createdTime;
268
269        public CompositeCallable(List<? extends XCallable<?>> callables) {
270            this.callables = new ArrayList<XCallable<?>>(callables);
271            priority = 0;
272            createdTime = Long.MAX_VALUE;
273            StringBuilder sb = new StringBuilder();
274            String separator = "[";
275            for (XCallable<?> callable : callables) {
276                priority = Math.max(priority, callable.getPriority());
277                createdTime = Math.min(createdTime, callable.getCreatedTime());
278                sb.append(separator).append(callable.getName());
279                separator = ",";
280            }
281            sb.append("]");
282            name = sb.toString();
283        }
284
285        @Override
286        public String getName() {
287            return name;
288        }
289
290        @Override
291        public String getType() {
292            return "#composite#" + callables.get(0).getType();
293        }
294
295        @Override
296        public String getKey() {
297            return "#composite#" + callables.get(0).getKey();
298        }
299
300        @Override
301        public String getEntityKey() {
302            return "#composite#" + callables.get(0).getEntityKey();
303        }
304
305        @Override
306        public int getPriority() {
307            return priority;
308        }
309
310        @Override
311        public long getCreatedTime() {
312            return createdTime;
313        }
314
315        @Override
316        public void setInterruptMode(boolean mode) {
317        }
318
319        @Override
320        public boolean inInterruptMode() {
321            return false;
322        }
323
324        public List<XCallable<?>> getCallables() {
325            return this.callables;
326        }
327
328        public Void call() throws Exception {
329            XLog log = XLog.getLog(getClass());
330
331            for (XCallable<?> callable : callables) {
332                log.trace("executing callable [{0}]", callable.getName());
333                try {
334                    callable.call();
335                    incrCounter(INSTR_EXECUTED_COUNTER, 1);
336                    log.trace("executed callable [{0}]", callable.getName());
337                }
338                catch (Exception ex) {
339                    incrCounter(INSTR_FAILED_COUNTER, 1);
340                    log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
341                }
342            }
343
344            // ticking -1 not to count the call to the composite callable
345            incrCounter(INSTR_EXECUTED_COUNTER, -1);
346            return null;
347        }
348
349        /*
350         * (non-Javadoc)
351         *
352         * @see java.lang.Object#toString()
353         */
354        @Override
355        public String toString() {
356            if (callables.size() == 0) {
357                return null;
358            }
359            StringBuilder sb = new StringBuilder();
360            int size = callables.size();
361            for (int i = 0; i < size; i++) {
362                XCallable<?> callable = callables.get(i);
363                sb.append("(");
364                sb.append(callable.toString());
365                if (i + 1 == size) {
366                    sb.append(")");
367                }
368                else {
369                    sb.append("),");
370                }
371            }
372            return sb.toString();
373        }
374
375        /**
376         * Remove the duplicate callables from the list before queue them
377         *
378         * @return true if callables should be queued
379         */
380        public boolean removeDuplicates() {
381            Set<String> set = new HashSet<String>();
382            List<XCallable<?>> filteredCallables = new ArrayList<XCallable<?>>();
383            if (callables.size() == 0) {
384                return false;
385            }
386            for (XCallable<?> callable : callables) {
387                if (!uniqueCallables.containsKey(callable.getKey()) && !set.contains(callable.getKey())) {
388                    filteredCallables.add(callable);
389                    set.add(callable.getKey());
390                }
391            }
392            callables = filteredCallables;
393            if (callables.size() == 0) {
394                return false;
395            }
396            return true;
397        }
398
399        /**
400         * Add the keys to the set
401         */
402        public void addToUniqueCallables() {
403            for (XCallable<?> callable : callables) {
404                ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date());
405            }
406        }
407
408        /**
409         * Remove the keys from the set
410         */
411        public void removeFromUniqueCallables() {
412            for (XCallable<?> callable : callables) {
413                uniqueCallables.remove(callable.getKey());
414            }
415        }
416    }
417
418    private XLog log = XLog.getLog(getClass());
419
420    private int queueSize;
421    private PriorityDelayQueue<CallableWrapper> queue;
422    private ThreadPoolExecutor executor;
423    private Instrumentation instrumentation;
424
425    /**
426     * Convenience method for instrumentation counters.
427     *
428     * @param name counter name.
429     * @param count count to increment the counter.
430     */
431    private void incrCounter(String name, int count) {
432        if (instrumentation != null) {
433            instrumentation.incr(INSTRUMENTATION_GROUP, name, count);
434        }
435    }
436
437    private void addInQueueCron(Instrumentation.Cron cron) {
438        if (instrumentation != null) {
439            instrumentation.addCron(INSTRUMENTATION_GROUP, INSTR_IN_QUEUE_TIME_TIMER, cron);
440        }
441    }
442
443    /**
444     * Initialize the command queue service.
445     *
446     * @param services services instance.
447     */
448    @Override
449    @SuppressWarnings({ "unchecked", "rawtypes" })
450    public void init(Services services) {
451        Configuration conf = services.getConf();
452
453        queueSize = ConfigurationService.getInt(conf, CONF_QUEUE_SIZE);
454        int threads = ConfigurationService.getInt(conf, CONF_THREADS);
455        boolean callableNextEligible = ConfigurationService.getBoolean(conf, CONF_CALLABLE_NEXT_ELIGIBLE);
456
457        interruptTypes = new HashSet<>();
458        for (String type : ConfigurationService.getStrings(conf, CONF_CALLABLE_INTERRUPT_TYPES)) {
459            log.debug("Adding interrupt type [{0}]", type);
460            interruptTypes.add(type);
461        }
462        interruptTypes = ImmutableSet.copyOf(interruptTypes);
463
464        if (!callableNextEligible) {
465            queue = new PriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
466                @Override
467                protected void debug(String msgTemplate, Object... msgArgs) {
468                    log.trace(msgTemplate, msgArgs);
469                }
470            };
471        }
472        else {
473            // If the head of this queue has already reached max concurrency,
474            // continuously find next one
475            // which has not yet reach max concurrency.Overrided method
476            // 'eligibleToPoll' to check if the
477            // element of this queue has reached the maximum concurrency.
478            queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
479                @Override
480                protected void debug(String msgTemplate, Object... msgArgs) {
481                    log.trace(msgTemplate, msgArgs);
482                }
483
484                @Override
485                protected boolean eligibleToPoll(QueueElement<?> element) {
486                    if (element != null) {
487                        CallableWrapper wrapper = (CallableWrapper) element;
488                        if (element.getElement() != null) {
489                            return callableReachMaxConcurrency(wrapper.getElement());
490                        }
491                    }
492                    return false;
493                }
494
495            };
496        }
497
498        interruptMapMaxSize = ConfigurationService.getInt(conf, CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE);
499
500        // IMPORTANT: The ThreadPoolExecutor does not always the execute
501        // commands out of the queue, there are
502        // certain conditions where commands are pushed directly to a thread.
503        // As we are using a queue with DELAYED semantics (i.e. execute the
504        // command in 5 mins) we need to make
505        // sure that the commands are always pushed to the queue.
506        // To achieve this (by looking a the ThreadPoolExecutor.execute()
507        // implementation, we are making the pool
508        // minimum size equals to the maximum size (thus threads are keep always
509        // running) and we are warming up
510        // all those threads (the for loop that runs dummy runnables).
511        executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue,
512                new NamedThreadFactory("CallableQueue")) {
513            protected void beforeExecute(Thread t, Runnable r) {
514                super.beforeExecute(t,r);
515                XLog.Info.get().clear();
516            }
517            protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
518                return (RunnableFuture<T>)callable;
519            }
520        };
521
522        for (int i = 0; i < threads; i++) {
523            executor.execute(new Runnable() {
524                public void run() {
525                    try {
526                        Thread.sleep(100);
527                    }
528                    catch (InterruptedException ex) {
529                        log.warn("Could not warm up threadpool {0}", ex.getMessage(), ex);
530                    }
531                }
532            });
533        }
534
535        maxCallableConcurrency = ConfigurationService.getInt(conf, CONF_CALLABLE_CONCURRENCY);
536    }
537
538    /**
539     * Destroy the command queue service.
540     */
541    @Override
542    public void destroy() {
543        try {
544            long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds
545            executor.shutdown();
546            queue.clear();
547            while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
548                log.info("Waiting for executor to shutdown");
549                if (System.currentTimeMillis() > limit) {
550                    log.warn("Gave up, continuing without waiting for executor to shutdown");
551                    break;
552                }
553            }
554        }
555        catch (InterruptedException ex) {
556            log.warn(ex);
557        }
558    }
559
560    /**
561     * Return the public interface for command queue service.
562     *
563     * @return {@link CallableQueueService}.
564     */
565    @Override
566    public Class<? extends Service> getInterface() {
567        return CallableQueueService.class;
568    }
569
570    /**
571     * @return int size of queue
572     */
573    public synchronized int queueSize() {
574        return queue.size();
575    }
576
577    private synchronized boolean queue(CallableWrapper wrapper, boolean ignoreQueueSize) {
578        if (!ignoreQueueSize && queue.size() >= queueSize) {
579            log.warn("queue full, ignoring queuing for [{0}]", wrapper.getElement().getKey());
580            return false;
581        }
582        if (!executor.isShutdown()) {
583            if (wrapper.filterDuplicates()) {
584                wrapper.addToUniqueCallables();
585                try {
586                    executor.execute(wrapper);
587                }
588                catch (Throwable ree) {
589                    wrapper.removeFromUniqueCallables();
590                    throw new RuntimeException(ree);
591                }
592            }
593        }
594        else {
595            log.warn("Executor shutting down, ignoring queueing of [{0}]", wrapper.getElement().getKey());
596        }
597        return true;
598    }
599
600    /**
601     * Queue a callable for asynchronous execution.
602     *
603     * @param callable callable to queue.
604     * @return <code>true</code> if the callable was queued, <code>false</code> if the queue is full and the callable
605     *         was not queued.
606     */
607    public boolean queue(XCallable<?> callable) {
608        return queue(callable, 0);
609    }
610
611    /**
612     * Queue a list of callables for serial execution.
613     * <p>
614     * Useful to serialize callables that may compete with each other for resources.
615     * <p>
616     * All callables will be processed with the priority of the highest priority of all callables.
617     *
618     * @param callables callables to be executed by the composite callable.
619     * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables
620     *         were not queued.
621     */
622    public boolean queueSerial(List<? extends XCallable<?>> callables) {
623        return queueSerial(callables, 0);
624    }
625
626    /**
627     * Queue a callable for asynchronous execution sometime in the future.
628     *
629     * @param callable callable to queue for delayed execution
630     * @param delay time, in milliseconds, that the callable should be delayed.
631     * @return <code>true</code> if the callable was queued, <code>false</code>
632     *         if the queue is full and the callable was not queued.
633     */
634    public synchronized boolean queue(XCallable<?> callable, long delay) {
635        if (callable == null) {
636            return true;
637        }
638        boolean queued = false;
639        if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
640            log.warn("[queue] System is in SAFEMODE. Hence no callable is queued. current queue size " + queue.size());
641        }
642        else {
643            checkInterruptTypes(callable);
644            queued = queue(new CallableWrapper(callable, delay), false);
645            if (queued) {
646                incrCounter(INSTR_QUEUED_COUNTER, 1);
647            }
648            else {
649                log.warn("Could not queue callable");
650            }
651        }
652        return queued;
653    }
654
655    /**
656     * Queue a list of callables for serial execution sometime in the future.
657     * <p>
658     * Useful to serialize callables that may compete with each other for resources.
659     * <p>
660     * All callables will be processed with the priority of the highest priority of all callables.
661     *
662     * @param callables callables to be executed by the composite callable.
663     * @param delay time, in milliseconds, that the callable should be delayed.
664     * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables
665     *         were not queued.
666     */
667    public synchronized boolean queueSerial(List<? extends XCallable<?>> callables, long delay) {
668        boolean queued;
669        if (callables == null || callables.size() == 0) {
670            queued = true;
671        }
672        else if (callables.size() == 1) {
673            queued = queue(callables.get(0), delay);
674        }
675        else {
676            XCallable<?> callable = new CompositeCallable(callables);
677            queued = queue(callable, delay);
678            if (queued) {
679                incrCounter(INSTR_QUEUED_COUNTER, callables.size());
680            }
681        }
682        return queued;
683    }
684
685    /**
686     * Instruments the callable queue service.
687     *
688     * @param instr instance to instrument the callable queue service to.
689     */
690    public void instrument(Instrumentation instr) {
691        instrumentation = instr;
692        instr.addSampler(INSTRUMENTATION_GROUP, INSTR_QUEUE_SIZE_SAMPLER, 60, 1, new Instrumentation.Variable<Long>() {
693            public Long getValue() {
694                return (long) queue.size();
695            }
696        });
697        instr.addSampler(INSTRUMENTATION_GROUP, INSTR_THREADS_ACTIVE_SAMPLER, 60, 1,
698                new Instrumentation.Variable<Long>() {
699                    public Long getValue() {
700                        return (long) executor.getActiveCount();
701                    }
702                });
703    }
704
705    /**
706     * check the interrupt map for the existence of an interrupt commands if
707     * exist a List of Interrupt Callable for the same lock key will bereturned,
708     * otherwise it will return null
709     */
710    public Set<XCallable<?>> checkInterrupts(String lockKey) {
711
712        if (lockKey != null) {
713            return interruptCommandsMap.remove(lockKey);
714        }
715        return null;
716    }
717
718    /**
719     * check if the callable is of an interrupt type and insert it into the map
720     * accordingly
721     *
722     * @param callable
723     */
724    public void checkInterruptTypes(XCallable<?> callable) {
725        if ((callable instanceof CompositeCallable) && (((CompositeCallable) callable).getCallables() != null)) {
726            for (XCallable<?> singleCallable : ((CompositeCallable) callable).getCallables()) {
727                if (interruptTypes.contains(singleCallable.getType())) {
728                    insertCallableIntoInterruptMap(singleCallable);
729                }
730            }
731        }
732        else if (interruptTypes.contains(callable.getType())) {
733            insertCallableIntoInterruptMap(callable);
734        }
735    }
736
737    /**
738     * insert a new callable in the Interrupt Command Map add a new element to
739     * the list or create a new list accordingly
740     *
741     * @param callable
742     */
743    public void insertCallableIntoInterruptMap(XCallable<?> callable) {
744        if (interruptCommandsMap.size() < interruptMapMaxSize) {
745            Set<XCallable<?>> newSet = Collections.synchronizedSet(new LinkedHashSet<XCallable<?>>());
746            Set<XCallable<?>> interruptSet = interruptCommandsMap.putIfAbsent(callable.getEntityKey(), newSet);
747            if (interruptSet == null) {
748                interruptSet = newSet;
749            }
750            if (interruptSet.add(callable)) {
751                log.trace("Inserting an interrupt element [{0}] to the interrupt map", callable.toString());
752            } else {
753                log.trace("Interrupt element [{0}] already present", callable.toString());
754            }
755        }
756        else {
757            log.warn(
758                    "The interrupt map reached max size of [{0}], an interrupt element [{1}] will not added to the map [{1}]",
759                    interruptCommandsMap.size(), callable.toString());
760        }
761    }
762
763    /**
764     * Get the list of strings of queue dump
765     *
766     * @return the list of string that representing each CallableWrapper
767     */
768    public List<String> getQueueDump() {
769        List<String> list = new ArrayList<String>();
770        for (QueueElement<CallableWrapper> qe : queue) {
771            if (qe.toString() == null) {
772                continue;
773            }
774            list.add(qe.toString());
775        }
776        return list;
777    }
778
779    /**
780     * Get the list of strings of uniqueness map dump
781     *
782     * @return the list of string that representing the key of each command in the queue
783     */
784    public List<String> getUniqueDump() {
785        List<String> list = new ArrayList<String>();
786        for (Entry<String, Date> entry : uniqueCallables.entrySet()) {
787            list.add(entry.toString());
788        }
789        return list;
790    }
791
792    // Refer executor.invokeAll
793    public <T> List<Future<T>> invokeAll(List<CallableWrapper<T>> tasks)
794            throws InterruptedException {
795        return executor.invokeAll(tasks);
796    }
797
798    public Set<String> getInterruptTypes() {
799        return interruptTypes;
800    }
801
802}