This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command;
019
020 import org.apache.oozie.ErrorCode;
021 import org.apache.oozie.FaultInjection;
022 import org.apache.oozie.XException;
023 import org.apache.oozie.service.CallableQueueService;
024 import org.apache.oozie.service.InstrumentationService;
025 import org.apache.oozie.service.MemoryLocksService;
026 import org.apache.oozie.service.Services;
027 import org.apache.oozie.util.Instrumentation;
028 import org.apache.oozie.util.MemoryLocks;
029 import org.apache.oozie.util.XCallable;
030 import org.apache.oozie.util.XLog;
031
032 import java.util.ArrayList;
033 import java.util.HashMap;
034 import java.util.List;
035 import java.util.Map;
036 import java.util.Set;
037 import java.util.UUID;
038 import java.util.concurrent.atomic.AtomicBoolean;
039
040 /**
041 * Base class for synchronous and asynchronous commands.
042 * <p/>
043 * It enables by API the following pattern:
044 * <p/>
045 * <ul>
046 * <li>single execution: a command instance can be executed only once</li>
047 * <li>eager data loading: loads data for eager precondition check</li>
048 * <li>eager precondition check: verify precondition before obtaining lock</li>
049 * <li>data loading: loads data for precondition check and execution</li>
050 * <li>precondition check: verifies precondition for execution is still met</li>
051 * <li>locking: obtains exclusive lock on key before executing the command</li>
052 * <li>execution: command logic</li>
053 * </ul>
054 * <p/>
055 * It has built in instrumentation and logging.
056 */
057 public abstract class XCommand<T> implements XCallable<T> {
058 public static final String DEFAULT_LOCK_TIMEOUT = "oozie.command.default.lock.timeout";
059
060 public static final String INSTRUMENTATION_GROUP = "commands";
061
062 public static final Long DEFAULT_REQUEUE_DELAY = 10L;
063
064 public XLog LOG = XLog.getLog(getClass());
065
066 private String key;
067 private String name;
068 private int priority;
069 private String type;
070 private long createdTime;
071 private MemoryLocks.LockToken lock;
072 private AtomicBoolean used = new AtomicBoolean(false);
073 private boolean inInterrupt = false;
074
075 private Map<Long, List<XCommand<?>>> commandQueue;
076 protected boolean dryrun = false;
077 protected Instrumentation instrumentation;
078
079 protected XLog.Info logInfo;
080
081 /**
082 * Create a command.
083 *
084 * @param name command name.
085 * @param type command type.
086 * @param priority command priority.
087 */
088 public XCommand(String name, String type, int priority) {
089 this.name = name;
090 this.type = type;
091 this.priority = priority;
092 this.key = name + "_" + UUID.randomUUID();
093 createdTime = System.currentTimeMillis();
094 logInfo = new XLog.Info();
095 instrumentation = Services.get().get(InstrumentationService.class).get();
096 }
097
098 /**
099 * @param name command name.
100 * @param type command type.
101 * @param priority command priority.
102 * @param dryrun indicates if dryrun option is enabled. if enabled bundle will show a diagnostic output without
103 * really running the job
104 */
105 public XCommand(String name, String type, int priority, boolean dryrun) {
106 this(name, type, priority);
107 this.dryrun = dryrun;
108 }
109
110 /**
111 * Return the command name.
112 *
113 * @return the command name.
114 */
115 @Override
116 public String getName() {
117 return name;
118 }
119
120 /**
121 * Return the callable type.
122 * <p/>
123 * The command type is used for concurrency throttling in the {@link CallableQueueService}.
124 *
125 * @return the command type.
126 */
127 @Override
128 public String getType() {
129 return type;
130 }
131
132 /**
133 * Return the priority of the command.
134 *
135 * @return the command priority.
136 */
137 @Override
138 public int getPriority() {
139 return priority;
140 }
141
142 /**
143 * Returns the creation time of the command.
144 *
145 * @return the command creation time, in milliseconds.
146 */
147 @Override
148 public long getCreatedTime() {
149 return createdTime;
150 }
151
152 /**
153 * Queue a command for execution after the current command execution completes.
154 * <p/>
155 * All commands queued during the execution of the current command will be queued for a single serial execution.
156 * <p/>
157 * If the command execution throws an exception, no command will be effectively queued.
158 *
159 * @param command command to queue.
160 */
161 protected void queue(XCommand<?> command) {
162 queue(command, 0);
163 }
164
165 /**
166 * Queue a command for delayed execution after the current command execution completes.
167 * <p/>
168 * All commands queued during the execution of the current command with the same delay will be queued for a single
169 * serial execution.
170 * <p/>
171 * If the command execution throws an exception, no command will be effectively queued.
172 *
173 * @param command command to queue.
174 * @param msDelay delay in milliseconds.
175 */
176 protected void queue(XCommand<?> command, long msDelay) {
177 if (commandQueue == null) {
178 commandQueue = new HashMap<Long, List<XCommand<?>>>();
179 }
180 List<XCommand<?>> list = commandQueue.get(msDelay);
181 if (list == null) {
182 list = new ArrayList<XCommand<?>>();
183 commandQueue.put(msDelay, list);
184 }
185 list.add(command);
186 }
187
188 /**
189 * Obtain an exclusive lock on the {link #getEntityKey}.
190 * <p/>
191 * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock.
192 *
193 * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock
194 * @throws CommandException thrown i the lock could not be obtained.
195 */
196 private void acquireLock() throws InterruptedException, CommandException {
197 if (getEntityKey() == null) {
198 // no lock for null entity key
199 return;
200 }
201 lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut());
202 if (lock == null) {
203 Instrumentation instrumentation = Services.get().get(InstrumentationService.class).get();
204 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".lockTimeOut", 1);
205 if (isReQueueRequired()) {
206 //if not acquire the lock, re-queue itself with default delay
207 queue(this, getRequeueDelay());
208 LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and requeue itself [{2}]", this.toString(), getLockTimeOut(), getName());
209 } else {
210 throw new CommandException(ErrorCode.E0606, this.toString(), getLockTimeOut());
211 }
212 } else {
213 LOG.debug("Acquired lock for [{0}] in [{1}]", getEntityKey(), getName());
214 }
215 }
216
217 /**
218 * Release the lock on the {link #getEntityKey}.
219 */
220 private void releaseLock() {
221 if (lock != null) {
222 lock.release();
223 LOG.debug("Released lock for [{0}] in [{1}]", getEntityKey(), getName());
224 }
225 }
226
227 /**
228 * Implements the XCommand life-cycle.
229 *
230 * @return the {link #execute} return value.
231 * @throws Exception thrown if the command could not be executed.
232 */
233 @Override
234 public final T call() throws CommandException {
235 if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) && used.get()) {
236 LOG.debug("Command [{0}] key [{1}] already used for [{2}]", getName(), getEntityKey(), this.toString());
237 return null;
238 }
239
240 commandQueue = null;
241 Instrumentation instrumentation = Services.get().get(InstrumentationService.class).get();
242 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions", 1);
243 Instrumentation.Cron callCron = new Instrumentation.Cron();
244 try {
245 callCron.start();
246 eagerLoadState();
247 LOG = XLog.resetPrefix(LOG);
248 eagerVerifyPrecondition();
249 try {
250 T ret = null;
251 if (isLockRequired() && !this.inInterruptMode()) {
252 Instrumentation.Cron acquireLockCron = new Instrumentation.Cron();
253 acquireLockCron.start();
254 acquireLock();
255 acquireLockCron.stop();
256 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".acquireLock", acquireLockCron);
257 }
258 // executing interrupts only in case of the lock required commands
259 if (lock != null) {
260 this.executeInterrupts();
261 }
262
263 if (!isLockRequired() || (lock != null) || this.inInterruptMode()) {
264 if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType())
265 && !used.compareAndSet(false, true)) {
266 LOG.debug("Command [{0}] key [{1}] already executed for [{2}]", getName(), getEntityKey(), this.toString());
267 return null;
268 }
269 LOG.trace("Load state for [{0}]", getEntityKey());
270 loadState();
271 LOG = XLog.resetPrefix(LOG);
272 LOG.trace("Precondition check for command [{0}] key [{1}]", getName(), getEntityKey());
273 verifyPrecondition();
274 LOG.debug("Execute command [{0}] key [{1}]", getName(), getEntityKey());
275 Instrumentation.Cron executeCron = new Instrumentation.Cron();
276 executeCron.start();
277 ret = execute();
278 executeCron.stop();
279 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".execute", executeCron);
280 }
281 if (commandQueue != null) {
282 CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
283 for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) {
284 LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey());
285 if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) {
286 LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, queue full", entry.getValue()
287 .size(), entry.getKey());
288 }
289 }
290 }
291 return ret;
292 }
293 finally {
294 if (isLockRequired() && !this.inInterruptMode()) {
295 releaseLock();
296 }
297 }
298 }
299 catch(PreconditionException pex){
300 LOG.warn(pex.getMessage().toString() + ", Error Code: " + pex.getErrorCode().toString());
301 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".preconditionfailed", 1);
302 return null;
303 }
304 catch (XException ex) {
305 LOG.error("XException, ", ex);
306 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".xexceptions", 1);
307 if (ex instanceof CommandException) {
308 throw (CommandException) ex;
309 }
310 else {
311 throw new CommandException(ex);
312 }
313 }
314 catch (Exception ex) {
315 LOG.error("Exception, ", ex);
316 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".exceptions", 1);
317 throw new CommandException(ErrorCode.E0607, getName(), ex.getMessage(), ex);
318 }
319 catch (Error er) {
320 LOG.error("Error, ", er);
321 throw er;
322 }
323 finally {
324 FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
325 callCron.stop();
326 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".call", callCron);
327 }
328 }
329
330 /**
331 * Check for the existence of interrupts for the same lock key
332 * Execute them if exist.
333 *
334 */
335 protected void executeInterrupts() {
336 CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
337 // getting all the list of interrupts to be executed
338 Set<XCallable<?>> callables = callableQueueService.checkInterrupts(this.getEntityKey());
339
340 if (callables != null) {
341 // executing the list of interrupts in the given order of insertion
342 // in the list
343 for (XCallable<?> callable : callables) {
344 LOG.trace("executing interrupt callable [{0}]", callable.getName());
345 try {
346 // executing the callable in interrupt mode
347 callable.setInterruptMode(true);
348 callable.call();
349 LOG.trace("executed interrupt callable [{0}]", callable.getName());
350 }
351 catch (Exception ex) {
352 LOG.warn("exception interrupt callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
353 }
354 finally {
355 // reseting the interrupt mode to false after the command is
356 // executed
357 callable.setInterruptMode(false);
358 }
359 }
360 }
361 }
362
363 /**
364 * Return the time out when acquiring a lock.
365 * <p/>
366 * The value is loaded from the Oozie configuration, the property {link #DEFAULT_LOCK_TIMEOUT}.
367 * <p/>
368 * Subclasses should override this method if they want to use a different time out.
369 *
370 * @return the lock time out in milliseconds.
371 */
372 protected long getLockTimeOut() {
373 return Services.get().getConf().getLong(DEFAULT_LOCK_TIMEOUT, 5 * 1000);
374 }
375
376 /**
377 * Indicate if the the command requires locking.
378 * <p/>
379 * Subclasses should override this method if they require locking.
380 *
381 * @return <code>true/false</code>
382 */
383 protected abstract boolean isLockRequired();
384
385 /**
386 * Return the entity key for the command.
387 * <p/>
388 *
389 * @return the entity key for the command.
390 */
391 public abstract String getEntityKey();
392
393 /**
394 * Indicate if the the command requires to requeue itself if the lock is not acquired.
395 * <p/>
396 * Subclasses should override this method if they don't want to requeue.
397 * <p/>
398 * Default is true.
399 *
400 * @return <code>true/false</code>
401 */
402 protected boolean isReQueueRequired() {
403 return true;
404 }
405
406 /**
407 * Load the necessary state to perform an eager precondition check.
408 * <p/>
409 * This implementation does a NOP.
410 * <p/>
411 * Subclasses should override this method and load the state needed to do an eager precondition check.
412 * <p/>
413 * A trivial implementation is calling {link #loadState}.
414 */
415 protected void eagerLoadState() throws CommandException{
416 }
417
418 /**
419 * Verify the precondition for the command before obtaining a lock.
420 * <p/>
421 * This implementation does a NOP.
422 * <p/>
423 * A trivial implementation is calling {link #verifyPrecondition}.
424 *
425 * @throws CommandException thrown if the precondition is not met.
426 */
427 protected void eagerVerifyPrecondition() throws CommandException,PreconditionException {
428 }
429
430 /**
431 * Load the necessary state to perform the precondition check and to execute the command.
432 * <p/>
433 * Subclasses must implement this method and load the state needed to do the precondition check and execute the
434 * command.
435 */
436 protected abstract void loadState() throws CommandException;
437
438 /**
439 * Verify the precondition for the command after a lock has been obtain, just before executing the command.
440 * <p/>
441 *
442 * @throws CommandException thrown if the precondition is not met.
443 */
444 protected abstract void verifyPrecondition() throws CommandException,PreconditionException;
445
446 /**
447 * Command execution body.
448 * <p/>
449 * This method will be invoked after the {link #loadState} and {link #verifyPrecondition} methods.
450 * <p/>
451 * If the command requires locking, this method will be invoked ONLY if the lock has been acquired.
452 *
453 * @return a return value from the execution of the command, only meaningful if the command is executed
454 * synchronously.
455 * @throws CommandException thrown if the command execution failed.
456 */
457 protected abstract T execute() throws CommandException;
458
459
460 /**
461 * Return the {@link Instrumentation} instance in use.
462 *
463 * @return the {@link Instrumentation} instance in use.
464 */
465 protected Instrumentation getInstrumentation() {
466 return instrumentation;
467 }
468
469 /**
470 * @param used set false to the used
471 */
472 public void resetUsed() {
473 this.used.set(false);
474 }
475
476
477 /**
478 * Return the delay time for requeue
479 *
480 * @return delay time when requeue itself
481 */
482 protected Long getRequeueDelay() {
483 return DEFAULT_REQUEUE_DELAY;
484 }
485
486 /**
487 * Get command key
488 *
489 * @return command key
490 */
491 @Override
492 public String getKey(){
493 return this.key;
494 }
495
496 /**
497 * set the mode of execution for the callable. True if in interrupt, false
498 * if not
499 */
500 public void setInterruptMode(boolean mode) {
501 this.inInterrupt = mode;
502 }
503
504 /**
505 * @return the mode of execution. true if it is executed as an Interrupt,
506 * false otherwise
507 */
508 public boolean inInterruptMode() {
509 return this.inInterrupt;
510 }
511
512 /**
513 * Get XLog log
514 *
515 * @return XLog
516 */
517 public XLog getLog() {
518 return LOG;
519 }
520
521 }