001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command; 020 021import org.apache.oozie.ErrorCode; 022import org.apache.oozie.FaultInjection; 023import org.apache.oozie.XException; 024import org.apache.oozie.service.CallableQueueService; 025import org.apache.oozie.service.ConfigurationService; 026import org.apache.oozie.service.EventHandlerService; 027import org.apache.oozie.service.InstrumentationService; 028import org.apache.oozie.service.MemoryLocksService; 029import org.apache.oozie.service.Services; 030import org.apache.oozie.util.Instrumentation; 031import org.apache.oozie.lock.LockToken; 032import org.apache.oozie.util.XCallable; 033import org.apache.oozie.util.XLog; 034 035import java.util.ArrayList; 036import java.util.HashMap; 037import java.util.List; 038import java.util.Map; 039import java.util.Set; 040import java.util.UUID; 041import java.util.concurrent.atomic.AtomicBoolean; 042 043/** 044 * Base class for synchronous and asynchronous commands. 045 * <p> 046 * It enables by API the following pattern: 047 * <p> 048 * <ul> 049 * <li>single execution: a command instance can be executed only once</li> 050 * <li>eager data loading: loads data for eager precondition check</li> 051 * <li>eager precondition check: verify precondition before obtaining lock</li> 052 * <li>data loading: loads data for precondition check and execution</li> 053 * <li>precondition check: verifies precondition for execution is still met</li> 054 * <li>locking: obtains exclusive lock on key before executing the command</li> 055 * <li>execution: command logic</li> 056 * </ul> 057 * <p> 058 * It has built in instrumentation and logging. 059 */ 060public abstract class XCommand<T> implements XCallable<T> { 061 public static final String DEFAULT_LOCK_TIMEOUT = "oozie.command.default.lock.timeout"; 062 063 public static final String INSTRUMENTATION_GROUP = "commands"; 064 065 public static final String DEFAULT_REQUEUE_DELAY = "oozie.command.default.requeue.delay"; 066 067 public XLog LOG = XLog.getLog(getClass()); 068 069 private String key; 070 private String name; 071 private int priority; 072 private String type; 073 private long createdTime; 074 private LockToken lock; 075 private AtomicBoolean used = new AtomicBoolean(false); 076 private boolean inInterrupt = false; 077 078 private Map<Long, List<XCommand<?>>> commandQueue; 079 protected boolean dryrun = false; 080 protected Instrumentation instrumentation; 081 082 protected static EventHandlerService eventService; 083 084 /** 085 * Create a command. 086 * 087 * @param name command name. 088 * @param type command type. 089 * @param priority command priority. 090 */ 091 public XCommand(String name, String type, int priority) { 092 this.name = name; 093 this.type = type; 094 this.priority = priority; 095 this.key = name + "_" + UUID.randomUUID(); 096 createdTime = System.currentTimeMillis(); 097 instrumentation = Services.get().get(InstrumentationService.class).get(); 098 eventService = Services.get().get(EventHandlerService.class); 099 } 100 101 /** 102 * @param name command name. 103 * @param type command type. 104 * @param priority command priority. 105 * @param dryrun indicates if dryrun option is enabled. if enabled bundle will show a diagnostic output without 106 * really running the job 107 */ 108 public XCommand(String name, String type, int priority, boolean dryrun) { 109 this(name, type, priority); 110 this.dryrun = dryrun; 111 } 112 113 /** 114 * Set the thread local logInfo with the context of this command and reset log prefix. 115 */ 116 protected void setLogInfo() { 117 } 118 119 /** 120 * Return the command name. 121 * 122 * @return the command name. 123 */ 124 @Override 125 public String getName() { 126 return name; 127 } 128 129 /** 130 * Return the callable type. 131 * <p> 132 * The command type is used for concurrency throttling in the {@link CallableQueueService}. 133 * 134 * @return the command type. 135 */ 136 @Override 137 public String getType() { 138 return type; 139 } 140 141 /** 142 * Return the priority of the command. 143 * 144 * @return the command priority. 145 */ 146 @Override 147 public int getPriority() { 148 return priority; 149 } 150 151 /** 152 * Returns the creation time of the command. 153 * 154 * @return the command creation time, in milliseconds. 155 */ 156 @Override 157 public long getCreatedTime() { 158 return createdTime; 159 } 160 161 /** 162 * Queue a command for execution after the current command execution completes. 163 * <p> 164 * All commands queued during the execution of the current command will be queued for a single serial execution. 165 * <p> 166 * If the command execution throws an exception, no command will be effectively queued. 167 * 168 * @param command command to queue. 169 */ 170 protected void queue(XCommand<?> command) { 171 queue(command, 0); 172 } 173 174 /** 175 * Queue a command for delayed execution after the current command execution completes. 176 * <p> 177 * All commands queued during the execution of the current command with the same delay will be queued for a single 178 * serial execution. 179 * <p> 180 * If the command execution throws an exception, no command will be effectively queued. 181 * 182 * @param command command to queue. 183 * @param msDelay delay in milliseconds. 184 */ 185 protected void queue(XCommand<?> command, long msDelay) { 186 if (commandQueue == null) { 187 commandQueue = new HashMap<Long, List<XCommand<?>>>(); 188 } 189 List<XCommand<?>> list = commandQueue.get(msDelay); 190 if (list == null) { 191 list = new ArrayList<XCommand<?>>(); 192 commandQueue.put(msDelay, list); 193 } 194 list.add(command); 195 } 196 197 /** 198 * Obtain an exclusive lock on the {link #getEntityKey}. 199 * <p> 200 * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock. 201 * 202 * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock 203 * @throws CommandException thrown i the lock could not be obtained. 204 */ 205 private void acquireLock() throws InterruptedException, CommandException { 206 if (getEntityKey() == null) { 207 // no lock for null entity key 208 return; 209 } 210 lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut()); 211 if (lock == null) { 212 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".lockTimeOut", 1); 213 if (isReQueueRequired()) { 214 // if not acquire the lock, re-queue itself with default delay 215 queue(this, getRequeueDelay()); 216 LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and requeue itself [{2}]", this.toString(), 217 getLockTimeOut(), getName()); 218 } 219 else { 220 throw new CommandException(ErrorCode.E0606, this.toString(), getLockTimeOut()); 221 } 222 } 223 else { 224 LOG.debug("Acquired lock for [{0}] in [{1}]", getEntityKey(), getName()); 225 } 226 } 227 228 /** 229 * Release the lock on the {link #getEntityKey}. 230 */ 231 private void releaseLock() { 232 if (lock != null) { 233 lock.release(); 234 LOG.debug("Released lock for [{0}] in [{1}]", getEntityKey(), getName()); 235 } 236 } 237 238 /** 239 * Implements the XCommand life-cycle. 240 * 241 * @return the {link #execute} return value. 242 * @throws CommandException thrown if the command could not be executed. 243 */ 244 @Override 245 public final T call() throws CommandException { 246 setLogInfo(); 247 CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class); 248 Set<String> interruptTypes = callableQueueService.getInterruptTypes(); 249 250 if (interruptTypes.contains(this.getType()) && used.get()) { 251 LOG.debug("Command [{0}] key [{1}] already used for [{2}]", getName(), getEntityKey(), this.toString()); 252 return null; 253 } 254 255 commandQueue = null; 256 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions", 1); 257 Instrumentation.Cron callCron = new Instrumentation.Cron(); 258 try { 259 callCron.start(); 260 eagerLoadState(); 261 eagerVerifyPrecondition(); 262 try { 263 T ret = null; 264 if (isLockRequired() && !this.inInterruptMode()) { 265 Instrumentation.Cron acquireLockCron = new Instrumentation.Cron(); 266 acquireLockCron.start(); 267 acquireLock(); 268 acquireLockCron.stop(); 269 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".acquireLock", acquireLockCron); 270 } 271 // executing interrupts only in case of the lock required commands 272 if (lock != null) { 273 this.executeInterrupts(); 274 } 275 276 if (!isLockRequired() || (lock != null) || this.inInterruptMode()) { 277 if (interruptTypes.contains(this.getType()) 278 && !used.compareAndSet(false, true)) { 279 LOG.debug("Command [{0}] key [{1}] already executed for [{2}]", getName(), getEntityKey(), 280 this.toString()); 281 return null; 282 } 283 LOG.trace("Load state for [{0}]", getEntityKey()); 284 loadState(); 285 LOG.trace("Precondition check for command [{0}] key [{1}]", getName(), getEntityKey()); 286 verifyPrecondition(); 287 LOG.debug("Execute command [{0}] key [{1}]", getName(), getEntityKey()); 288 Instrumentation.Cron executeCron = new Instrumentation.Cron(); 289 executeCron.start(); 290 ret = execute(); 291 executeCron.stop(); 292 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".execute", executeCron); 293 } 294 if (commandQueue != null) { 295 for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) { 296 LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey()); 297 if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) { 298 LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, queue full", entry.getValue() 299 .size(), entry.getKey()); 300 } 301 } 302 } 303 return ret; 304 } 305 finally { 306 if (isLockRequired() && !this.inInterruptMode()) { 307 releaseLock(); 308 } 309 } 310 } 311 catch (PreconditionException pex) { 312 LOG.warn(pex.getMessage().toString() + ", Error Code: " + pex.getErrorCode().toString()); 313 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".preconditionfailed", 1); 314 return null; 315 } 316 catch (XException ex) { 317 LOG.error("XException, ", ex); 318 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".xexceptions", 1); 319 if (ex instanceof CommandException) { 320 throw (CommandException) ex; 321 } 322 else { 323 throw new CommandException(ex); 324 } 325 } 326 catch (Exception ex) { 327 LOG.error("Exception, ", ex); 328 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".exceptions", 1); 329 throw new CommandException(ErrorCode.E0607, getName(), ex.getMessage(), ex); 330 } 331 catch (Error er) { 332 LOG.error("Error, ", er); 333 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".errors", 1); 334 throw er; 335 } 336 finally { 337 FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection"); 338 callCron.stop(); 339 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".call", callCron); 340 } 341 } 342 343 /** 344 * Check for the existence of interrupts for the same lock key 345 * Execute them if exist. 346 * 347 */ 348 protected void executeInterrupts() { 349 CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class); 350 // getting all the list of interrupts to be executed 351 Set<XCallable<?>> callables = callableQueueService.checkInterrupts(this.getEntityKey()); 352 353 if (callables != null) { 354 // executing the list of interrupts in the given order of insertion 355 // in the list 356 for (XCallable<?> callable : callables) { 357 LOG.trace("executing interrupt callable [{0}]", callable.getName()); 358 try { 359 // executing the callable in interrupt mode 360 callable.setInterruptMode(true); 361 callable.call(); 362 LOG.trace("executed interrupt callable [{0}]", callable.getName()); 363 } 364 catch (Exception ex) { 365 LOG.warn("exception interrupt callable [{0}], {1}", callable.getName(), ex.getMessage(), ex); 366 } 367 finally { 368 // reseting the interrupt mode to false after the command is 369 // executed 370 callable.setInterruptMode(false); 371 } 372 } 373 } 374 } 375 376 /** 377 * Return the time out when acquiring a lock. 378 * <p> 379 * The value is loaded from the Oozie configuration, the property {link #DEFAULT_LOCK_TIMEOUT}. 380 * <p> 381 * Subclasses should override this method if they want to use a different time out. 382 * 383 * @return the lock time out in milliseconds. 384 */ 385 protected long getLockTimeOut() { 386 return Services.get().getConf().getLong(DEFAULT_LOCK_TIMEOUT, 5 * 1000); 387 } 388 389 /** 390 * Indicate if the the command requires locking. 391 * <p> 392 * Subclasses should override this method if they require locking. 393 * 394 * @return <code>true/false</code> 395 */ 396 protected abstract boolean isLockRequired(); 397 398 /** 399 * Return the entity key for the command. 400 * <p> 401 * 402 * @return the entity key for the command. 403 */ 404 public abstract String getEntityKey(); 405 406 /** 407 * Indicate if the the command requires to requeue itself if the lock is not acquired. 408 * <p> 409 * Subclasses should override this method if they don't want to requeue. 410 * <p> 411 * Default is true. 412 * 413 * @return <code>true/false</code> 414 */ 415 protected boolean isReQueueRequired() { 416 return true; 417 } 418 419 /** 420 * Load the necessary state to perform an eager precondition check. 421 * <p> 422 * This implementation does a NOP. 423 * <p> 424 * Subclasses should override this method and load the state needed to do an eager precondition check. 425 * <p> 426 * A trivial implementation is calling {link #loadState}. 427 */ 428 protected void eagerLoadState() throws CommandException { 429 } 430 431 /** 432 * Verify the precondition for the command before obtaining a lock. 433 * <p> 434 * This implementation does a NOP. 435 * <p> 436 * A trivial implementation is calling {link #verifyPrecondition}. 437 * 438 * @throws CommandException thrown if the precondition is not met. 439 */ 440 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 441 } 442 443 /** 444 * Load the necessary state to perform the precondition check and to execute the command. 445 * <p> 446 * Subclasses must implement this method and load the state needed to do the precondition check and execute the 447 * command. 448 */ 449 protected abstract void loadState() throws CommandException; 450 451 /** 452 * Verify the precondition for the command after a lock has been obtain, just before executing the command. 453 * <p> 454 * 455 * @throws CommandException thrown if the precondition is not met. 456 */ 457 protected abstract void verifyPrecondition() throws CommandException, PreconditionException; 458 459 /** 460 * Command execution body. 461 * <p> 462 * This method will be invoked after the {link #loadState} and {link #verifyPrecondition} methods. 463 * <p> 464 * If the command requires locking, this method will be invoked ONLY if the lock has been acquired. 465 * 466 * @return a return value from the execution of the command, only meaningful if the command is executed 467 * synchronously. 468 * @throws CommandException thrown if the command execution failed. 469 */ 470 protected abstract T execute() throws CommandException; 471 472 /** 473 * Return the {@link Instrumentation} instance in use. 474 * 475 * @return the {@link Instrumentation} instance in use. 476 */ 477 protected Instrumentation getInstrumentation() { 478 return instrumentation; 479 } 480 481 /** 482 * Set false to the used 483 */ 484 public void resetUsed() { 485 this.used.set(false); 486 } 487 488 /** 489 * Return the delay time for requeue 490 * <p> 491 * The value is loaded from the Oozie configuration, the property {link #DEFAULT_REQUEUE_DELAY}. 492 * <p> 493 * Subclasses should override this method if they want to use a different requeue delay time 494 * 495 * @return delay time when requeue itself 496 */ 497 protected long getRequeueDelay() { 498 return ConfigurationService.getLong(DEFAULT_REQUEUE_DELAY); 499 } 500 501 /** 502 * Get command key 503 * 504 * @return command key 505 */ 506 @Override 507 public String getKey() { 508 return this.key; 509 } 510 511 /** 512 * set the mode of execution for the callable. True if in interrupt, false 513 * if not 514 */ 515 public void setInterruptMode(boolean mode) { 516 this.inInterrupt = mode; 517 } 518 519 /** 520 * @return the mode of execution. true if it is executed as an Interrupt, 521 * false otherwise 522 */ 523 public boolean inInterruptMode() { 524 return this.inInterrupt; 525 } 526 527 /** 528 * Get XLog log 529 * 530 * @return XLog 531 */ 532 public XLog getLog() { 533 return LOG; 534 } 535 536 /** 537 * String for the command - key 538 * @return String 539 */ 540 @Override 541 public String toString() { 542 return getKey(); 543 } 544}