001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.Date; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.LinkedHashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Map.Entry; 030import java.util.Set; 031import java.util.concurrent.BlockingQueue; 032import java.util.concurrent.Callable; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.Future; 035import java.util.concurrent.RunnableFuture; 036import java.util.concurrent.ThreadPoolExecutor; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.atomic.AtomicInteger; 039 040import org.apache.hadoop.conf.Configuration; 041import org.apache.oozie.client.OozieClient.SYSTEM_MODE; 042import org.apache.oozie.util.Instrumentable; 043import org.apache.oozie.util.Instrumentation; 044import org.apache.oozie.util.NamedThreadFactory; 045import org.apache.oozie.util.PollablePriorityDelayQueue; 046import org.apache.oozie.util.PriorityDelayQueue; 047import org.apache.oozie.util.PriorityDelayQueue.QueueElement; 048import org.apache.oozie.util.XCallable; 049import org.apache.oozie.util.XLog; 050 051/** 052 * The callable queue service queues {@link XCallable}s for asynchronous execution. 053 * <p> 054 * Callables can be queued for immediate execution or for delayed execution (some time in the future). 055 * <p> 056 * Callables are consumed from the queue for execution based on their priority. 057 * <p> 058 * When the queues (for immediate execution and for delayed execution) are full, the callable queue service stops 059 * queuing callables. 060 * <p> 061 * A thread-pool is used to execute the callables asynchronously. 062 * <p> 063 * The following configuration parameters control the callable queue service: 064 * <p> 065 * {@link #CONF_QUEUE_SIZE} size of the immediate execution queue. Defaulf value is 10000. 066 * <p> 067 * {@link #CONF_THREADS} number of threads in the thread-pool used for asynchronous command execution. When this number 068 * of threads is reached, commands remain the queue until threads become available. Sets up a priority queue for the 069 * execution of Commands via a ThreadPool. Sets up a Delayed Queue to handle actions which will be ready for execution 070 * sometime in the future. 071 */ 072public class CallableQueueService implements Service, Instrumentable { 073 private static final String INSTRUMENTATION_GROUP = "callablequeue"; 074 private static final String INSTR_IN_QUEUE_TIME_TIMER = "time.in.queue"; 075 private static final String INSTR_EXECUTED_COUNTER = "executed"; 076 private static final String INSTR_FAILED_COUNTER = "failed"; 077 private static final String INSTR_QUEUED_COUNTER = "queued"; 078 private static final String INSTR_QUEUE_SIZE_SAMPLER = "queue.size"; 079 private static final String INSTR_THREADS_ACTIVE_SAMPLER = "threads.active"; 080 081 public static final String CONF_PREFIX = Service.CONF_PREFIX + "CallableQueueService."; 082 083 public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size"; 084 public static final String CONF_THREADS = CONF_PREFIX + "threads"; 085 public static final String CONF_CALLABLE_CONCURRENCY = CONF_PREFIX + "callable.concurrency"; 086 public static final String CONF_CALLABLE_NEXT_ELIGIBLE = CONF_PREFIX + "callable.next.eligible"; 087 public static final String CONF_CALLABLE_INTERRUPT_TYPES = CONF_PREFIX + "InterruptTypes"; 088 public static final String CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE = CONF_PREFIX + "InterruptMapMaxSize"; 089 090 public static final int CONCURRENCY_DELAY = 500; 091 092 public static final int SAFE_MODE_DELAY = 60000; 093 094 private final Map<String, AtomicInteger> activeCallables = new HashMap<String, AtomicInteger>(); 095 096 private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>(); 097 098 private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<String, Set<XCallable<?>>>(); 099 100 public static final HashSet<String> INTERRUPT_TYPES = new HashSet<String>(); 101 102 private int interruptMapMaxSize; 103 104 private int maxCallableConcurrency; 105 106 private boolean callableBegin(XCallable<?> callable) { 107 synchronized (activeCallables) { 108 AtomicInteger counter = activeCallables.get(callable.getType()); 109 if (counter == null) { 110 counter = new AtomicInteger(1); 111 activeCallables.put(callable.getType(), counter); 112 return true; 113 } 114 else { 115 int i = counter.incrementAndGet(); 116 return i <= maxCallableConcurrency; 117 } 118 } 119 } 120 121 private void callableEnd(XCallable<?> callable) { 122 synchronized (activeCallables) { 123 AtomicInteger counter = activeCallables.get(callable.getType()); 124 if (counter == null) { 125 throw new IllegalStateException("It should not happen"); 126 } 127 else { 128 counter.decrementAndGet(); 129 } 130 } 131 } 132 133 private boolean callableReachMaxConcurrency(XCallable<?> callable) { 134 synchronized (activeCallables) { 135 AtomicInteger counter = activeCallables.get(callable.getType()); 136 if (counter == null) { 137 return true; 138 } 139 else { 140 int i = counter.get(); 141 return i < maxCallableConcurrency; 142 } 143 } 144 } 145 146 // Callables are wrapped with the this wrapper for execution, for logging 147 // and instrumentation. 148 // The wrapper implements Runnable and Comparable to be able to work with an 149 // executor and a priority queue. 150 public class CallableWrapper<E> extends PriorityDelayQueue.QueueElement<E> implements Runnable, Callable<E> { 151 private Instrumentation.Cron cron; 152 153 public CallableWrapper(XCallable<E> callable, long delay) { 154 super(callable, callable.getPriority(), delay, TimeUnit.MILLISECONDS); 155 cron = new Instrumentation.Cron(); 156 cron.start(); 157 } 158 159 public void run() { 160 XCallable<?> callable = null; 161 try { 162 removeFromUniqueCallables(); 163 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) { 164 log.info("Oozie is in SAFEMODE, requeuing callable [{0}] with [{1}]ms delay", getElement().getType(), 165 SAFE_MODE_DELAY); 166 setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS); 167 queue(this, true); 168 return; 169 } 170 callable = getElement(); 171 if (callableBegin(callable)) { 172 cron.stop(); 173 addInQueueCron(cron); 174 XLog log = XLog.getLog(getClass()); 175 log.trace("executing callable [{0}]", callable.getName()); 176 177 try { 178 //FutureTask.run() will invoke cllable.call() 179 super.run(); 180 incrCounter(INSTR_EXECUTED_COUNTER, 1); 181 log.trace("executed callable [{0}]", callable.getName()); 182 } 183 catch (Exception ex) { 184 incrCounter(INSTR_FAILED_COUNTER, 1); 185 log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex); 186 } 187 } 188 else { 189 log.warn("max concurrency for callable [{0}] exceeded, requeueing with [{1}]ms delay", callable 190 .getType(), CONCURRENCY_DELAY); 191 setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS); 192 queue(this, true); 193 incrCounter(callable.getType() + "#exceeded.concurrency", 1); 194 } 195 } 196 catch (Throwable t) { 197 incrCounter(INSTR_FAILED_COUNTER, 1); 198 log.warn("exception callable [{0}], {1}", callable == null ? "N/A" : callable.getName(), 199 t.getMessage(), t); 200 } 201 finally { 202 if (callable != null) { 203 callableEnd(callable); 204 } 205 } 206 } 207 208 /** 209 * Filter the duplicate callables from the list before queue this. 210 * <p> 211 * If it is single callable, checking if key is in unique map or not. 212 * <p> 213 * If it is composite callable, remove duplicates callables from the composite. 214 * 215 * @return true if this callable should be queued 216 */ 217 public boolean filterDuplicates() { 218 XCallable<?> callable = getElement(); 219 if (callable instanceof CompositeCallable) { 220 return ((CompositeCallable) callable).removeDuplicates(); 221 } 222 else { 223 return uniqueCallables.containsKey(callable.getKey()) == false; 224 } 225 } 226 227 /** 228 * Add the keys to the set 229 */ 230 public void addToUniqueCallables() { 231 XCallable<?> callable = getElement(); 232 if (callable instanceof CompositeCallable) { 233 ((CompositeCallable) callable).addToUniqueCallables(); 234 } 235 else { 236 ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date()); 237 } 238 } 239 240 /** 241 * Remove the keys from the set 242 */ 243 public void removeFromUniqueCallables() { 244 XCallable<?> callable = getElement(); 245 if (callable instanceof CompositeCallable) { 246 ((CompositeCallable) callable).removeFromUniqueCallables(); 247 } 248 else { 249 uniqueCallables.remove(callable.getKey()); 250 } 251 } 252 253 //this will not get called, bcz newTaskFor of threadpool will convert it in futureTask which is a runnable. 254 // futureTask will call the cllable.call from run method. so we override run to call super.run method. 255 @Override 256 public E call() throws Exception { 257 return null; 258 } 259 } 260 261 class CompositeCallable implements XCallable<Void> { 262 private List<XCallable<?>> callables; 263 private String name; 264 private int priority; 265 private long createdTime; 266 267 public CompositeCallable(List<? extends XCallable<?>> callables) { 268 this.callables = new ArrayList<XCallable<?>>(callables); 269 priority = 0; 270 createdTime = Long.MAX_VALUE; 271 StringBuilder sb = new StringBuilder(); 272 String separator = "["; 273 for (XCallable<?> callable : callables) { 274 priority = Math.max(priority, callable.getPriority()); 275 createdTime = Math.min(createdTime, callable.getCreatedTime()); 276 sb.append(separator).append(callable.getName()); 277 separator = ","; 278 } 279 sb.append("]"); 280 name = sb.toString(); 281 } 282 283 @Override 284 public String getName() { 285 return name; 286 } 287 288 @Override 289 public String getType() { 290 return "#composite#" + callables.get(0).getType(); 291 } 292 293 @Override 294 public String getKey() { 295 return "#composite#" + callables.get(0).getKey(); 296 } 297 298 @Override 299 public String getEntityKey() { 300 return "#composite#" + callables.get(0).getEntityKey(); 301 } 302 303 @Override 304 public int getPriority() { 305 return priority; 306 } 307 308 @Override 309 public long getCreatedTime() { 310 return createdTime; 311 } 312 313 @Override 314 public void setInterruptMode(boolean mode) { 315 } 316 317 @Override 318 public boolean inInterruptMode() { 319 return false; 320 } 321 322 public List<XCallable<?>> getCallables() { 323 return this.callables; 324 } 325 326 public Void call() throws Exception { 327 XLog log = XLog.getLog(getClass()); 328 329 for (XCallable<?> callable : callables) { 330 log.trace("executing callable [{0}]", callable.getName()); 331 try { 332 callable.call(); 333 incrCounter(INSTR_EXECUTED_COUNTER, 1); 334 log.trace("executed callable [{0}]", callable.getName()); 335 } 336 catch (Exception ex) { 337 incrCounter(INSTR_FAILED_COUNTER, 1); 338 log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex); 339 } 340 } 341 342 // ticking -1 not to count the call to the composite callable 343 incrCounter(INSTR_EXECUTED_COUNTER, -1); 344 return null; 345 } 346 347 /* 348 * (non-Javadoc) 349 * 350 * @see java.lang.Object#toString() 351 */ 352 @Override 353 public String toString() { 354 if (callables.size() == 0) { 355 return null; 356 } 357 StringBuilder sb = new StringBuilder(); 358 int size = callables.size(); 359 for (int i = 0; i < size; i++) { 360 XCallable<?> callable = callables.get(i); 361 sb.append("("); 362 sb.append(callable.toString()); 363 if (i + 1 == size) { 364 sb.append(")"); 365 } 366 else { 367 sb.append("),"); 368 } 369 } 370 return sb.toString(); 371 } 372 373 /** 374 * Remove the duplicate callables from the list before queue them 375 * 376 * @return true if callables should be queued 377 */ 378 public boolean removeDuplicates() { 379 Set<String> set = new HashSet<String>(); 380 List<XCallable<?>> filteredCallables = new ArrayList<XCallable<?>>(); 381 if (callables.size() == 0) { 382 return false; 383 } 384 for (XCallable<?> callable : callables) { 385 if (!uniqueCallables.containsKey(callable.getKey()) && !set.contains(callable.getKey())) { 386 filteredCallables.add(callable); 387 set.add(callable.getKey()); 388 } 389 } 390 callables = filteredCallables; 391 if (callables.size() == 0) { 392 return false; 393 } 394 return true; 395 } 396 397 /** 398 * Add the keys to the set 399 */ 400 public void addToUniqueCallables() { 401 for (XCallable<?> callable : callables) { 402 ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date()); 403 } 404 } 405 406 /** 407 * Remove the keys from the set 408 */ 409 public void removeFromUniqueCallables() { 410 for (XCallable<?> callable : callables) { 411 uniqueCallables.remove(callable.getKey()); 412 } 413 } 414 } 415 416 private XLog log = XLog.getLog(getClass()); 417 418 private int queueSize; 419 private PriorityDelayQueue<CallableWrapper> queue; 420 private ThreadPoolExecutor executor; 421 private Instrumentation instrumentation; 422 423 /** 424 * Convenience method for instrumentation counters. 425 * 426 * @param name counter name. 427 * @param count count to increment the counter. 428 */ 429 private void incrCounter(String name, int count) { 430 if (instrumentation != null) { 431 instrumentation.incr(INSTRUMENTATION_GROUP, name, count); 432 } 433 } 434 435 private void addInQueueCron(Instrumentation.Cron cron) { 436 if (instrumentation != null) { 437 instrumentation.addCron(INSTRUMENTATION_GROUP, INSTR_IN_QUEUE_TIME_TIMER, cron); 438 } 439 } 440 441 /** 442 * Initialize the command queue service. 443 * 444 * @param services services instance. 445 */ 446 @Override 447 @SuppressWarnings({ "unchecked", "rawtypes" }) 448 public void init(Services services) { 449 Configuration conf = services.getConf(); 450 451 queueSize = ConfigurationService.getInt(conf, CONF_QUEUE_SIZE); 452 int threads = ConfigurationService.getInt(conf, CONF_THREADS); 453 boolean callableNextEligible = ConfigurationService.getBoolean(conf, CONF_CALLABLE_NEXT_ELIGIBLE); 454 455 for (String type : ConfigurationService.getStrings(conf, CONF_CALLABLE_INTERRUPT_TYPES)) { 456 log.debug("Adding interrupt type [{0}]", type); 457 INTERRUPT_TYPES.add(type); 458 } 459 460 if (!callableNextEligible) { 461 queue = new PriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) { 462 @Override 463 protected void debug(String msgTemplate, Object... msgArgs) { 464 log.trace(msgTemplate, msgArgs); 465 } 466 }; 467 } 468 else { 469 // If the head of this queue has already reached max concurrency, 470 // continuously find next one 471 // which has not yet reach max concurrency.Overrided method 472 // 'eligibleToPoll' to check if the 473 // element of this queue has reached the maximum concurrency. 474 queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) { 475 @Override 476 protected void debug(String msgTemplate, Object... msgArgs) { 477 log.trace(msgTemplate, msgArgs); 478 } 479 480 @Override 481 protected boolean eligibleToPoll(QueueElement<?> element) { 482 if (element != null) { 483 CallableWrapper wrapper = (CallableWrapper) element; 484 if (element.getElement() != null) { 485 return callableReachMaxConcurrency(wrapper.getElement()); 486 } 487 } 488 return false; 489 } 490 491 }; 492 } 493 494 interruptMapMaxSize = ConfigurationService.getInt(conf, CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE); 495 496 // IMPORTANT: The ThreadPoolExecutor does not always the execute 497 // commands out of the queue, there are 498 // certain conditions where commands are pushed directly to a thread. 499 // As we are using a queue with DELAYED semantics (i.e. execute the 500 // command in 5 mins) we need to make 501 // sure that the commands are always pushed to the queue. 502 // To achieve this (by looking a the ThreadPoolExecutor.execute() 503 // implementation, we are making the pool 504 // minimum size equals to the maximum size (thus threads are keep always 505 // running) and we are warming up 506 // all those threads (the for loop that runs dummy runnables). 507 executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue, 508 new NamedThreadFactory("CallableQueue")) { 509 protected void beforeExecute(Thread t, Runnable r) { 510 super.beforeExecute(t,r); 511 XLog.Info.get().clear(); 512 } 513 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 514 return (RunnableFuture<T>)callable; 515 } 516 }; 517 518 for (int i = 0; i < threads; i++) { 519 executor.execute(new Runnable() { 520 public void run() { 521 try { 522 Thread.sleep(100); 523 } 524 catch (InterruptedException ex) { 525 log.warn("Could not warm up threadpool {0}", ex.getMessage(), ex); 526 } 527 } 528 }); 529 } 530 531 maxCallableConcurrency = ConfigurationService.getInt(conf, CONF_CALLABLE_CONCURRENCY); 532 } 533 534 /** 535 * Destroy the command queue service. 536 */ 537 @Override 538 public void destroy() { 539 try { 540 long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds 541 executor.shutdown(); 542 queue.clear(); 543 while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) { 544 log.info("Waiting for executor to shutdown"); 545 if (System.currentTimeMillis() > limit) { 546 log.warn("Gave up, continuing without waiting for executor to shutdown"); 547 break; 548 } 549 } 550 } 551 catch (InterruptedException ex) { 552 log.warn(ex); 553 } 554 } 555 556 /** 557 * Return the public interface for command queue service. 558 * 559 * @return {@link CallableQueueService}. 560 */ 561 @Override 562 public Class<? extends Service> getInterface() { 563 return CallableQueueService.class; 564 } 565 566 /** 567 * @return int size of queue 568 */ 569 public synchronized int queueSize() { 570 return queue.size(); 571 } 572 573 private synchronized boolean queue(CallableWrapper wrapper, boolean ignoreQueueSize) { 574 if (!ignoreQueueSize && queue.size() >= queueSize) { 575 log.warn("queue full, ignoring queuing for [{0}]", wrapper.getElement().getKey()); 576 return false; 577 } 578 if (!executor.isShutdown()) { 579 if (wrapper.filterDuplicates()) { 580 wrapper.addToUniqueCallables(); 581 try { 582 executor.execute(wrapper); 583 } 584 catch (Throwable ree) { 585 wrapper.removeFromUniqueCallables(); 586 throw new RuntimeException(ree); 587 } 588 } 589 } 590 else { 591 log.warn("Executor shutting down, ignoring queueing of [{0}]", wrapper.getElement().getKey()); 592 } 593 return true; 594 } 595 596 /** 597 * Queue a callable for asynchronous execution. 598 * 599 * @param callable callable to queue. 600 * @return <code>true</code> if the callable was queued, <code>false</code> if the queue is full and the callable 601 * was not queued. 602 */ 603 public boolean queue(XCallable<?> callable) { 604 return queue(callable, 0); 605 } 606 607 /** 608 * Queue a list of callables for serial execution. 609 * <p> 610 * Useful to serialize callables that may compete with each other for resources. 611 * <p> 612 * All callables will be processed with the priority of the highest priority of all callables. 613 * 614 * @param callables callables to be executed by the composite callable. 615 * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables 616 * were not queued. 617 */ 618 public boolean queueSerial(List<? extends XCallable<?>> callables) { 619 return queueSerial(callables, 0); 620 } 621 622 /** 623 * Queue a callable for asynchronous execution sometime in the future. 624 * 625 * @param callable callable to queue for delayed execution 626 * @param delay time, in milliseconds, that the callable should be delayed. 627 * @return <code>true</code> if the callable was queued, <code>false</code> 628 * if the queue is full and the callable was not queued. 629 */ 630 public synchronized boolean queue(XCallable<?> callable, long delay) { 631 if (callable == null) { 632 return true; 633 } 634 boolean queued = false; 635 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) { 636 log.warn("[queue] System is in SAFEMODE. Hence no callable is queued. current queue size " + queue.size()); 637 } 638 else { 639 checkInterruptTypes(callable); 640 queued = queue(new CallableWrapper(callable, delay), false); 641 if (queued) { 642 incrCounter(INSTR_QUEUED_COUNTER, 1); 643 } 644 else { 645 log.warn("Could not queue callable"); 646 } 647 } 648 return queued; 649 } 650 651 /** 652 * Queue a list of callables for serial execution sometime in the future. 653 * <p> 654 * Useful to serialize callables that may compete with each other for resources. 655 * <p> 656 * All callables will be processed with the priority of the highest priority of all callables. 657 * 658 * @param callables callables to be executed by the composite callable. 659 * @param delay time, in milliseconds, that the callable should be delayed. 660 * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables 661 * were not queued. 662 */ 663 public synchronized boolean queueSerial(List<? extends XCallable<?>> callables, long delay) { 664 boolean queued; 665 if (callables == null || callables.size() == 0) { 666 queued = true; 667 } 668 else if (callables.size() == 1) { 669 queued = queue(callables.get(0), delay); 670 } 671 else { 672 XCallable<?> callable = new CompositeCallable(callables); 673 queued = queue(callable, delay); 674 if (queued) { 675 incrCounter(INSTR_QUEUED_COUNTER, callables.size()); 676 } 677 } 678 return queued; 679 } 680 681 /** 682 * Instruments the callable queue service. 683 * 684 * @param instr instance to instrument the callable queue service to. 685 */ 686 public void instrument(Instrumentation instr) { 687 instrumentation = instr; 688 instr.addSampler(INSTRUMENTATION_GROUP, INSTR_QUEUE_SIZE_SAMPLER, 60, 1, new Instrumentation.Variable<Long>() { 689 public Long getValue() { 690 return (long) queue.size(); 691 } 692 }); 693 instr.addSampler(INSTRUMENTATION_GROUP, INSTR_THREADS_ACTIVE_SAMPLER, 60, 1, 694 new Instrumentation.Variable<Long>() { 695 public Long getValue() { 696 return (long) executor.getActiveCount(); 697 } 698 }); 699 } 700 701 /** 702 * check the interrupt map for the existence of an interrupt commands if 703 * exist a List of Interrupt Callable for the same lock key will bereturned, 704 * otherwise it will return null 705 */ 706 public Set<XCallable<?>> checkInterrupts(String lockKey) { 707 708 if (lockKey != null) { 709 return interruptCommandsMap.remove(lockKey); 710 } 711 return null; 712 } 713 714 /** 715 * check if the callable is of an interrupt type and insert it into the map 716 * accordingly 717 * 718 * @param callable 719 */ 720 public void checkInterruptTypes(XCallable<?> callable) { 721 if ((callable instanceof CompositeCallable) && (((CompositeCallable) callable).getCallables() != null)) { 722 for (XCallable<?> singleCallable : ((CompositeCallable) callable).getCallables()) { 723 if (INTERRUPT_TYPES.contains(singleCallable.getType())) { 724 insertCallableIntoInterruptMap(singleCallable); 725 } 726 } 727 } 728 else if (INTERRUPT_TYPES.contains(callable.getType())) { 729 insertCallableIntoInterruptMap(callable); 730 } 731 } 732 733 /** 734 * insert a new callable in the Interrupt Command Map add a new element to 735 * the list or create a new list accordingly 736 * 737 * @param callable 738 */ 739 public void insertCallableIntoInterruptMap(XCallable<?> callable) { 740 if (interruptCommandsMap.size() < interruptMapMaxSize) { 741 Set<XCallable<?>> newSet = Collections.synchronizedSet(new LinkedHashSet<XCallable<?>>()); 742 Set<XCallable<?>> interruptSet = interruptCommandsMap.putIfAbsent(callable.getEntityKey(), newSet); 743 if (interruptSet == null) { 744 interruptSet = newSet; 745 } 746 if (interruptSet.add(callable)) { 747 log.trace("Inserting an interrupt element [{0}] to the interrupt map", callable.toString()); 748 } else { 749 log.trace("Interrupt element [{0}] already present", callable.toString()); 750 } 751 } 752 else { 753 log.warn( 754 "The interrupt map reached max size of [{0}], an interrupt element [{1}] will not added to the map [{1}]", 755 interruptCommandsMap.size(), callable.toString()); 756 } 757 } 758 759 /** 760 * Get the list of strings of queue dump 761 * 762 * @return the list of string that representing each CallableWrapper 763 */ 764 public List<String> getQueueDump() { 765 List<String> list = new ArrayList<String>(); 766 for (QueueElement<CallableWrapper> qe : queue) { 767 if (qe.toString() == null) { 768 continue; 769 } 770 list.add(qe.toString()); 771 } 772 return list; 773 } 774 775 /** 776 * Get the list of strings of uniqueness map dump 777 * 778 * @return the list of string that representing the key of each command in the queue 779 */ 780 public List<String> getUniqueDump() { 781 List<String> list = new ArrayList<String>(); 782 for (Entry<String, Date> entry : uniqueCallables.entrySet()) { 783 list.add(entry.toString()); 784 } 785 return list; 786 } 787 788 // Refer executor.invokeAll 789 public <T> List<Future<T>> invokeAll(List<CallableWrapper<T>> tasks) 790 throws InterruptedException { 791 return executor.invokeAll(tasks); 792 } 793 794}