001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.Date; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.LinkedHashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Map.Entry; 030import java.util.Set; 031import java.util.concurrent.BlockingQueue; 032import java.util.concurrent.Callable; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.Future; 035import java.util.concurrent.RunnableFuture; 036import java.util.concurrent.ThreadPoolExecutor; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.atomic.AtomicInteger; 039 040import org.apache.hadoop.conf.Configuration; 041import org.apache.oozie.client.OozieClient.SYSTEM_MODE; 042import org.apache.oozie.util.Instrumentable; 043import org.apache.oozie.util.Instrumentation; 044import org.apache.oozie.util.NamedThreadFactory; 045import org.apache.oozie.util.PollablePriorityDelayQueue; 046import org.apache.oozie.util.PriorityDelayQueue; 047import org.apache.oozie.util.PriorityDelayQueue.QueueElement; 048import org.apache.oozie.util.XCallable; 049import org.apache.oozie.util.XLog; 050 051import com.google.common.collect.ImmutableSet; 052 053/** 054 * The callable queue service queues {@link XCallable}s for asynchronous execution. 055 * <p> 056 * Callables can be queued for immediate execution or for delayed execution (some time in the future). 057 * <p> 058 * Callables are consumed from the queue for execution based on their priority. 059 * <p> 060 * When the queues (for immediate execution and for delayed execution) are full, the callable queue service stops 061 * queuing callables. 062 * <p> 063 * A thread-pool is used to execute the callables asynchronously. 064 * <p> 065 * The following configuration parameters control the callable queue service: 066 * <p> 067 * {@link #CONF_QUEUE_SIZE} size of the immediate execution queue. Defaulf value is 10000. 068 * <p> 069 * {@link #CONF_THREADS} number of threads in the thread-pool used for asynchronous command execution. When this number 070 * of threads is reached, commands remain the queue until threads become available. Sets up a priority queue for the 071 * execution of Commands via a ThreadPool. Sets up a Delayed Queue to handle actions which will be ready for execution 072 * sometime in the future. 073 */ 074public class CallableQueueService implements Service, Instrumentable { 075 private static final String INSTRUMENTATION_GROUP = "callablequeue"; 076 private static final String INSTR_IN_QUEUE_TIME_TIMER = "time.in.queue"; 077 private static final String INSTR_EXECUTED_COUNTER = "executed"; 078 private static final String INSTR_FAILED_COUNTER = "failed"; 079 private static final String INSTR_QUEUED_COUNTER = "queued"; 080 private static final String INSTR_QUEUE_SIZE_SAMPLER = "queue.size"; 081 private static final String INSTR_THREADS_ACTIVE_SAMPLER = "threads.active"; 082 083 public static final String CONF_PREFIX = Service.CONF_PREFIX + "CallableQueueService."; 084 085 public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size"; 086 public static final String CONF_THREADS = CONF_PREFIX + "threads"; 087 public static final String CONF_CALLABLE_CONCURRENCY = CONF_PREFIX + "callable.concurrency"; 088 public static final String CONF_CALLABLE_NEXT_ELIGIBLE = CONF_PREFIX + "callable.next.eligible"; 089 public static final String CONF_CALLABLE_INTERRUPT_TYPES = CONF_PREFIX + "InterruptTypes"; 090 public static final String CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE = CONF_PREFIX + "InterruptMapMaxSize"; 091 092 public static final int CONCURRENCY_DELAY = 500; 093 094 public static final int SAFE_MODE_DELAY = 60000; 095 096 private final Map<String, AtomicInteger> activeCallables = new HashMap<String, AtomicInteger>(); 097 098 private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>(); 099 100 private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<>(); 101 102 private Set<String> interruptTypes; 103 104 private int interruptMapMaxSize; 105 106 private int maxCallableConcurrency; 107 108 private boolean callableBegin(XCallable<?> callable) { 109 synchronized (activeCallables) { 110 AtomicInteger counter = activeCallables.get(callable.getType()); 111 if (counter == null) { 112 counter = new AtomicInteger(1); 113 activeCallables.put(callable.getType(), counter); 114 return true; 115 } 116 else { 117 int i = counter.incrementAndGet(); 118 return i <= maxCallableConcurrency; 119 } 120 } 121 } 122 123 private void callableEnd(XCallable<?> callable) { 124 synchronized (activeCallables) { 125 AtomicInteger counter = activeCallables.get(callable.getType()); 126 if (counter == null) { 127 throw new IllegalStateException("It should not happen"); 128 } 129 else { 130 counter.decrementAndGet(); 131 } 132 } 133 } 134 135 private boolean callableReachMaxConcurrency(XCallable<?> callable) { 136 synchronized (activeCallables) { 137 AtomicInteger counter = activeCallables.get(callable.getType()); 138 if (counter == null) { 139 return true; 140 } 141 else { 142 int i = counter.get(); 143 return i < maxCallableConcurrency; 144 } 145 } 146 } 147 148 // Callables are wrapped with the this wrapper for execution, for logging 149 // and instrumentation. 150 // The wrapper implements Runnable and Comparable to be able to work with an 151 // executor and a priority queue. 152 public class CallableWrapper<E> extends PriorityDelayQueue.QueueElement<E> implements Runnable, Callable<E> { 153 private Instrumentation.Cron cron; 154 155 public CallableWrapper(XCallable<E> callable, long delay) { 156 super(callable, callable.getPriority(), delay, TimeUnit.MILLISECONDS); 157 cron = new Instrumentation.Cron(); 158 cron.start(); 159 } 160 161 public void run() { 162 XCallable<?> callable = null; 163 try { 164 removeFromUniqueCallables(); 165 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) { 166 log.info("Oozie is in SAFEMODE, requeuing callable [{0}] with [{1}]ms delay", getElement().getType(), 167 SAFE_MODE_DELAY); 168 setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS); 169 queue(this, true); 170 return; 171 } 172 callable = getElement(); 173 if (callableBegin(callable)) { 174 cron.stop(); 175 addInQueueCron(cron); 176 XLog log = XLog.getLog(getClass()); 177 log.trace("executing callable [{0}]", callable.getName()); 178 179 try { 180 //FutureTask.run() will invoke cllable.call() 181 super.run(); 182 incrCounter(INSTR_EXECUTED_COUNTER, 1); 183 log.trace("executed callable [{0}]", callable.getName()); 184 } 185 catch (Exception ex) { 186 incrCounter(INSTR_FAILED_COUNTER, 1); 187 log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex); 188 } 189 } 190 else { 191 log.warn("max concurrency for callable [{0}] exceeded, requeueing with [{1}]ms delay", callable 192 .getType(), CONCURRENCY_DELAY); 193 setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS); 194 queue(this, true); 195 incrCounter(callable.getType() + "#exceeded.concurrency", 1); 196 } 197 } 198 catch (Throwable t) { 199 incrCounter(INSTR_FAILED_COUNTER, 1); 200 log.warn("exception callable [{0}], {1}", callable == null ? "N/A" : callable.getName(), 201 t.getMessage(), t); 202 } 203 finally { 204 if (callable != null) { 205 callableEnd(callable); 206 } 207 } 208 } 209 210 /** 211 * Filter the duplicate callables from the list before queue this. 212 * <p> 213 * If it is single callable, checking if key is in unique map or not. 214 * <p> 215 * If it is composite callable, remove duplicates callables from the composite. 216 * 217 * @return true if this callable should be queued 218 */ 219 public boolean filterDuplicates() { 220 XCallable<?> callable = getElement(); 221 if (callable instanceof CompositeCallable) { 222 return ((CompositeCallable) callable).removeDuplicates(); 223 } 224 else { 225 return uniqueCallables.containsKey(callable.getKey()) == false; 226 } 227 } 228 229 /** 230 * Add the keys to the set 231 */ 232 public void addToUniqueCallables() { 233 XCallable<?> callable = getElement(); 234 if (callable instanceof CompositeCallable) { 235 ((CompositeCallable) callable).addToUniqueCallables(); 236 } 237 else { 238 ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date()); 239 } 240 } 241 242 /** 243 * Remove the keys from the set 244 */ 245 public void removeFromUniqueCallables() { 246 XCallable<?> callable = getElement(); 247 if (callable instanceof CompositeCallable) { 248 ((CompositeCallable) callable).removeFromUniqueCallables(); 249 } 250 else { 251 uniqueCallables.remove(callable.getKey()); 252 } 253 } 254 255 //this will not get called, bcz newTaskFor of threadpool will convert it in futureTask which is a runnable. 256 // futureTask will call the cllable.call from run method. so we override run to call super.run method. 257 @Override 258 public E call() throws Exception { 259 return null; 260 } 261 } 262 263 class CompositeCallable implements XCallable<Void> { 264 private List<XCallable<?>> callables; 265 private String name; 266 private int priority; 267 private long createdTime; 268 269 public CompositeCallable(List<? extends XCallable<?>> callables) { 270 this.callables = new ArrayList<XCallable<?>>(callables); 271 priority = 0; 272 createdTime = Long.MAX_VALUE; 273 StringBuilder sb = new StringBuilder(); 274 String separator = "["; 275 for (XCallable<?> callable : callables) { 276 priority = Math.max(priority, callable.getPriority()); 277 createdTime = Math.min(createdTime, callable.getCreatedTime()); 278 sb.append(separator).append(callable.getName()); 279 separator = ","; 280 } 281 sb.append("]"); 282 name = sb.toString(); 283 } 284 285 @Override 286 public String getName() { 287 return name; 288 } 289 290 @Override 291 public String getType() { 292 return "#composite#" + callables.get(0).getType(); 293 } 294 295 @Override 296 public String getKey() { 297 return "#composite#" + callables.get(0).getKey(); 298 } 299 300 @Override 301 public String getEntityKey() { 302 return "#composite#" + callables.get(0).getEntityKey(); 303 } 304 305 @Override 306 public int getPriority() { 307 return priority; 308 } 309 310 @Override 311 public long getCreatedTime() { 312 return createdTime; 313 } 314 315 @Override 316 public void setInterruptMode(boolean mode) { 317 } 318 319 @Override 320 public boolean inInterruptMode() { 321 return false; 322 } 323 324 public List<XCallable<?>> getCallables() { 325 return this.callables; 326 } 327 328 public Void call() throws Exception { 329 XLog log = XLog.getLog(getClass()); 330 331 for (XCallable<?> callable : callables) { 332 log.trace("executing callable [{0}]", callable.getName()); 333 try { 334 callable.call(); 335 incrCounter(INSTR_EXECUTED_COUNTER, 1); 336 log.trace("executed callable [{0}]", callable.getName()); 337 } 338 catch (Exception ex) { 339 incrCounter(INSTR_FAILED_COUNTER, 1); 340 log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex); 341 } 342 } 343 344 // ticking -1 not to count the call to the composite callable 345 incrCounter(INSTR_EXECUTED_COUNTER, -1); 346 return null; 347 } 348 349 /* 350 * (non-Javadoc) 351 * 352 * @see java.lang.Object#toString() 353 */ 354 @Override 355 public String toString() { 356 if (callables.size() == 0) { 357 return null; 358 } 359 StringBuilder sb = new StringBuilder(); 360 int size = callables.size(); 361 for (int i = 0; i < size; i++) { 362 XCallable<?> callable = callables.get(i); 363 sb.append("("); 364 sb.append(callable.toString()); 365 if (i + 1 == size) { 366 sb.append(")"); 367 } 368 else { 369 sb.append("),"); 370 } 371 } 372 return sb.toString(); 373 } 374 375 /** 376 * Remove the duplicate callables from the list before queue them 377 * 378 * @return true if callables should be queued 379 */ 380 public boolean removeDuplicates() { 381 Set<String> set = new HashSet<String>(); 382 List<XCallable<?>> filteredCallables = new ArrayList<XCallable<?>>(); 383 if (callables.size() == 0) { 384 return false; 385 } 386 for (XCallable<?> callable : callables) { 387 if (!uniqueCallables.containsKey(callable.getKey()) && !set.contains(callable.getKey())) { 388 filteredCallables.add(callable); 389 set.add(callable.getKey()); 390 } 391 } 392 callables = filteredCallables; 393 if (callables.size() == 0) { 394 return false; 395 } 396 return true; 397 } 398 399 /** 400 * Add the keys to the set 401 */ 402 public void addToUniqueCallables() { 403 for (XCallable<?> callable : callables) { 404 ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date()); 405 } 406 } 407 408 /** 409 * Remove the keys from the set 410 */ 411 public void removeFromUniqueCallables() { 412 for (XCallable<?> callable : callables) { 413 uniqueCallables.remove(callable.getKey()); 414 } 415 } 416 } 417 418 private XLog log = XLog.getLog(getClass()); 419 420 private int queueSize; 421 private PriorityDelayQueue<CallableWrapper> queue; 422 private ThreadPoolExecutor executor; 423 private Instrumentation instrumentation; 424 425 /** 426 * Convenience method for instrumentation counters. 427 * 428 * @param name counter name. 429 * @param count count to increment the counter. 430 */ 431 private void incrCounter(String name, int count) { 432 if (instrumentation != null) { 433 instrumentation.incr(INSTRUMENTATION_GROUP, name, count); 434 } 435 } 436 437 private void addInQueueCron(Instrumentation.Cron cron) { 438 if (instrumentation != null) { 439 instrumentation.addCron(INSTRUMENTATION_GROUP, INSTR_IN_QUEUE_TIME_TIMER, cron); 440 } 441 } 442 443 /** 444 * Initialize the command queue service. 445 * 446 * @param services services instance. 447 */ 448 @Override 449 @SuppressWarnings({ "unchecked", "rawtypes" }) 450 public void init(Services services) { 451 Configuration conf = services.getConf(); 452 453 queueSize = ConfigurationService.getInt(conf, CONF_QUEUE_SIZE); 454 int threads = ConfigurationService.getInt(conf, CONF_THREADS); 455 boolean callableNextEligible = ConfigurationService.getBoolean(conf, CONF_CALLABLE_NEXT_ELIGIBLE); 456 457 interruptTypes = new HashSet<>(); 458 for (String type : ConfigurationService.getStrings(conf, CONF_CALLABLE_INTERRUPT_TYPES)) { 459 log.debug("Adding interrupt type [{0}]", type); 460 interruptTypes.add(type); 461 } 462 interruptTypes = ImmutableSet.copyOf(interruptTypes); 463 464 if (!callableNextEligible) { 465 queue = new PriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) { 466 @Override 467 protected void debug(String msgTemplate, Object... msgArgs) { 468 log.trace(msgTemplate, msgArgs); 469 } 470 }; 471 } 472 else { 473 // If the head of this queue has already reached max concurrency, 474 // continuously find next one 475 // which has not yet reach max concurrency.Overrided method 476 // 'eligibleToPoll' to check if the 477 // element of this queue has reached the maximum concurrency. 478 queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) { 479 @Override 480 protected void debug(String msgTemplate, Object... msgArgs) { 481 log.trace(msgTemplate, msgArgs); 482 } 483 484 @Override 485 protected boolean eligibleToPoll(QueueElement<?> element) { 486 if (element != null) { 487 CallableWrapper wrapper = (CallableWrapper) element; 488 if (element.getElement() != null) { 489 return callableReachMaxConcurrency(wrapper.getElement()); 490 } 491 } 492 return false; 493 } 494 495 }; 496 } 497 498 interruptMapMaxSize = ConfigurationService.getInt(conf, CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE); 499 500 // IMPORTANT: The ThreadPoolExecutor does not always the execute 501 // commands out of the queue, there are 502 // certain conditions where commands are pushed directly to a thread. 503 // As we are using a queue with DELAYED semantics (i.e. execute the 504 // command in 5 mins) we need to make 505 // sure that the commands are always pushed to the queue. 506 // To achieve this (by looking a the ThreadPoolExecutor.execute() 507 // implementation, we are making the pool 508 // minimum size equals to the maximum size (thus threads are keep always 509 // running) and we are warming up 510 // all those threads (the for loop that runs dummy runnables). 511 executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue, 512 new NamedThreadFactory("CallableQueue")) { 513 protected void beforeExecute(Thread t, Runnable r) { 514 super.beforeExecute(t,r); 515 XLog.Info.get().clear(); 516 } 517 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 518 return (RunnableFuture<T>)callable; 519 } 520 }; 521 522 for (int i = 0; i < threads; i++) { 523 executor.execute(new Runnable() { 524 public void run() { 525 try { 526 Thread.sleep(100); 527 } 528 catch (InterruptedException ex) { 529 log.warn("Could not warm up threadpool {0}", ex.getMessage(), ex); 530 } 531 } 532 }); 533 } 534 535 maxCallableConcurrency = ConfigurationService.getInt(conf, CONF_CALLABLE_CONCURRENCY); 536 } 537 538 /** 539 * Destroy the command queue service. 540 */ 541 @Override 542 public void destroy() { 543 try { 544 long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds 545 executor.shutdown(); 546 queue.clear(); 547 while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) { 548 log.info("Waiting for executor to shutdown"); 549 if (System.currentTimeMillis() > limit) { 550 log.warn("Gave up, continuing without waiting for executor to shutdown"); 551 break; 552 } 553 } 554 } 555 catch (InterruptedException ex) { 556 log.warn(ex); 557 } 558 } 559 560 /** 561 * Return the public interface for command queue service. 562 * 563 * @return {@link CallableQueueService}. 564 */ 565 @Override 566 public Class<? extends Service> getInterface() { 567 return CallableQueueService.class; 568 } 569 570 /** 571 * @return int size of queue 572 */ 573 public synchronized int queueSize() { 574 return queue.size(); 575 } 576 577 private synchronized boolean queue(CallableWrapper wrapper, boolean ignoreQueueSize) { 578 if (!ignoreQueueSize && queue.size() >= queueSize) { 579 log.warn("queue full, ignoring queuing for [{0}]", wrapper.getElement().getKey()); 580 return false; 581 } 582 if (!executor.isShutdown()) { 583 if (wrapper.filterDuplicates()) { 584 wrapper.addToUniqueCallables(); 585 try { 586 executor.execute(wrapper); 587 } 588 catch (Throwable ree) { 589 wrapper.removeFromUniqueCallables(); 590 throw new RuntimeException(ree); 591 } 592 } 593 } 594 else { 595 log.warn("Executor shutting down, ignoring queueing of [{0}]", wrapper.getElement().getKey()); 596 } 597 return true; 598 } 599 600 /** 601 * Queue a callable for asynchronous execution. 602 * 603 * @param callable callable to queue. 604 * @return <code>true</code> if the callable was queued, <code>false</code> if the queue is full and the callable 605 * was not queued. 606 */ 607 public boolean queue(XCallable<?> callable) { 608 return queue(callable, 0); 609 } 610 611 /** 612 * Queue a list of callables for serial execution. 613 * <p> 614 * Useful to serialize callables that may compete with each other for resources. 615 * <p> 616 * All callables will be processed with the priority of the highest priority of all callables. 617 * 618 * @param callables callables to be executed by the composite callable. 619 * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables 620 * were not queued. 621 */ 622 public boolean queueSerial(List<? extends XCallable<?>> callables) { 623 return queueSerial(callables, 0); 624 } 625 626 /** 627 * Queue a callable for asynchronous execution sometime in the future. 628 * 629 * @param callable callable to queue for delayed execution 630 * @param delay time, in milliseconds, that the callable should be delayed. 631 * @return <code>true</code> if the callable was queued, <code>false</code> 632 * if the queue is full and the callable was not queued. 633 */ 634 public synchronized boolean queue(XCallable<?> callable, long delay) { 635 if (callable == null) { 636 return true; 637 } 638 boolean queued = false; 639 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) { 640 log.warn("[queue] System is in SAFEMODE. Hence no callable is queued. current queue size " + queue.size()); 641 } 642 else { 643 checkInterruptTypes(callable); 644 queued = queue(new CallableWrapper(callable, delay), false); 645 if (queued) { 646 incrCounter(INSTR_QUEUED_COUNTER, 1); 647 } 648 else { 649 log.warn("Could not queue callable"); 650 } 651 } 652 return queued; 653 } 654 655 /** 656 * Queue a list of callables for serial execution sometime in the future. 657 * <p> 658 * Useful to serialize callables that may compete with each other for resources. 659 * <p> 660 * All callables will be processed with the priority of the highest priority of all callables. 661 * 662 * @param callables callables to be executed by the composite callable. 663 * @param delay time, in milliseconds, that the callable should be delayed. 664 * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables 665 * were not queued. 666 */ 667 public synchronized boolean queueSerial(List<? extends XCallable<?>> callables, long delay) { 668 boolean queued; 669 if (callables == null || callables.size() == 0) { 670 queued = true; 671 } 672 else if (callables.size() == 1) { 673 queued = queue(callables.get(0), delay); 674 } 675 else { 676 XCallable<?> callable = new CompositeCallable(callables); 677 queued = queue(callable, delay); 678 if (queued) { 679 incrCounter(INSTR_QUEUED_COUNTER, callables.size()); 680 } 681 } 682 return queued; 683 } 684 685 /** 686 * Instruments the callable queue service. 687 * 688 * @param instr instance to instrument the callable queue service to. 689 */ 690 public void instrument(Instrumentation instr) { 691 instrumentation = instr; 692 instr.addSampler(INSTRUMENTATION_GROUP, INSTR_QUEUE_SIZE_SAMPLER, 60, 1, new Instrumentation.Variable<Long>() { 693 public Long getValue() { 694 return (long) queue.size(); 695 } 696 }); 697 instr.addSampler(INSTRUMENTATION_GROUP, INSTR_THREADS_ACTIVE_SAMPLER, 60, 1, 698 new Instrumentation.Variable<Long>() { 699 public Long getValue() { 700 return (long) executor.getActiveCount(); 701 } 702 }); 703 } 704 705 /** 706 * check the interrupt map for the existence of an interrupt commands if 707 * exist a List of Interrupt Callable for the same lock key will bereturned, 708 * otherwise it will return null 709 */ 710 public Set<XCallable<?>> checkInterrupts(String lockKey) { 711 712 if (lockKey != null) { 713 return interruptCommandsMap.remove(lockKey); 714 } 715 return null; 716 } 717 718 /** 719 * check if the callable is of an interrupt type and insert it into the map 720 * accordingly 721 * 722 * @param callable 723 */ 724 public void checkInterruptTypes(XCallable<?> callable) { 725 if ((callable instanceof CompositeCallable) && (((CompositeCallable) callable).getCallables() != null)) { 726 for (XCallable<?> singleCallable : ((CompositeCallable) callable).getCallables()) { 727 if (interruptTypes.contains(singleCallable.getType())) { 728 insertCallableIntoInterruptMap(singleCallable); 729 } 730 } 731 } 732 else if (interruptTypes.contains(callable.getType())) { 733 insertCallableIntoInterruptMap(callable); 734 } 735 } 736 737 /** 738 * insert a new callable in the Interrupt Command Map add a new element to 739 * the list or create a new list accordingly 740 * 741 * @param callable 742 */ 743 public void insertCallableIntoInterruptMap(XCallable<?> callable) { 744 if (interruptCommandsMap.size() < interruptMapMaxSize) { 745 Set<XCallable<?>> newSet = Collections.synchronizedSet(new LinkedHashSet<XCallable<?>>()); 746 Set<XCallable<?>> interruptSet = interruptCommandsMap.putIfAbsent(callable.getEntityKey(), newSet); 747 if (interruptSet == null) { 748 interruptSet = newSet; 749 } 750 if (interruptSet.add(callable)) { 751 log.trace("Inserting an interrupt element [{0}] to the interrupt map", callable.toString()); 752 } else { 753 log.trace("Interrupt element [{0}] already present", callable.toString()); 754 } 755 } 756 else { 757 log.warn( 758 "The interrupt map reached max size of [{0}], an interrupt element [{1}] will not added to the map [{1}]", 759 interruptCommandsMap.size(), callable.toString()); 760 } 761 } 762 763 /** 764 * Get the list of strings of queue dump 765 * 766 * @return the list of string that representing each CallableWrapper 767 */ 768 public List<String> getQueueDump() { 769 List<String> list = new ArrayList<String>(); 770 for (QueueElement<CallableWrapper> qe : queue) { 771 if (qe.toString() == null) { 772 continue; 773 } 774 list.add(qe.toString()); 775 } 776 return list; 777 } 778 779 /** 780 * Get the list of strings of uniqueness map dump 781 * 782 * @return the list of string that representing the key of each command in the queue 783 */ 784 public List<String> getUniqueDump() { 785 List<String> list = new ArrayList<String>(); 786 for (Entry<String, Date> entry : uniqueCallables.entrySet()) { 787 list.add(entry.toString()); 788 } 789 return list; 790 } 791 792 // Refer executor.invokeAll 793 public <T> List<Future<T>> invokeAll(List<CallableWrapper<T>> tasks) 794 throws InterruptedException { 795 return executor.invokeAll(tasks); 796 } 797 798 public Set<String> getInterruptTypes() { 799 return interruptTypes; 800 } 801 802}