001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command; 020 021import org.apache.oozie.ErrorCode; 022import org.apache.oozie.FaultInjection; 023import org.apache.oozie.XException; 024import org.apache.oozie.service.CallableQueueService; 025import org.apache.oozie.service.ConfigurationService; 026import org.apache.oozie.service.EventHandlerService; 027import org.apache.oozie.service.InstrumentationService; 028import org.apache.oozie.service.MemoryLocksService; 029import org.apache.oozie.service.Services; 030import org.apache.oozie.util.Instrumentation; 031import org.apache.oozie.lock.LockToken; 032import org.apache.oozie.util.XCallable; 033import org.apache.oozie.util.XLog; 034 035import java.util.ArrayList; 036import java.util.HashMap; 037import java.util.List; 038import java.util.Map; 039import java.util.Set; 040import java.util.UUID; 041import java.util.concurrent.atomic.AtomicBoolean; 042 043/** 044 * Base class for synchronous and asynchronous commands. 045 * <p> 046 * It enables by API the following pattern: 047 * <p> 048 * <ul> 049 * <li>single execution: a command instance can be executed only once</li> 050 * <li>eager data loading: loads data for eager precondition check</li> 051 * <li>eager precondition check: verify precondition before obtaining lock</li> 052 * <li>data loading: loads data for precondition check and execution</li> 053 * <li>precondition check: verifies precondition for execution is still met</li> 054 * <li>locking: obtains exclusive lock on key before executing the command</li> 055 * <li>execution: command logic</li> 056 * </ul> 057 * <p> 058 * It has built in instrumentation and logging. 059 */ 060public abstract class XCommand<T> implements XCallable<T> { 061 public static final String DEFAULT_LOCK_TIMEOUT = "oozie.command.default.lock.timeout"; 062 063 public static final String INSTRUMENTATION_GROUP = "commands"; 064 065 public static final String DEFAULT_REQUEUE_DELAY = "oozie.command.default.requeue.delay"; 066 067 public XLog LOG = XLog.getLog(getClass()); 068 069 private String key; 070 private String name; 071 private int priority; 072 private String type; 073 private long createdTime; 074 private LockToken lock; 075 private AtomicBoolean used = new AtomicBoolean(false); 076 private boolean inInterrupt = false; 077 078 private Map<Long, List<XCommand<?>>> commandQueue; 079 protected boolean dryrun = false; 080 protected Instrumentation instrumentation; 081 082 protected static EventHandlerService eventService; 083 084 /** 085 * Create a command. 086 * 087 * @param name command name. 088 * @param type command type. 089 * @param priority command priority. 090 */ 091 public XCommand(String name, String type, int priority) { 092 this.name = name; 093 this.type = type; 094 this.priority = priority; 095 this.key = name + "_" + UUID.randomUUID(); 096 createdTime = System.currentTimeMillis(); 097 instrumentation = Services.get().get(InstrumentationService.class).get(); 098 eventService = Services.get().get(EventHandlerService.class); 099 } 100 101 /** 102 * @param name command name. 103 * @param type command type. 104 * @param priority command priority. 105 * @param dryrun indicates if dryrun option is enabled. if enabled bundle will show a diagnostic output without 106 * really running the job 107 */ 108 public XCommand(String name, String type, int priority, boolean dryrun) { 109 this(name, type, priority); 110 this.dryrun = dryrun; 111 } 112 113 /** 114 * Set the thread local logInfo with the context of this command and reset log prefix. 115 */ 116 protected void setLogInfo() { 117 } 118 119 /** 120 * Return the command name. 121 * 122 * @return the command name. 123 */ 124 @Override 125 public String getName() { 126 return name; 127 } 128 129 /** 130 * Return the callable type. 131 * <p> 132 * The command type is used for concurrency throttling in the {@link CallableQueueService}. 133 * 134 * @return the command type. 135 */ 136 @Override 137 public String getType() { 138 return type; 139 } 140 141 /** 142 * Return the priority of the command. 143 * 144 * @return the command priority. 145 */ 146 @Override 147 public int getPriority() { 148 return priority; 149 } 150 151 /** 152 * Returns the creation time of the command. 153 * 154 * @return the command creation time, in milliseconds. 155 */ 156 @Override 157 public long getCreatedTime() { 158 return createdTime; 159 } 160 161 /** 162 * Queue a command for execution after the current command execution completes. 163 * <p> 164 * All commands queued during the execution of the current command will be queued for a single serial execution. 165 * <p> 166 * If the command execution throws an exception, no command will be effectively queued. 167 * 168 * @param command command to queue. 169 */ 170 protected void queue(XCommand<?> command) { 171 queue(command, 0); 172 } 173 174 /** 175 * Queue a command for delayed execution after the current command execution completes. 176 * <p> 177 * All commands queued during the execution of the current command with the same delay will be queued for a single 178 * serial execution. 179 * <p> 180 * If the command execution throws an exception, no command will be effectively queued. 181 * 182 * @param command command to queue. 183 * @param msDelay delay in milliseconds. 184 */ 185 protected void queue(XCommand<?> command, long msDelay) { 186 if (commandQueue == null) { 187 commandQueue = new HashMap<Long, List<XCommand<?>>>(); 188 } 189 List<XCommand<?>> list = commandQueue.get(msDelay); 190 if (list == null) { 191 list = new ArrayList<XCommand<?>>(); 192 commandQueue.put(msDelay, list); 193 } 194 list.add(command); 195 } 196 197 /** 198 * Obtain an exclusive lock on the {link #getEntityKey}. 199 * <p> 200 * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock. 201 * 202 * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock 203 * @throws CommandException thrown i the lock could not be obtained. 204 */ 205 private void acquireLock() throws InterruptedException, CommandException { 206 if (getEntityKey() == null) { 207 // no lock for null entity key 208 return; 209 } 210 lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut()); 211 if (lock == null) { 212 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".lockTimeOut", 1); 213 if (isReQueueRequired()) { 214 // if not acquire the lock, re-queue itself with default delay 215 queue(this, getRequeueDelay()); 216 LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and requeue itself [{2}]", this.toString(), 217 getLockTimeOut(), getName()); 218 } 219 else { 220 throw new CommandException(ErrorCode.E0606, this.toString(), getLockTimeOut()); 221 } 222 } 223 else { 224 LOG.debug("Acquired lock for [{0}] in [{1}]", getEntityKey(), getName()); 225 } 226 } 227 228 /** 229 * Release the lock on the {link #getEntityKey}. 230 */ 231 private void releaseLock() { 232 if (lock != null) { 233 lock.release(); 234 LOG.debug("Released lock for [{0}] in [{1}]", getEntityKey(), getName()); 235 } 236 } 237 238 /** 239 * Implements the XCommand life-cycle. 240 * 241 * @return the {link #execute} return value. 242 * @throws CommandException thrown if the command could not be executed. 243 */ 244 @Override 245 public final T call() throws CommandException { 246 setLogInfo(); 247 if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) && used.get()) { 248 LOG.debug("Command [{0}] key [{1}] already used for [{2}]", getName(), getEntityKey(), this.toString()); 249 return null; 250 } 251 252 commandQueue = null; 253 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions", 1); 254 Instrumentation.Cron callCron = new Instrumentation.Cron(); 255 try { 256 callCron.start(); 257 eagerLoadState(); 258 eagerVerifyPrecondition(); 259 try { 260 T ret = null; 261 if (isLockRequired() && !this.inInterruptMode()) { 262 Instrumentation.Cron acquireLockCron = new Instrumentation.Cron(); 263 acquireLockCron.start(); 264 acquireLock(); 265 acquireLockCron.stop(); 266 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".acquireLock", acquireLockCron); 267 } 268 // executing interrupts only in case of the lock required commands 269 if (lock != null) { 270 this.executeInterrupts(); 271 } 272 273 if (!isLockRequired() || (lock != null) || this.inInterruptMode()) { 274 if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) 275 && !used.compareAndSet(false, true)) { 276 LOG.debug("Command [{0}] key [{1}] already executed for [{2}]", getName(), getEntityKey(), 277 this.toString()); 278 return null; 279 } 280 LOG.trace("Load state for [{0}]", getEntityKey()); 281 loadState(); 282 LOG.trace("Precondition check for command [{0}] key [{1}]", getName(), getEntityKey()); 283 verifyPrecondition(); 284 LOG.debug("Execute command [{0}] key [{1}]", getName(), getEntityKey()); 285 Instrumentation.Cron executeCron = new Instrumentation.Cron(); 286 executeCron.start(); 287 ret = execute(); 288 executeCron.stop(); 289 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".execute", executeCron); 290 } 291 if (commandQueue != null) { 292 CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class); 293 for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) { 294 LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey()); 295 if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) { 296 LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, queue full", entry.getValue() 297 .size(), entry.getKey()); 298 } 299 } 300 } 301 return ret; 302 } 303 finally { 304 if (isLockRequired() && !this.inInterruptMode()) { 305 releaseLock(); 306 } 307 } 308 } 309 catch (PreconditionException pex) { 310 LOG.warn(pex.getMessage().toString() + ", Error Code: " + pex.getErrorCode().toString()); 311 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".preconditionfailed", 1); 312 return null; 313 } 314 catch (XException ex) { 315 LOG.error("XException, ", ex); 316 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".xexceptions", 1); 317 if (ex instanceof CommandException) { 318 throw (CommandException) ex; 319 } 320 else { 321 throw new CommandException(ex); 322 } 323 } 324 catch (Exception ex) { 325 LOG.error("Exception, ", ex); 326 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".exceptions", 1); 327 throw new CommandException(ErrorCode.E0607, getName(), ex.getMessage(), ex); 328 } 329 catch (Error er) { 330 LOG.error("Error, ", er); 331 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".errors", 1); 332 throw er; 333 } 334 finally { 335 FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection"); 336 callCron.stop(); 337 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".call", callCron); 338 } 339 } 340 341 /** 342 * Check for the existence of interrupts for the same lock key 343 * Execute them if exist. 344 * 345 */ 346 protected void executeInterrupts() { 347 CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class); 348 // getting all the list of interrupts to be executed 349 Set<XCallable<?>> callables = callableQueueService.checkInterrupts(this.getEntityKey()); 350 351 if (callables != null) { 352 // executing the list of interrupts in the given order of insertion 353 // in the list 354 for (XCallable<?> callable : callables) { 355 LOG.trace("executing interrupt callable [{0}]", callable.getName()); 356 try { 357 // executing the callable in interrupt mode 358 callable.setInterruptMode(true); 359 callable.call(); 360 LOG.trace("executed interrupt callable [{0}]", callable.getName()); 361 } 362 catch (Exception ex) { 363 LOG.warn("exception interrupt callable [{0}], {1}", callable.getName(), ex.getMessage(), ex); 364 } 365 finally { 366 // reseting the interrupt mode to false after the command is 367 // executed 368 callable.setInterruptMode(false); 369 } 370 } 371 } 372 } 373 374 /** 375 * Return the time out when acquiring a lock. 376 * <p> 377 * The value is loaded from the Oozie configuration, the property {link #DEFAULT_LOCK_TIMEOUT}. 378 * <p> 379 * Subclasses should override this method if they want to use a different time out. 380 * 381 * @return the lock time out in milliseconds. 382 */ 383 protected long getLockTimeOut() { 384 return Services.get().getConf().getLong(DEFAULT_LOCK_TIMEOUT, 5 * 1000); 385 } 386 387 /** 388 * Indicate if the the command requires locking. 389 * <p> 390 * Subclasses should override this method if they require locking. 391 * 392 * @return <code>true/false</code> 393 */ 394 protected abstract boolean isLockRequired(); 395 396 /** 397 * Return the entity key for the command. 398 * <p> 399 * 400 * @return the entity key for the command. 401 */ 402 public abstract String getEntityKey(); 403 404 /** 405 * Indicate if the the command requires to requeue itself if the lock is not acquired. 406 * <p> 407 * Subclasses should override this method if they don't want to requeue. 408 * <p> 409 * Default is true. 410 * 411 * @return <code>true/false</code> 412 */ 413 protected boolean isReQueueRequired() { 414 return true; 415 } 416 417 /** 418 * Load the necessary state to perform an eager precondition check. 419 * <p> 420 * This implementation does a NOP. 421 * <p> 422 * Subclasses should override this method and load the state needed to do an eager precondition check. 423 * <p> 424 * A trivial implementation is calling {link #loadState}. 425 */ 426 protected void eagerLoadState() throws CommandException { 427 } 428 429 /** 430 * Verify the precondition for the command before obtaining a lock. 431 * <p> 432 * This implementation does a NOP. 433 * <p> 434 * A trivial implementation is calling {link #verifyPrecondition}. 435 * 436 * @throws CommandException thrown if the precondition is not met. 437 */ 438 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 439 } 440 441 /** 442 * Load the necessary state to perform the precondition check and to execute the command. 443 * <p> 444 * Subclasses must implement this method and load the state needed to do the precondition check and execute the 445 * command. 446 */ 447 protected abstract void loadState() throws CommandException; 448 449 /** 450 * Verify the precondition for the command after a lock has been obtain, just before executing the command. 451 * <p> 452 * 453 * @throws CommandException thrown if the precondition is not met. 454 */ 455 protected abstract void verifyPrecondition() throws CommandException, PreconditionException; 456 457 /** 458 * Command execution body. 459 * <p> 460 * This method will be invoked after the {link #loadState} and {link #verifyPrecondition} methods. 461 * <p> 462 * If the command requires locking, this method will be invoked ONLY if the lock has been acquired. 463 * 464 * @return a return value from the execution of the command, only meaningful if the command is executed 465 * synchronously. 466 * @throws CommandException thrown if the command execution failed. 467 */ 468 protected abstract T execute() throws CommandException; 469 470 /** 471 * Return the {@link Instrumentation} instance in use. 472 * 473 * @return the {@link Instrumentation} instance in use. 474 */ 475 protected Instrumentation getInstrumentation() { 476 return instrumentation; 477 } 478 479 /** 480 * Set false to the used 481 */ 482 public void resetUsed() { 483 this.used.set(false); 484 } 485 486 /** 487 * Return the delay time for requeue 488 * <p> 489 * The value is loaded from the Oozie configuration, the property {link #DEFAULT_REQUEUE_DELAY}. 490 * <p> 491 * Subclasses should override this method if they want to use a different requeue delay time 492 * 493 * @return delay time when requeue itself 494 */ 495 protected long getRequeueDelay() { 496 return ConfigurationService.getLong(DEFAULT_REQUEUE_DELAY); 497 } 498 499 /** 500 * Get command key 501 * 502 * @return command key 503 */ 504 @Override 505 public String getKey() { 506 return this.key; 507 } 508 509 /** 510 * set the mode of execution for the callable. True if in interrupt, false 511 * if not 512 */ 513 public void setInterruptMode(boolean mode) { 514 this.inInterrupt = mode; 515 } 516 517 /** 518 * @return the mode of execution. true if it is executed as an Interrupt, 519 * false otherwise 520 */ 521 public boolean inInterruptMode() { 522 return this.inInterrupt; 523 } 524 525 /** 526 * Get XLog log 527 * 528 * @return XLog 529 */ 530 public XLog getLog() { 531 return LOG; 532 } 533 534 /** 535 * String for the command - key 536 * @return String 537 */ 538 @Override 539 public String toString() { 540 return getKey(); 541 } 542}