001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command; 019 020 import org.apache.oozie.ErrorCode; 021 import org.apache.oozie.FaultInjection; 022 import org.apache.oozie.XException; 023 import org.apache.oozie.service.CallableQueueService; 024 import org.apache.oozie.service.EventHandlerService; 025 import org.apache.oozie.service.InstrumentationService; 026 import org.apache.oozie.service.MemoryLocksService; 027 import org.apache.oozie.service.Services; 028 import org.apache.oozie.util.Instrumentation; 029 import org.apache.oozie.util.MemoryLocks; 030 import org.apache.oozie.util.XCallable; 031 import org.apache.oozie.util.XLog; 032 033 import java.util.ArrayList; 034 import java.util.HashMap; 035 import java.util.List; 036 import java.util.Map; 037 import java.util.Set; 038 import java.util.UUID; 039 import java.util.concurrent.atomic.AtomicBoolean; 040 041 /** 042 * Base class for synchronous and asynchronous commands. 043 * <p/> 044 * It enables by API the following pattern: 045 * <p/> 046 * <ul> 047 * <li>single execution: a command instance can be executed only once</li> 048 * <li>eager data loading: loads data for eager precondition check</li> 049 * <li>eager precondition check: verify precondition before obtaining lock</li> 050 * <li>data loading: loads data for precondition check and execution</li> 051 * <li>precondition check: verifies precondition for execution is still met</li> 052 * <li>locking: obtains exclusive lock on key before executing the command</li> 053 * <li>execution: command logic</li> 054 * </ul> 055 * <p/> 056 * It has built in instrumentation and logging. 057 */ 058 public abstract class XCommand<T> implements XCallable<T> { 059 public static final String DEFAULT_LOCK_TIMEOUT = "oozie.command.default.lock.timeout"; 060 061 public static final String INSTRUMENTATION_GROUP = "commands"; 062 063 public static final Long DEFAULT_REQUEUE_DELAY = 10L; 064 065 public XLog LOG = XLog.getLog(getClass()); 066 067 private String key; 068 private String name; 069 private int priority; 070 private String type; 071 private long createdTime; 072 private MemoryLocks.LockToken lock; 073 private AtomicBoolean used = new AtomicBoolean(false); 074 private boolean inInterrupt = false; 075 076 private Map<Long, List<XCommand<?>>> commandQueue; 077 protected boolean dryrun = false; 078 protected Instrumentation instrumentation; 079 080 protected XLog.Info logInfo; 081 protected static EventHandlerService eventService; 082 083 /** 084 * Create a command. 085 * 086 * @param name command name. 087 * @param type command type. 088 * @param priority command priority. 089 */ 090 public XCommand(String name, String type, int priority) { 091 this.name = name; 092 this.type = type; 093 this.priority = priority; 094 this.key = name + "_" + UUID.randomUUID(); 095 createdTime = System.currentTimeMillis(); 096 logInfo = new XLog.Info(); 097 instrumentation = Services.get().get(InstrumentationService.class).get(); 098 eventService = Services.get().get(EventHandlerService.class); 099 } 100 101 /** 102 * @param name command name. 103 * @param type command type. 104 * @param priority command priority. 105 * @param dryrun indicates if dryrun option is enabled. if enabled bundle will show a diagnostic output without 106 * really running the job 107 */ 108 public XCommand(String name, String type, int priority, boolean dryrun) { 109 this(name, type, priority); 110 this.dryrun = dryrun; 111 } 112 113 /** 114 * Return the command name. 115 * 116 * @return the command name. 117 */ 118 @Override 119 public String getName() { 120 return name; 121 } 122 123 /** 124 * Return the callable type. 125 * <p/> 126 * The command type is used for concurrency throttling in the {@link CallableQueueService}. 127 * 128 * @return the command type. 129 */ 130 @Override 131 public String getType() { 132 return type; 133 } 134 135 /** 136 * Return the priority of the command. 137 * 138 * @return the command priority. 139 */ 140 @Override 141 public int getPriority() { 142 return priority; 143 } 144 145 /** 146 * Returns the creation time of the command. 147 * 148 * @return the command creation time, in milliseconds. 149 */ 150 @Override 151 public long getCreatedTime() { 152 return createdTime; 153 } 154 155 /** 156 * Queue a command for execution after the current command execution completes. 157 * <p/> 158 * All commands queued during the execution of the current command will be queued for a single serial execution. 159 * <p/> 160 * If the command execution throws an exception, no command will be effectively queued. 161 * 162 * @param command command to queue. 163 */ 164 protected void queue(XCommand<?> command) { 165 queue(command, 0); 166 } 167 168 /** 169 * Queue a command for delayed execution after the current command execution completes. 170 * <p/> 171 * All commands queued during the execution of the current command with the same delay will be queued for a single 172 * serial execution. 173 * <p/> 174 * If the command execution throws an exception, no command will be effectively queued. 175 * 176 * @param command command to queue. 177 * @param msDelay delay in milliseconds. 178 */ 179 protected void queue(XCommand<?> command, long msDelay) { 180 if (commandQueue == null) { 181 commandQueue = new HashMap<Long, List<XCommand<?>>>(); 182 } 183 List<XCommand<?>> list = commandQueue.get(msDelay); 184 if (list == null) { 185 list = new ArrayList<XCommand<?>>(); 186 commandQueue.put(msDelay, list); 187 } 188 list.add(command); 189 } 190 191 /** 192 * Obtain an exclusive lock on the {link #getEntityKey}. 193 * <p/> 194 * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock. 195 * 196 * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock 197 * @throws CommandException thrown i the lock could not be obtained. 198 */ 199 private void acquireLock() throws InterruptedException, CommandException { 200 if (getEntityKey() == null) { 201 // no lock for null entity key 202 return; 203 } 204 lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut()); 205 if (lock == null) { 206 Instrumentation instrumentation = Services.get().get(InstrumentationService.class).get(); 207 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".lockTimeOut", 1); 208 if (isReQueueRequired()) { 209 //if not acquire the lock, re-queue itself with default delay 210 queue(this, getRequeueDelay()); 211 LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and requeue itself [{2}]", this.toString(), getLockTimeOut(), getName()); 212 } else { 213 throw new CommandException(ErrorCode.E0606, this.toString(), getLockTimeOut()); 214 } 215 } else { 216 LOG.debug("Acquired lock for [{0}] in [{1}]", getEntityKey(), getName()); 217 } 218 } 219 220 /** 221 * Release the lock on the {link #getEntityKey}. 222 */ 223 private void releaseLock() { 224 if (lock != null) { 225 lock.release(); 226 LOG.debug("Released lock for [{0}] in [{1}]", getEntityKey(), getName()); 227 } 228 } 229 230 /** 231 * Implements the XCommand life-cycle. 232 * 233 * @return the {link #execute} return value. 234 * @throws Exception thrown if the command could not be executed. 235 */ 236 @Override 237 public final T call() throws CommandException { 238 if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) && used.get()) { 239 LOG.debug("Command [{0}] key [{1}] already used for [{2}]", getName(), getEntityKey(), this.toString()); 240 return null; 241 } 242 243 commandQueue = null; 244 Instrumentation instrumentation = Services.get().get(InstrumentationService.class).get(); 245 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions", 1); 246 Instrumentation.Cron callCron = new Instrumentation.Cron(); 247 try { 248 callCron.start(); 249 eagerLoadState(); 250 LOG = XLog.resetPrefix(LOG); 251 eagerVerifyPrecondition(); 252 try { 253 T ret = null; 254 if (isLockRequired() && !this.inInterruptMode()) { 255 Instrumentation.Cron acquireLockCron = new Instrumentation.Cron(); 256 acquireLockCron.start(); 257 acquireLock(); 258 acquireLockCron.stop(); 259 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".acquireLock", acquireLockCron); 260 } 261 // executing interrupts only in case of the lock required commands 262 if (lock != null) { 263 this.executeInterrupts(); 264 } 265 266 if (!isLockRequired() || (lock != null) || this.inInterruptMode()) { 267 if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) 268 && !used.compareAndSet(false, true)) { 269 LOG.debug("Command [{0}] key [{1}] already executed for [{2}]", getName(), getEntityKey(), this.toString()); 270 return null; 271 } 272 LOG.trace("Load state for [{0}]", getEntityKey()); 273 loadState(); 274 LOG = XLog.resetPrefix(LOG); 275 LOG.trace("Precondition check for command [{0}] key [{1}]", getName(), getEntityKey()); 276 verifyPrecondition(); 277 LOG.debug("Execute command [{0}] key [{1}]", getName(), getEntityKey()); 278 Instrumentation.Cron executeCron = new Instrumentation.Cron(); 279 executeCron.start(); 280 ret = execute(); 281 executeCron.stop(); 282 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".execute", executeCron); 283 } 284 if (commandQueue != null) { 285 CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class); 286 for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) { 287 LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey()); 288 if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) { 289 LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, queue full", entry.getValue() 290 .size(), entry.getKey()); 291 } 292 } 293 } 294 return ret; 295 } 296 finally { 297 if (isLockRequired() && !this.inInterruptMode()) { 298 releaseLock(); 299 } 300 } 301 } 302 catch(PreconditionException pex){ 303 LOG.warn(pex.getMessage().toString() + ", Error Code: " + pex.getErrorCode().toString()); 304 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".preconditionfailed", 1); 305 return null; 306 } 307 catch (XException ex) { 308 LOG.error("XException, ", ex); 309 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".xexceptions", 1); 310 if (ex instanceof CommandException) { 311 throw (CommandException) ex; 312 } 313 else { 314 throw new CommandException(ex); 315 } 316 } 317 catch (Exception ex) { 318 LOG.error("Exception, ", ex); 319 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".exceptions", 1); 320 throw new CommandException(ErrorCode.E0607, getName(), ex.getMessage(), ex); 321 } 322 catch (Error er) { 323 LOG.error("Error, ", er); 324 throw er; 325 } 326 finally { 327 FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection"); 328 callCron.stop(); 329 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".call", callCron); 330 } 331 } 332 333 /** 334 * Check for the existence of interrupts for the same lock key 335 * Execute them if exist. 336 * 337 */ 338 protected void executeInterrupts() { 339 CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class); 340 // getting all the list of interrupts to be executed 341 Set<XCallable<?>> callables = callableQueueService.checkInterrupts(this.getEntityKey()); 342 343 if (callables != null) { 344 // executing the list of interrupts in the given order of insertion 345 // in the list 346 for (XCallable<?> callable : callables) { 347 LOG.trace("executing interrupt callable [{0}]", callable.getName()); 348 try { 349 // executing the callable in interrupt mode 350 callable.setInterruptMode(true); 351 callable.call(); 352 LOG.trace("executed interrupt callable [{0}]", callable.getName()); 353 } 354 catch (Exception ex) { 355 LOG.warn("exception interrupt callable [{0}], {1}", callable.getName(), ex.getMessage(), ex); 356 } 357 finally { 358 // reseting the interrupt mode to false after the command is 359 // executed 360 callable.setInterruptMode(false); 361 } 362 } 363 } 364 } 365 366 /** 367 * Return the time out when acquiring a lock. 368 * <p/> 369 * The value is loaded from the Oozie configuration, the property {link #DEFAULT_LOCK_TIMEOUT}. 370 * <p/> 371 * Subclasses should override this method if they want to use a different time out. 372 * 373 * @return the lock time out in milliseconds. 374 */ 375 protected long getLockTimeOut() { 376 return Services.get().getConf().getLong(DEFAULT_LOCK_TIMEOUT, 5 * 1000); 377 } 378 379 /** 380 * Indicate if the the command requires locking. 381 * <p/> 382 * Subclasses should override this method if they require locking. 383 * 384 * @return <code>true/false</code> 385 */ 386 protected abstract boolean isLockRequired(); 387 388 /** 389 * Return the entity key for the command. 390 * <p/> 391 * 392 * @return the entity key for the command. 393 */ 394 public abstract String getEntityKey(); 395 396 /** 397 * Indicate if the the command requires to requeue itself if the lock is not acquired. 398 * <p/> 399 * Subclasses should override this method if they don't want to requeue. 400 * <p/> 401 * Default is true. 402 * 403 * @return <code>true/false</code> 404 */ 405 protected boolean isReQueueRequired() { 406 return true; 407 } 408 409 /** 410 * Load the necessary state to perform an eager precondition check. 411 * <p/> 412 * This implementation does a NOP. 413 * <p/> 414 * Subclasses should override this method and load the state needed to do an eager precondition check. 415 * <p/> 416 * A trivial implementation is calling {link #loadState}. 417 */ 418 protected void eagerLoadState() throws CommandException{ 419 } 420 421 /** 422 * Verify the precondition for the command before obtaining a lock. 423 * <p/> 424 * This implementation does a NOP. 425 * <p/> 426 * A trivial implementation is calling {link #verifyPrecondition}. 427 * 428 * @throws CommandException thrown if the precondition is not met. 429 */ 430 protected void eagerVerifyPrecondition() throws CommandException,PreconditionException { 431 } 432 433 /** 434 * Load the necessary state to perform the precondition check and to execute the command. 435 * <p/> 436 * Subclasses must implement this method and load the state needed to do the precondition check and execute the 437 * command. 438 */ 439 protected abstract void loadState() throws CommandException; 440 441 /** 442 * Verify the precondition for the command after a lock has been obtain, just before executing the command. 443 * <p/> 444 * 445 * @throws CommandException thrown if the precondition is not met. 446 */ 447 protected abstract void verifyPrecondition() throws CommandException,PreconditionException; 448 449 /** 450 * Command execution body. 451 * <p/> 452 * This method will be invoked after the {link #loadState} and {link #verifyPrecondition} methods. 453 * <p/> 454 * If the command requires locking, this method will be invoked ONLY if the lock has been acquired. 455 * 456 * @return a return value from the execution of the command, only meaningful if the command is executed 457 * synchronously. 458 * @throws CommandException thrown if the command execution failed. 459 */ 460 protected abstract T execute() throws CommandException; 461 462 463 /** 464 * Return the {@link Instrumentation} instance in use. 465 * 466 * @return the {@link Instrumentation} instance in use. 467 */ 468 protected Instrumentation getInstrumentation() { 469 return instrumentation; 470 } 471 472 /** 473 * @param used set false to the used 474 */ 475 public void resetUsed() { 476 this.used.set(false); 477 } 478 479 480 /** 481 * Return the delay time for requeue 482 * 483 * @return delay time when requeue itself 484 */ 485 protected Long getRequeueDelay() { 486 return DEFAULT_REQUEUE_DELAY; 487 } 488 489 /** 490 * Get command key 491 * 492 * @return command key 493 */ 494 @Override 495 public String getKey(){ 496 return this.key; 497 } 498 499 /** 500 * set the mode of execution for the callable. True if in interrupt, false 501 * if not 502 */ 503 public void setInterruptMode(boolean mode) { 504 this.inInterrupt = mode; 505 } 506 507 /** 508 * @return the mode of execution. true if it is executed as an Interrupt, 509 * false otherwise 510 */ 511 public boolean inInterruptMode() { 512 return this.inInterrupt; 513 } 514 515 /** 516 * Get XLog log 517 * 518 * @return XLog 519 */ 520 public XLog getLog() { 521 return LOG; 522 } 523 524 }