001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command; 020 021import org.apache.oozie.ErrorCode; 022import org.apache.oozie.FaultInjection; 023import org.apache.oozie.XException; 024import org.apache.oozie.service.CallableQueueService; 025import org.apache.oozie.service.ConfigurationService; 026import org.apache.oozie.service.EventHandlerService; 027import org.apache.oozie.service.InstrumentationService; 028import org.apache.oozie.service.MemoryLocksService; 029import org.apache.oozie.service.Services; 030import org.apache.oozie.util.Instrumentation; 031import org.apache.oozie.lock.LockToken; 032import org.apache.oozie.util.XCallable; 033import org.apache.oozie.util.XLog; 034 035import java.util.ArrayList; 036import java.util.HashMap; 037import java.util.List; 038import java.util.Map; 039import java.util.Set; 040import java.util.UUID; 041import java.util.concurrent.atomic.AtomicBoolean; 042 043/** 044 * Base class for synchronous and asynchronous commands. 045 * <p/> 046 * It enables by API the following pattern: 047 * <p/> 048 * <ul> 049 * <li>single execution: a command instance can be executed only once</li> 050 * <li>eager data loading: loads data for eager precondition check</li> 051 * <li>eager precondition check: verify precondition before obtaining lock</li> 052 * <li>data loading: loads data for precondition check and execution</li> 053 * <li>precondition check: verifies precondition for execution is still met</li> 054 * <li>locking: obtains exclusive lock on key before executing the command</li> 055 * <li>execution: command logic</li> 056 * </ul> 057 * <p/> 058 * It has built in instrumentation and logging. 059 */ 060public abstract class XCommand<T> implements XCallable<T> { 061 public static final String DEFAULT_LOCK_TIMEOUT = "oozie.command.default.lock.timeout"; 062 063 public static final String INSTRUMENTATION_GROUP = "commands"; 064 065 public static final String DEFAULT_REQUEUE_DELAY = "oozie.command.default.requeue.delay"; 066 067 public XLog LOG = XLog.getLog(getClass()); 068 069 private String key; 070 private String name; 071 private int priority; 072 private String type; 073 private long createdTime; 074 private LockToken lock; 075 private AtomicBoolean used = new AtomicBoolean(false); 076 private boolean inInterrupt = false; 077 private boolean isSynchronous = false; 078 079 private Map<Long, List<XCommand<?>>> commandQueue; 080 protected boolean dryrun = false; 081 protected Instrumentation instrumentation; 082 083 protected static EventHandlerService eventService; 084 085 /** 086 * Create a command. 087 * 088 * @param name command name. 089 * @param type command type. 090 * @param priority command priority. 091 */ 092 public XCommand(String name, String type, int priority) { 093 this.name = name; 094 this.type = type; 095 this.priority = priority; 096 this.key = name + "_" + UUID.randomUUID(); 097 createdTime = System.currentTimeMillis(); 098 instrumentation = Services.get().get(InstrumentationService.class).get(); 099 eventService = Services.get().get(EventHandlerService.class); 100 } 101 102 /** 103 * @param name command name. 104 * @param type command type. 105 * @param priority command priority. 106 * @param dryrun indicates if dryrun option is enabled. if enabled bundle will show a diagnostic output without 107 * really running the job 108 */ 109 public XCommand(String name, String type, int priority, boolean dryrun) { 110 this(name, type, priority); 111 this.dryrun = dryrun; 112 } 113 114 /** 115 * Set the thread local logInfo with the context of this command and reset log prefix. 116 */ 117 protected void setLogInfo() { 118 } 119 120 /** 121 * Return the command name. 122 * 123 * @return the command name. 124 */ 125 @Override 126 public String getName() { 127 return name; 128 } 129 130 /** 131 * Return the callable type. 132 * <p/> 133 * The command type is used for concurrency throttling in the {@link CallableQueueService}. 134 * 135 * @return the command type. 136 */ 137 @Override 138 public String getType() { 139 return type; 140 } 141 142 /** 143 * Return the priority of the command. 144 * 145 * @return the command priority. 146 */ 147 @Override 148 public int getPriority() { 149 return priority; 150 } 151 152 /** 153 * Returns the creation time of the command. 154 * 155 * @return the command creation time, in milliseconds. 156 */ 157 @Override 158 public long getCreatedTime() { 159 return createdTime; 160 } 161 162 /** 163 * Queue a command for execution after the current command execution completes. 164 * <p/> 165 * All commands queued during the execution of the current command will be queued for a single serial execution. 166 * <p/> 167 * If the command execution throws an exception, no command will be effectively queued. 168 * 169 * @param command command to queue. 170 */ 171 protected void queue(XCommand<?> command) { 172 queue(command, 0); 173 } 174 175 /** 176 * Queue a command for delayed execution after the current command execution completes. 177 * <p/> 178 * All commands queued during the execution of the current command with the same delay will be queued for a single 179 * serial execution. 180 * <p/> 181 * If the command execution throws an exception, no command will be effectively queued. 182 * 183 * @param command command to queue. 184 * @param msDelay delay in milliseconds. 185 */ 186 protected void queue(XCommand<?> command, long msDelay) { 187 if (commandQueue == null) { 188 commandQueue = new HashMap<Long, List<XCommand<?>>>(); 189 } 190 List<XCommand<?>> list = commandQueue.get(msDelay); 191 if (list == null) { 192 list = new ArrayList<XCommand<?>>(); 193 commandQueue.put(msDelay, list); 194 } 195 list.add(command); 196 } 197 198 /** 199 * Obtain an exclusive lock on the {link #getEntityKey}. 200 * <p/> 201 * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock. 202 * 203 * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock 204 * @throws CommandException thrown i the lock could not be obtained. 205 */ 206 private void acquireLock() throws InterruptedException, CommandException { 207 if (getEntityKey() == null) { 208 // no lock for null entity key 209 return; 210 } 211 lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut()); 212 if (lock == null) { 213 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".lockTimeOut", 1); 214 if (isReQueueRequired()) { 215 //if not acquire the lock, re-queue itself with default delay 216 queue(this, getRequeueDelay()); 217 LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and requeue itself [{2}]", this.toString(), getLockTimeOut(), getName()); 218 } else { 219 throw new CommandException(ErrorCode.E0606, this.toString(), getLockTimeOut()); 220 } 221 } else { 222 LOG.debug("Acquired lock for [{0}] in [{1}]", getEntityKey(), getName()); 223 } 224 } 225 226 /** 227 * Release the lock on the {link #getEntityKey}. 228 */ 229 private void releaseLock() { 230 if (lock != null) { 231 lock.release(); 232 LOG.debug("Released lock for [{0}] in [{1}]", getEntityKey(), getName()); 233 } 234 } 235 236 /** 237 * Implements the XCommand life-cycle. 238 * 239 * @return the {link #execute} return value. 240 * @throws Exception thrown if the command could not be executed. 241 */ 242 @Override 243 public final T call() throws CommandException { 244 setLogInfo(); 245 if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) && used.get()) { 246 LOG.debug("Command [{0}] key [{1}] already used for [{2}]", getName(), getEntityKey(), this.toString()); 247 return null; 248 } 249 250 commandQueue = null; 251 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions", 1); 252 Instrumentation.Cron callCron = new Instrumentation.Cron(); 253 try { 254 callCron.start(); 255 if (!isSynchronous) { 256 eagerLoadState(); 257 eagerVerifyPrecondition(); 258 } 259 try { 260 T ret = null; 261 if (!isSynchronous && isLockRequired() && !this.inInterruptMode()) { 262 Instrumentation.Cron acquireLockCron = new Instrumentation.Cron(); 263 acquireLockCron.start(); 264 acquireLock(); 265 acquireLockCron.stop(); 266 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".acquireLock", acquireLockCron); 267 } 268 // executing interrupts only in case of the lock required commands 269 if (lock != null) { 270 this.executeInterrupts(); 271 } 272 273 if (isSynchronous || !isLockRequired() || (lock != null) || this.inInterruptMode()) { 274 if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) 275 && !used.compareAndSet(false, true)) { 276 LOG.debug("Command [{0}] key [{1}] already executed for [{2}]", getName(), getEntityKey(), this.toString()); 277 return null; 278 } 279 LOG.trace("Load state for [{0}]", getEntityKey()); 280 loadState(); 281 LOG.trace("Precondition check for command [{0}] key [{1}]", getName(), getEntityKey()); 282 verifyPrecondition(); 283 LOG.debug("Execute command [{0}] key [{1}]", getName(), getEntityKey()); 284 Instrumentation.Cron executeCron = new Instrumentation.Cron(); 285 executeCron.start(); 286 ret = execute(); 287 executeCron.stop(); 288 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".execute", executeCron); 289 } 290 if (commandQueue != null) { 291 CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class); 292 for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) { 293 LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey()); 294 if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) { 295 LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, queue full", entry.getValue() 296 .size(), entry.getKey()); 297 } 298 } 299 } 300 return ret; 301 } 302 finally { 303 if (!isSynchronous && isLockRequired() && !this.inInterruptMode()) { 304 releaseLock(); 305 } 306 } 307 } 308 catch(PreconditionException pex){ 309 LOG.warn(pex.getMessage().toString() + ", Error Code: " + pex.getErrorCode().toString()); 310 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".preconditionfailed", 1); 311 return null; 312 } 313 catch (XException ex) { 314 LOG.error("XException, ", ex); 315 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".xexceptions", 1); 316 if (ex instanceof CommandException) { 317 throw (CommandException) ex; 318 } 319 else { 320 throw new CommandException(ex); 321 } 322 } 323 catch (Exception ex) { 324 LOG.error("Exception, ", ex); 325 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".exceptions", 1); 326 throw new CommandException(ErrorCode.E0607, getName(), ex.getMessage(), ex); 327 } 328 catch (Error er) { 329 LOG.error("Error, ", er); 330 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".errors", 1); 331 throw er; 332 } 333 finally { 334 FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection"); 335 callCron.stop(); 336 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".call", callCron); 337 } 338 } 339 340 /** 341 * Call this command synchronously from its caller. This benefits faster 342 * execution of command lifecycle for control nodes and kicking off 343 * subsequent actions 344 * 345 * @param callerEntityKey 346 * @return the {link #execute} return value. 347 * @throws CommandException 348 */ 349 public final T call(String callerEntityKey) throws CommandException { 350 if (!callerEntityKey.equals(this.getEntityKey())) { 351 throw new CommandException(ErrorCode.E0607, "Entity Keys mismatch during synchronous call", "caller=" 352 + callerEntityKey + ", callee=" + getEntityKey()); 353 } 354 isSynchronous = true; //setting to true so lock acquiring and release is not repeated 355 LOG.trace("Executing synchronously command [{0}] on job [{1}]", this.getName(), this.getKey()); 356 return call(); 357 } 358 359 /** 360 * Check for the existence of interrupts for the same lock key 361 * Execute them if exist. 362 * 363 */ 364 protected void executeInterrupts() { 365 CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class); 366 // getting all the list of interrupts to be executed 367 Set<XCallable<?>> callables = callableQueueService.checkInterrupts(this.getEntityKey()); 368 369 if (callables != null) { 370 // executing the list of interrupts in the given order of insertion 371 // in the list 372 for (XCallable<?> callable : callables) { 373 LOG.trace("executing interrupt callable [{0}]", callable.getName()); 374 try { 375 // executing the callable in interrupt mode 376 callable.setInterruptMode(true); 377 callable.call(); 378 LOG.trace("executed interrupt callable [{0}]", callable.getName()); 379 } 380 catch (Exception ex) { 381 LOG.warn("exception interrupt callable [{0}], {1}", callable.getName(), ex.getMessage(), ex); 382 } 383 finally { 384 // reseting the interrupt mode to false after the command is 385 // executed 386 callable.setInterruptMode(false); 387 } 388 } 389 } 390 } 391 392 /** 393 * Return the time out when acquiring a lock. 394 * <p/> 395 * The value is loaded from the Oozie configuration, the property {link #DEFAULT_LOCK_TIMEOUT}. 396 * <p/> 397 * Subclasses should override this method if they want to use a different time out. 398 * 399 * @return the lock time out in milliseconds. 400 */ 401 protected long getLockTimeOut() { 402 return Services.get().getConf().getLong(DEFAULT_LOCK_TIMEOUT, 5 * 1000); 403 } 404 405 /** 406 * Indicate if the the command requires locking. 407 * <p/> 408 * Subclasses should override this method if they require locking. 409 * 410 * @return <code>true/false</code> 411 */ 412 protected abstract boolean isLockRequired(); 413 414 /** 415 * Return the entity key for the command. 416 * <p/> 417 * 418 * @return the entity key for the command. 419 */ 420 public abstract String getEntityKey(); 421 422 /** 423 * Indicate if the the command requires to requeue itself if the lock is not acquired. 424 * <p/> 425 * Subclasses should override this method if they don't want to requeue. 426 * <p/> 427 * Default is true. 428 * 429 * @return <code>true/false</code> 430 */ 431 protected boolean isReQueueRequired() { 432 return true; 433 } 434 435 /** 436 * Load the necessary state to perform an eager precondition check. 437 * <p/> 438 * This implementation does a NOP. 439 * <p/> 440 * Subclasses should override this method and load the state needed to do an eager precondition check. 441 * <p/> 442 * A trivial implementation is calling {link #loadState}. 443 */ 444 protected void eagerLoadState() throws CommandException{ 445 } 446 447 /** 448 * Verify the precondition for the command before obtaining a lock. 449 * <p/> 450 * This implementation does a NOP. 451 * <p/> 452 * A trivial implementation is calling {link #verifyPrecondition}. 453 * 454 * @throws CommandException thrown if the precondition is not met. 455 */ 456 protected void eagerVerifyPrecondition() throws CommandException,PreconditionException { 457 } 458 459 /** 460 * Load the necessary state to perform the precondition check and to execute the command. 461 * <p/> 462 * Subclasses must implement this method and load the state needed to do the precondition check and execute the 463 * command. 464 */ 465 protected abstract void loadState() throws CommandException; 466 467 /** 468 * Verify the precondition for the command after a lock has been obtain, just before executing the command. 469 * <p/> 470 * 471 * @throws CommandException thrown if the precondition is not met. 472 */ 473 protected abstract void verifyPrecondition() throws CommandException,PreconditionException; 474 475 /** 476 * Command execution body. 477 * <p/> 478 * This method will be invoked after the {link #loadState} and {link #verifyPrecondition} methods. 479 * <p/> 480 * If the command requires locking, this method will be invoked ONLY if the lock has been acquired. 481 * 482 * @return a return value from the execution of the command, only meaningful if the command is executed 483 * synchronously. 484 * @throws CommandException thrown if the command execution failed. 485 */ 486 protected abstract T execute() throws CommandException; 487 488 489 /** 490 * Return the {@link Instrumentation} instance in use. 491 * 492 * @return the {@link Instrumentation} instance in use. 493 */ 494 protected Instrumentation getInstrumentation() { 495 return instrumentation; 496 } 497 498 /** 499 * @param used set false to the used 500 */ 501 public void resetUsed() { 502 this.used.set(false); 503 } 504 505 506 /** 507 * Return the delay time for requeue 508 * <p/> 509 * The value is loaded from the Oozie configuration, the property {link #DEFAULT_REQUEUE_DELAY}. 510 * <p/> 511 * Subclasses should override this method if they want to use a different requeue delay time 512 * 513 * @return delay time when requeue itself 514 */ 515 protected long getRequeueDelay() { 516 return ConfigurationService.getLong(DEFAULT_REQUEUE_DELAY); 517 } 518 519 /** 520 * Get command key 521 * 522 * @return command key 523 */ 524 @Override 525 public String getKey(){ 526 return this.key; 527 } 528 529 /** 530 * set the mode of execution for the callable. True if in interrupt, false 531 * if not 532 */ 533 public void setInterruptMode(boolean mode) { 534 this.inInterrupt = mode; 535 } 536 537 /** 538 * @return the mode of execution. true if it is executed as an Interrupt, 539 * false otherwise 540 */ 541 public boolean inInterruptMode() { 542 return this.inInterrupt; 543 } 544 545 /** 546 * Get XLog log 547 * 548 * @return XLog 549 */ 550 public XLog getLog() { 551 return LOG; 552 } 553 554 /** 555 * String for the command - key 556 * @return String 557 */ 558 @Override 559 public String toString() { 560 return getKey(); 561 } 562}