001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.util.ArrayList; 021 import java.util.Collections; 022 import java.util.Date; 023 import java.util.HashMap; 024 import java.util.HashSet; 025 import java.util.LinkedHashSet; 026 import java.util.List; 027 import java.util.Map; 028 import java.util.Set; 029 import java.util.Map.Entry; 030 import java.util.concurrent.BlockingQueue; 031 import java.util.concurrent.ConcurrentHashMap; 032 import java.util.concurrent.RejectedExecutionException; 033 import java.util.concurrent.ThreadPoolExecutor; 034 import java.util.concurrent.TimeUnit; 035 import java.util.concurrent.atomic.AtomicInteger; 036 037 import org.apache.hadoop.conf.Configuration; 038 import org.apache.oozie.client.OozieClient.SYSTEM_MODE; 039 import org.apache.oozie.util.Instrumentable; 040 import org.apache.oozie.util.Instrumentation; 041 import org.apache.oozie.util.PollablePriorityDelayQueue; 042 import org.apache.oozie.util.PriorityDelayQueue; 043 import org.apache.oozie.util.XCallable; 044 import org.apache.oozie.util.XLog; 045 import org.apache.oozie.util.PriorityDelayQueue.QueueElement; 046 047 /** 048 * The callable queue service queues {@link XCallable}s for asynchronous execution. 049 * <p/> 050 * Callables can be queued for immediate execution or for delayed execution (some time in the future). 051 * <p/> 052 * Callables are consumed from the queue for execution based on their priority. 053 * <p/> 054 * When the queues (for immediate execution and for delayed execution) are full, the callable queue service stops 055 * queuing callables. 056 * <p/> 057 * A thread-pool is used to execute the callables asynchronously. 058 * <p/> 059 * The following configuration parameters control the callable queue service: 060 * <p/> 061 * {@link #CONF_QUEUE_SIZE} size of the immediate execution queue. Defaulf value is 10000. 062 * <p/> 063 * {@link #CONF_THREADS} number of threads in the thread-pool used for asynchronous command execution. When this number 064 * of threads is reached, commands remain the queue until threads become available. Sets up a priority queue for the 065 * execution of Commands via a ThreadPool. Sets up a Delayed Queue to handle actions which will be ready for execution 066 * sometime in the future. 067 */ 068 public class CallableQueueService implements Service, Instrumentable { 069 private static final String INSTRUMENTATION_GROUP = "callablequeue"; 070 private static final String INSTR_IN_QUEUE_TIME_TIMER = "time.in.queue"; 071 private static final String INSTR_EXECUTED_COUNTER = "executed"; 072 private static final String INSTR_FAILED_COUNTER = "failed"; 073 private static final String INSTR_QUEUED_COUNTER = "queued"; 074 private static final String INSTR_QUEUE_SIZE_SAMPLER = "queue.size"; 075 private static final String INSTR_THREADS_ACTIVE_SAMPLER = "threads.active"; 076 077 public static final String CONF_PREFIX = Service.CONF_PREFIX + "CallableQueueService."; 078 079 public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size"; 080 public static final String CONF_THREADS = CONF_PREFIX + "threads"; 081 public static final String CONF_CALLABLE_CONCURRENCY = CONF_PREFIX + "callable.concurrency"; 082 public static final String CONF_CALLABLE_NEXT_ELIGIBLE = CONF_PREFIX + "callable.next.eligible"; 083 public static final String CONF_CALLABLE_INTERRUPT_TYPES = CONF_PREFIX + "InterruptTypes"; 084 public static final String CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE = CONF_PREFIX + "InterruptMapMaxSize"; 085 086 public static final int CONCURRENCY_DELAY = 500; 087 088 public static final int SAFE_MODE_DELAY = 60000; 089 090 private final Map<String, AtomicInteger> activeCallables = new HashMap<String, AtomicInteger>(); 091 092 private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>(); 093 094 private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<String, Set<XCallable<?>>>(); 095 096 public static final HashSet<String> INTERRUPT_TYPES = new HashSet<String>(); 097 098 private int interruptMapMaxSize; 099 100 private int maxCallableConcurrency; 101 102 private boolean callableBegin(XCallable<?> callable) { 103 synchronized (activeCallables) { 104 AtomicInteger counter = activeCallables.get(callable.getType()); 105 if (counter == null) { 106 counter = new AtomicInteger(1); 107 activeCallables.put(callable.getType(), counter); 108 return true; 109 } 110 else { 111 int i = counter.incrementAndGet(); 112 return i <= maxCallableConcurrency; 113 } 114 } 115 } 116 117 private void callableEnd(XCallable<?> callable) { 118 synchronized (activeCallables) { 119 AtomicInteger counter = activeCallables.get(callable.getType()); 120 if (counter == null) { 121 throw new IllegalStateException("It should not happen"); 122 } 123 else { 124 counter.decrementAndGet(); 125 } 126 } 127 } 128 129 private boolean callableReachMaxConcurrency(XCallable<?> callable) { 130 synchronized (activeCallables) { 131 AtomicInteger counter = activeCallables.get(callable.getType()); 132 if (counter == null) { 133 return true; 134 } 135 else { 136 int i = counter.get(); 137 return i < maxCallableConcurrency; 138 } 139 } 140 } 141 142 // Callables are wrapped with the this wrapper for execution, for logging 143 // and instrumentation. 144 // The wrapper implements Runnable and Comparable to be able to work with an 145 // executor and a priority queue. 146 class CallableWrapper extends PriorityDelayQueue.QueueElement<XCallable<?>> implements Runnable { 147 private Instrumentation.Cron cron; 148 149 public CallableWrapper(XCallable<?> callable, long delay) { 150 super(callable, callable.getPriority(), delay, TimeUnit.MILLISECONDS); 151 cron = new Instrumentation.Cron(); 152 cron.start(); 153 } 154 155 public void run() { 156 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) { 157 log.info("Oozie is in SAFEMODE, requeuing callable [{0}] with [{1}]ms delay", getElement().getType(), 158 SAFE_MODE_DELAY); 159 setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS); 160 removeFromUniqueCallables(); 161 queue(this, true); 162 return; 163 } 164 XCallable<?> callable = getElement(); 165 try { 166 if (callableBegin(callable)) { 167 cron.stop(); 168 addInQueueCron(cron); 169 XLog.Info.get().clear(); 170 XLog log = XLog.getLog(getClass()); 171 log.trace("executing callable [{0}]", callable.getName()); 172 173 removeFromUniqueCallables(); 174 try { 175 callable.call(); 176 incrCounter(INSTR_EXECUTED_COUNTER, 1); 177 log.trace("executed callable [{0}]", callable.getName()); 178 } 179 catch (Exception ex) { 180 incrCounter(INSTR_FAILED_COUNTER, 1); 181 log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex); 182 } 183 finally { 184 XLog.Info.get().clear(); 185 } 186 } 187 else { 188 log.warn("max concurrency for callable [{0}] exceeded, requeueing with [{1}]ms delay", callable 189 .getType(), CONCURRENCY_DELAY); 190 setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS); 191 removeFromUniqueCallables(); 192 queue(this, true); 193 incrCounter(callable.getType() + "#exceeded.concurrency", 1); 194 } 195 } 196 finally { 197 callableEnd(callable); 198 } 199 } 200 201 /** 202 * @return String the queue dump 203 */ 204 @Override 205 public String toString() { 206 return "delay=" + getDelay(TimeUnit.MILLISECONDS) + ", elements=" + getElement().toString(); 207 } 208 209 /** 210 * Filter the duplicate callables from the list before queue this. 211 * <p/> 212 * If it is single callable, checking if key is in unique map or not. 213 * <p/> 214 * If it is composite callable, remove duplicates callables from the composite. 215 * 216 * @return true if this callable should be queued 217 */ 218 public boolean filterDuplicates() { 219 XCallable<?> callable = getElement(); 220 if (callable instanceof CompositeCallable) { 221 return ((CompositeCallable) callable).removeDuplicates(); 222 } 223 else { 224 return uniqueCallables.containsKey(callable.getKey()) == false; 225 } 226 } 227 228 /** 229 * Add the keys to the set 230 */ 231 public void addToUniqueCallables() { 232 XCallable<?> callable = getElement(); 233 if (callable instanceof CompositeCallable) { 234 ((CompositeCallable) callable).addToUniqueCallables(); 235 } 236 else { 237 ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date()); 238 } 239 } 240 241 /** 242 * Remove the keys from the set 243 */ 244 public void removeFromUniqueCallables() { 245 XCallable<?> callable = getElement(); 246 if (callable instanceof CompositeCallable) { 247 ((CompositeCallable) callable).removeFromUniqueCallables(); 248 } 249 else { 250 uniqueCallables.remove(callable.getKey()); 251 } 252 } 253 } 254 255 class CompositeCallable implements XCallable<Void> { 256 private List<XCallable<?>> callables; 257 private String name; 258 private int priority; 259 private long createdTime; 260 261 public CompositeCallable(List<? extends XCallable<?>> callables) { 262 this.callables = new ArrayList<XCallable<?>>(callables); 263 priority = 0; 264 createdTime = Long.MAX_VALUE; 265 StringBuilder sb = new StringBuilder(); 266 String separator = "["; 267 for (XCallable<?> callable : callables) { 268 priority = Math.max(priority, callable.getPriority()); 269 createdTime = Math.min(createdTime, callable.getCreatedTime()); 270 sb.append(separator).append(callable.getName()); 271 separator = ","; 272 } 273 sb.append("]"); 274 name = sb.toString(); 275 } 276 277 @Override 278 public String getName() { 279 return name; 280 } 281 282 @Override 283 public String getType() { 284 return "#composite#" + callables.get(0).getType(); 285 } 286 287 @Override 288 public String getKey() { 289 return "#composite#" + callables.get(0).getKey(); 290 } 291 292 @Override 293 public String getEntityKey() { 294 return "#composite#" + callables.get(0).getEntityKey(); 295 } 296 297 @Override 298 public int getPriority() { 299 return priority; 300 } 301 302 @Override 303 public long getCreatedTime() { 304 return createdTime; 305 } 306 307 @Override 308 public void setInterruptMode(boolean mode) { 309 } 310 311 @Override 312 public boolean inInterruptMode() { 313 return false; 314 } 315 316 public List<XCallable<?>> getCallables() { 317 return this.callables; 318 } 319 320 public Void call() throws Exception { 321 XLog log = XLog.getLog(getClass()); 322 323 for (XCallable<?> callable : callables) { 324 log.trace("executing callable [{0}]", callable.getName()); 325 try { 326 callable.call(); 327 incrCounter(INSTR_EXECUTED_COUNTER, 1); 328 log.trace("executed callable [{0}]", callable.getName()); 329 } 330 catch (Exception ex) { 331 incrCounter(INSTR_FAILED_COUNTER, 1); 332 log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex); 333 } 334 } 335 336 // ticking -1 not to count the call to the composite callable 337 incrCounter(INSTR_EXECUTED_COUNTER, -1); 338 return null; 339 } 340 341 /* 342 * (non-Javadoc) 343 * 344 * @see java.lang.Object#toString() 345 */ 346 @Override 347 public String toString() { 348 if (callables.size() == 0) { 349 return null; 350 } 351 StringBuilder sb = new StringBuilder(); 352 int size = callables.size(); 353 for (int i = 0; i < size; i++) { 354 XCallable<?> callable = callables.get(i); 355 sb.append("("); 356 sb.append(callable.toString()); 357 if (i + 1 == size) { 358 sb.append(")"); 359 } 360 else { 361 sb.append("),"); 362 } 363 } 364 return sb.toString(); 365 } 366 367 /** 368 * Remove the duplicate callables from the list before queue them 369 * 370 * @return true if callables should be queued 371 */ 372 public boolean removeDuplicates() { 373 Set<String> set = new HashSet<String>(); 374 List<XCallable<?>> filteredCallables = new ArrayList<XCallable<?>>(); 375 if (callables.size() == 0) { 376 return false; 377 } 378 for (XCallable<?> callable : callables) { 379 if (!uniqueCallables.containsKey(callable.getKey()) && !set.contains(callable.getKey())) { 380 filteredCallables.add(callable); 381 set.add(callable.getKey()); 382 } 383 } 384 callables = filteredCallables; 385 if (callables.size() == 0) { 386 return false; 387 } 388 return true; 389 } 390 391 /** 392 * Add the keys to the set 393 */ 394 public void addToUniqueCallables() { 395 for (XCallable<?> callable : callables) { 396 ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date()); 397 } 398 } 399 400 /** 401 * Remove the keys from the set 402 */ 403 public void removeFromUniqueCallables() { 404 for (XCallable<?> callable : callables) { 405 uniqueCallables.remove(callable.getKey()); 406 } 407 } 408 } 409 410 private XLog log = XLog.getLog(getClass()); 411 412 private int queueSize; 413 private PriorityDelayQueue<CallableWrapper> queue; 414 private ThreadPoolExecutor executor; 415 private Instrumentation instrumentation; 416 417 /** 418 * Convenience method for instrumentation counters. 419 * 420 * @param name counter name. 421 * @param count count to increment the counter. 422 */ 423 private void incrCounter(String name, int count) { 424 if (instrumentation != null) { 425 instrumentation.incr(INSTRUMENTATION_GROUP, name, count); 426 } 427 } 428 429 private void addInQueueCron(Instrumentation.Cron cron) { 430 if (instrumentation != null) { 431 instrumentation.addCron(INSTRUMENTATION_GROUP, INSTR_IN_QUEUE_TIME_TIMER, cron); 432 } 433 } 434 435 /** 436 * Initialize the command queue service. 437 * 438 * @param services services instance. 439 */ 440 @Override 441 @SuppressWarnings({ "unchecked", "rawtypes" }) 442 public void init(Services services) { 443 Configuration conf = services.getConf(); 444 445 queueSize = conf.getInt(CONF_QUEUE_SIZE, 10000); 446 int threads = conf.getInt(CONF_THREADS, 10); 447 boolean callableNextEligible = conf.getBoolean(CONF_CALLABLE_NEXT_ELIGIBLE, true); 448 449 for (String type : conf.getStringCollection(CONF_CALLABLE_INTERRUPT_TYPES)) { 450 log.debug("Adding interrupt type [{0}]", type); 451 INTERRUPT_TYPES.add(type); 452 } 453 454 if (!callableNextEligible) { 455 queue = new PriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) { 456 @Override 457 protected void debug(String msgTemplate, Object... msgArgs) { 458 log.trace(msgTemplate, msgArgs); 459 } 460 }; 461 } 462 else { 463 // If the head of this queue has already reached max concurrency, 464 // continuously find next one 465 // which has not yet reach max concurrency.Overrided method 466 // 'eligibleToPoll' to check if the 467 // element of this queue has reached the maximum concurrency. 468 queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) { 469 @Override 470 protected void debug(String msgTemplate, Object... msgArgs) { 471 log.trace(msgTemplate, msgArgs); 472 } 473 474 @Override 475 protected boolean eligibleToPoll(QueueElement<?> element) { 476 if (element != null) { 477 CallableWrapper wrapper = (CallableWrapper) element; 478 if (element.getElement() != null) { 479 return callableReachMaxConcurrency(wrapper.getElement()); 480 } 481 } 482 return false; 483 } 484 485 }; 486 } 487 488 interruptMapMaxSize = conf.getInt(CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE, 100); 489 490 // IMPORTANT: The ThreadPoolExecutor does not always the execute 491 // commands out of the queue, there are 492 // certain conditions where commands are pushed directly to a thread. 493 // As we are using a queue with DELAYED semantics (i.e. execute the 494 // command in 5 mins) we need to make 495 // sure that the commands are always pushed to the queue. 496 // To achieve this (by looking a the ThreadPoolExecutor.execute() 497 // implementation, we are making the pool 498 // minimum size equals to the maximum size (thus threads are keep always 499 // running) and we are warming up 500 // all those threads (the for loop that runs dummy runnables). 501 executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue); 502 503 for (int i = 0; i < threads; i++) { 504 executor.execute(new Runnable() { 505 public void run() { 506 try { 507 Thread.sleep(100); 508 } 509 catch (InterruptedException ex) { 510 log.warn("Could not warm up threadpool {0}", ex.getMessage(), ex); 511 } 512 } 513 }); 514 } 515 516 maxCallableConcurrency = conf.getInt(CONF_CALLABLE_CONCURRENCY, 3); 517 } 518 519 /** 520 * Destroy the command queue service. 521 */ 522 @Override 523 public void destroy() { 524 try { 525 long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds 526 executor.shutdown(); 527 queue.clear(); 528 while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) { 529 log.info("Waiting for executor to shutdown"); 530 if (System.currentTimeMillis() > limit) { 531 log.warn("Gave up, continuing without waiting for executor to shutdown"); 532 break; 533 } 534 } 535 } 536 catch (InterruptedException ex) { 537 log.warn(ex); 538 } 539 } 540 541 /** 542 * Return the public interface for command queue service. 543 * 544 * @return {@link CallableQueueService}. 545 */ 546 @Override 547 public Class<? extends Service> getInterface() { 548 return CallableQueueService.class; 549 } 550 551 /** 552 * @return int size of queue 553 */ 554 public synchronized int queueSize() { 555 return queue.size(); 556 } 557 558 private synchronized boolean queue(CallableWrapper wrapper, boolean ignoreQueueSize) { 559 if (!ignoreQueueSize && queue.size() >= queueSize) { 560 log.warn("queue if full, ignoring queuing for [{0}]", wrapper.getElement()); 561 return false; 562 } 563 if (!executor.isShutdown()) { 564 if (wrapper.filterDuplicates()) { 565 wrapper.addToUniqueCallables(); 566 try { 567 executor.execute(wrapper); 568 } 569 catch (RejectedExecutionException ree) { 570 wrapper.removeFromUniqueCallables(); 571 throw ree; 572 } 573 } 574 } 575 else { 576 log.warn("Executor shutting down, ignoring queueing of [{0}]", wrapper.getElement()); 577 } 578 return true; 579 } 580 581 /** 582 * Queue a callable for asynchronous execution. 583 * 584 * @param callable callable to queue. 585 * @return <code>true</code> if the callable was queued, <code>false</code> if the queue is full and the callable 586 * was not queued. 587 */ 588 public boolean queue(XCallable<?> callable) { 589 return queue(callable, 0); 590 } 591 592 /** 593 * Queue a list of callables for serial execution. 594 * <p/> 595 * Useful to serialize callables that may compete with each other for resources. 596 * <p/> 597 * All callables will be processed with the priority of the highest priority of all callables. 598 * 599 * @param callables callables to be executed by the composite callable. 600 * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables 601 * were not queued. 602 */ 603 public boolean queueSerial(List<? extends XCallable<?>> callables) { 604 return queueSerial(callables, 0); 605 } 606 607 /** 608 * Queue a callable for asynchronous execution sometime in the future. 609 * 610 * @param callable callable to queue for delayed execution 611 * @param delay time, in milliseconds, that the callable should be delayed. 612 * @return <code>true</code> if the callable was queued, <code>false</code> 613 * if the queue is full and the callable was not queued. 614 */ 615 public synchronized boolean queue(XCallable<?> callable, long delay) { 616 if (callable == null) { 617 return true; 618 } 619 boolean queued = false; 620 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) { 621 log.warn("[queue] System is in SAFEMODE. Hence no callable is queued. current queue size " + queue.size()); 622 } 623 else { 624 checkInterruptTypes(callable); 625 queued = queue(new CallableWrapper(callable, delay), false); 626 if (queued) { 627 incrCounter(INSTR_QUEUED_COUNTER, 1); 628 } 629 else { 630 log.warn("Could not queue callable"); 631 } 632 } 633 return queued; 634 } 635 636 /** 637 * Queue a list of callables for serial execution sometime in the future. 638 * <p/> 639 * Useful to serialize callables that may compete with each other for resources. 640 * <p/> 641 * All callables will be processed with the priority of the highest priority of all callables. 642 * 643 * @param callables callables to be executed by the composite callable. 644 * @param delay time, in milliseconds, that the callable should be delayed. 645 * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables 646 * were not queued. 647 */ 648 public synchronized boolean queueSerial(List<? extends XCallable<?>> callables, long delay) { 649 boolean queued; 650 if (callables == null || callables.size() == 0) { 651 queued = true; 652 } 653 else if (callables.size() == 1) { 654 queued = queue(callables.get(0), delay); 655 } 656 else { 657 XCallable<?> callable = new CompositeCallable(callables); 658 queued = queue(callable, delay); 659 if (queued) { 660 incrCounter(INSTR_QUEUED_COUNTER, callables.size()); 661 } 662 } 663 return queued; 664 } 665 666 /** 667 * Instruments the callable queue service. 668 * 669 * @param instr instance to instrument the callable queue service to. 670 */ 671 public void instrument(Instrumentation instr) { 672 instrumentation = instr; 673 instr.addSampler(INSTRUMENTATION_GROUP, INSTR_QUEUE_SIZE_SAMPLER, 60, 1, new Instrumentation.Variable<Long>() { 674 public Long getValue() { 675 return (long) queue.size(); 676 } 677 }); 678 instr.addSampler(INSTRUMENTATION_GROUP, INSTR_THREADS_ACTIVE_SAMPLER, 60, 1, 679 new Instrumentation.Variable<Long>() { 680 public Long getValue() { 681 return (long) executor.getActiveCount(); 682 } 683 }); 684 } 685 686 /** 687 * check the interrupt map for the existence of an interrupt commands if 688 * exist a List of Interrupt Callable for the same lock key will bereturned, 689 * otherwise it will return null 690 */ 691 public Set<XCallable<?>> checkInterrupts(String lockKey) { 692 693 if (lockKey != null) { 694 return interruptCommandsMap.remove(lockKey); 695 } 696 return null; 697 } 698 699 /** 700 * check if the callable is of an interrupt type and insert it into the map 701 * accordingly 702 * 703 * @param callable 704 */ 705 public void checkInterruptTypes(XCallable<?> callable) { 706 if ((callable instanceof CompositeCallable) && (((CompositeCallable) callable).getCallables() != null)) { 707 for (XCallable<?> singleCallable : ((CompositeCallable) callable).getCallables()) { 708 if (INTERRUPT_TYPES.contains(singleCallable.getType())) { 709 insertCallableIntoInterruptMap(singleCallable); 710 } 711 } 712 } 713 else if (INTERRUPT_TYPES.contains(callable.getType())) { 714 insertCallableIntoInterruptMap(callable); 715 } 716 } 717 718 /** 719 * insert a new callable in the Interrupt Command Map add a new element to 720 * the list or create a new list accordingly 721 * 722 * @param callable 723 */ 724 public void insertCallableIntoInterruptMap(XCallable<?> callable) { 725 if (interruptCommandsMap.size() < interruptMapMaxSize) { 726 Set<XCallable<?>> newSet = Collections.synchronizedSet(new LinkedHashSet<XCallable<?>>()); 727 Set<XCallable<?>> interruptSet = interruptCommandsMap.putIfAbsent(callable.getEntityKey(), newSet); 728 if (interruptSet == null) { 729 interruptSet = newSet; 730 } 731 if (interruptSet.add(callable)) { 732 log.trace("Inserting an interrupt element [{0}] to the interrupt map", callable.toString()); 733 } else { 734 log.trace("Interrupt element [{0}] already present", callable.toString()); 735 } 736 } 737 else { 738 log.warn( 739 "The interrupt map reached max size of [{0}], an interrupt element [{1}] will not added to the map [{1}]", 740 interruptCommandsMap.size(), callable.toString()); 741 } 742 } 743 744 /** 745 * Get the list of strings of queue dump 746 * 747 * @return the list of string that representing each CallableWrapper 748 */ 749 public List<String> getQueueDump() { 750 List<String> list = new ArrayList<String>(); 751 for (QueueElement<CallableWrapper> qe : queue) { 752 if (qe.toString() == null) { 753 continue; 754 } 755 list.add(qe.toString()); 756 } 757 return list; 758 } 759 760 /** 761 * Get the list of strings of uniqueness map dump 762 * 763 * @return the list of string that representing the key of each command in the queue 764 */ 765 public List<String> getUniqueDump() { 766 List<String> list = new ArrayList<String>(); 767 for (Entry<String, Date> entry : uniqueCallables.entrySet()) { 768 list.add(entry.toString()); 769 } 770 return list; 771 } 772 773 }