This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command;
019
020 import org.apache.oozie.ErrorCode;
021 import org.apache.oozie.FaultInjection;
022 import org.apache.oozie.XException;
023 import org.apache.oozie.service.CallableQueueService;
024 import org.apache.oozie.service.EventHandlerService;
025 import org.apache.oozie.service.InstrumentationService;
026 import org.apache.oozie.service.MemoryLocksService;
027 import org.apache.oozie.service.Services;
028 import org.apache.oozie.util.Instrumentation;
029 import org.apache.oozie.util.MemoryLocks;
030 import org.apache.oozie.util.XCallable;
031 import org.apache.oozie.util.XLog;
032
033 import java.util.ArrayList;
034 import java.util.HashMap;
035 import java.util.List;
036 import java.util.Map;
037 import java.util.Set;
038 import java.util.UUID;
039 import java.util.concurrent.atomic.AtomicBoolean;
040
041 /**
042 * Base class for synchronous and asynchronous commands.
043 * <p/>
044 * It enables by API the following pattern:
045 * <p/>
046 * <ul>
047 * <li>single execution: a command instance can be executed only once</li>
048 * <li>eager data loading: loads data for eager precondition check</li>
049 * <li>eager precondition check: verify precondition before obtaining lock</li>
050 * <li>data loading: loads data for precondition check and execution</li>
051 * <li>precondition check: verifies precondition for execution is still met</li>
052 * <li>locking: obtains exclusive lock on key before executing the command</li>
053 * <li>execution: command logic</li>
054 * </ul>
055 * <p/>
056 * It has built in instrumentation and logging.
057 */
058 public abstract class XCommand<T> implements XCallable<T> {
059 public static final String DEFAULT_LOCK_TIMEOUT = "oozie.command.default.lock.timeout";
060
061 public static final String INSTRUMENTATION_GROUP = "commands";
062
063 public static final Long DEFAULT_REQUEUE_DELAY = 10L;
064
065 public XLog LOG = XLog.getLog(getClass());
066
067 private String key;
068 private String name;
069 private int priority;
070 private String type;
071 private long createdTime;
072 private MemoryLocks.LockToken lock;
073 private AtomicBoolean used = new AtomicBoolean(false);
074 private boolean inInterrupt = false;
075
076 private Map<Long, List<XCommand<?>>> commandQueue;
077 protected boolean dryrun = false;
078 protected Instrumentation instrumentation;
079
080 protected XLog.Info logInfo;
081 protected static EventHandlerService eventService;
082
083 /**
084 * Create a command.
085 *
086 * @param name command name.
087 * @param type command type.
088 * @param priority command priority.
089 */
090 public XCommand(String name, String type, int priority) {
091 this.name = name;
092 this.type = type;
093 this.priority = priority;
094 this.key = name + "_" + UUID.randomUUID();
095 createdTime = System.currentTimeMillis();
096 logInfo = new XLog.Info();
097 instrumentation = Services.get().get(InstrumentationService.class).get();
098 eventService = Services.get().get(EventHandlerService.class);
099 }
100
101 /**
102 * @param name command name.
103 * @param type command type.
104 * @param priority command priority.
105 * @param dryrun indicates if dryrun option is enabled. if enabled bundle will show a diagnostic output without
106 * really running the job
107 */
108 public XCommand(String name, String type, int priority, boolean dryrun) {
109 this(name, type, priority);
110 this.dryrun = dryrun;
111 }
112
113 /**
114 * Return the command name.
115 *
116 * @return the command name.
117 */
118 @Override
119 public String getName() {
120 return name;
121 }
122
123 /**
124 * Return the callable type.
125 * <p/>
126 * The command type is used for concurrency throttling in the {@link CallableQueueService}.
127 *
128 * @return the command type.
129 */
130 @Override
131 public String getType() {
132 return type;
133 }
134
135 /**
136 * Return the priority of the command.
137 *
138 * @return the command priority.
139 */
140 @Override
141 public int getPriority() {
142 return priority;
143 }
144
145 /**
146 * Returns the creation time of the command.
147 *
148 * @return the command creation time, in milliseconds.
149 */
150 @Override
151 public long getCreatedTime() {
152 return createdTime;
153 }
154
155 /**
156 * Queue a command for execution after the current command execution completes.
157 * <p/>
158 * All commands queued during the execution of the current command will be queued for a single serial execution.
159 * <p/>
160 * If the command execution throws an exception, no command will be effectively queued.
161 *
162 * @param command command to queue.
163 */
164 protected void queue(XCommand<?> command) {
165 queue(command, 0);
166 }
167
168 /**
169 * Queue a command for delayed execution after the current command execution completes.
170 * <p/>
171 * All commands queued during the execution of the current command with the same delay will be queued for a single
172 * serial execution.
173 * <p/>
174 * If the command execution throws an exception, no command will be effectively queued.
175 *
176 * @param command command to queue.
177 * @param msDelay delay in milliseconds.
178 */
179 protected void queue(XCommand<?> command, long msDelay) {
180 if (commandQueue == null) {
181 commandQueue = new HashMap<Long, List<XCommand<?>>>();
182 }
183 List<XCommand<?>> list = commandQueue.get(msDelay);
184 if (list == null) {
185 list = new ArrayList<XCommand<?>>();
186 commandQueue.put(msDelay, list);
187 }
188 list.add(command);
189 }
190
191 /**
192 * Obtain an exclusive lock on the {link #getEntityKey}.
193 * <p/>
194 * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock.
195 *
196 * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock
197 * @throws CommandException thrown i the lock could not be obtained.
198 */
199 private void acquireLock() throws InterruptedException, CommandException {
200 if (getEntityKey() == null) {
201 // no lock for null entity key
202 return;
203 }
204 lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut());
205 if (lock == null) {
206 Instrumentation instrumentation = Services.get().get(InstrumentationService.class).get();
207 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".lockTimeOut", 1);
208 if (isReQueueRequired()) {
209 //if not acquire the lock, re-queue itself with default delay
210 queue(this, getRequeueDelay());
211 LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and requeue itself [{2}]", this.toString(), getLockTimeOut(), getName());
212 } else {
213 throw new CommandException(ErrorCode.E0606, this.toString(), getLockTimeOut());
214 }
215 } else {
216 LOG.debug("Acquired lock for [{0}] in [{1}]", getEntityKey(), getName());
217 }
218 }
219
220 /**
221 * Release the lock on the {link #getEntityKey}.
222 */
223 private void releaseLock() {
224 if (lock != null) {
225 lock.release();
226 LOG.debug("Released lock for [{0}] in [{1}]", getEntityKey(), getName());
227 }
228 }
229
230 /**
231 * Implements the XCommand life-cycle.
232 *
233 * @return the {link #execute} return value.
234 * @throws Exception thrown if the command could not be executed.
235 */
236 @Override
237 public final T call() throws CommandException {
238 if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) && used.get()) {
239 LOG.debug("Command [{0}] key [{1}] already used for [{2}]", getName(), getEntityKey(), this.toString());
240 return null;
241 }
242
243 commandQueue = null;
244 Instrumentation instrumentation = Services.get().get(InstrumentationService.class).get();
245 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions", 1);
246 Instrumentation.Cron callCron = new Instrumentation.Cron();
247 try {
248 callCron.start();
249 eagerLoadState();
250 LOG = XLog.resetPrefix(LOG);
251 eagerVerifyPrecondition();
252 try {
253 T ret = null;
254 if (isLockRequired() && !this.inInterruptMode()) {
255 Instrumentation.Cron acquireLockCron = new Instrumentation.Cron();
256 acquireLockCron.start();
257 acquireLock();
258 acquireLockCron.stop();
259 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".acquireLock", acquireLockCron);
260 }
261 // executing interrupts only in case of the lock required commands
262 if (lock != null) {
263 this.executeInterrupts();
264 }
265
266 if (!isLockRequired() || (lock != null) || this.inInterruptMode()) {
267 if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType())
268 && !used.compareAndSet(false, true)) {
269 LOG.debug("Command [{0}] key [{1}] already executed for [{2}]", getName(), getEntityKey(), this.toString());
270 return null;
271 }
272 LOG.trace("Load state for [{0}]", getEntityKey());
273 loadState();
274 LOG = XLog.resetPrefix(LOG);
275 LOG.trace("Precondition check for command [{0}] key [{1}]", getName(), getEntityKey());
276 verifyPrecondition();
277 LOG.debug("Execute command [{0}] key [{1}]", getName(), getEntityKey());
278 Instrumentation.Cron executeCron = new Instrumentation.Cron();
279 executeCron.start();
280 ret = execute();
281 executeCron.stop();
282 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".execute", executeCron);
283 }
284 if (commandQueue != null) {
285 CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
286 for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) {
287 LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey());
288 if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) {
289 LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, queue full", entry.getValue()
290 .size(), entry.getKey());
291 }
292 }
293 }
294 return ret;
295 }
296 finally {
297 if (isLockRequired() && !this.inInterruptMode()) {
298 releaseLock();
299 }
300 }
301 }
302 catch(PreconditionException pex){
303 LOG.warn(pex.getMessage().toString() + ", Error Code: " + pex.getErrorCode().toString());
304 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".preconditionfailed", 1);
305 return null;
306 }
307 catch (XException ex) {
308 LOG.error("XException, ", ex);
309 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".xexceptions", 1);
310 if (ex instanceof CommandException) {
311 throw (CommandException) ex;
312 }
313 else {
314 throw new CommandException(ex);
315 }
316 }
317 catch (Exception ex) {
318 LOG.error("Exception, ", ex);
319 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".exceptions", 1);
320 throw new CommandException(ErrorCode.E0607, getName(), ex.getMessage(), ex);
321 }
322 catch (Error er) {
323 LOG.error("Error, ", er);
324 throw er;
325 }
326 finally {
327 FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
328 callCron.stop();
329 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".call", callCron);
330 }
331 }
332
333 /**
334 * Check for the existence of interrupts for the same lock key
335 * Execute them if exist.
336 *
337 */
338 protected void executeInterrupts() {
339 CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
340 // getting all the list of interrupts to be executed
341 Set<XCallable<?>> callables = callableQueueService.checkInterrupts(this.getEntityKey());
342
343 if (callables != null) {
344 // executing the list of interrupts in the given order of insertion
345 // in the list
346 for (XCallable<?> callable : callables) {
347 LOG.trace("executing interrupt callable [{0}]", callable.getName());
348 try {
349 // executing the callable in interrupt mode
350 callable.setInterruptMode(true);
351 callable.call();
352 LOG.trace("executed interrupt callable [{0}]", callable.getName());
353 }
354 catch (Exception ex) {
355 LOG.warn("exception interrupt callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
356 }
357 finally {
358 // reseting the interrupt mode to false after the command is
359 // executed
360 callable.setInterruptMode(false);
361 }
362 }
363 }
364 }
365
366 /**
367 * Return the time out when acquiring a lock.
368 * <p/>
369 * The value is loaded from the Oozie configuration, the property {link #DEFAULT_LOCK_TIMEOUT}.
370 * <p/>
371 * Subclasses should override this method if they want to use a different time out.
372 *
373 * @return the lock time out in milliseconds.
374 */
375 protected long getLockTimeOut() {
376 return Services.get().getConf().getLong(DEFAULT_LOCK_TIMEOUT, 5 * 1000);
377 }
378
379 /**
380 * Indicate if the the command requires locking.
381 * <p/>
382 * Subclasses should override this method if they require locking.
383 *
384 * @return <code>true/false</code>
385 */
386 protected abstract boolean isLockRequired();
387
388 /**
389 * Return the entity key for the command.
390 * <p/>
391 *
392 * @return the entity key for the command.
393 */
394 public abstract String getEntityKey();
395
396 /**
397 * Indicate if the the command requires to requeue itself if the lock is not acquired.
398 * <p/>
399 * Subclasses should override this method if they don't want to requeue.
400 * <p/>
401 * Default is true.
402 *
403 * @return <code>true/false</code>
404 */
405 protected boolean isReQueueRequired() {
406 return true;
407 }
408
409 /**
410 * Load the necessary state to perform an eager precondition check.
411 * <p/>
412 * This implementation does a NOP.
413 * <p/>
414 * Subclasses should override this method and load the state needed to do an eager precondition check.
415 * <p/>
416 * A trivial implementation is calling {link #loadState}.
417 */
418 protected void eagerLoadState() throws CommandException{
419 }
420
421 /**
422 * Verify the precondition for the command before obtaining a lock.
423 * <p/>
424 * This implementation does a NOP.
425 * <p/>
426 * A trivial implementation is calling {link #verifyPrecondition}.
427 *
428 * @throws CommandException thrown if the precondition is not met.
429 */
430 protected void eagerVerifyPrecondition() throws CommandException,PreconditionException {
431 }
432
433 /**
434 * Load the necessary state to perform the precondition check and to execute the command.
435 * <p/>
436 * Subclasses must implement this method and load the state needed to do the precondition check and execute the
437 * command.
438 */
439 protected abstract void loadState() throws CommandException;
440
441 /**
442 * Verify the precondition for the command after a lock has been obtain, just before executing the command.
443 * <p/>
444 *
445 * @throws CommandException thrown if the precondition is not met.
446 */
447 protected abstract void verifyPrecondition() throws CommandException,PreconditionException;
448
449 /**
450 * Command execution body.
451 * <p/>
452 * This method will be invoked after the {link #loadState} and {link #verifyPrecondition} methods.
453 * <p/>
454 * If the command requires locking, this method will be invoked ONLY if the lock has been acquired.
455 *
456 * @return a return value from the execution of the command, only meaningful if the command is executed
457 * synchronously.
458 * @throws CommandException thrown if the command execution failed.
459 */
460 protected abstract T execute() throws CommandException;
461
462
463 /**
464 * Return the {@link Instrumentation} instance in use.
465 *
466 * @return the {@link Instrumentation} instance in use.
467 */
468 protected Instrumentation getInstrumentation() {
469 return instrumentation;
470 }
471
472 /**
473 * @param used set false to the used
474 */
475 public void resetUsed() {
476 this.used.set(false);
477 }
478
479
480 /**
481 * Return the delay time for requeue
482 *
483 * @return delay time when requeue itself
484 */
485 protected Long getRequeueDelay() {
486 return DEFAULT_REQUEUE_DELAY;
487 }
488
489 /**
490 * Get command key
491 *
492 * @return command key
493 */
494 @Override
495 public String getKey(){
496 return this.key;
497 }
498
499 /**
500 * set the mode of execution for the callable. True if in interrupt, false
501 * if not
502 */
503 public void setInterruptMode(boolean mode) {
504 this.inInterrupt = mode;
505 }
506
507 /**
508 * @return the mode of execution. true if it is executed as an Interrupt,
509 * false otherwise
510 */
511 public boolean inInterruptMode() {
512 return this.inInterrupt;
513 }
514
515 /**
516 * Get XLog log
517 *
518 * @return XLog
519 */
520 public XLog getLog() {
521 return LOG;
522 }
523
524 }