001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.util.ArrayList; 021 import java.util.Date; 022 import java.util.HashMap; 023 import java.util.HashSet; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.Set; 027 import java.util.Map.Entry; 028 import java.util.concurrent.BlockingQueue; 029 import java.util.concurrent.ConcurrentHashMap; 030 import java.util.concurrent.RejectedExecutionException; 031 import java.util.concurrent.ThreadPoolExecutor; 032 import java.util.concurrent.TimeUnit; 033 import java.util.concurrent.atomic.AtomicInteger; 034 import java.util.concurrent.atomic.AtomicLong; 035 036 import org.apache.hadoop.conf.Configuration; 037 import org.apache.oozie.client.OozieClient.SYSTEM_MODE; 038 import org.apache.oozie.util.Instrumentable; 039 import org.apache.oozie.util.Instrumentation; 040 import org.apache.oozie.util.PollablePriorityDelayQueue; 041 import org.apache.oozie.util.PriorityDelayQueue; 042 import org.apache.oozie.util.XCallable; 043 import org.apache.oozie.util.XLog; 044 import org.apache.oozie.util.PriorityDelayQueue.QueueElement; 045 046 /** 047 * The callable queue service queues {@link XCallable}s for asynchronous execution. 048 * <p/> 049 * Callables can be queued for immediate execution or for delayed execution (some time in the future). 050 * <p/> 051 * Callables are consumed from the queue for execution based on their priority. 052 * <p/> 053 * When the queues (for immediate execution and for delayed execution) are full, the callable queue service stops 054 * queuing callables. 055 * <p/> 056 * A thread-pool is used to execute the callables asynchronously. 057 * <p/> 058 * The following configuration parameters control the callable queue service: 059 * <p/> 060 * {@link #CONF_QUEUE_SIZE} size of the immediate execution queue. Defaulf value is 10000. 061 * <p/> 062 * {@link #CONF_THREADS} number of threads in the thread-pool used for asynchronous command execution. When this number 063 * of threads is reached, commands remain the queue until threads become available. Sets up a priority queue for the 064 * execution of Commands via a ThreadPool. Sets up a Delayed Queue to handle actions which will be ready for execution 065 * sometime in the future. 066 */ 067 public class CallableQueueService implements Service, Instrumentable { 068 private static final String INSTRUMENTATION_GROUP = "callablequeue"; 069 private static final String INSTR_IN_QUEUE_TIME_TIMER = "time.in.queue"; 070 private static final String INSTR_EXECUTED_COUNTER = "executed"; 071 private static final String INSTR_FAILED_COUNTER = "failed"; 072 private static final String INSTR_QUEUED_COUNTER = "queued"; 073 private static final String INSTR_QUEUE_SIZE_SAMPLER = "queue.size"; 074 private static final String INSTR_THREADS_ACTIVE_SAMPLER = "threads.active"; 075 076 public static final String CONF_PREFIX = Service.CONF_PREFIX + "CallableQueueService."; 077 078 public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size"; 079 public static final String CONF_THREADS = CONF_PREFIX + "threads"; 080 public static final String CONF_CALLABLE_CONCURRENCY = CONF_PREFIX + "callable.concurrency"; 081 public static final String CONF_CALLABLE_NEXT_ELIGIBLE = CONF_PREFIX + "callable.next.eligible"; 082 083 public static final int CONCURRENCY_DELAY = 500; 084 085 public static final int SAFE_MODE_DELAY = 60000; 086 087 private final Map<String, AtomicInteger> activeCallables = new HashMap<String, AtomicInteger>(); 088 089 private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>(); 090 091 private int maxCallableConcurrency; 092 093 private boolean callableBegin(XCallable<?> callable) { 094 synchronized (activeCallables) { 095 AtomicInteger counter = activeCallables.get(callable.getType()); 096 if (counter == null) { 097 counter = new AtomicInteger(1); 098 activeCallables.put(callable.getType(), counter); 099 return true; 100 } 101 else { 102 int i = counter.incrementAndGet(); 103 return i <= maxCallableConcurrency; 104 } 105 } 106 } 107 108 private void callableEnd(XCallable<?> callable) { 109 synchronized (activeCallables) { 110 AtomicInteger counter = activeCallables.get(callable.getType()); 111 if (counter == null) { 112 throw new IllegalStateException("It should not happen"); 113 } 114 else { 115 counter.decrementAndGet(); 116 } 117 } 118 } 119 120 private boolean callableReachMaxConcurrency(XCallable<?> callable) { 121 synchronized (activeCallables) { 122 AtomicInteger counter = activeCallables.get(callable.getType()); 123 if (counter == null) { 124 return true; 125 } 126 else { 127 int i = counter.get(); 128 return i < maxCallableConcurrency; 129 } 130 } 131 } 132 133 // Callables are wrapped with the this wrapper for execution, for logging 134 // and instrumentation. 135 // The wrapper implements Runnable and Comparable to be able to work with an 136 // executor and a priority queue. 137 class CallableWrapper extends PriorityDelayQueue.QueueElement<XCallable<?>> implements Runnable { 138 private Instrumentation.Cron cron; 139 140 public CallableWrapper(XCallable<?> callable, long delay) { 141 super(callable, callable.getPriority(), delay, TimeUnit.MILLISECONDS); 142 cron = new Instrumentation.Cron(); 143 cron.start(); 144 } 145 146 public void run() { 147 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) { 148 log.info("Oozie is in SAFEMODE, requeuing callable [{0}] with [{1}]ms delay", getElement().getType(), 149 SAFE_MODE_DELAY); 150 setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS); 151 removeFromUniqueCallables(); 152 queue(this, true); 153 return; 154 } 155 XCallable<?> callable = getElement(); 156 try { 157 if (callableBegin(callable)) { 158 cron.stop(); 159 addInQueueCron(cron); 160 XLog.Info.get().clear(); 161 XLog log = XLog.getLog(getClass()); 162 log.trace("executing callable [{0}]", callable.getName()); 163 164 removeFromUniqueCallables(); 165 try { 166 callable.call(); 167 incrCounter(INSTR_EXECUTED_COUNTER, 1); 168 log.trace("executed callable [{0}]", callable.getName()); 169 } 170 catch (Exception ex) { 171 incrCounter(INSTR_FAILED_COUNTER, 1); 172 log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex); 173 } 174 finally { 175 XLog.Info.get().clear(); 176 } 177 } 178 else { 179 log.warn("max concurrency for callable [{0}] exceeded, requeueing with [{1}]ms delay", callable 180 .getType(), CONCURRENCY_DELAY); 181 setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS); 182 removeFromUniqueCallables(); 183 queue(this, true); 184 incrCounter(callable.getType() + "#exceeded.concurrency", 1); 185 } 186 } 187 finally { 188 callableEnd(callable); 189 } 190 } 191 192 /** 193 * @return String the queue dump 194 */ 195 @Override 196 public String toString() { 197 return "delay=" + getDelay(TimeUnit.MILLISECONDS) + ", elements=" + getElement().toString(); 198 } 199 200 /** 201 * Filter the duplicate callables from the list before queue this. 202 * <p/> 203 * If it is single callable, checking if key is in unique map or not. 204 * <p/> 205 * If it is composite callable, remove duplicates callables from the composite. 206 * 207 * @return true if this callable should be queued 208 */ 209 public boolean filterDuplicates() { 210 XCallable<?> callable = getElement(); 211 if (callable instanceof CompositeCallable) { 212 return ((CompositeCallable) callable).removeDuplicates(); 213 } 214 else { 215 return uniqueCallables.containsKey(callable.getKey()) == false; 216 } 217 } 218 219 /** 220 * Add the keys to the set 221 */ 222 public void addToUniqueCallables() { 223 XCallable<?> callable = getElement(); 224 if (callable instanceof CompositeCallable) { 225 ((CompositeCallable) callable).addToUniqueCallables(); 226 } 227 else { 228 ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date()); 229 } 230 } 231 232 /** 233 * Remove the keys from the set 234 */ 235 public void removeFromUniqueCallables() { 236 XCallable<?> callable = getElement(); 237 if (callable instanceof CompositeCallable) { 238 ((CompositeCallable) callable).removeFromUniqueCallables(); 239 } 240 else { 241 uniqueCallables.remove(callable.getKey()); 242 } 243 } 244 245 } 246 247 class CompositeCallable implements XCallable<Void> { 248 private List<XCallable<?>> callables; 249 private String name; 250 private int priority; 251 private long createdTime; 252 253 public CompositeCallable(List<? extends XCallable<?>> callables) { 254 this.callables = new ArrayList<XCallable<?>>(callables); 255 priority = 0; 256 createdTime = Long.MAX_VALUE; 257 StringBuilder sb = new StringBuilder(); 258 String separator = "["; 259 for (XCallable<?> callable : callables) { 260 priority = Math.max(priority, callable.getPriority()); 261 createdTime = Math.min(createdTime, callable.getCreatedTime()); 262 sb.append(separator).append(callable.getName()); 263 separator = ","; 264 } 265 sb.append("]"); 266 name = sb.toString(); 267 } 268 269 @Override 270 public String getName() { 271 return name; 272 } 273 274 @Override 275 public String getType() { 276 return "#composite#" + callables.get(0).getType(); 277 } 278 279 @Override 280 public String getKey() { 281 return "#composite#" + callables.get(0).getKey(); 282 } 283 284 @Override 285 public int getPriority() { 286 return priority; 287 } 288 289 @Override 290 public long getCreatedTime() { 291 return createdTime; 292 } 293 294 public Void call() throws Exception { 295 XLog log = XLog.getLog(getClass()); 296 297 for (XCallable<?> callable : callables) { 298 log.trace("executing callable [{0}]", callable.getName()); 299 try { 300 callable.call(); 301 incrCounter(INSTR_EXECUTED_COUNTER, 1); 302 log.trace("executed callable [{0}]", callable.getName()); 303 } 304 catch (Exception ex) { 305 incrCounter(INSTR_FAILED_COUNTER, 1); 306 log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex); 307 } 308 } 309 310 // ticking -1 not to count the call to the composite callable 311 incrCounter(INSTR_EXECUTED_COUNTER, -1); 312 return null; 313 } 314 315 /* 316 * (non-Javadoc) 317 * 318 * @see java.lang.Object#toString() 319 */ 320 @Override 321 public String toString() { 322 if (callables.size() == 0) { 323 return null; 324 } 325 StringBuilder sb = new StringBuilder(); 326 int size = callables.size(); 327 for (int i = 0; i < size; i++) { 328 XCallable<?> callable = callables.get(i); 329 sb.append("("); 330 sb.append(callable.toString()); 331 if (i + 1 == size) { 332 sb.append(")"); 333 } 334 else { 335 sb.append("),"); 336 } 337 } 338 return sb.toString(); 339 } 340 341 /** 342 * Remove the duplicate callables from the list before queue them 343 * 344 * @return true if callables should be queued 345 */ 346 public boolean removeDuplicates() { 347 Set<String> set = new HashSet<String>(); 348 List<XCallable<?>> filteredCallables = new ArrayList<XCallable<?>>(); 349 if (callables.size() == 0) { 350 return false; 351 } 352 for (XCallable<?> callable : callables) { 353 if (!uniqueCallables.containsKey(callable.getKey()) && !set.contains(callable.getKey())) { 354 filteredCallables.add(callable); 355 set.add(callable.getKey()); 356 } 357 } 358 callables = filteredCallables; 359 if (callables.size() == 0) { 360 return false; 361 } 362 return true; 363 } 364 365 /** 366 * Add the keys to the set 367 */ 368 public void addToUniqueCallables() { 369 for (XCallable<?> callable : callables) { 370 ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date()); 371 } 372 } 373 374 /** 375 * Remove the keys from the set 376 */ 377 public void removeFromUniqueCallables() { 378 for (XCallable<?> callable : callables) { 379 uniqueCallables.remove(callable.getKey()); 380 } 381 } 382 383 } 384 385 private XLog log = XLog.getLog(getClass()); 386 387 private int queueSize; 388 private PriorityDelayQueue<CallableWrapper> queue; 389 private AtomicLong delayQueueExecCounter = new AtomicLong(0); 390 private ThreadPoolExecutor executor; 391 private Instrumentation instrumentation; 392 393 /** 394 * Convenience method for instrumentation counters. 395 * 396 * @param name counter name. 397 * @param count count to increment the counter. 398 */ 399 private void incrCounter(String name, int count) { 400 if (instrumentation != null) { 401 instrumentation.incr(INSTRUMENTATION_GROUP, name, count); 402 } 403 } 404 405 private void addInQueueCron(Instrumentation.Cron cron) { 406 if (instrumentation != null) { 407 instrumentation.addCron(INSTRUMENTATION_GROUP, INSTR_IN_QUEUE_TIME_TIMER, cron); 408 } 409 } 410 411 /** 412 * Initialize the command queue service. 413 * 414 * @param services services instance. 415 */ 416 @Override 417 @SuppressWarnings("unchecked") 418 public void init(Services services) { 419 Configuration conf = services.getConf(); 420 421 queueSize = conf.getInt(CONF_QUEUE_SIZE, 10000); 422 int threads = conf.getInt(CONF_THREADS, 10); 423 boolean callableNextEligible = conf.getBoolean(CONF_CALLABLE_NEXT_ELIGIBLE, true); 424 425 if (!callableNextEligible) { 426 queue = new PriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) { 427 @Override 428 protected void debug(String msgTemplate, Object... msgArgs) { 429 log.trace(msgTemplate, msgArgs); 430 } 431 }; 432 } 433 else { 434 // If the head of this queue has already reached max concurrency, continuously find next one 435 // which has not yet reach max concurrency.Overrided method 'eligibleToPoll' to check if the 436 // element of this queue has reached the maximum concurrency. 437 queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, 438 queueSize) { 439 @Override 440 protected void debug(String msgTemplate, Object... msgArgs) { 441 log.trace(msgTemplate, msgArgs); 442 } 443 444 @Override 445 protected boolean eligibleToPoll(QueueElement<?> element) { 446 if (element != null) { 447 CallableWrapper wrapper = (CallableWrapper) element; 448 if (element.getElement() != null) { 449 return callableReachMaxConcurrency(wrapper.getElement()); 450 } 451 } 452 return false; 453 } 454 455 }; 456 } 457 458 // IMPORTANT: The ThreadPoolExecutor does not always the execute 459 // commands out of the queue, there are 460 // certain conditions where commands are pushed directly to a thread. 461 // As we are using a queue with DELAYED semantics (i.e. execute the 462 // command in 5 mins) we need to make 463 // sure that the commands are always pushed to the queue. 464 // To achieve this (by looking a the ThreadPoolExecutor.execute() 465 // implementation, we are making the pool 466 // minimum size equals to the maximum size (thus threads are keep always 467 // running) and we are warming up 468 // all those threads (the for loop that runs dummy runnables). 469 executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue); 470 471 for (int i = 0; i < threads; i++) { 472 executor.execute(new Runnable() { 473 public void run() { 474 try { 475 Thread.sleep(100); 476 } 477 catch (InterruptedException ex) { 478 log.warn("Could not warm up threadpool {0}", ex.getMessage(), ex); 479 } 480 } 481 }); 482 } 483 484 maxCallableConcurrency = conf.getInt(CONF_CALLABLE_CONCURRENCY, 3); 485 } 486 487 /** 488 * Destroy the command queue service. 489 */ 490 @Override 491 public void destroy() { 492 try { 493 long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds 494 executor.shutdown(); 495 queue.clear(); 496 while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) { 497 log.info("Waiting for executor to shutdown"); 498 if (System.currentTimeMillis() > limit) { 499 log.warn("Gave up, continuing without waiting for executor to shutdown"); 500 break; 501 } 502 } 503 } 504 catch (InterruptedException ex) { 505 log.warn(ex); 506 } 507 } 508 509 /** 510 * Return the public interface for command queue service. 511 * 512 * @return {@link CallableQueueService}. 513 */ 514 @Override 515 public Class<? extends Service> getInterface() { 516 return CallableQueueService.class; 517 } 518 519 /** 520 * @return int size of queue 521 */ 522 public synchronized int queueSize() { 523 return queue.size(); 524 } 525 526 private synchronized boolean queue(CallableWrapper wrapper, boolean ignoreQueueSize) { 527 if (!ignoreQueueSize && queue.size() >= queueSize) { 528 log.warn("queue if full, ignoring queuing for [{0}]", wrapper.getElement()); 529 return false; 530 } 531 if (!executor.isShutdown()) { 532 if (wrapper.filterDuplicates()) { 533 wrapper.addToUniqueCallables(); 534 try { 535 executor.execute(wrapper); 536 } 537 catch (RejectedExecutionException ree) { 538 wrapper.removeFromUniqueCallables(); 539 throw ree; 540 } 541 } 542 } 543 else { 544 log.warn("Executor shutting down, ignoring queueing of [{0}]", wrapper.getElement()); 545 } 546 return true; 547 } 548 549 /** 550 * Queue a callable for asynchronous execution. 551 * 552 * @param callable callable to queue. 553 * @return <code>true</code> if the callable was queued, <code>false</code> if the queue is full and the callable 554 * was not queued. 555 */ 556 public boolean queue(XCallable<?> callable) { 557 return queue(callable, 0); 558 } 559 560 /** 561 * Queue a list of callables for serial execution. 562 * <p/> 563 * Useful to serialize callables that may compete with each other for resources. 564 * <p/> 565 * All callables will be processed with the priority of the highest priority of all callables. 566 * 567 * @param callables callables to be executed by the composite callable. 568 * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables 569 * were not queued. 570 */ 571 @SuppressWarnings("unchecked") 572 public boolean queueSerial(List<? extends XCallable<?>> callables) { 573 return queueSerial(callables, 0); 574 } 575 576 /** 577 * Queue a callable for asynchronous execution sometime in the future. 578 * 579 * @param callable callable to queue for delayed execution 580 * @param delay time, in milliseconds, that the callable should be delayed. 581 * @return <code>true</code> if the callable was queued, <code>false</code> if the queue is full and the callable 582 * was not queued. 583 */ 584 public synchronized boolean queue(XCallable<?> callable, long delay) { 585 if (callable == null) { 586 return true; 587 } 588 boolean queued = false; 589 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) { 590 log.warn("[queue] System is in SAFEMODE. Hence no callable is queued. current queue size " + queue.size()); 591 } 592 else { 593 queued = queue(new CallableWrapper(callable, delay), false); 594 if (queued) { 595 incrCounter(INSTR_QUEUED_COUNTER, 1); 596 } 597 else { 598 log.warn("Could not queue callable"); 599 } 600 } 601 return queued; 602 } 603 604 /** 605 * Queue a list of callables for serial execution sometime in the future. 606 * <p/> 607 * Useful to serialize callables that may compete with each other for resources. 608 * <p/> 609 * All callables will be processed with the priority of the highest priority of all callables. 610 * 611 * @param callables callables to be executed by the composite callable. 612 * @param delay time, in milliseconds, that the callable should be delayed. 613 * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables 614 * were not queued. 615 */ 616 @SuppressWarnings("unchecked") 617 public synchronized boolean queueSerial(List<? extends XCallable<?>> callables, long delay) { 618 boolean queued; 619 if (callables == null || callables.size() == 0) { 620 queued = true; 621 } 622 else if (callables.size() == 1) { 623 queued = queue(callables.get(0), delay); 624 } 625 else { 626 XCallable<?> callable = new CompositeCallable(callables); 627 queued = queue(callable, delay); 628 if (queued) { 629 incrCounter(INSTR_QUEUED_COUNTER, callables.size()); 630 } 631 } 632 return queued; 633 } 634 635 /** 636 * Instruments the callable queue service. 637 * 638 * @param instr instance to instrument the callable queue service to. 639 */ 640 public void instrument(Instrumentation instr) { 641 instrumentation = instr; 642 instr.addSampler(INSTRUMENTATION_GROUP, INSTR_QUEUE_SIZE_SAMPLER, 60, 1, new Instrumentation.Variable<Long>() { 643 public Long getValue() { 644 return (long) queue.size(); 645 } 646 }); 647 instr.addSampler(INSTRUMENTATION_GROUP, INSTR_THREADS_ACTIVE_SAMPLER, 60, 1, 648 new Instrumentation.Variable<Long>() { 649 public Long getValue() { 650 return (long) executor.getActiveCount(); 651 } 652 }); 653 } 654 655 /** 656 * Get the list of strings of queue dump 657 * 658 * @return the list of string that representing each CallableWrapper 659 */ 660 public List<String> getQueueDump() { 661 List<String> list = new ArrayList<String>(); 662 for (QueueElement<CallableWrapper> qe : queue) { 663 if (qe.toString() == null) { 664 continue; 665 } 666 list.add(qe.toString()); 667 } 668 return list; 669 } 670 671 /** 672 * Get the list of strings of uniqueness map dump 673 * 674 * @return the list of string that representing the key of each command in the queue 675 */ 676 public List<String> getUniqueDump() { 677 List<String> list = new ArrayList<String>(); 678 for (Entry<String, Date> entry : uniqueCallables.entrySet()) { 679 list.add(entry.toString()); 680 } 681 return list; 682 } 683 684 }