001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command; 019 020 import org.apache.oozie.ErrorCode; 021 import org.apache.oozie.FaultInjection; 022 import org.apache.oozie.XException; 023 import org.apache.oozie.service.CallableQueueService; 024 import org.apache.oozie.service.InstrumentationService; 025 import org.apache.oozie.service.MemoryLocksService; 026 import org.apache.oozie.service.Services; 027 import org.apache.oozie.util.Instrumentation; 028 import org.apache.oozie.util.MemoryLocks; 029 import org.apache.oozie.util.XCallable; 030 import org.apache.oozie.util.XLog; 031 032 import java.util.ArrayList; 033 import java.util.HashMap; 034 import java.util.List; 035 import java.util.Map; 036 import java.util.UUID; 037 038 /** 039 * Base class for synchronous and asynchronous commands. 040 * <p/> 041 * It enables by API the following pattern: 042 * <p/> 043 * <ul> 044 * <li>single execution: a command instance can be executed only once</li> 045 * <li>eager data loading: loads data for eager precondition check</li> 046 * <li>eager precondition check: verify precondition before obtaining lock</li> 047 * <li>data loading: loads data for precondition check and execution</li> 048 * <li>precondition check: verifies precondition for execution is still met</li> 049 * <li>locking: obtains exclusive lock on key before executing the command</li> 050 * <li>execution: command logic</li> 051 * </ul> 052 * <p/> 053 * It has built in instrumentation and logging. 054 */ 055 public abstract class XCommand<T> implements XCallable<T> { 056 public static final String DEFAULT_LOCK_TIMEOUT = "oozie.command.default.lock.timeout"; 057 058 public static final String INSTRUMENTATION_GROUP = "commands"; 059 060 public static final Long DEFAULT_REQUEUE_DELAY = 10L; 061 062 public XLog LOG = XLog.getLog(getClass()); 063 064 private String key; 065 private String name; 066 private int priority; 067 private String type; 068 private long createdTime; 069 private MemoryLocks.LockToken lock; 070 private boolean used = false; 071 072 private Map<Long, List<XCommand<?>>> commandQueue; 073 protected boolean dryrun = false; 074 protected Instrumentation instrumentation; 075 076 protected XLog.Info logInfo; 077 078 /** 079 * Create a command. 080 * 081 * @param name command name. 082 * @param type command type. 083 * @param priority command priority. 084 */ 085 public XCommand(String name, String type, int priority) { 086 this.name = name; 087 this.type = type; 088 this.priority = priority; 089 this.key = name + "_" + UUID.randomUUID(); 090 createdTime = System.currentTimeMillis(); 091 logInfo = new XLog.Info(); 092 instrumentation = Services.get().get(InstrumentationService.class).get(); 093 } 094 095 /** 096 * @param name command name. 097 * @param type command type. 098 * @param priority command priority. 099 * @param dryrun indicates if dryrun option is enabled. if enabled bundle will show a diagnostic output without 100 * really running the job 101 */ 102 public XCommand(String name, String type, int priority, boolean dryrun) { 103 this(name, type, priority); 104 this.dryrun = dryrun; 105 } 106 107 /** 108 * Return the command name. 109 * 110 * @return the command name. 111 */ 112 @Override 113 public String getName() { 114 return name; 115 } 116 117 /** 118 * Return the callable type. 119 * <p/> 120 * The command type is used for concurrency throttling in the {@link CallableQueueService}. 121 * 122 * @return the command type. 123 */ 124 @Override 125 public String getType() { 126 return type; 127 } 128 129 /** 130 * Return the priority of the command. 131 * 132 * @return the command priority. 133 */ 134 @Override 135 public int getPriority() { 136 return priority; 137 } 138 139 /** 140 * Returns the creation time of the command. 141 * 142 * @return the command creation time, in milliseconds. 143 */ 144 @Override 145 public long getCreatedTime() { 146 return createdTime; 147 } 148 149 /** 150 * Queue a command for execution after the current command execution completes. 151 * <p/> 152 * All commands queued during the execution of the current command will be queued for a single serial execution. 153 * <p/> 154 * If the command execution throws an exception, no command will be effectively queued. 155 * 156 * @param command command to queue. 157 */ 158 protected void queue(XCommand<?> command) { 159 queue(command, 0); 160 } 161 162 /** 163 * Queue a command for delayed execution after the current command execution completes. 164 * <p/> 165 * All commands queued during the execution of the current command with the same delay will be queued for a single 166 * serial execution. 167 * <p/> 168 * If the command execution throws an exception, no command will be effectively queued. 169 * 170 * @param command command to queue. 171 * @param msDelay delay in milliseconds. 172 */ 173 protected void queue(XCommand<?> command, long msDelay) { 174 if (commandQueue == null) { 175 commandQueue = new HashMap<Long, List<XCommand<?>>>(); 176 } 177 List<XCommand<?>> list = commandQueue.get(msDelay); 178 if (list == null) { 179 list = new ArrayList<XCommand<?>>(); 180 commandQueue.put(msDelay, list); 181 } 182 list.add(command); 183 } 184 185 /** 186 * Obtain an exclusive lock on the {link #getEntityKey}. 187 * <p/> 188 * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock. 189 * 190 * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock 191 * @throws CommandException thrown i the lock could not be obtained. 192 */ 193 private void acquireLock() throws InterruptedException, CommandException { 194 lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut()); 195 if (lock == null) { 196 Instrumentation instrumentation = Services.get().get(InstrumentationService.class).get(); 197 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".lockTimeOut", 1); 198 if (isReQueueRequired()) { 199 //if not acquire the lock, re-queue itself with default delay 200 resetUsed(); 201 queue(this, getRequeueDelay()); 202 LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and requeue itself [{2}]", this.toString(), getLockTimeOut(), getName()); 203 } else { 204 throw new CommandException(ErrorCode.E0606, this.toString(), getLockTimeOut()); 205 } 206 } else { 207 LOG.debug("Acquired lock for [{0}] in [{1}]", getEntityKey(), getName()); 208 } 209 } 210 211 /** 212 * Release the lock on the {link #getEntityKey}. 213 */ 214 private void releaseLock() { 215 if (lock != null) { 216 lock.release(); 217 LOG.debug("Released lock for [{0}] in [{1}]", getEntityKey(), getName()); 218 } 219 } 220 221 /** 222 * Implements the XCommand life-cycle. 223 * 224 * @return the {link #execute} return value. 225 * @throws Exception thrown if the command could not be executed. 226 */ 227 @Override 228 public final T call() throws CommandException { 229 if (used) { 230 throw new IllegalStateException(this.getClass().getSimpleName() + " already used."); 231 } 232 used = true; 233 commandQueue = null; 234 Instrumentation instrumentation = Services.get().get(InstrumentationService.class).get(); 235 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions", 1); 236 Instrumentation.Cron callCron = new Instrumentation.Cron(); 237 try { 238 callCron.start(); 239 eagerLoadState(); 240 LOG = XLog.resetPrefix(LOG); 241 eagerVerifyPrecondition(); 242 try { 243 T ret = null; 244 if (isLockRequired()) { 245 Instrumentation.Cron acquireLockCron = new Instrumentation.Cron(); 246 acquireLockCron.start(); 247 acquireLock(); 248 acquireLockCron.stop(); 249 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".acquireLock", acquireLockCron); 250 } 251 if (!isLockRequired() || (isLockRequired() && lock != null)) { 252 LOG.debug("Load state for [{0}]", getEntityKey()); 253 loadState(); 254 LOG = XLog.resetPrefix(LOG); 255 LOG.debug("Precondition check for command [{0}] key [{1}]", getName(), getEntityKey()); 256 verifyPrecondition(); 257 LOG.debug("Execute command [{0}] key [{1}]", getName(), getEntityKey()); 258 Instrumentation.Cron executeCron = new Instrumentation.Cron(); 259 executeCron.start(); 260 ret = execute(); 261 executeCron.stop(); 262 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".execute", executeCron); 263 } 264 if (commandQueue != null) { 265 CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class); 266 for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) { 267 LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey()); 268 if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) { 269 LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, queue full", entry.getValue() 270 .size(), entry.getKey()); 271 } 272 } 273 } 274 return ret; 275 } 276 finally { 277 if (isLockRequired()) { 278 releaseLock(); 279 } 280 } 281 } 282 catch(PreconditionException pex){ 283 LOG.warn(pex.getMessage().toString() + ", Error Code: " + pex.getErrorCode().toString()); 284 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".preconditionfailed", 1); 285 return null; 286 } 287 catch (XException ex) { 288 LOG.error("XException, ", ex); 289 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".xexceptions", 1); 290 if (ex instanceof CommandException) { 291 throw (CommandException) ex; 292 } 293 else { 294 throw new CommandException(ex); 295 } 296 } 297 catch (Exception ex) { 298 LOG.error("Exception, ", ex); 299 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".exceptions", 1); 300 throw new CommandException(ErrorCode.E0607, ex); 301 } 302 finally { 303 FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection"); 304 callCron.stop(); 305 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".call", callCron); 306 } 307 } 308 309 /** 310 * Return the time out when acquiring a lock. 311 * <p/> 312 * The value is loaded from the Oozie configuration, the property {link #DEFAULT_LOCK_TIMEOUT}. 313 * <p/> 314 * Subclasses should override this method if they want to use a different time out. 315 * 316 * @return the lock time out in milliseconds. 317 */ 318 protected long getLockTimeOut() { 319 return Services.get().getConf().getLong(DEFAULT_LOCK_TIMEOUT, 5 * 1000); 320 } 321 322 /** 323 * Indicate if the the command requires locking. 324 * <p/> 325 * Subclasses should override this method if they require locking. 326 * 327 * @return <code>true/false</code> 328 */ 329 protected abstract boolean isLockRequired(); 330 331 /** 332 * Return the entity key for the command. 333 * <p/> 334 * 335 * @return the entity key for the command. 336 */ 337 protected abstract String getEntityKey(); 338 339 /** 340 * Indicate if the the command requires to requeue itself if the lock is not acquired. 341 * <p/> 342 * Subclasses should override this method if they don't want to requeue. 343 * <p/> 344 * Default is true. 345 * 346 * @return <code>true/false</code> 347 */ 348 protected boolean isReQueueRequired() { 349 return true; 350 } 351 352 /** 353 * Load the necessary state to perform an eager precondition check. 354 * <p/> 355 * This implementation does a NOP. 356 * <p/> 357 * Subclasses should override this method and load the state needed to do an eager precondition check. 358 * <p/> 359 * A trivial implementation is calling {link #loadState}. 360 */ 361 protected void eagerLoadState() throws CommandException{ 362 } 363 364 /** 365 * Verify the precondition for the command before obtaining a lock. 366 * <p/> 367 * This implementation does a NOP. 368 * <p/> 369 * A trivial implementation is calling {link #verifyPrecondition}. 370 * 371 * @throws CommandException thrown if the precondition is not met. 372 */ 373 protected void eagerVerifyPrecondition() throws CommandException,PreconditionException { 374 } 375 376 /** 377 * Load the necessary state to perform the precondition check and to execute the command. 378 * <p/> 379 * Subclasses must implement this method and load the state needed to do the precondition check and execute the 380 * command. 381 */ 382 protected abstract void loadState() throws CommandException; 383 384 /** 385 * Verify the precondition for the command after a lock has been obtain, just before executing the command. 386 * <p/> 387 * 388 * @throws CommandException thrown if the precondition is not met. 389 */ 390 protected abstract void verifyPrecondition() throws CommandException,PreconditionException; 391 392 /** 393 * Command execution body. 394 * <p/> 395 * This method will be invoked after the {link #loadState} and {link #verifyPrecondition} methods. 396 * <p/> 397 * If the command requires locking, this method will be invoked ONLY if the lock has been acquired. 398 * 399 * @return a return value from the execution of the command, only meaningful if the command is executed 400 * synchronously. 401 * @throws CommandException thrown if the command execution failed. 402 */ 403 protected abstract T execute() throws CommandException; 404 405 406 /** 407 * Return the {@link Instrumentation} instance in use. 408 * 409 * @return the {@link Instrumentation} instance in use. 410 */ 411 protected Instrumentation getInstrumentation() { 412 return instrumentation; 413 } 414 415 /** 416 * @param used set false to the used 417 */ 418 public void resetUsed() { 419 this.used = false; 420 } 421 422 423 /** 424 * Return the delay time for requeue 425 * 426 * @return delay time when requeue itself 427 */ 428 protected Long getRequeueDelay() { 429 return DEFAULT_REQUEUE_DELAY; 430 } 431 432 /** 433 * Get command key 434 * 435 * @return command key 436 */ 437 @Override 438 public String getKey(){ 439 return this.key; 440 } 441 442 /** 443 * Get XLog log 444 * 445 * @return XLog 446 */ 447 public XLog getLog() { 448 return LOG; 449 } 450 451 }