001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command; 019 020 import org.apache.oozie.ErrorCode; 021 import org.apache.oozie.FaultInjection; 022 import org.apache.oozie.XException; 023 import org.apache.oozie.service.CallableQueueService; 024 import org.apache.oozie.service.InstrumentationService; 025 import org.apache.oozie.service.MemoryLocksService; 026 import org.apache.oozie.service.Services; 027 import org.apache.oozie.util.Instrumentation; 028 import org.apache.oozie.util.MemoryLocks; 029 import org.apache.oozie.util.XCallable; 030 import org.apache.oozie.util.XLog; 031 032 import java.util.ArrayList; 033 import java.util.HashMap; 034 import java.util.List; 035 import java.util.Map; 036 import java.util.Set; 037 import java.util.UUID; 038 import java.util.concurrent.atomic.AtomicBoolean; 039 040 /** 041 * Base class for synchronous and asynchronous commands. 042 * <p/> 043 * It enables by API the following pattern: 044 * <p/> 045 * <ul> 046 * <li>single execution: a command instance can be executed only once</li> 047 * <li>eager data loading: loads data for eager precondition check</li> 048 * <li>eager precondition check: verify precondition before obtaining lock</li> 049 * <li>data loading: loads data for precondition check and execution</li> 050 * <li>precondition check: verifies precondition for execution is still met</li> 051 * <li>locking: obtains exclusive lock on key before executing the command</li> 052 * <li>execution: command logic</li> 053 * </ul> 054 * <p/> 055 * It has built in instrumentation and logging. 056 */ 057 public abstract class XCommand<T> implements XCallable<T> { 058 public static final String DEFAULT_LOCK_TIMEOUT = "oozie.command.default.lock.timeout"; 059 060 public static final String INSTRUMENTATION_GROUP = "commands"; 061 062 public static final Long DEFAULT_REQUEUE_DELAY = 10L; 063 064 public XLog LOG = XLog.getLog(getClass()); 065 066 private String key; 067 private String name; 068 private int priority; 069 private String type; 070 private long createdTime; 071 private MemoryLocks.LockToken lock; 072 private AtomicBoolean used = new AtomicBoolean(false); 073 private boolean inInterrupt = false; 074 075 private Map<Long, List<XCommand<?>>> commandQueue; 076 protected boolean dryrun = false; 077 protected Instrumentation instrumentation; 078 079 protected XLog.Info logInfo; 080 081 /** 082 * Create a command. 083 * 084 * @param name command name. 085 * @param type command type. 086 * @param priority command priority. 087 */ 088 public XCommand(String name, String type, int priority) { 089 this.name = name; 090 this.type = type; 091 this.priority = priority; 092 this.key = name + "_" + UUID.randomUUID(); 093 createdTime = System.currentTimeMillis(); 094 logInfo = new XLog.Info(); 095 instrumentation = Services.get().get(InstrumentationService.class).get(); 096 } 097 098 /** 099 * @param name command name. 100 * @param type command type. 101 * @param priority command priority. 102 * @param dryrun indicates if dryrun option is enabled. if enabled bundle will show a diagnostic output without 103 * really running the job 104 */ 105 public XCommand(String name, String type, int priority, boolean dryrun) { 106 this(name, type, priority); 107 this.dryrun = dryrun; 108 } 109 110 /** 111 * Return the command name. 112 * 113 * @return the command name. 114 */ 115 @Override 116 public String getName() { 117 return name; 118 } 119 120 /** 121 * Return the callable type. 122 * <p/> 123 * The command type is used for concurrency throttling in the {@link CallableQueueService}. 124 * 125 * @return the command type. 126 */ 127 @Override 128 public String getType() { 129 return type; 130 } 131 132 /** 133 * Return the priority of the command. 134 * 135 * @return the command priority. 136 */ 137 @Override 138 public int getPriority() { 139 return priority; 140 } 141 142 /** 143 * Returns the creation time of the command. 144 * 145 * @return the command creation time, in milliseconds. 146 */ 147 @Override 148 public long getCreatedTime() { 149 return createdTime; 150 } 151 152 /** 153 * Queue a command for execution after the current command execution completes. 154 * <p/> 155 * All commands queued during the execution of the current command will be queued for a single serial execution. 156 * <p/> 157 * If the command execution throws an exception, no command will be effectively queued. 158 * 159 * @param command command to queue. 160 */ 161 protected void queue(XCommand<?> command) { 162 queue(command, 0); 163 } 164 165 /** 166 * Queue a command for delayed execution after the current command execution completes. 167 * <p/> 168 * All commands queued during the execution of the current command with the same delay will be queued for a single 169 * serial execution. 170 * <p/> 171 * If the command execution throws an exception, no command will be effectively queued. 172 * 173 * @param command command to queue. 174 * @param msDelay delay in milliseconds. 175 */ 176 protected void queue(XCommand<?> command, long msDelay) { 177 if (commandQueue == null) { 178 commandQueue = new HashMap<Long, List<XCommand<?>>>(); 179 } 180 List<XCommand<?>> list = commandQueue.get(msDelay); 181 if (list == null) { 182 list = new ArrayList<XCommand<?>>(); 183 commandQueue.put(msDelay, list); 184 } 185 list.add(command); 186 } 187 188 /** 189 * Obtain an exclusive lock on the {link #getEntityKey}. 190 * <p/> 191 * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock. 192 * 193 * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock 194 * @throws CommandException thrown i the lock could not be obtained. 195 */ 196 private void acquireLock() throws InterruptedException, CommandException { 197 if (getEntityKey() == null) { 198 // no lock for null entity key 199 return; 200 } 201 lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut()); 202 if (lock == null) { 203 Instrumentation instrumentation = Services.get().get(InstrumentationService.class).get(); 204 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".lockTimeOut", 1); 205 if (isReQueueRequired()) { 206 //if not acquire the lock, re-queue itself with default delay 207 queue(this, getRequeueDelay()); 208 LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and requeue itself [{2}]", this.toString(), getLockTimeOut(), getName()); 209 } else { 210 throw new CommandException(ErrorCode.E0606, this.toString(), getLockTimeOut()); 211 } 212 } else { 213 LOG.debug("Acquired lock for [{0}] in [{1}]", getEntityKey(), getName()); 214 } 215 } 216 217 /** 218 * Release the lock on the {link #getEntityKey}. 219 */ 220 private void releaseLock() { 221 if (lock != null) { 222 lock.release(); 223 LOG.debug("Released lock for [{0}] in [{1}]", getEntityKey(), getName()); 224 } 225 } 226 227 /** 228 * Implements the XCommand life-cycle. 229 * 230 * @return the {link #execute} return value. 231 * @throws Exception thrown if the command could not be executed. 232 */ 233 @Override 234 public final T call() throws CommandException { 235 if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) && used.get()) { 236 LOG.debug("Command [{0}] key [{1}] already used for [{2}]", getName(), getEntityKey(), this.toString()); 237 return null; 238 } 239 240 commandQueue = null; 241 Instrumentation instrumentation = Services.get().get(InstrumentationService.class).get(); 242 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions", 1); 243 Instrumentation.Cron callCron = new Instrumentation.Cron(); 244 try { 245 callCron.start(); 246 eagerLoadState(); 247 LOG = XLog.resetPrefix(LOG); 248 eagerVerifyPrecondition(); 249 try { 250 T ret = null; 251 if (isLockRequired() && !this.inInterruptMode()) { 252 Instrumentation.Cron acquireLockCron = new Instrumentation.Cron(); 253 acquireLockCron.start(); 254 acquireLock(); 255 acquireLockCron.stop(); 256 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".acquireLock", acquireLockCron); 257 } 258 // executing interrupts only in case of the lock required commands 259 if (lock != null) { 260 this.executeInterrupts(); 261 } 262 263 if (!isLockRequired() || (lock != null) || this.inInterruptMode()) { 264 if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) 265 && !used.compareAndSet(false, true)) { 266 LOG.debug("Command [{0}] key [{1}] already executed for [{2}]", getName(), getEntityKey(), this.toString()); 267 return null; 268 } 269 LOG.trace("Load state for [{0}]", getEntityKey()); 270 loadState(); 271 LOG = XLog.resetPrefix(LOG); 272 LOG.trace("Precondition check for command [{0}] key [{1}]", getName(), getEntityKey()); 273 verifyPrecondition(); 274 LOG.debug("Execute command [{0}] key [{1}]", getName(), getEntityKey()); 275 Instrumentation.Cron executeCron = new Instrumentation.Cron(); 276 executeCron.start(); 277 ret = execute(); 278 executeCron.stop(); 279 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".execute", executeCron); 280 } 281 if (commandQueue != null) { 282 CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class); 283 for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) { 284 LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey()); 285 if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) { 286 LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, queue full", entry.getValue() 287 .size(), entry.getKey()); 288 } 289 } 290 } 291 return ret; 292 } 293 finally { 294 if (isLockRequired() && !this.inInterruptMode()) { 295 releaseLock(); 296 } 297 } 298 } 299 catch(PreconditionException pex){ 300 LOG.warn(pex.getMessage().toString() + ", Error Code: " + pex.getErrorCode().toString()); 301 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".preconditionfailed", 1); 302 return null; 303 } 304 catch (XException ex) { 305 LOG.error("XException, ", ex); 306 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".xexceptions", 1); 307 if (ex instanceof CommandException) { 308 throw (CommandException) ex; 309 } 310 else { 311 throw new CommandException(ex); 312 } 313 } 314 catch (Exception ex) { 315 LOG.error("Exception, ", ex); 316 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".exceptions", 1); 317 throw new CommandException(ErrorCode.E0607, getName(), ex.getMessage(), ex); 318 } 319 catch (Error er) { 320 LOG.error("Error, ", er); 321 throw er; 322 } 323 finally { 324 FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection"); 325 callCron.stop(); 326 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".call", callCron); 327 } 328 } 329 330 /** 331 * Check for the existence of interrupts for the same lock key 332 * Execute them if exist. 333 * 334 */ 335 protected void executeInterrupts() { 336 CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class); 337 // getting all the list of interrupts to be executed 338 Set<XCallable<?>> callables = callableQueueService.checkInterrupts(this.getEntityKey()); 339 340 if (callables != null) { 341 // executing the list of interrupts in the given order of insertion 342 // in the list 343 for (XCallable<?> callable : callables) { 344 LOG.trace("executing interrupt callable [{0}]", callable.getName()); 345 try { 346 // executing the callable in interrupt mode 347 callable.setInterruptMode(true); 348 callable.call(); 349 LOG.trace("executed interrupt callable [{0}]", callable.getName()); 350 } 351 catch (Exception ex) { 352 LOG.warn("exception interrupt callable [{0}], {1}", callable.getName(), ex.getMessage(), ex); 353 } 354 finally { 355 // reseting the interrupt mode to false after the command is 356 // executed 357 callable.setInterruptMode(false); 358 } 359 } 360 } 361 } 362 363 /** 364 * Return the time out when acquiring a lock. 365 * <p/> 366 * The value is loaded from the Oozie configuration, the property {link #DEFAULT_LOCK_TIMEOUT}. 367 * <p/> 368 * Subclasses should override this method if they want to use a different time out. 369 * 370 * @return the lock time out in milliseconds. 371 */ 372 protected long getLockTimeOut() { 373 return Services.get().getConf().getLong(DEFAULT_LOCK_TIMEOUT, 5 * 1000); 374 } 375 376 /** 377 * Indicate if the the command requires locking. 378 * <p/> 379 * Subclasses should override this method if they require locking. 380 * 381 * @return <code>true/false</code> 382 */ 383 protected abstract boolean isLockRequired(); 384 385 /** 386 * Return the entity key for the command. 387 * <p/> 388 * 389 * @return the entity key for the command. 390 */ 391 public abstract String getEntityKey(); 392 393 /** 394 * Indicate if the the command requires to requeue itself if the lock is not acquired. 395 * <p/> 396 * Subclasses should override this method if they don't want to requeue. 397 * <p/> 398 * Default is true. 399 * 400 * @return <code>true/false</code> 401 */ 402 protected boolean isReQueueRequired() { 403 return true; 404 } 405 406 /** 407 * Load the necessary state to perform an eager precondition check. 408 * <p/> 409 * This implementation does a NOP. 410 * <p/> 411 * Subclasses should override this method and load the state needed to do an eager precondition check. 412 * <p/> 413 * A trivial implementation is calling {link #loadState}. 414 */ 415 protected void eagerLoadState() throws CommandException{ 416 } 417 418 /** 419 * Verify the precondition for the command before obtaining a lock. 420 * <p/> 421 * This implementation does a NOP. 422 * <p/> 423 * A trivial implementation is calling {link #verifyPrecondition}. 424 * 425 * @throws CommandException thrown if the precondition is not met. 426 */ 427 protected void eagerVerifyPrecondition() throws CommandException,PreconditionException { 428 } 429 430 /** 431 * Load the necessary state to perform the precondition check and to execute the command. 432 * <p/> 433 * Subclasses must implement this method and load the state needed to do the precondition check and execute the 434 * command. 435 */ 436 protected abstract void loadState() throws CommandException; 437 438 /** 439 * Verify the precondition for the command after a lock has been obtain, just before executing the command. 440 * <p/> 441 * 442 * @throws CommandException thrown if the precondition is not met. 443 */ 444 protected abstract void verifyPrecondition() throws CommandException,PreconditionException; 445 446 /** 447 * Command execution body. 448 * <p/> 449 * This method will be invoked after the {link #loadState} and {link #verifyPrecondition} methods. 450 * <p/> 451 * If the command requires locking, this method will be invoked ONLY if the lock has been acquired. 452 * 453 * @return a return value from the execution of the command, only meaningful if the command is executed 454 * synchronously. 455 * @throws CommandException thrown if the command execution failed. 456 */ 457 protected abstract T execute() throws CommandException; 458 459 460 /** 461 * Return the {@link Instrumentation} instance in use. 462 * 463 * @return the {@link Instrumentation} instance in use. 464 */ 465 protected Instrumentation getInstrumentation() { 466 return instrumentation; 467 } 468 469 /** 470 * @param used set false to the used 471 */ 472 public void resetUsed() { 473 this.used.set(false); 474 } 475 476 477 /** 478 * Return the delay time for requeue 479 * 480 * @return delay time when requeue itself 481 */ 482 protected Long getRequeueDelay() { 483 return DEFAULT_REQUEUE_DELAY; 484 } 485 486 /** 487 * Get command key 488 * 489 * @return command key 490 */ 491 @Override 492 public String getKey(){ 493 return this.key; 494 } 495 496 /** 497 * set the mode of execution for the callable. True if in interrupt, false 498 * if not 499 */ 500 public void setInterruptMode(boolean mode) { 501 this.inInterrupt = mode; 502 } 503 504 /** 505 * @return the mode of execution. true if it is executed as an Interrupt, 506 * false otherwise 507 */ 508 public boolean inInterruptMode() { 509 return this.inInterrupt; 510 } 511 512 /** 513 * Get XLog log 514 * 515 * @return XLog 516 */ 517 public XLog getLog() { 518 return LOG; 519 } 520 521 }