001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command; 019 020 import org.apache.oozie.ErrorCode; 021 import org.apache.oozie.FaultInjection; 022 import org.apache.oozie.XException; 023 import org.apache.oozie.service.CallableQueueService; 024 import org.apache.oozie.service.InstrumentationService; 025 import org.apache.oozie.service.MemoryLocksService; 026 import org.apache.oozie.service.Services; 027 import org.apache.oozie.util.Instrumentation; 028 import org.apache.oozie.util.MemoryLocks; 029 import org.apache.oozie.util.XCallable; 030 import org.apache.oozie.util.XLog; 031 032 import java.util.ArrayList; 033 import java.util.HashMap; 034 import java.util.List; 035 import java.util.Map; 036 import java.util.Set; 037 import java.util.UUID; 038 import java.util.concurrent.atomic.AtomicBoolean; 039 040 /** 041 * Base class for synchronous and asynchronous commands. 042 * <p/> 043 * It enables by API the following pattern: 044 * <p/> 045 * <ul> 046 * <li>single execution: a command instance can be executed only once</li> 047 * <li>eager data loading: loads data for eager precondition check</li> 048 * <li>eager precondition check: verify precondition before obtaining lock</li> 049 * <li>data loading: loads data for precondition check and execution</li> 050 * <li>precondition check: verifies precondition for execution is still met</li> 051 * <li>locking: obtains exclusive lock on key before executing the command</li> 052 * <li>execution: command logic</li> 053 * </ul> 054 * <p/> 055 * It has built in instrumentation and logging. 056 */ 057 public abstract class XCommand<T> implements XCallable<T> { 058 public static final String DEFAULT_LOCK_TIMEOUT = "oozie.command.default.lock.timeout"; 059 060 public static final String INSTRUMENTATION_GROUP = "commands"; 061 062 public static final Long DEFAULT_REQUEUE_DELAY = 10L; 063 064 public XLog LOG = XLog.getLog(getClass()); 065 066 private String key; 067 private String name; 068 private int priority; 069 private String type; 070 private long createdTime; 071 private MemoryLocks.LockToken lock; 072 private AtomicBoolean used = new AtomicBoolean(false); 073 private boolean inInterrupt = false; 074 075 private Map<Long, List<XCommand<?>>> commandQueue; 076 protected boolean dryrun = false; 077 protected Instrumentation instrumentation; 078 079 protected XLog.Info logInfo; 080 081 /** 082 * Create a command. 083 * 084 * @param name command name. 085 * @param type command type. 086 * @param priority command priority. 087 */ 088 public XCommand(String name, String type, int priority) { 089 this.name = name; 090 this.type = type; 091 this.priority = priority; 092 this.key = name + "_" + UUID.randomUUID(); 093 createdTime = System.currentTimeMillis(); 094 logInfo = new XLog.Info(); 095 instrumentation = Services.get().get(InstrumentationService.class).get(); 096 } 097 098 /** 099 * @param name command name. 100 * @param type command type. 101 * @param priority command priority. 102 * @param dryrun indicates if dryrun option is enabled. if enabled bundle will show a diagnostic output without 103 * really running the job 104 */ 105 public XCommand(String name, String type, int priority, boolean dryrun) { 106 this(name, type, priority); 107 this.dryrun = dryrun; 108 } 109 110 /** 111 * Return the command name. 112 * 113 * @return the command name. 114 */ 115 @Override 116 public String getName() { 117 return name; 118 } 119 120 /** 121 * Return the callable type. 122 * <p/> 123 * The command type is used for concurrency throttling in the {@link CallableQueueService}. 124 * 125 * @return the command type. 126 */ 127 @Override 128 public String getType() { 129 return type; 130 } 131 132 /** 133 * Return the priority of the command. 134 * 135 * @return the command priority. 136 */ 137 @Override 138 public int getPriority() { 139 return priority; 140 } 141 142 /** 143 * Returns the creation time of the command. 144 * 145 * @return the command creation time, in milliseconds. 146 */ 147 @Override 148 public long getCreatedTime() { 149 return createdTime; 150 } 151 152 /** 153 * Queue a command for execution after the current command execution completes. 154 * <p/> 155 * All commands queued during the execution of the current command will be queued for a single serial execution. 156 * <p/> 157 * If the command execution throws an exception, no command will be effectively queued. 158 * 159 * @param command command to queue. 160 */ 161 protected void queue(XCommand<?> command) { 162 queue(command, 0); 163 } 164 165 /** 166 * Queue a command for delayed execution after the current command execution completes. 167 * <p/> 168 * All commands queued during the execution of the current command with the same delay will be queued for a single 169 * serial execution. 170 * <p/> 171 * If the command execution throws an exception, no command will be effectively queued. 172 * 173 * @param command command to queue. 174 * @param msDelay delay in milliseconds. 175 */ 176 protected void queue(XCommand<?> command, long msDelay) { 177 if (commandQueue == null) { 178 commandQueue = new HashMap<Long, List<XCommand<?>>>(); 179 } 180 List<XCommand<?>> list = commandQueue.get(msDelay); 181 if (list == null) { 182 list = new ArrayList<XCommand<?>>(); 183 commandQueue.put(msDelay, list); 184 } 185 list.add(command); 186 } 187 188 /** 189 * Obtain an exclusive lock on the {link #getEntityKey}. 190 * <p/> 191 * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock. 192 * 193 * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock 194 * @throws CommandException thrown i the lock could not be obtained. 195 */ 196 private void acquireLock() throws InterruptedException, CommandException { 197 if (getEntityKey() == null) { 198 // no lock for null entity key 199 return; 200 } 201 lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut()); 202 if (lock == null) { 203 Instrumentation instrumentation = Services.get().get(InstrumentationService.class).get(); 204 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".lockTimeOut", 1); 205 if (isReQueueRequired()) { 206 //if not acquire the lock, re-queue itself with default delay 207 queue(this, getRequeueDelay()); 208 LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and requeue itself [{2}]", this.toString(), getLockTimeOut(), getName()); 209 } else { 210 throw new CommandException(ErrorCode.E0606, this.toString(), getLockTimeOut()); 211 } 212 } else { 213 LOG.debug("Acquired lock for [{0}] in [{1}]", getEntityKey(), getName()); 214 } 215 } 216 217 /** 218 * Release the lock on the {link #getEntityKey}. 219 */ 220 private void releaseLock() { 221 if (lock != null) { 222 lock.release(); 223 LOG.debug("Released lock for [{0}] in [{1}]", getEntityKey(), getName()); 224 } 225 } 226 227 /** 228 * Implements the XCommand life-cycle. 229 * 230 * @return the {link #execute} return value. 231 * @throws Exception thrown if the command could not be executed. 232 */ 233 @Override 234 public final T call() throws CommandException { 235 if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) && used.get()) { 236 LOG.debug("Command [{0}] key [{1}] already used for [{2}]", getName(), getEntityKey(), this.toString()); 237 return null; 238 } 239 240 commandQueue = null; 241 Instrumentation instrumentation = Services.get().get(InstrumentationService.class).get(); 242 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions", 1); 243 Instrumentation.Cron callCron = new Instrumentation.Cron(); 244 try { 245 callCron.start(); 246 eagerLoadState(); 247 LOG = XLog.resetPrefix(LOG); 248 eagerVerifyPrecondition(); 249 try { 250 T ret = null; 251 if (isLockRequired() && !this.inInterruptMode()) { 252 Instrumentation.Cron acquireLockCron = new Instrumentation.Cron(); 253 acquireLockCron.start(); 254 acquireLock(); 255 acquireLockCron.stop(); 256 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".acquireLock", acquireLockCron); 257 } 258 // executing interrupts only in case of the lock required commands 259 if (lock != null) { 260 this.executeInterrupts(); 261 } 262 263 if (!isLockRequired() || (lock != null) || this.inInterruptMode()) { 264 if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) 265 && !used.compareAndSet(false, true)) { 266 LOG.debug("Command [{0}] key [{1}] already executed for [{2}]", getName(), getEntityKey(), this.toString()); 267 return null; 268 } 269 LOG.debug("Load state for [{0}]", getEntityKey()); 270 loadState(); 271 LOG = XLog.resetPrefix(LOG); 272 LOG.debug("Precondition check for command [{0}] key [{1}]", getName(), getEntityKey()); 273 verifyPrecondition(); 274 LOG.debug("Execute command [{0}] key [{1}]", getName(), getEntityKey()); 275 Instrumentation.Cron executeCron = new Instrumentation.Cron(); 276 executeCron.start(); 277 ret = execute(); 278 executeCron.stop(); 279 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".execute", executeCron); 280 } 281 if (commandQueue != null) { 282 CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class); 283 for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) { 284 LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey()); 285 if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) { 286 LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, queue full", entry.getValue() 287 .size(), entry.getKey()); 288 } 289 } 290 } 291 return ret; 292 } 293 finally { 294 if (isLockRequired()) { 295 releaseLock(); 296 } 297 } 298 } 299 catch(PreconditionException pex){ 300 LOG.warn(pex.getMessage().toString() + ", Error Code: " + pex.getErrorCode().toString()); 301 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".preconditionfailed", 1); 302 return null; 303 } 304 catch (XException ex) { 305 LOG.error("XException, ", ex); 306 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".xexceptions", 1); 307 if (ex instanceof CommandException) { 308 throw (CommandException) ex; 309 } 310 else { 311 throw new CommandException(ex); 312 } 313 } 314 catch (Exception ex) { 315 LOG.error("Exception, ", ex); 316 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".exceptions", 1); 317 throw new CommandException(ErrorCode.E0607, ex); 318 } 319 finally { 320 FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection"); 321 callCron.stop(); 322 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".call", callCron); 323 } 324 } 325 326 /** 327 * Check for the existence of interrupts for the same lock key 328 * Execute them if exist. 329 * 330 */ 331 protected void executeInterrupts() { 332 CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class); 333 // getting all the list of interrupts to be executed 334 Set<XCallable<?>> callables = callableQueueService.checkInterrupts(this.getEntityKey()); 335 336 if (callables != null) { 337 // executing the list of interrupts in the given order of insertion 338 // in the list 339 for (XCallable<?> callable : callables) { 340 LOG.trace("executing interrupt callable [{0}]", callable.getName()); 341 try { 342 // executing the callable in interrupt mode 343 callable.setInterruptMode(true); 344 callable.call(); 345 LOG.trace("executed interrupt callable [{0}]", callable.getName()); 346 } 347 catch (Exception ex) { 348 LOG.warn("exception interrupt callable [{0}], {1}", callable.getName(), ex.getMessage(), ex); 349 } 350 finally { 351 // reseting the interrupt mode to false after the command is 352 // executed 353 callable.setInterruptMode(false); 354 } 355 } 356 } 357 } 358 359 /** 360 * Return the time out when acquiring a lock. 361 * <p/> 362 * The value is loaded from the Oozie configuration, the property {link #DEFAULT_LOCK_TIMEOUT}. 363 * <p/> 364 * Subclasses should override this method if they want to use a different time out. 365 * 366 * @return the lock time out in milliseconds. 367 */ 368 protected long getLockTimeOut() { 369 return Services.get().getConf().getLong(DEFAULT_LOCK_TIMEOUT, 5 * 1000); 370 } 371 372 /** 373 * Indicate if the the command requires locking. 374 * <p/> 375 * Subclasses should override this method if they require locking. 376 * 377 * @return <code>true/false</code> 378 */ 379 protected abstract boolean isLockRequired(); 380 381 /** 382 * Return the entity key for the command. 383 * <p/> 384 * 385 * @return the entity key for the command. 386 */ 387 public abstract String getEntityKey(); 388 389 /** 390 * Indicate if the the command requires to requeue itself if the lock is not acquired. 391 * <p/> 392 * Subclasses should override this method if they don't want to requeue. 393 * <p/> 394 * Default is true. 395 * 396 * @return <code>true/false</code> 397 */ 398 protected boolean isReQueueRequired() { 399 return true; 400 } 401 402 /** 403 * Load the necessary state to perform an eager precondition check. 404 * <p/> 405 * This implementation does a NOP. 406 * <p/> 407 * Subclasses should override this method and load the state needed to do an eager precondition check. 408 * <p/> 409 * A trivial implementation is calling {link #loadState}. 410 */ 411 protected void eagerLoadState() throws CommandException{ 412 } 413 414 /** 415 * Verify the precondition for the command before obtaining a lock. 416 * <p/> 417 * This implementation does a NOP. 418 * <p/> 419 * A trivial implementation is calling {link #verifyPrecondition}. 420 * 421 * @throws CommandException thrown if the precondition is not met. 422 */ 423 protected void eagerVerifyPrecondition() throws CommandException,PreconditionException { 424 } 425 426 /** 427 * Load the necessary state to perform the precondition check and to execute the command. 428 * <p/> 429 * Subclasses must implement this method and load the state needed to do the precondition check and execute the 430 * command. 431 */ 432 protected abstract void loadState() throws CommandException; 433 434 /** 435 * Verify the precondition for the command after a lock has been obtain, just before executing the command. 436 * <p/> 437 * 438 * @throws CommandException thrown if the precondition is not met. 439 */ 440 protected abstract void verifyPrecondition() throws CommandException,PreconditionException; 441 442 /** 443 * Command execution body. 444 * <p/> 445 * This method will be invoked after the {link #loadState} and {link #verifyPrecondition} methods. 446 * <p/> 447 * If the command requires locking, this method will be invoked ONLY if the lock has been acquired. 448 * 449 * @return a return value from the execution of the command, only meaningful if the command is executed 450 * synchronously. 451 * @throws CommandException thrown if the command execution failed. 452 */ 453 protected abstract T execute() throws CommandException; 454 455 456 /** 457 * Return the {@link Instrumentation} instance in use. 458 * 459 * @return the {@link Instrumentation} instance in use. 460 */ 461 protected Instrumentation getInstrumentation() { 462 return instrumentation; 463 } 464 465 /** 466 * @param used set false to the used 467 */ 468 public void resetUsed() { 469 this.used.set(false); 470 } 471 472 473 /** 474 * Return the delay time for requeue 475 * 476 * @return delay time when requeue itself 477 */ 478 protected Long getRequeueDelay() { 479 return DEFAULT_REQUEUE_DELAY; 480 } 481 482 /** 483 * Get command key 484 * 485 * @return command key 486 */ 487 @Override 488 public String getKey(){ 489 return this.key; 490 } 491 492 /** 493 * set the mode of execution for the callable. True if in interrupt, false 494 * if not 495 */ 496 public void setInterruptMode(boolean mode) { 497 this.inInterrupt = mode; 498 } 499 500 /** 501 * @return the mode of execution. true if it is executed as an Interrupt, 502 * false otherwise 503 */ 504 public boolean inInterruptMode() { 505 return this.inInterrupt; 506 } 507 508 /** 509 * Get XLog log 510 * 511 * @return XLog 512 */ 513 public XLog getLog() { 514 return LOG; 515 } 516 517 }