001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import java.util.ArrayList; 021import java.util.Collections; 022import java.util.Date; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.LinkedHashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.Map.Entry; 030import java.util.concurrent.BlockingQueue; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.ThreadPoolExecutor; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicInteger; 035 036import org.apache.hadoop.conf.Configuration; 037import org.apache.oozie.client.OozieClient.SYSTEM_MODE; 038import org.apache.oozie.util.Instrumentable; 039import org.apache.oozie.util.Instrumentation; 040import org.apache.oozie.util.PollablePriorityDelayQueue; 041import org.apache.oozie.util.PriorityDelayQueue; 042import org.apache.oozie.util.XCallable; 043import org.apache.oozie.util.XLog; 044import org.apache.oozie.util.PriorityDelayQueue.QueueElement; 045 046/** 047 * The callable queue service queues {@link XCallable}s for asynchronous execution. 048 * <p/> 049 * Callables can be queued for immediate execution or for delayed execution (some time in the future). 050 * <p/> 051 * Callables are consumed from the queue for execution based on their priority. 052 * <p/> 053 * When the queues (for immediate execution and for delayed execution) are full, the callable queue service stops 054 * queuing callables. 055 * <p/> 056 * A thread-pool is used to execute the callables asynchronously. 057 * <p/> 058 * The following configuration parameters control the callable queue service: 059 * <p/> 060 * {@link #CONF_QUEUE_SIZE} size of the immediate execution queue. Defaulf value is 10000. 061 * <p/> 062 * {@link #CONF_THREADS} number of threads in the thread-pool used for asynchronous command execution. When this number 063 * of threads is reached, commands remain the queue until threads become available. Sets up a priority queue for the 064 * execution of Commands via a ThreadPool. Sets up a Delayed Queue to handle actions which will be ready for execution 065 * sometime in the future. 066 */ 067public class CallableQueueService implements Service, Instrumentable { 068 private static final String INSTRUMENTATION_GROUP = "callablequeue"; 069 private static final String INSTR_IN_QUEUE_TIME_TIMER = "time.in.queue"; 070 private static final String INSTR_EXECUTED_COUNTER = "executed"; 071 private static final String INSTR_FAILED_COUNTER = "failed"; 072 private static final String INSTR_QUEUED_COUNTER = "queued"; 073 private static final String INSTR_QUEUE_SIZE_SAMPLER = "queue.size"; 074 private static final String INSTR_THREADS_ACTIVE_SAMPLER = "threads.active"; 075 076 public static final String CONF_PREFIX = Service.CONF_PREFIX + "CallableQueueService."; 077 078 public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size"; 079 public static final String CONF_THREADS = CONF_PREFIX + "threads"; 080 public static final String CONF_CALLABLE_CONCURRENCY = CONF_PREFIX + "callable.concurrency"; 081 public static final String CONF_CALLABLE_NEXT_ELIGIBLE = CONF_PREFIX + "callable.next.eligible"; 082 public static final String CONF_CALLABLE_INTERRUPT_TYPES = CONF_PREFIX + "InterruptTypes"; 083 public static final String CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE = CONF_PREFIX + "InterruptMapMaxSize"; 084 085 public static final int CONCURRENCY_DELAY = 500; 086 087 public static final int SAFE_MODE_DELAY = 60000; 088 089 private final Map<String, AtomicInteger> activeCallables = new HashMap<String, AtomicInteger>(); 090 091 private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>(); 092 093 private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<String, Set<XCallable<?>>>(); 094 095 public static final HashSet<String> INTERRUPT_TYPES = new HashSet<String>(); 096 097 private int interruptMapMaxSize; 098 099 private int maxCallableConcurrency; 100 101 private boolean callableBegin(XCallable<?> callable) { 102 synchronized (activeCallables) { 103 AtomicInteger counter = activeCallables.get(callable.getType()); 104 if (counter == null) { 105 counter = new AtomicInteger(1); 106 activeCallables.put(callable.getType(), counter); 107 return true; 108 } 109 else { 110 int i = counter.incrementAndGet(); 111 return i <= maxCallableConcurrency; 112 } 113 } 114 } 115 116 private void callableEnd(XCallable<?> callable) { 117 synchronized (activeCallables) { 118 AtomicInteger counter = activeCallables.get(callable.getType()); 119 if (counter == null) { 120 throw new IllegalStateException("It should not happen"); 121 } 122 else { 123 counter.decrementAndGet(); 124 } 125 } 126 } 127 128 private boolean callableReachMaxConcurrency(XCallable<?> callable) { 129 synchronized (activeCallables) { 130 AtomicInteger counter = activeCallables.get(callable.getType()); 131 if (counter == null) { 132 return true; 133 } 134 else { 135 int i = counter.get(); 136 return i < maxCallableConcurrency; 137 } 138 } 139 } 140 141 // Callables are wrapped with the this wrapper for execution, for logging 142 // and instrumentation. 143 // The wrapper implements Runnable and Comparable to be able to work with an 144 // executor and a priority queue. 145 class CallableWrapper extends PriorityDelayQueue.QueueElement<XCallable<?>> implements Runnable { 146 private Instrumentation.Cron cron; 147 148 public CallableWrapper(XCallable<?> callable, long delay) { 149 super(callable, callable.getPriority(), delay, TimeUnit.MILLISECONDS); 150 cron = new Instrumentation.Cron(); 151 cron.start(); 152 } 153 154 public void run() { 155 XCallable<?> callable = null; 156 try { 157 removeFromUniqueCallables(); 158 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) { 159 log.info("Oozie is in SAFEMODE, requeuing callable [{0}] with [{1}]ms delay", getElement().getType(), 160 SAFE_MODE_DELAY); 161 setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS); 162 queue(this, true); 163 return; 164 } 165 callable = getElement(); 166 if (callableBegin(callable)) { 167 cron.stop(); 168 addInQueueCron(cron); 169 XLog.Info.get().clear(); 170 XLog log = XLog.getLog(getClass()); 171 log.trace("executing callable [{0}]", callable.getName()); 172 173 try { 174 callable.call(); 175 incrCounter(INSTR_EXECUTED_COUNTER, 1); 176 log.trace("executed callable [{0}]", callable.getName()); 177 } 178 catch (Exception ex) { 179 incrCounter(INSTR_FAILED_COUNTER, 1); 180 log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex); 181 } 182 finally { 183 XLog.Info.get().clear(); 184 } 185 } 186 else { 187 log.warn("max concurrency for callable [{0}] exceeded, requeueing with [{1}]ms delay", callable 188 .getType(), CONCURRENCY_DELAY); 189 setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS); 190 queue(this, true); 191 incrCounter(callable.getType() + "#exceeded.concurrency", 1); 192 } 193 } 194 catch (Throwable t) { 195 incrCounter(INSTR_FAILED_COUNTER, 1); 196 log.warn("exception callable [{0}], {1}", callable == null ? "N/A" : callable.getName(), 197 t.getMessage(), t); 198 } 199 finally { 200 if (callable != null) { 201 callableEnd(callable); 202 } 203 } 204 } 205 206 /** 207 * Filter the duplicate callables from the list before queue this. 208 * <p/> 209 * If it is single callable, checking if key is in unique map or not. 210 * <p/> 211 * If it is composite callable, remove duplicates callables from the composite. 212 * 213 * @return true if this callable should be queued 214 */ 215 public boolean filterDuplicates() { 216 XCallable<?> callable = getElement(); 217 if (callable instanceof CompositeCallable) { 218 return ((CompositeCallable) callable).removeDuplicates(); 219 } 220 else { 221 return uniqueCallables.containsKey(callable.getKey()) == false; 222 } 223 } 224 225 /** 226 * Add the keys to the set 227 */ 228 public void addToUniqueCallables() { 229 XCallable<?> callable = getElement(); 230 if (callable instanceof CompositeCallable) { 231 ((CompositeCallable) callable).addToUniqueCallables(); 232 } 233 else { 234 ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date()); 235 } 236 } 237 238 /** 239 * Remove the keys from the set 240 */ 241 public void removeFromUniqueCallables() { 242 XCallable<?> callable = getElement(); 243 if (callable instanceof CompositeCallable) { 244 ((CompositeCallable) callable).removeFromUniqueCallables(); 245 } 246 else { 247 uniqueCallables.remove(callable.getKey()); 248 } 249 } 250 } 251 252 class CompositeCallable implements XCallable<Void> { 253 private List<XCallable<?>> callables; 254 private String name; 255 private int priority; 256 private long createdTime; 257 258 public CompositeCallable(List<? extends XCallable<?>> callables) { 259 this.callables = new ArrayList<XCallable<?>>(callables); 260 priority = 0; 261 createdTime = Long.MAX_VALUE; 262 StringBuilder sb = new StringBuilder(); 263 String separator = "["; 264 for (XCallable<?> callable : callables) { 265 priority = Math.max(priority, callable.getPriority()); 266 createdTime = Math.min(createdTime, callable.getCreatedTime()); 267 sb.append(separator).append(callable.getName()); 268 separator = ","; 269 } 270 sb.append("]"); 271 name = sb.toString(); 272 } 273 274 @Override 275 public String getName() { 276 return name; 277 } 278 279 @Override 280 public String getType() { 281 return "#composite#" + callables.get(0).getType(); 282 } 283 284 @Override 285 public String getKey() { 286 return "#composite#" + callables.get(0).getKey(); 287 } 288 289 @Override 290 public String getEntityKey() { 291 return "#composite#" + callables.get(0).getEntityKey(); 292 } 293 294 @Override 295 public int getPriority() { 296 return priority; 297 } 298 299 @Override 300 public long getCreatedTime() { 301 return createdTime; 302 } 303 304 @Override 305 public void setInterruptMode(boolean mode) { 306 } 307 308 @Override 309 public boolean inInterruptMode() { 310 return false; 311 } 312 313 public List<XCallable<?>> getCallables() { 314 return this.callables; 315 } 316 317 public Void call() throws Exception { 318 XLog log = XLog.getLog(getClass()); 319 320 for (XCallable<?> callable : callables) { 321 log.trace("executing callable [{0}]", callable.getName()); 322 try { 323 callable.call(); 324 incrCounter(INSTR_EXECUTED_COUNTER, 1); 325 log.trace("executed callable [{0}]", callable.getName()); 326 } 327 catch (Exception ex) { 328 incrCounter(INSTR_FAILED_COUNTER, 1); 329 log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex); 330 } 331 } 332 333 // ticking -1 not to count the call to the composite callable 334 incrCounter(INSTR_EXECUTED_COUNTER, -1); 335 return null; 336 } 337 338 /* 339 * (non-Javadoc) 340 * 341 * @see java.lang.Object#toString() 342 */ 343 @Override 344 public String toString() { 345 if (callables.size() == 0) { 346 return null; 347 } 348 StringBuilder sb = new StringBuilder(); 349 int size = callables.size(); 350 for (int i = 0; i < size; i++) { 351 XCallable<?> callable = callables.get(i); 352 sb.append("("); 353 sb.append(callable.toString()); 354 if (i + 1 == size) { 355 sb.append(")"); 356 } 357 else { 358 sb.append("),"); 359 } 360 } 361 return sb.toString(); 362 } 363 364 /** 365 * Remove the duplicate callables from the list before queue them 366 * 367 * @return true if callables should be queued 368 */ 369 public boolean removeDuplicates() { 370 Set<String> set = new HashSet<String>(); 371 List<XCallable<?>> filteredCallables = new ArrayList<XCallable<?>>(); 372 if (callables.size() == 0) { 373 return false; 374 } 375 for (XCallable<?> callable : callables) { 376 if (!uniqueCallables.containsKey(callable.getKey()) && !set.contains(callable.getKey())) { 377 filteredCallables.add(callable); 378 set.add(callable.getKey()); 379 } 380 } 381 callables = filteredCallables; 382 if (callables.size() == 0) { 383 return false; 384 } 385 return true; 386 } 387 388 /** 389 * Add the keys to the set 390 */ 391 public void addToUniqueCallables() { 392 for (XCallable<?> callable : callables) { 393 ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date()); 394 } 395 } 396 397 /** 398 * Remove the keys from the set 399 */ 400 public void removeFromUniqueCallables() { 401 for (XCallable<?> callable : callables) { 402 uniqueCallables.remove(callable.getKey()); 403 } 404 } 405 } 406 407 private XLog log = XLog.getLog(getClass()); 408 409 private int queueSize; 410 private PriorityDelayQueue<CallableWrapper> queue; 411 private ThreadPoolExecutor executor; 412 private Instrumentation instrumentation; 413 414 /** 415 * Convenience method for instrumentation counters. 416 * 417 * @param name counter name. 418 * @param count count to increment the counter. 419 */ 420 private void incrCounter(String name, int count) { 421 if (instrumentation != null) { 422 instrumentation.incr(INSTRUMENTATION_GROUP, name, count); 423 } 424 } 425 426 private void addInQueueCron(Instrumentation.Cron cron) { 427 if (instrumentation != null) { 428 instrumentation.addCron(INSTRUMENTATION_GROUP, INSTR_IN_QUEUE_TIME_TIMER, cron); 429 } 430 } 431 432 /** 433 * Initialize the command queue service. 434 * 435 * @param services services instance. 436 */ 437 @Override 438 @SuppressWarnings({ "unchecked", "rawtypes" }) 439 public void init(Services services) { 440 Configuration conf = services.getConf(); 441 442 queueSize = conf.getInt(CONF_QUEUE_SIZE, 10000); 443 int threads = conf.getInt(CONF_THREADS, 10); 444 boolean callableNextEligible = conf.getBoolean(CONF_CALLABLE_NEXT_ELIGIBLE, true); 445 446 for (String type : conf.getStringCollection(CONF_CALLABLE_INTERRUPT_TYPES)) { 447 log.debug("Adding interrupt type [{0}]", type); 448 INTERRUPT_TYPES.add(type); 449 } 450 451 if (!callableNextEligible) { 452 queue = new PriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) { 453 @Override 454 protected void debug(String msgTemplate, Object... msgArgs) { 455 log.trace(msgTemplate, msgArgs); 456 } 457 }; 458 } 459 else { 460 // If the head of this queue has already reached max concurrency, 461 // continuously find next one 462 // which has not yet reach max concurrency.Overrided method 463 // 'eligibleToPoll' to check if the 464 // element of this queue has reached the maximum concurrency. 465 queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) { 466 @Override 467 protected void debug(String msgTemplate, Object... msgArgs) { 468 log.trace(msgTemplate, msgArgs); 469 } 470 471 @Override 472 protected boolean eligibleToPoll(QueueElement<?> element) { 473 if (element != null) { 474 CallableWrapper wrapper = (CallableWrapper) element; 475 if (element.getElement() != null) { 476 return callableReachMaxConcurrency(wrapper.getElement()); 477 } 478 } 479 return false; 480 } 481 482 }; 483 } 484 485 interruptMapMaxSize = conf.getInt(CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE, 100); 486 487 // IMPORTANT: The ThreadPoolExecutor does not always the execute 488 // commands out of the queue, there are 489 // certain conditions where commands are pushed directly to a thread. 490 // As we are using a queue with DELAYED semantics (i.e. execute the 491 // command in 5 mins) we need to make 492 // sure that the commands are always pushed to the queue. 493 // To achieve this (by looking a the ThreadPoolExecutor.execute() 494 // implementation, we are making the pool 495 // minimum size equals to the maximum size (thus threads are keep always 496 // running) and we are warming up 497 // all those threads (the for loop that runs dummy runnables). 498 executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue); 499 500 for (int i = 0; i < threads; i++) { 501 executor.execute(new Runnable() { 502 public void run() { 503 try { 504 Thread.sleep(100); 505 } 506 catch (InterruptedException ex) { 507 log.warn("Could not warm up threadpool {0}", ex.getMessage(), ex); 508 } 509 } 510 }); 511 } 512 513 maxCallableConcurrency = conf.getInt(CONF_CALLABLE_CONCURRENCY, 3); 514 } 515 516 /** 517 * Destroy the command queue service. 518 */ 519 @Override 520 public void destroy() { 521 try { 522 long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds 523 executor.shutdown(); 524 queue.clear(); 525 while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) { 526 log.info("Waiting for executor to shutdown"); 527 if (System.currentTimeMillis() > limit) { 528 log.warn("Gave up, continuing without waiting for executor to shutdown"); 529 break; 530 } 531 } 532 } 533 catch (InterruptedException ex) { 534 log.warn(ex); 535 } 536 } 537 538 /** 539 * Return the public interface for command queue service. 540 * 541 * @return {@link CallableQueueService}. 542 */ 543 @Override 544 public Class<? extends Service> getInterface() { 545 return CallableQueueService.class; 546 } 547 548 /** 549 * @return int size of queue 550 */ 551 public synchronized int queueSize() { 552 return queue.size(); 553 } 554 555 private synchronized boolean queue(CallableWrapper wrapper, boolean ignoreQueueSize) { 556 if (!ignoreQueueSize && queue.size() >= queueSize) { 557 log.warn("queue full, ignoring queuing for [{0}]", wrapper.getElement().getKey()); 558 return false; 559 } 560 if (!executor.isShutdown()) { 561 if (wrapper.filterDuplicates()) { 562 wrapper.addToUniqueCallables(); 563 try { 564 executor.execute(wrapper); 565 } 566 catch (Throwable ree) { 567 wrapper.removeFromUniqueCallables(); 568 throw new RuntimeException(ree); 569 } 570 } 571 } 572 else { 573 log.warn("Executor shutting down, ignoring queueing of [{0}]", wrapper.getElement().getKey()); 574 } 575 return true; 576 } 577 578 /** 579 * Queue a callable for asynchronous execution. 580 * 581 * @param callable callable to queue. 582 * @return <code>true</code> if the callable was queued, <code>false</code> if the queue is full and the callable 583 * was not queued. 584 */ 585 public boolean queue(XCallable<?> callable) { 586 return queue(callable, 0); 587 } 588 589 /** 590 * Queue a list of callables for serial execution. 591 * <p/> 592 * Useful to serialize callables that may compete with each other for resources. 593 * <p/> 594 * All callables will be processed with the priority of the highest priority of all callables. 595 * 596 * @param callables callables to be executed by the composite callable. 597 * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables 598 * were not queued. 599 */ 600 public boolean queueSerial(List<? extends XCallable<?>> callables) { 601 return queueSerial(callables, 0); 602 } 603 604 /** 605 * Queue a callable for asynchronous execution sometime in the future. 606 * 607 * @param callable callable to queue for delayed execution 608 * @param delay time, in milliseconds, that the callable should be delayed. 609 * @return <code>true</code> if the callable was queued, <code>false</code> 610 * if the queue is full and the callable was not queued. 611 */ 612 public synchronized boolean queue(XCallable<?> callable, long delay) { 613 if (callable == null) { 614 return true; 615 } 616 boolean queued = false; 617 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) { 618 log.warn("[queue] System is in SAFEMODE. Hence no callable is queued. current queue size " + queue.size()); 619 } 620 else { 621 checkInterruptTypes(callable); 622 queued = queue(new CallableWrapper(callable, delay), false); 623 if (queued) { 624 incrCounter(INSTR_QUEUED_COUNTER, 1); 625 } 626 else { 627 log.warn("Could not queue callable"); 628 } 629 } 630 return queued; 631 } 632 633 /** 634 * Queue a list of callables for serial execution sometime in the future. 635 * <p/> 636 * Useful to serialize callables that may compete with each other for resources. 637 * <p/> 638 * All callables will be processed with the priority of the highest priority of all callables. 639 * 640 * @param callables callables to be executed by the composite callable. 641 * @param delay time, in milliseconds, that the callable should be delayed. 642 * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables 643 * were not queued. 644 */ 645 public synchronized boolean queueSerial(List<? extends XCallable<?>> callables, long delay) { 646 boolean queued; 647 if (callables == null || callables.size() == 0) { 648 queued = true; 649 } 650 else if (callables.size() == 1) { 651 queued = queue(callables.get(0), delay); 652 } 653 else { 654 XCallable<?> callable = new CompositeCallable(callables); 655 queued = queue(callable, delay); 656 if (queued) { 657 incrCounter(INSTR_QUEUED_COUNTER, callables.size()); 658 } 659 } 660 return queued; 661 } 662 663 /** 664 * Instruments the callable queue service. 665 * 666 * @param instr instance to instrument the callable queue service to. 667 */ 668 public void instrument(Instrumentation instr) { 669 instrumentation = instr; 670 instr.addSampler(INSTRUMENTATION_GROUP, INSTR_QUEUE_SIZE_SAMPLER, 60, 1, new Instrumentation.Variable<Long>() { 671 public Long getValue() { 672 return (long) queue.size(); 673 } 674 }); 675 instr.addSampler(INSTRUMENTATION_GROUP, INSTR_THREADS_ACTIVE_SAMPLER, 60, 1, 676 new Instrumentation.Variable<Long>() { 677 public Long getValue() { 678 return (long) executor.getActiveCount(); 679 } 680 }); 681 } 682 683 /** 684 * check the interrupt map for the existence of an interrupt commands if 685 * exist a List of Interrupt Callable for the same lock key will bereturned, 686 * otherwise it will return null 687 */ 688 public Set<XCallable<?>> checkInterrupts(String lockKey) { 689 690 if (lockKey != null) { 691 return interruptCommandsMap.remove(lockKey); 692 } 693 return null; 694 } 695 696 /** 697 * check if the callable is of an interrupt type and insert it into the map 698 * accordingly 699 * 700 * @param callable 701 */ 702 public void checkInterruptTypes(XCallable<?> callable) { 703 if ((callable instanceof CompositeCallable) && (((CompositeCallable) callable).getCallables() != null)) { 704 for (XCallable<?> singleCallable : ((CompositeCallable) callable).getCallables()) { 705 if (INTERRUPT_TYPES.contains(singleCallable.getType())) { 706 insertCallableIntoInterruptMap(singleCallable); 707 } 708 } 709 } 710 else if (INTERRUPT_TYPES.contains(callable.getType())) { 711 insertCallableIntoInterruptMap(callable); 712 } 713 } 714 715 /** 716 * insert a new callable in the Interrupt Command Map add a new element to 717 * the list or create a new list accordingly 718 * 719 * @param callable 720 */ 721 public void insertCallableIntoInterruptMap(XCallable<?> callable) { 722 if (interruptCommandsMap.size() < interruptMapMaxSize) { 723 Set<XCallable<?>> newSet = Collections.synchronizedSet(new LinkedHashSet<XCallable<?>>()); 724 Set<XCallable<?>> interruptSet = interruptCommandsMap.putIfAbsent(callable.getEntityKey(), newSet); 725 if (interruptSet == null) { 726 interruptSet = newSet; 727 } 728 if (interruptSet.add(callable)) { 729 log.trace("Inserting an interrupt element [{0}] to the interrupt map", callable.toString()); 730 } else { 731 log.trace("Interrupt element [{0}] already present", callable.toString()); 732 } 733 } 734 else { 735 log.warn( 736 "The interrupt map reached max size of [{0}], an interrupt element [{1}] will not added to the map [{1}]", 737 interruptCommandsMap.size(), callable.toString()); 738 } 739 } 740 741 /** 742 * Get the list of strings of queue dump 743 * 744 * @return the list of string that representing each CallableWrapper 745 */ 746 public List<String> getQueueDump() { 747 List<String> list = new ArrayList<String>(); 748 for (QueueElement<CallableWrapper> qe : queue) { 749 if (qe.toString() == null) { 750 continue; 751 } 752 list.add(qe.toString()); 753 } 754 return list; 755 } 756 757 /** 758 * Get the list of strings of uniqueness map dump 759 * 760 * @return the list of string that representing the key of each command in the queue 761 */ 762 public List<String> getUniqueDump() { 763 List<String> list = new ArrayList<String>(); 764 for (Entry<String, Date> entry : uniqueCallables.entrySet()) { 765 list.add(entry.toString()); 766 } 767 return list; 768 } 769 770}