001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command;
019
020import org.apache.oozie.ErrorCode;
021import org.apache.oozie.FaultInjection;
022import org.apache.oozie.XException;
023import org.apache.oozie.service.CallableQueueService;
024import org.apache.oozie.service.EventHandlerService;
025import org.apache.oozie.service.InstrumentationService;
026import org.apache.oozie.service.MemoryLocksService;
027import org.apache.oozie.service.Services;
028import org.apache.oozie.util.Instrumentation;
029import org.apache.oozie.lock.LockToken;
030import org.apache.oozie.util.XCallable;
031import org.apache.oozie.util.XLog;
032
033import java.util.ArrayList;
034import java.util.HashMap;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.UUID;
039import java.util.concurrent.atomic.AtomicBoolean;
040
041/**
042 * Base class for synchronous and asynchronous commands.
043 * <p/>
044 * It enables by API the following pattern:
045 * <p/>
046 * <ul>
047 * <li>single execution: a command instance can be executed only once</li>
048 * <li>eager data loading: loads data for eager precondition check</li>
049 * <li>eager precondition check: verify precondition before obtaining lock</li>
050 * <li>data loading: loads data for precondition check and execution</li>
051 * <li>precondition check: verifies precondition for execution is still met</li>
052 * <li>locking: obtains exclusive lock on key before executing the command</li>
053 * <li>execution: command logic</li>
054 * </ul>
055 * <p/>
056 * It has built in instrumentation and logging.
057 */
058public abstract class XCommand<T> implements XCallable<T> {
059    public static final String DEFAULT_LOCK_TIMEOUT = "oozie.command.default.lock.timeout";
060
061    public static final String INSTRUMENTATION_GROUP = "commands";
062
063    public static final Long DEFAULT_REQUEUE_DELAY = 10L;
064
065    public XLog LOG = XLog.getLog(getClass());
066
067    private String key;
068    private String name;
069    private int priority;
070    private String type;
071    private long createdTime;
072    private LockToken lock;
073    private AtomicBoolean used = new AtomicBoolean(false);
074    private boolean inInterrupt = false;
075    private boolean isSynchronous = false;
076
077    private Map<Long, List<XCommand<?>>> commandQueue;
078    protected boolean dryrun = false;
079    protected Instrumentation instrumentation;
080
081    protected XLog.Info logInfo;
082    protected static EventHandlerService eventService;
083
084    /**
085     * Create a command.
086     *
087     * @param name command name.
088     * @param type command type.
089     * @param priority command priority.
090     */
091    public XCommand(String name, String type, int priority) {
092        this.name = name;
093        this.type = type;
094        this.priority = priority;
095        this.key = name + "_" + UUID.randomUUID();
096        createdTime = System.currentTimeMillis();
097        logInfo = new XLog.Info();
098        instrumentation = Services.get().get(InstrumentationService.class).get();
099        eventService = Services.get().get(EventHandlerService.class);
100    }
101
102    /**
103     * @param name command name.
104     * @param type command type.
105     * @param priority command priority.
106     * @param dryrun indicates if dryrun option is enabled. if enabled bundle will show a diagnostic output without
107     *        really running the job
108     */
109    public XCommand(String name, String type, int priority, boolean dryrun) {
110        this(name, type, priority);
111        this.dryrun = dryrun;
112    }
113
114    /**
115     * Return the command name.
116     *
117     * @return the command name.
118     */
119    @Override
120    public String getName() {
121        return name;
122    }
123
124    /**
125     * Return the callable type.
126     * <p/>
127     * The command type is used for concurrency throttling in the {@link CallableQueueService}.
128     *
129     * @return the command type.
130     */
131    @Override
132    public String getType() {
133        return type;
134    }
135
136    /**
137     * Return the priority of the command.
138     *
139     * @return the command priority.
140     */
141    @Override
142    public int getPriority() {
143        return priority;
144    }
145
146    /**
147     * Returns the creation time of the command.
148     *
149     * @return the command creation time, in milliseconds.
150     */
151    @Override
152    public long getCreatedTime() {
153        return createdTime;
154    }
155
156    /**
157     * Queue a command for execution after the current command execution completes.
158     * <p/>
159     * All commands queued during the execution of the current command will be queued for a single serial execution.
160     * <p/>
161     * If the command execution throws an exception, no command will be effectively queued.
162     *
163     * @param command command to queue.
164     */
165    protected void queue(XCommand<?> command) {
166        queue(command, 0);
167    }
168
169    /**
170     * Queue a command for delayed execution after the current command execution completes.
171     * <p/>
172     * All commands queued during the execution of the current command with the same delay will be queued for a single
173     * serial execution.
174     * <p/>
175     * If the command execution throws an exception, no command will be effectively queued.
176     *
177     * @param command command to queue.
178     * @param msDelay delay in milliseconds.
179     */
180    protected void queue(XCommand<?> command, long msDelay) {
181        if (commandQueue == null) {
182            commandQueue = new HashMap<Long, List<XCommand<?>>>();
183        }
184        List<XCommand<?>> list = commandQueue.get(msDelay);
185        if (list == null) {
186            list = new ArrayList<XCommand<?>>();
187            commandQueue.put(msDelay, list);
188        }
189        list.add(command);
190    }
191
192    /**
193     * Obtain an exclusive lock on the {link #getEntityKey}.
194     * <p/>
195     * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock.
196     *
197     * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock
198     * @throws CommandException thrown i the lock could not be obtained.
199     */
200    private void acquireLock() throws InterruptedException, CommandException {
201        if (getEntityKey() == null) {
202            // no lock for null entity key
203            return;
204        }
205        lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut());
206        if (lock == null) {
207            instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".lockTimeOut", 1);
208            if (isReQueueRequired()) {
209                //if not acquire the lock, re-queue itself with default delay
210                queue(this, getRequeueDelay());
211                LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and requeue itself [{2}]", this.toString(), getLockTimeOut(), getName());
212            } else {
213                throw new CommandException(ErrorCode.E0606, this.toString(), getLockTimeOut());
214            }
215        } else {
216            LOG.debug("Acquired lock for [{0}] in [{1}]", getEntityKey(), getName());
217        }
218    }
219
220    /**
221     * Release the lock on the {link #getEntityKey}.
222     */
223    private void releaseLock() {
224        if (lock != null) {
225            lock.release();
226            LOG.debug("Released lock for [{0}] in [{1}]", getEntityKey(), getName());
227        }
228    }
229
230    /**
231     * Implements the XCommand life-cycle.
232     *
233     * @return the {link #execute} return value.
234     * @throws Exception thrown if the command could not be executed.
235     */
236    @Override
237    public final T call() throws CommandException {
238        if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) && used.get()) {
239            LOG.debug("Command [{0}] key [{1}]  already used for [{2}]", getName(), getEntityKey(), this.toString());
240            return null;
241        }
242
243        commandQueue = null;
244        instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions", 1);
245        Instrumentation.Cron callCron = new Instrumentation.Cron();
246        try {
247            callCron.start();
248            if (!isSynchronous) {
249                eagerLoadState();
250                LOG = XLog.resetPrefix(LOG);
251                eagerVerifyPrecondition();
252            }
253            try {
254                T ret = null;
255                if (!isSynchronous && isLockRequired() && !this.inInterruptMode()) {
256                    Instrumentation.Cron acquireLockCron = new Instrumentation.Cron();
257                    acquireLockCron.start();
258                    acquireLock();
259                    acquireLockCron.stop();
260                    instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".acquireLock", acquireLockCron);
261                }
262                // executing interrupts only in case of the lock required commands
263                if (lock != null) {
264                    this.executeInterrupts();
265                }
266
267                if (isSynchronous || !isLockRequired() || (lock != null) || this.inInterruptMode()) {
268                    if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType())
269                            && !used.compareAndSet(false, true)) {
270                        LOG.debug("Command [{0}] key [{1}]  already executed for [{2}]", getName(), getEntityKey(), this.toString());
271                        return null;
272                    }
273                    LOG.trace("Load state for [{0}]", getEntityKey());
274                    loadState();
275                    LOG = XLog.resetPrefix(LOG);
276                    LOG.trace("Precondition check for command [{0}] key [{1}]", getName(), getEntityKey());
277                    verifyPrecondition();
278                    LOG.debug("Execute command [{0}] key [{1}]", getName(), getEntityKey());
279                    Instrumentation.Cron executeCron = new Instrumentation.Cron();
280                    executeCron.start();
281                    ret = execute();
282                    executeCron.stop();
283                    instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".execute", executeCron);
284                }
285                if (commandQueue != null) {
286                    CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
287                    for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) {
288                        LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey());
289                        if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) {
290                            LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, queue full", entry.getValue()
291                                    .size(), entry.getKey());
292                        }
293                    }
294                }
295                return ret;
296            }
297            finally {
298                if (!isSynchronous && isLockRequired() && !this.inInterruptMode()) {
299                    releaseLock();
300                }
301            }
302        }
303        catch(PreconditionException pex){
304            LOG.warn(pex.getMessage().toString() + ", Error Code: " + pex.getErrorCode().toString());
305            instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".preconditionfailed", 1);
306            return null;
307        }
308        catch (XException ex) {
309            LOG.error("XException, ", ex);
310            instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".xexceptions", 1);
311            if (ex instanceof CommandException) {
312                throw (CommandException) ex;
313            }
314            else {
315                throw new CommandException(ex);
316            }
317        }
318        catch (Exception ex) {
319            LOG.error("Exception, ", ex);
320            instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".exceptions", 1);
321            throw new CommandException(ErrorCode.E0607, getName(), ex.getMessage(), ex);
322        }
323        catch (Error er) {
324            LOG.error("Error, ", er);
325            instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".errors", 1);
326            throw er;
327        }
328        finally {
329            FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
330            callCron.stop();
331            instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".call", callCron);
332        }
333    }
334
335    /**
336     * Call this command synchronously from its caller. This benefits faster
337     * execution of command lifecycle for control nodes and kicking off
338     * subsequent actions
339     *
340     * @param callerEntityKey
341     * @return the {link #execute} return value.
342     * @throws CommandException
343     */
344    public final T call(String callerEntityKey) throws CommandException {
345        if (!callerEntityKey.equals(this.getEntityKey())) {
346            throw new CommandException(ErrorCode.E0607, "Entity Keys mismatch during synchronous call", "caller="
347                    + callerEntityKey + ", callee=" + getEntityKey());
348        }
349        isSynchronous = true; //setting to true so lock acquiring and release is not repeated
350        LOG.trace("Executing synchronously command [{0}] on job [{1}]", this.getName(), this.getKey());
351        return call();
352    }
353
354    /**
355     * Check for the existence of interrupts for the same lock key
356     * Execute them if exist.
357     *
358     */
359    protected void executeInterrupts() {
360        CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
361        // getting all the list of interrupts to be executed
362        Set<XCallable<?>> callables = callableQueueService.checkInterrupts(this.getEntityKey());
363
364        if (callables != null) {
365            // executing the list of interrupts in the given order of insertion
366            // in the list
367            for (XCallable<?> callable : callables) {
368                LOG.trace("executing interrupt callable [{0}]", callable.getName());
369                try {
370                    // executing the callable in interrupt mode
371                    callable.setInterruptMode(true);
372                    callable.call();
373                    LOG.trace("executed interrupt callable [{0}]", callable.getName());
374                }
375                catch (Exception ex) {
376                    LOG.warn("exception interrupt callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
377                }
378                finally {
379                    // reseting the interrupt mode to false after the command is
380                    // executed
381                    callable.setInterruptMode(false);
382                }
383            }
384        }
385    }
386
387    /**
388     * Return the time out when acquiring a lock.
389     * <p/>
390     * The value is loaded from the Oozie configuration, the property {link #DEFAULT_LOCK_TIMEOUT}.
391     * <p/>
392     * Subclasses should override this method if they want to use a different time out.
393     *
394     * @return the lock time out in milliseconds.
395     */
396    protected long getLockTimeOut() {
397        return Services.get().getConf().getLong(DEFAULT_LOCK_TIMEOUT, 5 * 1000);
398    }
399
400    /**
401     * Indicate if the the command requires locking.
402     * <p/>
403     * Subclasses should override this method if they require locking.
404     *
405     * @return <code>true/false</code>
406     */
407    protected abstract boolean isLockRequired();
408
409    /**
410     * Return the entity key for the command.
411     * <p/>
412     *
413     * @return the entity key for the command.
414     */
415    public abstract String getEntityKey();
416
417    /**
418     * Indicate if the the command requires to requeue itself if the lock is not acquired.
419     * <p/>
420     * Subclasses should override this method if they don't want to requeue.
421     * <p/>
422     * Default is true.
423     *
424     * @return <code>true/false</code>
425     */
426    protected boolean isReQueueRequired() {
427        return true;
428    }
429
430    /**
431     * Load the necessary state to perform an eager precondition check.
432     * <p/>
433     * This implementation does a NOP.
434     * <p/>
435     * Subclasses should override this method and load the state needed to do an eager precondition check.
436     * <p/>
437     * A trivial implementation is calling {link #loadState}.
438     */
439    protected void eagerLoadState() throws CommandException{
440    }
441
442    /**
443     * Verify the precondition for the command before obtaining a lock.
444     * <p/>
445     * This implementation does a NOP.
446     * <p/>
447     * A trivial implementation is calling {link #verifyPrecondition}.
448     *
449     * @throws CommandException thrown if the precondition is not met.
450     */
451    protected void eagerVerifyPrecondition() throws CommandException,PreconditionException {
452    }
453
454    /**
455     * Load the necessary state to perform the precondition check and to execute the command.
456     * <p/>
457     * Subclasses must implement this method and load the state needed to do the precondition check and execute the
458     * command.
459     */
460    protected abstract void loadState() throws CommandException;
461
462    /**
463     * Verify the precondition for the command after a lock has been obtain, just before executing the command.
464     * <p/>
465     *
466     * @throws CommandException thrown if the precondition is not met.
467     */
468    protected abstract void verifyPrecondition() throws CommandException,PreconditionException;
469
470    /**
471     * Command execution body.
472     * <p/>
473     * This method will be invoked after the {link #loadState} and {link #verifyPrecondition} methods.
474     * <p/>
475     * If the command requires locking, this method will be invoked ONLY if the lock has been acquired.
476     *
477     * @return a return value from the execution of the command, only meaningful if the command is executed
478     *         synchronously.
479     * @throws CommandException thrown if the command execution failed.
480     */
481    protected abstract T execute() throws CommandException;
482
483
484    /**
485     * Return the {@link Instrumentation} instance in use.
486     *
487     * @return the {@link Instrumentation} instance in use.
488     */
489    protected Instrumentation getInstrumentation() {
490        return instrumentation;
491    }
492
493    /**
494     * @param used set false to the used
495     */
496    public void resetUsed() {
497        this.used.set(false);
498    }
499
500
501    /**
502     * Return the delay time for requeue
503     *
504     * @return delay time when requeue itself
505     */
506    protected Long getRequeueDelay() {
507        return DEFAULT_REQUEUE_DELAY;
508    }
509
510    /**
511     * Get command key
512     *
513     * @return command key
514     */
515    @Override
516    public String getKey(){
517        return this.key;
518    }
519
520    /**
521     * set the mode of execution for the callable. True if in interrupt, false
522     * if not
523     */
524    public void setInterruptMode(boolean mode) {
525        this.inInterrupt = mode;
526    }
527
528    /**
529     * @return the mode of execution. true if it is executed as an Interrupt,
530     *         false otherwise
531     */
532    public boolean inInterruptMode() {
533        return this.inInterrupt;
534    }
535
536    /**
537     * Get XLog log
538     *
539     * @return XLog
540     */
541    public XLog getLog() {
542        return LOG;
543    }
544
545    /**
546     * String for the command - key
547     * @return String
548     */
549    @Override
550    public String toString() {
551        return getKey();
552    }
553}