001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.Date; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.LinkedHashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.Map.Entry; 031import java.util.concurrent.BlockingQueue; 032import java.util.concurrent.ConcurrentHashMap; 033import java.util.concurrent.ThreadPoolExecutor; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicInteger; 036 037import org.apache.hadoop.conf.Configuration; 038import org.apache.oozie.client.OozieClient.SYSTEM_MODE; 039import org.apache.oozie.command.XCommand; 040import org.apache.oozie.util.Instrumentable; 041import org.apache.oozie.util.Instrumentation; 042import org.apache.oozie.util.PollablePriorityDelayQueue; 043import org.apache.oozie.util.PriorityDelayQueue; 044import org.apache.oozie.util.XCallable; 045import org.apache.oozie.util.XLog; 046import org.apache.oozie.util.PriorityDelayQueue.QueueElement; 047 048/** 049 * The callable queue service queues {@link XCallable}s for asynchronous execution. 050 * <p/> 051 * Callables can be queued for immediate execution or for delayed execution (some time in the future). 052 * <p/> 053 * Callables are consumed from the queue for execution based on their priority. 054 * <p/> 055 * When the queues (for immediate execution and for delayed execution) are full, the callable queue service stops 056 * queuing callables. 057 * <p/> 058 * A thread-pool is used to execute the callables asynchronously. 059 * <p/> 060 * The following configuration parameters control the callable queue service: 061 * <p/> 062 * {@link #CONF_QUEUE_SIZE} size of the immediate execution queue. Defaulf value is 10000. 063 * <p/> 064 * {@link #CONF_THREADS} number of threads in the thread-pool used for asynchronous command execution. When this number 065 * of threads is reached, commands remain the queue until threads become available. Sets up a priority queue for the 066 * execution of Commands via a ThreadPool. Sets up a Delayed Queue to handle actions which will be ready for execution 067 * sometime in the future. 068 */ 069public class CallableQueueService implements Service, Instrumentable { 070 private static final String INSTRUMENTATION_GROUP = "callablequeue"; 071 private static final String INSTR_IN_QUEUE_TIME_TIMER = "time.in.queue"; 072 private static final String INSTR_EXECUTED_COUNTER = "executed"; 073 private static final String INSTR_FAILED_COUNTER = "failed"; 074 private static final String INSTR_QUEUED_COUNTER = "queued"; 075 private static final String INSTR_QUEUE_SIZE_SAMPLER = "queue.size"; 076 private static final String INSTR_THREADS_ACTIVE_SAMPLER = "threads.active"; 077 078 public static final String CONF_PREFIX = Service.CONF_PREFIX + "CallableQueueService."; 079 080 public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size"; 081 public static final String CONF_THREADS = CONF_PREFIX + "threads"; 082 public static final String CONF_CALLABLE_CONCURRENCY = CONF_PREFIX + "callable.concurrency"; 083 public static final String CONF_CALLABLE_NEXT_ELIGIBLE = CONF_PREFIX + "callable.next.eligible"; 084 public static final String CONF_CALLABLE_INTERRUPT_TYPES = CONF_PREFIX + "InterruptTypes"; 085 public static final String CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE = CONF_PREFIX + "InterruptMapMaxSize"; 086 087 public static final int CONCURRENCY_DELAY = 500; 088 089 public static final int SAFE_MODE_DELAY = 60000; 090 091 private final Map<String, AtomicInteger> activeCallables = new HashMap<String, AtomicInteger>(); 092 093 private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>(); 094 095 private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<String, Set<XCallable<?>>>(); 096 097 public static final HashSet<String> INTERRUPT_TYPES = new HashSet<String>(); 098 099 private int interruptMapMaxSize; 100 101 private int maxCallableConcurrency; 102 103 private boolean callableBegin(XCallable<?> callable) { 104 synchronized (activeCallables) { 105 AtomicInteger counter = activeCallables.get(callable.getType()); 106 if (counter == null) { 107 counter = new AtomicInteger(1); 108 activeCallables.put(callable.getType(), counter); 109 return true; 110 } 111 else { 112 int i = counter.incrementAndGet(); 113 return i <= maxCallableConcurrency; 114 } 115 } 116 } 117 118 private void callableEnd(XCallable<?> callable) { 119 synchronized (activeCallables) { 120 AtomicInteger counter = activeCallables.get(callable.getType()); 121 if (counter == null) { 122 throw new IllegalStateException("It should not happen"); 123 } 124 else { 125 counter.decrementAndGet(); 126 } 127 } 128 } 129 130 private boolean callableReachMaxConcurrency(XCallable<?> callable) { 131 synchronized (activeCallables) { 132 AtomicInteger counter = activeCallables.get(callable.getType()); 133 if (counter == null) { 134 return true; 135 } 136 else { 137 int i = counter.get(); 138 return i < maxCallableConcurrency; 139 } 140 } 141 } 142 143 // Callables are wrapped with the this wrapper for execution, for logging 144 // and instrumentation. 145 // The wrapper implements Runnable and Comparable to be able to work with an 146 // executor and a priority queue. 147 class CallableWrapper extends PriorityDelayQueue.QueueElement<XCallable<?>> implements Runnable { 148 private Instrumentation.Cron cron; 149 150 public CallableWrapper(XCallable<?> callable, long delay) { 151 super(callable, callable.getPriority(), delay, TimeUnit.MILLISECONDS); 152 cron = new Instrumentation.Cron(); 153 cron.start(); 154 } 155 156 public void run() { 157 XCallable<?> callable = null; 158 try { 159 removeFromUniqueCallables(); 160 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) { 161 log.info("Oozie is in SAFEMODE, requeuing callable [{0}] with [{1}]ms delay", getElement().getType(), 162 SAFE_MODE_DELAY); 163 setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS); 164 queue(this, true); 165 return; 166 } 167 callable = getElement(); 168 if (callableBegin(callable)) { 169 cron.stop(); 170 addInQueueCron(cron); 171 XLog log = XLog.getLog(getClass()); 172 log.trace("executing callable [{0}]", callable.getName()); 173 174 try { 175 callable.call(); 176 incrCounter(INSTR_EXECUTED_COUNTER, 1); 177 log.trace("executed callable [{0}]", callable.getName()); 178 } 179 catch (Exception ex) { 180 incrCounter(INSTR_FAILED_COUNTER, 1); 181 log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex); 182 } 183 } 184 else { 185 log.warn("max concurrency for callable [{0}] exceeded, requeueing with [{1}]ms delay", callable 186 .getType(), CONCURRENCY_DELAY); 187 setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS); 188 queue(this, true); 189 incrCounter(callable.getType() + "#exceeded.concurrency", 1); 190 } 191 } 192 catch (Throwable t) { 193 incrCounter(INSTR_FAILED_COUNTER, 1); 194 log.warn("exception callable [{0}], {1}", callable == null ? "N/A" : callable.getName(), 195 t.getMessage(), t); 196 } 197 finally { 198 if (callable != null) { 199 callableEnd(callable); 200 } 201 } 202 } 203 204 /** 205 * Filter the duplicate callables from the list before queue this. 206 * <p/> 207 * If it is single callable, checking if key is in unique map or not. 208 * <p/> 209 * If it is composite callable, remove duplicates callables from the composite. 210 * 211 * @return true if this callable should be queued 212 */ 213 public boolean filterDuplicates() { 214 XCallable<?> callable = getElement(); 215 if (callable instanceof CompositeCallable) { 216 return ((CompositeCallable) callable).removeDuplicates(); 217 } 218 else { 219 return uniqueCallables.containsKey(callable.getKey()) == false; 220 } 221 } 222 223 /** 224 * Add the keys to the set 225 */ 226 public void addToUniqueCallables() { 227 XCallable<?> callable = getElement(); 228 if (callable instanceof CompositeCallable) { 229 ((CompositeCallable) callable).addToUniqueCallables(); 230 } 231 else { 232 ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date()); 233 } 234 } 235 236 /** 237 * Remove the keys from the set 238 */ 239 public void removeFromUniqueCallables() { 240 XCallable<?> callable = getElement(); 241 if (callable instanceof CompositeCallable) { 242 ((CompositeCallable) callable).removeFromUniqueCallables(); 243 } 244 else { 245 uniqueCallables.remove(callable.getKey()); 246 } 247 } 248 } 249 250 class CompositeCallable implements XCallable<Void> { 251 private List<XCallable<?>> callables; 252 private String name; 253 private int priority; 254 private long createdTime; 255 256 public CompositeCallable(List<? extends XCallable<?>> callables) { 257 this.callables = new ArrayList<XCallable<?>>(callables); 258 priority = 0; 259 createdTime = Long.MAX_VALUE; 260 StringBuilder sb = new StringBuilder(); 261 String separator = "["; 262 for (XCallable<?> callable : callables) { 263 priority = Math.max(priority, callable.getPriority()); 264 createdTime = Math.min(createdTime, callable.getCreatedTime()); 265 sb.append(separator).append(callable.getName()); 266 separator = ","; 267 } 268 sb.append("]"); 269 name = sb.toString(); 270 } 271 272 @Override 273 public String getName() { 274 return name; 275 } 276 277 @Override 278 public String getType() { 279 return "#composite#" + callables.get(0).getType(); 280 } 281 282 @Override 283 public String getKey() { 284 return "#composite#" + callables.get(0).getKey(); 285 } 286 287 @Override 288 public String getEntityKey() { 289 return "#composite#" + callables.get(0).getEntityKey(); 290 } 291 292 @Override 293 public int getPriority() { 294 return priority; 295 } 296 297 @Override 298 public long getCreatedTime() { 299 return createdTime; 300 } 301 302 @Override 303 public void setInterruptMode(boolean mode) { 304 } 305 306 @Override 307 public boolean inInterruptMode() { 308 return false; 309 } 310 311 public List<XCallable<?>> getCallables() { 312 return this.callables; 313 } 314 315 public Void call() throws Exception { 316 XLog log = XLog.getLog(getClass()); 317 318 for (XCallable<?> callable : callables) { 319 log.trace("executing callable [{0}]", callable.getName()); 320 try { 321 callable.call(); 322 incrCounter(INSTR_EXECUTED_COUNTER, 1); 323 log.trace("executed callable [{0}]", callable.getName()); 324 } 325 catch (Exception ex) { 326 incrCounter(INSTR_FAILED_COUNTER, 1); 327 log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex); 328 } 329 } 330 331 // ticking -1 not to count the call to the composite callable 332 incrCounter(INSTR_EXECUTED_COUNTER, -1); 333 return null; 334 } 335 336 /* 337 * (non-Javadoc) 338 * 339 * @see java.lang.Object#toString() 340 */ 341 @Override 342 public String toString() { 343 if (callables.size() == 0) { 344 return null; 345 } 346 StringBuilder sb = new StringBuilder(); 347 int size = callables.size(); 348 for (int i = 0; i < size; i++) { 349 XCallable<?> callable = callables.get(i); 350 sb.append("("); 351 sb.append(callable.toString()); 352 if (i + 1 == size) { 353 sb.append(")"); 354 } 355 else { 356 sb.append("),"); 357 } 358 } 359 return sb.toString(); 360 } 361 362 /** 363 * Remove the duplicate callables from the list before queue them 364 * 365 * @return true if callables should be queued 366 */ 367 public boolean removeDuplicates() { 368 Set<String> set = new HashSet<String>(); 369 List<XCallable<?>> filteredCallables = new ArrayList<XCallable<?>>(); 370 if (callables.size() == 0) { 371 return false; 372 } 373 for (XCallable<?> callable : callables) { 374 if (!uniqueCallables.containsKey(callable.getKey()) && !set.contains(callable.getKey())) { 375 filteredCallables.add(callable); 376 set.add(callable.getKey()); 377 } 378 } 379 callables = filteredCallables; 380 if (callables.size() == 0) { 381 return false; 382 } 383 return true; 384 } 385 386 /** 387 * Add the keys to the set 388 */ 389 public void addToUniqueCallables() { 390 for (XCallable<?> callable : callables) { 391 ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date()); 392 } 393 } 394 395 /** 396 * Remove the keys from the set 397 */ 398 public void removeFromUniqueCallables() { 399 for (XCallable<?> callable : callables) { 400 uniqueCallables.remove(callable.getKey()); 401 } 402 } 403 } 404 405 private XLog log = XLog.getLog(getClass()); 406 407 private int queueSize; 408 private PriorityDelayQueue<CallableWrapper> queue; 409 private ThreadPoolExecutor executor; 410 private Instrumentation instrumentation; 411 412 /** 413 * Convenience method for instrumentation counters. 414 * 415 * @param name counter name. 416 * @param count count to increment the counter. 417 */ 418 private void incrCounter(String name, int count) { 419 if (instrumentation != null) { 420 instrumentation.incr(INSTRUMENTATION_GROUP, name, count); 421 } 422 } 423 424 private void addInQueueCron(Instrumentation.Cron cron) { 425 if (instrumentation != null) { 426 instrumentation.addCron(INSTRUMENTATION_GROUP, INSTR_IN_QUEUE_TIME_TIMER, cron); 427 } 428 } 429 430 /** 431 * Initialize the command queue service. 432 * 433 * @param services services instance. 434 */ 435 @Override 436 @SuppressWarnings({ "unchecked", "rawtypes" }) 437 public void init(Services services) { 438 Configuration conf = services.getConf(); 439 440 queueSize = ConfigurationService.getInt(conf, CONF_QUEUE_SIZE); 441 int threads = ConfigurationService.getInt(conf, CONF_THREADS); 442 boolean callableNextEligible = ConfigurationService.getBoolean(conf, CONF_CALLABLE_NEXT_ELIGIBLE); 443 444 for (String type : ConfigurationService.getStrings(conf, CONF_CALLABLE_INTERRUPT_TYPES)) { 445 log.debug("Adding interrupt type [{0}]", type); 446 INTERRUPT_TYPES.add(type); 447 } 448 449 if (!callableNextEligible) { 450 queue = new PriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) { 451 @Override 452 protected void debug(String msgTemplate, Object... msgArgs) { 453 log.trace(msgTemplate, msgArgs); 454 } 455 }; 456 } 457 else { 458 // If the head of this queue has already reached max concurrency, 459 // continuously find next one 460 // which has not yet reach max concurrency.Overrided method 461 // 'eligibleToPoll' to check if the 462 // element of this queue has reached the maximum concurrency. 463 queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) { 464 @Override 465 protected void debug(String msgTemplate, Object... msgArgs) { 466 log.trace(msgTemplate, msgArgs); 467 } 468 469 @Override 470 protected boolean eligibleToPoll(QueueElement<?> element) { 471 if (element != null) { 472 CallableWrapper wrapper = (CallableWrapper) element; 473 if (element.getElement() != null) { 474 return callableReachMaxConcurrency(wrapper.getElement()); 475 } 476 } 477 return false; 478 } 479 480 }; 481 } 482 483 interruptMapMaxSize = ConfigurationService.getInt(conf, CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE); 484 485 // IMPORTANT: The ThreadPoolExecutor does not always the execute 486 // commands out of the queue, there are 487 // certain conditions where commands are pushed directly to a thread. 488 // As we are using a queue with DELAYED semantics (i.e. execute the 489 // command in 5 mins) we need to make 490 // sure that the commands are always pushed to the queue. 491 // To achieve this (by looking a the ThreadPoolExecutor.execute() 492 // implementation, we are making the pool 493 // minimum size equals to the maximum size (thus threads are keep always 494 // running) and we are warming up 495 // all those threads (the for loop that runs dummy runnables). 496 executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue){ 497 protected void beforeExecute(Thread t, Runnable r) { 498 super.beforeExecute(t,r); 499 XLog.Info.get().clear(); 500 } 501 }; 502 503 for (int i = 0; i < threads; i++) { 504 executor.execute(new Runnable() { 505 public void run() { 506 try { 507 Thread.sleep(100); 508 } 509 catch (InterruptedException ex) { 510 log.warn("Could not warm up threadpool {0}", ex.getMessage(), ex); 511 } 512 } 513 }); 514 } 515 516 maxCallableConcurrency = ConfigurationService.getInt(conf, CONF_CALLABLE_CONCURRENCY); 517 } 518 519 /** 520 * Destroy the command queue service. 521 */ 522 @Override 523 public void destroy() { 524 try { 525 long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds 526 executor.shutdown(); 527 queue.clear(); 528 while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) { 529 log.info("Waiting for executor to shutdown"); 530 if (System.currentTimeMillis() > limit) { 531 log.warn("Gave up, continuing without waiting for executor to shutdown"); 532 break; 533 } 534 } 535 } 536 catch (InterruptedException ex) { 537 log.warn(ex); 538 } 539 } 540 541 /** 542 * Return the public interface for command queue service. 543 * 544 * @return {@link CallableQueueService}. 545 */ 546 @Override 547 public Class<? extends Service> getInterface() { 548 return CallableQueueService.class; 549 } 550 551 /** 552 * @return int size of queue 553 */ 554 public synchronized int queueSize() { 555 return queue.size(); 556 } 557 558 private synchronized boolean queue(CallableWrapper wrapper, boolean ignoreQueueSize) { 559 if (!ignoreQueueSize && queue.size() >= queueSize) { 560 log.warn("queue full, ignoring queuing for [{0}]", wrapper.getElement().getKey()); 561 return false; 562 } 563 if (!executor.isShutdown()) { 564 if (wrapper.filterDuplicates()) { 565 wrapper.addToUniqueCallables(); 566 try { 567 executor.execute(wrapper); 568 } 569 catch (Throwable ree) { 570 wrapper.removeFromUniqueCallables(); 571 throw new RuntimeException(ree); 572 } 573 } 574 } 575 else { 576 log.warn("Executor shutting down, ignoring queueing of [{0}]", wrapper.getElement().getKey()); 577 } 578 return true; 579 } 580 581 /** 582 * Queue a callable for asynchronous execution. 583 * 584 * @param callable callable to queue. 585 * @return <code>true</code> if the callable was queued, <code>false</code> if the queue is full and the callable 586 * was not queued. 587 */ 588 public boolean queue(XCallable<?> callable) { 589 return queue(callable, 0); 590 } 591 592 /** 593 * Queue a list of callables for serial execution. 594 * <p/> 595 * Useful to serialize callables that may compete with each other for resources. 596 * <p/> 597 * All callables will be processed with the priority of the highest priority of all callables. 598 * 599 * @param callables callables to be executed by the composite callable. 600 * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables 601 * were not queued. 602 */ 603 public boolean queueSerial(List<? extends XCallable<?>> callables) { 604 return queueSerial(callables, 0); 605 } 606 607 /** 608 * Queue a callable for asynchronous execution sometime in the future. 609 * 610 * @param callable callable to queue for delayed execution 611 * @param delay time, in milliseconds, that the callable should be delayed. 612 * @return <code>true</code> if the callable was queued, <code>false</code> 613 * if the queue is full and the callable was not queued. 614 */ 615 public synchronized boolean queue(XCallable<?> callable, long delay) { 616 if (callable == null) { 617 return true; 618 } 619 boolean queued = false; 620 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) { 621 log.warn("[queue] System is in SAFEMODE. Hence no callable is queued. current queue size " + queue.size()); 622 } 623 else { 624 checkInterruptTypes(callable); 625 queued = queue(new CallableWrapper(callable, delay), false); 626 if (queued) { 627 incrCounter(INSTR_QUEUED_COUNTER, 1); 628 } 629 else { 630 log.warn("Could not queue callable"); 631 } 632 } 633 return queued; 634 } 635 636 /** 637 * Queue a list of callables for serial execution sometime in the future. 638 * <p/> 639 * Useful to serialize callables that may compete with each other for resources. 640 * <p/> 641 * All callables will be processed with the priority of the highest priority of all callables. 642 * 643 * @param callables callables to be executed by the composite callable. 644 * @param delay time, in milliseconds, that the callable should be delayed. 645 * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables 646 * were not queued. 647 */ 648 public synchronized boolean queueSerial(List<? extends XCallable<?>> callables, long delay) { 649 boolean queued; 650 if (callables == null || callables.size() == 0) { 651 queued = true; 652 } 653 else if (callables.size() == 1) { 654 queued = queue(callables.get(0), delay); 655 } 656 else { 657 XCallable<?> callable = new CompositeCallable(callables); 658 queued = queue(callable, delay); 659 if (queued) { 660 incrCounter(INSTR_QUEUED_COUNTER, callables.size()); 661 } 662 } 663 return queued; 664 } 665 666 /** 667 * Instruments the callable queue service. 668 * 669 * @param instr instance to instrument the callable queue service to. 670 */ 671 public void instrument(Instrumentation instr) { 672 instrumentation = instr; 673 instr.addSampler(INSTRUMENTATION_GROUP, INSTR_QUEUE_SIZE_SAMPLER, 60, 1, new Instrumentation.Variable<Long>() { 674 public Long getValue() { 675 return (long) queue.size(); 676 } 677 }); 678 instr.addSampler(INSTRUMENTATION_GROUP, INSTR_THREADS_ACTIVE_SAMPLER, 60, 1, 679 new Instrumentation.Variable<Long>() { 680 public Long getValue() { 681 return (long) executor.getActiveCount(); 682 } 683 }); 684 } 685 686 /** 687 * check the interrupt map for the existence of an interrupt commands if 688 * exist a List of Interrupt Callable for the same lock key will bereturned, 689 * otherwise it will return null 690 */ 691 public Set<XCallable<?>> checkInterrupts(String lockKey) { 692 693 if (lockKey != null) { 694 return interruptCommandsMap.remove(lockKey); 695 } 696 return null; 697 } 698 699 /** 700 * check if the callable is of an interrupt type and insert it into the map 701 * accordingly 702 * 703 * @param callable 704 */ 705 public void checkInterruptTypes(XCallable<?> callable) { 706 if ((callable instanceof CompositeCallable) && (((CompositeCallable) callable).getCallables() != null)) { 707 for (XCallable<?> singleCallable : ((CompositeCallable) callable).getCallables()) { 708 if (INTERRUPT_TYPES.contains(singleCallable.getType())) { 709 insertCallableIntoInterruptMap(singleCallable); 710 } 711 } 712 } 713 else if (INTERRUPT_TYPES.contains(callable.getType())) { 714 insertCallableIntoInterruptMap(callable); 715 } 716 } 717 718 /** 719 * insert a new callable in the Interrupt Command Map add a new element to 720 * the list or create a new list accordingly 721 * 722 * @param callable 723 */ 724 public void insertCallableIntoInterruptMap(XCallable<?> callable) { 725 if (interruptCommandsMap.size() < interruptMapMaxSize) { 726 Set<XCallable<?>> newSet = Collections.synchronizedSet(new LinkedHashSet<XCallable<?>>()); 727 Set<XCallable<?>> interruptSet = interruptCommandsMap.putIfAbsent(callable.getEntityKey(), newSet); 728 if (interruptSet == null) { 729 interruptSet = newSet; 730 } 731 if (interruptSet.add(callable)) { 732 log.trace("Inserting an interrupt element [{0}] to the interrupt map", callable.toString()); 733 } else { 734 log.trace("Interrupt element [{0}] already present", callable.toString()); 735 } 736 } 737 else { 738 log.warn( 739 "The interrupt map reached max size of [{0}], an interrupt element [{1}] will not added to the map [{1}]", 740 interruptCommandsMap.size(), callable.toString()); 741 } 742 } 743 744 /** 745 * Get the list of strings of queue dump 746 * 747 * @return the list of string that representing each CallableWrapper 748 */ 749 public List<String> getQueueDump() { 750 List<String> list = new ArrayList<String>(); 751 for (QueueElement<CallableWrapper> qe : queue) { 752 if (qe.toString() == null) { 753 continue; 754 } 755 list.add(qe.toString()); 756 } 757 return list; 758 } 759 760 /** 761 * Get the list of strings of uniqueness map dump 762 * 763 * @return the list of string that representing the key of each command in the queue 764 */ 765 public List<String> getUniqueDump() { 766 List<String> list = new ArrayList<String>(); 767 for (Entry<String, Date> entry : uniqueCallables.entrySet()) { 768 list.add(entry.toString()); 769 } 770 return list; 771 } 772 773}