001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command; 019 020import org.apache.oozie.ErrorCode; 021import org.apache.oozie.FaultInjection; 022import org.apache.oozie.XException; 023import org.apache.oozie.service.CallableQueueService; 024import org.apache.oozie.service.EventHandlerService; 025import org.apache.oozie.service.InstrumentationService; 026import org.apache.oozie.service.MemoryLocksService; 027import org.apache.oozie.service.Services; 028import org.apache.oozie.util.Instrumentation; 029import org.apache.oozie.lock.LockToken; 030import org.apache.oozie.util.XCallable; 031import org.apache.oozie.util.XLog; 032 033import java.util.ArrayList; 034import java.util.HashMap; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.UUID; 039import java.util.concurrent.atomic.AtomicBoolean; 040 041/** 042 * Base class for synchronous and asynchronous commands. 043 * <p/> 044 * It enables by API the following pattern: 045 * <p/> 046 * <ul> 047 * <li>single execution: a command instance can be executed only once</li> 048 * <li>eager data loading: loads data for eager precondition check</li> 049 * <li>eager precondition check: verify precondition before obtaining lock</li> 050 * <li>data loading: loads data for precondition check and execution</li> 051 * <li>precondition check: verifies precondition for execution is still met</li> 052 * <li>locking: obtains exclusive lock on key before executing the command</li> 053 * <li>execution: command logic</li> 054 * </ul> 055 * <p/> 056 * It has built in instrumentation and logging. 057 */ 058public abstract class XCommand<T> implements XCallable<T> { 059 public static final String DEFAULT_LOCK_TIMEOUT = "oozie.command.default.lock.timeout"; 060 061 public static final String INSTRUMENTATION_GROUP = "commands"; 062 063 public static final Long DEFAULT_REQUEUE_DELAY = 10L; 064 065 public XLog LOG = XLog.getLog(getClass()); 066 067 private String key; 068 private String name; 069 private int priority; 070 private String type; 071 private long createdTime; 072 private LockToken lock; 073 private AtomicBoolean used = new AtomicBoolean(false); 074 private boolean inInterrupt = false; 075 private boolean isSynchronous = false; 076 077 private Map<Long, List<XCommand<?>>> commandQueue; 078 protected boolean dryrun = false; 079 protected Instrumentation instrumentation; 080 081 protected XLog.Info logInfo; 082 protected static EventHandlerService eventService; 083 084 /** 085 * Create a command. 086 * 087 * @param name command name. 088 * @param type command type. 089 * @param priority command priority. 090 */ 091 public XCommand(String name, String type, int priority) { 092 this.name = name; 093 this.type = type; 094 this.priority = priority; 095 this.key = name + "_" + UUID.randomUUID(); 096 createdTime = System.currentTimeMillis(); 097 logInfo = new XLog.Info(); 098 instrumentation = Services.get().get(InstrumentationService.class).get(); 099 eventService = Services.get().get(EventHandlerService.class); 100 } 101 102 /** 103 * @param name command name. 104 * @param type command type. 105 * @param priority command priority. 106 * @param dryrun indicates if dryrun option is enabled. if enabled bundle will show a diagnostic output without 107 * really running the job 108 */ 109 public XCommand(String name, String type, int priority, boolean dryrun) { 110 this(name, type, priority); 111 this.dryrun = dryrun; 112 } 113 114 /** 115 * Return the command name. 116 * 117 * @return the command name. 118 */ 119 @Override 120 public String getName() { 121 return name; 122 } 123 124 /** 125 * Return the callable type. 126 * <p/> 127 * The command type is used for concurrency throttling in the {@link CallableQueueService}. 128 * 129 * @return the command type. 130 */ 131 @Override 132 public String getType() { 133 return type; 134 } 135 136 /** 137 * Return the priority of the command. 138 * 139 * @return the command priority. 140 */ 141 @Override 142 public int getPriority() { 143 return priority; 144 } 145 146 /** 147 * Returns the creation time of the command. 148 * 149 * @return the command creation time, in milliseconds. 150 */ 151 @Override 152 public long getCreatedTime() { 153 return createdTime; 154 } 155 156 /** 157 * Queue a command for execution after the current command execution completes. 158 * <p/> 159 * All commands queued during the execution of the current command will be queued for a single serial execution. 160 * <p/> 161 * If the command execution throws an exception, no command will be effectively queued. 162 * 163 * @param command command to queue. 164 */ 165 protected void queue(XCommand<?> command) { 166 queue(command, 0); 167 } 168 169 /** 170 * Queue a command for delayed execution after the current command execution completes. 171 * <p/> 172 * All commands queued during the execution of the current command with the same delay will be queued for a single 173 * serial execution. 174 * <p/> 175 * If the command execution throws an exception, no command will be effectively queued. 176 * 177 * @param command command to queue. 178 * @param msDelay delay in milliseconds. 179 */ 180 protected void queue(XCommand<?> command, long msDelay) { 181 if (commandQueue == null) { 182 commandQueue = new HashMap<Long, List<XCommand<?>>>(); 183 } 184 List<XCommand<?>> list = commandQueue.get(msDelay); 185 if (list == null) { 186 list = new ArrayList<XCommand<?>>(); 187 commandQueue.put(msDelay, list); 188 } 189 list.add(command); 190 } 191 192 /** 193 * Obtain an exclusive lock on the {link #getEntityKey}. 194 * <p/> 195 * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock. 196 * 197 * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock 198 * @throws CommandException thrown i the lock could not be obtained. 199 */ 200 private void acquireLock() throws InterruptedException, CommandException { 201 if (getEntityKey() == null) { 202 // no lock for null entity key 203 return; 204 } 205 lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut()); 206 if (lock == null) { 207 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".lockTimeOut", 1); 208 if (isReQueueRequired()) { 209 //if not acquire the lock, re-queue itself with default delay 210 queue(this, getRequeueDelay()); 211 LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and requeue itself [{2}]", this.toString(), getLockTimeOut(), getName()); 212 } else { 213 throw new CommandException(ErrorCode.E0606, this.toString(), getLockTimeOut()); 214 } 215 } else { 216 LOG.debug("Acquired lock for [{0}] in [{1}]", getEntityKey(), getName()); 217 } 218 } 219 220 /** 221 * Release the lock on the {link #getEntityKey}. 222 */ 223 private void releaseLock() { 224 if (lock != null) { 225 lock.release(); 226 LOG.debug("Released lock for [{0}] in [{1}]", getEntityKey(), getName()); 227 } 228 } 229 230 /** 231 * Implements the XCommand life-cycle. 232 * 233 * @return the {link #execute} return value. 234 * @throws Exception thrown if the command could not be executed. 235 */ 236 @Override 237 public final T call() throws CommandException { 238 if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) && used.get()) { 239 LOG.debug("Command [{0}] key [{1}] already used for [{2}]", getName(), getEntityKey(), this.toString()); 240 return null; 241 } 242 243 commandQueue = null; 244 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions", 1); 245 Instrumentation.Cron callCron = new Instrumentation.Cron(); 246 try { 247 callCron.start(); 248 if (!isSynchronous) { 249 eagerLoadState(); 250 LOG = XLog.resetPrefix(LOG); 251 eagerVerifyPrecondition(); 252 } 253 try { 254 T ret = null; 255 if (!isSynchronous && isLockRequired() && !this.inInterruptMode()) { 256 Instrumentation.Cron acquireLockCron = new Instrumentation.Cron(); 257 acquireLockCron.start(); 258 acquireLock(); 259 acquireLockCron.stop(); 260 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".acquireLock", acquireLockCron); 261 } 262 // executing interrupts only in case of the lock required commands 263 if (lock != null) { 264 this.executeInterrupts(); 265 } 266 267 if (isSynchronous || !isLockRequired() || (lock != null) || this.inInterruptMode()) { 268 if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) 269 && !used.compareAndSet(false, true)) { 270 LOG.debug("Command [{0}] key [{1}] already executed for [{2}]", getName(), getEntityKey(), this.toString()); 271 return null; 272 } 273 LOG.trace("Load state for [{0}]", getEntityKey()); 274 loadState(); 275 LOG = XLog.resetPrefix(LOG); 276 LOG.trace("Precondition check for command [{0}] key [{1}]", getName(), getEntityKey()); 277 verifyPrecondition(); 278 LOG.debug("Execute command [{0}] key [{1}]", getName(), getEntityKey()); 279 Instrumentation.Cron executeCron = new Instrumentation.Cron(); 280 executeCron.start(); 281 ret = execute(); 282 executeCron.stop(); 283 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".execute", executeCron); 284 } 285 if (commandQueue != null) { 286 CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class); 287 for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) { 288 LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey()); 289 if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) { 290 LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, queue full", entry.getValue() 291 .size(), entry.getKey()); 292 } 293 } 294 } 295 return ret; 296 } 297 finally { 298 if (!isSynchronous && isLockRequired() && !this.inInterruptMode()) { 299 releaseLock(); 300 } 301 } 302 } 303 catch(PreconditionException pex){ 304 LOG.warn(pex.getMessage().toString() + ", Error Code: " + pex.getErrorCode().toString()); 305 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".preconditionfailed", 1); 306 return null; 307 } 308 catch (XException ex) { 309 LOG.error("XException, ", ex); 310 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".xexceptions", 1); 311 if (ex instanceof CommandException) { 312 throw (CommandException) ex; 313 } 314 else { 315 throw new CommandException(ex); 316 } 317 } 318 catch (Exception ex) { 319 LOG.error("Exception, ", ex); 320 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".exceptions", 1); 321 throw new CommandException(ErrorCode.E0607, getName(), ex.getMessage(), ex); 322 } 323 catch (Error er) { 324 LOG.error("Error, ", er); 325 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".errors", 1); 326 throw er; 327 } 328 finally { 329 FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection"); 330 callCron.stop(); 331 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".call", callCron); 332 } 333 } 334 335 /** 336 * Call this command synchronously from its caller. This benefits faster 337 * execution of command lifecycle for control nodes and kicking off 338 * subsequent actions 339 * 340 * @param callerEntityKey 341 * @return the {link #execute} return value. 342 * @throws CommandException 343 */ 344 public final T call(String callerEntityKey) throws CommandException { 345 if (!callerEntityKey.equals(this.getEntityKey())) { 346 throw new CommandException(ErrorCode.E0607, "Entity Keys mismatch during synchronous call", "caller=" 347 + callerEntityKey + ", callee=" + getEntityKey()); 348 } 349 isSynchronous = true; //setting to true so lock acquiring and release is not repeated 350 LOG.trace("Executing synchronously command [{0}] on job [{1}]", this.getName(), this.getKey()); 351 return call(); 352 } 353 354 /** 355 * Check for the existence of interrupts for the same lock key 356 * Execute them if exist. 357 * 358 */ 359 protected void executeInterrupts() { 360 CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class); 361 // getting all the list of interrupts to be executed 362 Set<XCallable<?>> callables = callableQueueService.checkInterrupts(this.getEntityKey()); 363 364 if (callables != null) { 365 // executing the list of interrupts in the given order of insertion 366 // in the list 367 for (XCallable<?> callable : callables) { 368 LOG.trace("executing interrupt callable [{0}]", callable.getName()); 369 try { 370 // executing the callable in interrupt mode 371 callable.setInterruptMode(true); 372 callable.call(); 373 LOG.trace("executed interrupt callable [{0}]", callable.getName()); 374 } 375 catch (Exception ex) { 376 LOG.warn("exception interrupt callable [{0}], {1}", callable.getName(), ex.getMessage(), ex); 377 } 378 finally { 379 // reseting the interrupt mode to false after the command is 380 // executed 381 callable.setInterruptMode(false); 382 } 383 } 384 } 385 } 386 387 /** 388 * Return the time out when acquiring a lock. 389 * <p/> 390 * The value is loaded from the Oozie configuration, the property {link #DEFAULT_LOCK_TIMEOUT}. 391 * <p/> 392 * Subclasses should override this method if they want to use a different time out. 393 * 394 * @return the lock time out in milliseconds. 395 */ 396 protected long getLockTimeOut() { 397 return Services.get().getConf().getLong(DEFAULT_LOCK_TIMEOUT, 5 * 1000); 398 } 399 400 /** 401 * Indicate if the the command requires locking. 402 * <p/> 403 * Subclasses should override this method if they require locking. 404 * 405 * @return <code>true/false</code> 406 */ 407 protected abstract boolean isLockRequired(); 408 409 /** 410 * Return the entity key for the command. 411 * <p/> 412 * 413 * @return the entity key for the command. 414 */ 415 public abstract String getEntityKey(); 416 417 /** 418 * Indicate if the the command requires to requeue itself if the lock is not acquired. 419 * <p/> 420 * Subclasses should override this method if they don't want to requeue. 421 * <p/> 422 * Default is true. 423 * 424 * @return <code>true/false</code> 425 */ 426 protected boolean isReQueueRequired() { 427 return true; 428 } 429 430 /** 431 * Load the necessary state to perform an eager precondition check. 432 * <p/> 433 * This implementation does a NOP. 434 * <p/> 435 * Subclasses should override this method and load the state needed to do an eager precondition check. 436 * <p/> 437 * A trivial implementation is calling {link #loadState}. 438 */ 439 protected void eagerLoadState() throws CommandException{ 440 } 441 442 /** 443 * Verify the precondition for the command before obtaining a lock. 444 * <p/> 445 * This implementation does a NOP. 446 * <p/> 447 * A trivial implementation is calling {link #verifyPrecondition}. 448 * 449 * @throws CommandException thrown if the precondition is not met. 450 */ 451 protected void eagerVerifyPrecondition() throws CommandException,PreconditionException { 452 } 453 454 /** 455 * Load the necessary state to perform the precondition check and to execute the command. 456 * <p/> 457 * Subclasses must implement this method and load the state needed to do the precondition check and execute the 458 * command. 459 */ 460 protected abstract void loadState() throws CommandException; 461 462 /** 463 * Verify the precondition for the command after a lock has been obtain, just before executing the command. 464 * <p/> 465 * 466 * @throws CommandException thrown if the precondition is not met. 467 */ 468 protected abstract void verifyPrecondition() throws CommandException,PreconditionException; 469 470 /** 471 * Command execution body. 472 * <p/> 473 * This method will be invoked after the {link #loadState} and {link #verifyPrecondition} methods. 474 * <p/> 475 * If the command requires locking, this method will be invoked ONLY if the lock has been acquired. 476 * 477 * @return a return value from the execution of the command, only meaningful if the command is executed 478 * synchronously. 479 * @throws CommandException thrown if the command execution failed. 480 */ 481 protected abstract T execute() throws CommandException; 482 483 484 /** 485 * Return the {@link Instrumentation} instance in use. 486 * 487 * @return the {@link Instrumentation} instance in use. 488 */ 489 protected Instrumentation getInstrumentation() { 490 return instrumentation; 491 } 492 493 /** 494 * @param used set false to the used 495 */ 496 public void resetUsed() { 497 this.used.set(false); 498 } 499 500 501 /** 502 * Return the delay time for requeue 503 * 504 * @return delay time when requeue itself 505 */ 506 protected Long getRequeueDelay() { 507 return DEFAULT_REQUEUE_DELAY; 508 } 509 510 /** 511 * Get command key 512 * 513 * @return command key 514 */ 515 @Override 516 public String getKey(){ 517 return this.key; 518 } 519 520 /** 521 * set the mode of execution for the callable. True if in interrupt, false 522 * if not 523 */ 524 public void setInterruptMode(boolean mode) { 525 this.inInterrupt = mode; 526 } 527 528 /** 529 * @return the mode of execution. true if it is executed as an Interrupt, 530 * false otherwise 531 */ 532 public boolean inInterruptMode() { 533 return this.inInterrupt; 534 } 535 536 /** 537 * Get XLog log 538 * 539 * @return XLog 540 */ 541 public XLog getLog() { 542 return LOG; 543 } 544 545 /** 546 * String for the command - key 547 * @return String 548 */ 549 @Override 550 public String toString() { 551 return getKey(); 552 } 553}