This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command;
020
021import org.apache.oozie.ErrorCode;
022import org.apache.oozie.FaultInjection;
023import org.apache.oozie.XException;
024import org.apache.oozie.service.CallableQueueService;
025import org.apache.oozie.service.ConfigurationService;
026import org.apache.oozie.service.EventHandlerService;
027import org.apache.oozie.service.InstrumentationService;
028import org.apache.oozie.service.MemoryLocksService;
029import org.apache.oozie.service.Services;
030import org.apache.oozie.util.Instrumentation;
031import org.apache.oozie.lock.LockToken;
032import org.apache.oozie.util.XCallable;
033import org.apache.oozie.util.XLog;
034
035import java.util.ArrayList;
036import java.util.HashMap;
037import java.util.List;
038import java.util.Map;
039import java.util.Set;
040import java.util.UUID;
041import java.util.concurrent.atomic.AtomicBoolean;
042
043/**
044 * Base class for synchronous and asynchronous commands.
045 * <p/>
046 * It enables by API the following pattern:
047 * <p/>
048 * <ul>
049 * <li>single execution: a command instance can be executed only once</li>
050 * <li>eager data loading: loads data for eager precondition check</li>
051 * <li>eager precondition check: verify precondition before obtaining lock</li>
052 * <li>data loading: loads data for precondition check and execution</li>
053 * <li>precondition check: verifies precondition for execution is still met</li>
054 * <li>locking: obtains exclusive lock on key before executing the command</li>
055 * <li>execution: command logic</li>
056 * </ul>
057 * <p/>
058 * It has built in instrumentation and logging.
059 */
060public abstract class XCommand<T> implements XCallable<T> {
061    public static final String DEFAULT_LOCK_TIMEOUT = "oozie.command.default.lock.timeout";
062
063    public static final String INSTRUMENTATION_GROUP = "commands";
064
065    public static final String DEFAULT_REQUEUE_DELAY = "oozie.command.default.requeue.delay";
066
067    public XLog LOG = XLog.getLog(getClass());
068
069    private String key;
070    private String name;
071    private int priority;
072    private String type;
073    private long createdTime;
074    private LockToken lock;
075    private AtomicBoolean used = new AtomicBoolean(false);
076    private boolean inInterrupt = false;
077    private boolean isSynchronous = false;
078
079    private Map<Long, List<XCommand<?>>> commandQueue;
080    protected boolean dryrun = false;
081    protected Instrumentation instrumentation;
082
083    protected static EventHandlerService eventService;
084
085    /**
086     * Create a command.
087     *
088     * @param name command name.
089     * @param type command type.
090     * @param priority command priority.
091     */
092    public XCommand(String name, String type, int priority) {
093        this.name = name;
094        this.type = type;
095        this.priority = priority;
096        this.key = name + "_" + UUID.randomUUID();
097        createdTime = System.currentTimeMillis();
098        instrumentation = Services.get().get(InstrumentationService.class).get();
099        eventService = Services.get().get(EventHandlerService.class);
100    }
101
102    /**
103     * @param name command name.
104     * @param type command type.
105     * @param priority command priority.
106     * @param dryrun indicates if dryrun option is enabled. if enabled bundle will show a diagnostic output without
107     *        really running the job
108     */
109    public XCommand(String name, String type, int priority, boolean dryrun) {
110        this(name, type, priority);
111        this.dryrun = dryrun;
112    }
113
114    /**
115     * Set the thread local logInfo with the context of this command and reset log prefix.
116     */
117    protected void setLogInfo() {
118    }
119
120    /**
121     * Return the command name.
122     *
123     * @return the command name.
124     */
125    @Override
126    public String getName() {
127        return name;
128    }
129
130    /**
131     * Return the callable type.
132     * <p/>
133     * The command type is used for concurrency throttling in the {@link CallableQueueService}.
134     *
135     * @return the command type.
136     */
137    @Override
138    public String getType() {
139        return type;
140    }
141
142    /**
143     * Return the priority of the command.
144     *
145     * @return the command priority.
146     */
147    @Override
148    public int getPriority() {
149        return priority;
150    }
151
152    /**
153     * Returns the creation time of the command.
154     *
155     * @return the command creation time, in milliseconds.
156     */
157    @Override
158    public long getCreatedTime() {
159        return createdTime;
160    }
161
162    /**
163     * Queue a command for execution after the current command execution completes.
164     * <p/>
165     * All commands queued during the execution of the current command will be queued for a single serial execution.
166     * <p/>
167     * If the command execution throws an exception, no command will be effectively queued.
168     *
169     * @param command command to queue.
170     */
171    protected void queue(XCommand<?> command) {
172        queue(command, 0);
173    }
174
175    /**
176     * Queue a command for delayed execution after the current command execution completes.
177     * <p/>
178     * All commands queued during the execution of the current command with the same delay will be queued for a single
179     * serial execution.
180     * <p/>
181     * If the command execution throws an exception, no command will be effectively queued.
182     *
183     * @param command command to queue.
184     * @param msDelay delay in milliseconds.
185     */
186    protected void queue(XCommand<?> command, long msDelay) {
187        if (commandQueue == null) {
188            commandQueue = new HashMap<Long, List<XCommand<?>>>();
189        }
190        List<XCommand<?>> list = commandQueue.get(msDelay);
191        if (list == null) {
192            list = new ArrayList<XCommand<?>>();
193            commandQueue.put(msDelay, list);
194        }
195        list.add(command);
196    }
197
198    /**
199     * Obtain an exclusive lock on the {link #getEntityKey}.
200     * <p/>
201     * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock.
202     *
203     * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock
204     * @throws CommandException thrown i the lock could not be obtained.
205     */
206    private void acquireLock() throws InterruptedException, CommandException {
207        if (getEntityKey() == null) {
208            // no lock for null entity key
209            return;
210        }
211        lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut());
212        if (lock == null) {
213            instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".lockTimeOut", 1);
214            if (isReQueueRequired()) {
215                //if not acquire the lock, re-queue itself with default delay
216                queue(this, getRequeueDelay());
217                LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and requeue itself [{2}]", this.toString(), getLockTimeOut(), getName());
218            } else {
219                throw new CommandException(ErrorCode.E0606, this.toString(), getLockTimeOut());
220            }
221        } else {
222            LOG.debug("Acquired lock for [{0}] in [{1}]", getEntityKey(), getName());
223        }
224    }
225
226    /**
227     * Release the lock on the {link #getEntityKey}.
228     */
229    private void releaseLock() {
230        if (lock != null) {
231            lock.release();
232            LOG.debug("Released lock for [{0}] in [{1}]", getEntityKey(), getName());
233        }
234    }
235
236    /**
237     * Implements the XCommand life-cycle.
238     *
239     * @return the {link #execute} return value.
240     * @throws Exception thrown if the command could not be executed.
241     */
242    @Override
243    public final T call() throws CommandException {
244        setLogInfo();
245        if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) && used.get()) {
246            LOG.debug("Command [{0}] key [{1}]  already used for [{2}]", getName(), getEntityKey(), this.toString());
247            return null;
248        }
249
250        commandQueue = null;
251        instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions", 1);
252        Instrumentation.Cron callCron = new Instrumentation.Cron();
253        try {
254            callCron.start();
255            if (!isSynchronous) {
256                eagerLoadState();
257                eagerVerifyPrecondition();
258            }
259            try {
260                T ret = null;
261                if (!isSynchronous && isLockRequired() && !this.inInterruptMode()) {
262                    Instrumentation.Cron acquireLockCron = new Instrumentation.Cron();
263                    acquireLockCron.start();
264                    acquireLock();
265                    acquireLockCron.stop();
266                    instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".acquireLock", acquireLockCron);
267                }
268                // executing interrupts only in case of the lock required commands
269                if (lock != null) {
270                    this.executeInterrupts();
271                }
272
273                if (isSynchronous || !isLockRequired() || (lock != null) || this.inInterruptMode()) {
274                    if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType())
275                            && !used.compareAndSet(false, true)) {
276                        LOG.debug("Command [{0}] key [{1}]  already executed for [{2}]", getName(), getEntityKey(), this.toString());
277                        return null;
278                    }
279                    LOG.trace("Load state for [{0}]", getEntityKey());
280                    loadState();
281                    LOG.trace("Precondition check for command [{0}] key [{1}]", getName(), getEntityKey());
282                    verifyPrecondition();
283                    LOG.debug("Execute command [{0}] key [{1}]", getName(), getEntityKey());
284                    Instrumentation.Cron executeCron = new Instrumentation.Cron();
285                    executeCron.start();
286                    ret = execute();
287                    executeCron.stop();
288                    instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".execute", executeCron);
289                }
290                if (commandQueue != null) {
291                    CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
292                    for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) {
293                        LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey());
294                        if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) {
295                            LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, queue full", entry.getValue()
296                                    .size(), entry.getKey());
297                        }
298                    }
299                }
300                return ret;
301            }
302            finally {
303                if (!isSynchronous && isLockRequired() && !this.inInterruptMode()) {
304                    releaseLock();
305                }
306            }
307        }
308        catch(PreconditionException pex){
309            LOG.warn(pex.getMessage().toString() + ", Error Code: " + pex.getErrorCode().toString());
310            instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".preconditionfailed", 1);
311            return null;
312        }
313        catch (XException ex) {
314            LOG.error("XException, ", ex);
315            instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".xexceptions", 1);
316            if (ex instanceof CommandException) {
317                throw (CommandException) ex;
318            }
319            else {
320                throw new CommandException(ex);
321            }
322        }
323        catch (Exception ex) {
324            LOG.error("Exception, ", ex);
325            instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".exceptions", 1);
326            throw new CommandException(ErrorCode.E0607, getName(), ex.getMessage(), ex);
327        }
328        catch (Error er) {
329            LOG.error("Error, ", er);
330            instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".errors", 1);
331            throw er;
332        }
333        finally {
334            FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
335            callCron.stop();
336            instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".call", callCron);
337        }
338    }
339
340    /**
341     * Call this command synchronously from its caller. This benefits faster
342     * execution of command lifecycle for control nodes and kicking off
343     * subsequent actions
344     *
345     * @param callerEntityKey
346     * @return the {link #execute} return value.
347     * @throws CommandException
348     */
349    public final T call(String callerEntityKey) throws CommandException {
350        if (!callerEntityKey.equals(this.getEntityKey())) {
351            throw new CommandException(ErrorCode.E0607, "Entity Keys mismatch during synchronous call", "caller="
352                    + callerEntityKey + ", callee=" + getEntityKey());
353        }
354        isSynchronous = true; //setting to true so lock acquiring and release is not repeated
355        LOG.trace("Executing synchronously command [{0}] on job [{1}]", this.getName(), this.getKey());
356        return call();
357    }
358
359    /**
360     * Check for the existence of interrupts for the same lock key
361     * Execute them if exist.
362     *
363     */
364    protected void executeInterrupts() {
365        CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
366        // getting all the list of interrupts to be executed
367        Set<XCallable<?>> callables = callableQueueService.checkInterrupts(this.getEntityKey());
368
369        if (callables != null) {
370            // executing the list of interrupts in the given order of insertion
371            // in the list
372            for (XCallable<?> callable : callables) {
373                LOG.trace("executing interrupt callable [{0}]", callable.getName());
374                try {
375                    // executing the callable in interrupt mode
376                    callable.setInterruptMode(true);
377                    callable.call();
378                    LOG.trace("executed interrupt callable [{0}]", callable.getName());
379                }
380                catch (Exception ex) {
381                    LOG.warn("exception interrupt callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
382                }
383                finally {
384                    // reseting the interrupt mode to false after the command is
385                    // executed
386                    callable.setInterruptMode(false);
387                }
388            }
389        }
390    }
391
392    /**
393     * Return the time out when acquiring a lock.
394     * <p/>
395     * The value is loaded from the Oozie configuration, the property {link #DEFAULT_LOCK_TIMEOUT}.
396     * <p/>
397     * Subclasses should override this method if they want to use a different time out.
398     *
399     * @return the lock time out in milliseconds.
400     */
401    protected long getLockTimeOut() {
402        return Services.get().getConf().getLong(DEFAULT_LOCK_TIMEOUT, 5 * 1000);
403    }
404
405    /**
406     * Indicate if the the command requires locking.
407     * <p/>
408     * Subclasses should override this method if they require locking.
409     *
410     * @return <code>true/false</code>
411     */
412    protected abstract boolean isLockRequired();
413
414    /**
415     * Return the entity key for the command.
416     * <p/>
417     *
418     * @return the entity key for the command.
419     */
420    public abstract String getEntityKey();
421
422    /**
423     * Indicate if the the command requires to requeue itself if the lock is not acquired.
424     * <p/>
425     * Subclasses should override this method if they don't want to requeue.
426     * <p/>
427     * Default is true.
428     *
429     * @return <code>true/false</code>
430     */
431    protected boolean isReQueueRequired() {
432        return true;
433    }
434
435    /**
436     * Load the necessary state to perform an eager precondition check.
437     * <p/>
438     * This implementation does a NOP.
439     * <p/>
440     * Subclasses should override this method and load the state needed to do an eager precondition check.
441     * <p/>
442     * A trivial implementation is calling {link #loadState}.
443     */
444    protected void eagerLoadState() throws CommandException{
445    }
446
447    /**
448     * Verify the precondition for the command before obtaining a lock.
449     * <p/>
450     * This implementation does a NOP.
451     * <p/>
452     * A trivial implementation is calling {link #verifyPrecondition}.
453     *
454     * @throws CommandException thrown if the precondition is not met.
455     */
456    protected void eagerVerifyPrecondition() throws CommandException,PreconditionException {
457    }
458
459    /**
460     * Load the necessary state to perform the precondition check and to execute the command.
461     * <p/>
462     * Subclasses must implement this method and load the state needed to do the precondition check and execute the
463     * command.
464     */
465    protected abstract void loadState() throws CommandException;
466
467    /**
468     * Verify the precondition for the command after a lock has been obtain, just before executing the command.
469     * <p/>
470     *
471     * @throws CommandException thrown if the precondition is not met.
472     */
473    protected abstract void verifyPrecondition() throws CommandException,PreconditionException;
474
475    /**
476     * Command execution body.
477     * <p/>
478     * This method will be invoked after the {link #loadState} and {link #verifyPrecondition} methods.
479     * <p/>
480     * If the command requires locking, this method will be invoked ONLY if the lock has been acquired.
481     *
482     * @return a return value from the execution of the command, only meaningful if the command is executed
483     *         synchronously.
484     * @throws CommandException thrown if the command execution failed.
485     */
486    protected abstract T execute() throws CommandException;
487
488
489    /**
490     * Return the {@link Instrumentation} instance in use.
491     *
492     * @return the {@link Instrumentation} instance in use.
493     */
494    protected Instrumentation getInstrumentation() {
495        return instrumentation;
496    }
497
498    /**
499     * @param used set false to the used
500     */
501    public void resetUsed() {
502        this.used.set(false);
503    }
504
505
506    /**
507     * Return the delay time for requeue
508     * <p/>
509     * The value is loaded from the Oozie configuration, the property {link #DEFAULT_REQUEUE_DELAY}.
510     * <p/>
511     * Subclasses should override this method if they want to use a different requeue delay time
512     *
513     * @return delay time when requeue itself
514     */
515    protected long getRequeueDelay() {
516        return ConfigurationService.getLong(DEFAULT_REQUEUE_DELAY);
517    }
518
519    /**
520     * Get command key
521     *
522     * @return command key
523     */
524    @Override
525    public String getKey(){
526        return this.key;
527    }
528
529    /**
530     * set the mode of execution for the callable. True if in interrupt, false
531     * if not
532     */
533    public void setInterruptMode(boolean mode) {
534        this.inInterrupt = mode;
535    }
536
537    /**
538     * @return the mode of execution. true if it is executed as an Interrupt,
539     *         false otherwise
540     */
541    public boolean inInterruptMode() {
542        return this.inInterrupt;
543    }
544
545    /**
546     * Get XLog log
547     *
548     * @return XLog
549     */
550    public XLog getLog() {
551        return LOG;
552    }
553
554    /**
555     * String for the command - key
556     * @return String
557     */
558    @Override
559    public String toString() {
560        return getKey();
561    }
562}