001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command;
020
021import org.apache.oozie.ErrorCode;
022import org.apache.oozie.FaultInjection;
023import org.apache.oozie.XException;
024import org.apache.oozie.service.CallableQueueService;
025import org.apache.oozie.service.ConfigurationService;
026import org.apache.oozie.service.EventHandlerService;
027import org.apache.oozie.service.InstrumentationService;
028import org.apache.oozie.service.MemoryLocksService;
029import org.apache.oozie.service.Services;
030import org.apache.oozie.util.Instrumentation;
031import org.apache.oozie.lock.LockToken;
032import org.apache.oozie.util.XCallable;
033import org.apache.oozie.util.XLog;
034
035import java.util.ArrayList;
036import java.util.HashMap;
037import java.util.List;
038import java.util.Map;
039import java.util.Set;
040import java.util.UUID;
041import java.util.concurrent.atomic.AtomicBoolean;
042
043/**
044 * Base class for synchronous and asynchronous commands.
045 * <p>
046 * It enables by API the following pattern:
047 * <p>
048 * <ul>
049 * <li>single execution: a command instance can be executed only once</li>
050 * <li>eager data loading: loads data for eager precondition check</li>
051 * <li>eager precondition check: verify precondition before obtaining lock</li>
052 * <li>data loading: loads data for precondition check and execution</li>
053 * <li>precondition check: verifies precondition for execution is still met</li>
054 * <li>locking: obtains exclusive lock on key before executing the command</li>
055 * <li>execution: command logic</li>
056 * </ul>
057 * <p>
058 * It has built in instrumentation and logging.
059 */
060public abstract class XCommand<T> implements XCallable<T> {
061    public static final String DEFAULT_LOCK_TIMEOUT = "oozie.command.default.lock.timeout";
062
063    public static final String INSTRUMENTATION_GROUP = "commands";
064
065    public static final String DEFAULT_REQUEUE_DELAY = "oozie.command.default.requeue.delay";
066
067    public XLog LOG = XLog.getLog(getClass());
068
069    private String key;
070    private String name;
071    private int priority;
072    private String type;
073    private long createdTime;
074    private LockToken lock;
075    private AtomicBoolean used = new AtomicBoolean(false);
076    private boolean inInterrupt = false;
077
078    private Map<Long, List<XCommand<?>>> commandQueue;
079    protected boolean dryrun = false;
080    protected Instrumentation instrumentation;
081
082    protected static EventHandlerService eventService;
083
084    /**
085     * Create a command.
086     *
087     * @param name command name.
088     * @param type command type.
089     * @param priority command priority.
090     */
091    public XCommand(String name, String type, int priority) {
092        this.name = name;
093        this.type = type;
094        this.priority = priority;
095        this.key = name + "_" + UUID.randomUUID();
096        createdTime = System.currentTimeMillis();
097        instrumentation = Services.get().get(InstrumentationService.class).get();
098        eventService = Services.get().get(EventHandlerService.class);
099    }
100
101    /**
102     * @param name command name.
103     * @param type command type.
104     * @param priority command priority.
105     * @param dryrun indicates if dryrun option is enabled. if enabled bundle will show a diagnostic output without
106     *        really running the job
107     */
108    public XCommand(String name, String type, int priority, boolean dryrun) {
109        this(name, type, priority);
110        this.dryrun = dryrun;
111    }
112
113    /**
114     * Set the thread local logInfo with the context of this command and reset log prefix.
115     */
116    protected void setLogInfo() {
117    }
118
119    /**
120     * Return the command name.
121     *
122     * @return the command name.
123     */
124    @Override
125    public String getName() {
126        return name;
127    }
128
129    /**
130     * Return the callable type.
131     * <p>
132     * The command type is used for concurrency throttling in the {@link CallableQueueService}.
133     *
134     * @return the command type.
135     */
136    @Override
137    public String getType() {
138        return type;
139    }
140
141    /**
142     * Return the priority of the command.
143     *
144     * @return the command priority.
145     */
146    @Override
147    public int getPriority() {
148        return priority;
149    }
150
151    /**
152     * Returns the creation time of the command.
153     *
154     * @return the command creation time, in milliseconds.
155     */
156    @Override
157    public long getCreatedTime() {
158        return createdTime;
159    }
160
161    /**
162     * Queue a command for execution after the current command execution completes.
163     * <p>
164     * All commands queued during the execution of the current command will be queued for a single serial execution.
165     * <p>
166     * If the command execution throws an exception, no command will be effectively queued.
167     *
168     * @param command command to queue.
169     */
170    protected void queue(XCommand<?> command) {
171        queue(command, 0);
172    }
173
174    /**
175     * Queue a command for delayed execution after the current command execution completes.
176     * <p>
177     * All commands queued during the execution of the current command with the same delay will be queued for a single
178     * serial execution.
179     * <p>
180     * If the command execution throws an exception, no command will be effectively queued.
181     *
182     * @param command command to queue.
183     * @param msDelay delay in milliseconds.
184     */
185    protected void queue(XCommand<?> command, long msDelay) {
186        if (commandQueue == null) {
187            commandQueue = new HashMap<Long, List<XCommand<?>>>();
188        }
189        List<XCommand<?>> list = commandQueue.get(msDelay);
190        if (list == null) {
191            list = new ArrayList<XCommand<?>>();
192            commandQueue.put(msDelay, list);
193        }
194        list.add(command);
195    }
196
197    /**
198     * Obtain an exclusive lock on the {link #getEntityKey}.
199     * <p>
200     * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock.
201     *
202     * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock
203     * @throws CommandException thrown i the lock could not be obtained.
204     */
205    private void acquireLock() throws InterruptedException, CommandException {
206        if (getEntityKey() == null) {
207            // no lock for null entity key
208            return;
209        }
210        lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut());
211        if (lock == null) {
212            instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".lockTimeOut", 1);
213            if (isReQueueRequired()) {
214                // if not acquire the lock, re-queue itself with default delay
215                queue(this, getRequeueDelay());
216                LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and requeue itself [{2}]", this.toString(),
217                        getLockTimeOut(), getName());
218            }
219            else {
220                throw new CommandException(ErrorCode.E0606, this.toString(), getLockTimeOut());
221            }
222        }
223        else {
224            LOG.debug("Acquired lock for [{0}] in [{1}]", getEntityKey(), getName());
225        }
226    }
227
228    /**
229     * Release the lock on the {link #getEntityKey}.
230     */
231    private void releaseLock() {
232        if (lock != null) {
233            lock.release();
234            LOG.debug("Released lock for [{0}] in [{1}]", getEntityKey(), getName());
235        }
236    }
237
238    /**
239     * Implements the XCommand life-cycle.
240     *
241     * @return the {link #execute} return value.
242     * @throws CommandException thrown if the command could not be executed.
243     */
244    @Override
245    public final T call() throws CommandException {
246        setLogInfo();
247        CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
248        Set<String> interruptTypes = callableQueueService.getInterruptTypes();
249
250        if (interruptTypes.contains(this.getType()) && used.get()) {
251            LOG.debug("Command [{0}] key [{1}]  already used for [{2}]", getName(), getEntityKey(), this.toString());
252            return null;
253        }
254
255        commandQueue = null;
256        instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions", 1);
257        Instrumentation.Cron callCron = new Instrumentation.Cron();
258        try {
259            callCron.start();
260            eagerLoadState();
261            eagerVerifyPrecondition();
262            try {
263                T ret = null;
264                if (isLockRequired() && !this.inInterruptMode()) {
265                    Instrumentation.Cron acquireLockCron = new Instrumentation.Cron();
266                    acquireLockCron.start();
267                    acquireLock();
268                    acquireLockCron.stop();
269                    instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".acquireLock", acquireLockCron);
270                }
271                // executing interrupts only in case of the lock required commands
272                if (lock != null) {
273                    this.executeInterrupts();
274                }
275
276                if (!isLockRequired() || (lock != null) || this.inInterruptMode()) {
277                    if (interruptTypes.contains(this.getType())
278                            && !used.compareAndSet(false, true)) {
279                        LOG.debug("Command [{0}] key [{1}]  already executed for [{2}]", getName(), getEntityKey(),
280                                this.toString());
281                        return null;
282                    }
283                    LOG.trace("Load state for [{0}]", getEntityKey());
284                    loadState();
285                    LOG.trace("Precondition check for command [{0}] key [{1}]", getName(), getEntityKey());
286                    verifyPrecondition();
287                    LOG.debug("Execute command [{0}] key [{1}]", getName(), getEntityKey());
288                    Instrumentation.Cron executeCron = new Instrumentation.Cron();
289                    executeCron.start();
290                    ret = execute();
291                    executeCron.stop();
292                    instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".execute", executeCron);
293                }
294                if (commandQueue != null) {
295                    for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) {
296                        LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey());
297                        if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) {
298                            LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, queue full", entry.getValue()
299                                    .size(), entry.getKey());
300                        }
301                    }
302                }
303                return ret;
304            }
305            finally {
306                if (isLockRequired() && !this.inInterruptMode()) {
307                    releaseLock();
308                }
309            }
310        }
311        catch (PreconditionException pex) {
312            LOG.warn(pex.getMessage().toString() + ", Error Code: " + pex.getErrorCode().toString());
313            instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".preconditionfailed", 1);
314            return null;
315        }
316        catch (XException ex) {
317            LOG.error("XException, ", ex);
318            instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".xexceptions", 1);
319            if (ex instanceof CommandException) {
320                throw (CommandException) ex;
321            }
322            else {
323                throw new CommandException(ex);
324            }
325        }
326        catch (Exception ex) {
327            LOG.error("Exception, ", ex);
328            instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".exceptions", 1);
329            throw new CommandException(ErrorCode.E0607, getName(), ex.getMessage(), ex);
330        }
331        catch (Error er) {
332            LOG.error("Error, ", er);
333            instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".errors", 1);
334            throw er;
335        }
336        finally {
337            FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
338            callCron.stop();
339            instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".call", callCron);
340        }
341    }
342
343    /**
344     * Check for the existence of interrupts for the same lock key
345     * Execute them if exist.
346     *
347     */
348    protected void executeInterrupts() {
349        CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
350        // getting all the list of interrupts to be executed
351        Set<XCallable<?>> callables = callableQueueService.checkInterrupts(this.getEntityKey());
352
353        if (callables != null) {
354            // executing the list of interrupts in the given order of insertion
355            // in the list
356            for (XCallable<?> callable : callables) {
357                LOG.trace("executing interrupt callable [{0}]", callable.getName());
358                try {
359                    // executing the callable in interrupt mode
360                    callable.setInterruptMode(true);
361                    callable.call();
362                    LOG.trace("executed interrupt callable [{0}]", callable.getName());
363                }
364                catch (Exception ex) {
365                    LOG.warn("exception interrupt callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
366                }
367                finally {
368                    // reseting the interrupt mode to false after the command is
369                    // executed
370                    callable.setInterruptMode(false);
371                }
372            }
373        }
374    }
375
376    /**
377     * Return the time out when acquiring a lock.
378     * <p>
379     * The value is loaded from the Oozie configuration, the property {link #DEFAULT_LOCK_TIMEOUT}.
380     * <p>
381     * Subclasses should override this method if they want to use a different time out.
382     *
383     * @return the lock time out in milliseconds.
384     */
385    protected long getLockTimeOut() {
386        return Services.get().getConf().getLong(DEFAULT_LOCK_TIMEOUT, 5 * 1000);
387    }
388
389    /**
390     * Indicate if the the command requires locking.
391     * <p>
392     * Subclasses should override this method if they require locking.
393     *
394     * @return <code>true/false</code>
395     */
396    protected abstract boolean isLockRequired();
397
398    /**
399     * Return the entity key for the command.
400     * <p>
401     *
402     * @return the entity key for the command.
403     */
404    public abstract String getEntityKey();
405
406    /**
407     * Indicate if the the command requires to requeue itself if the lock is not acquired.
408     * <p>
409     * Subclasses should override this method if they don't want to requeue.
410     * <p>
411     * Default is true.
412     *
413     * @return <code>true/false</code>
414     */
415    protected boolean isReQueueRequired() {
416        return true;
417    }
418
419    /**
420     * Load the necessary state to perform an eager precondition check.
421     * <p>
422     * This implementation does a NOP.
423     * <p>
424     * Subclasses should override this method and load the state needed to do an eager precondition check.
425     * <p>
426     * A trivial implementation is calling {link #loadState}.
427     */
428    protected void eagerLoadState() throws CommandException {
429    }
430
431    /**
432     * Verify the precondition for the command before obtaining a lock.
433     * <p>
434     * This implementation does a NOP.
435     * <p>
436     * A trivial implementation is calling {link #verifyPrecondition}.
437     *
438     * @throws CommandException thrown if the precondition is not met.
439     */
440    protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
441    }
442
443    /**
444     * Load the necessary state to perform the precondition check and to execute the command.
445     * <p>
446     * Subclasses must implement this method and load the state needed to do the precondition check and execute the
447     * command.
448     */
449    protected abstract void loadState() throws CommandException;
450
451    /**
452     * Verify the precondition for the command after a lock has been obtain, just before executing the command.
453     * <p>
454     *
455     * @throws CommandException thrown if the precondition is not met.
456     */
457    protected abstract void verifyPrecondition() throws CommandException, PreconditionException;
458
459    /**
460     * Command execution body.
461     * <p>
462     * This method will be invoked after the {link #loadState} and {link #verifyPrecondition} methods.
463     * <p>
464     * If the command requires locking, this method will be invoked ONLY if the lock has been acquired.
465     *
466     * @return a return value from the execution of the command, only meaningful if the command is executed
467     *         synchronously.
468     * @throws CommandException thrown if the command execution failed.
469     */
470    protected abstract T execute() throws CommandException;
471
472    /**
473     * Return the {@link Instrumentation} instance in use.
474     *
475     * @return the {@link Instrumentation} instance in use.
476     */
477    protected Instrumentation getInstrumentation() {
478        return instrumentation;
479    }
480
481    /**
482     * Set false to the used
483     */
484    public void resetUsed() {
485        this.used.set(false);
486    }
487
488    /**
489     * Return the delay time for requeue
490     * <p>
491     * The value is loaded from the Oozie configuration, the property {link #DEFAULT_REQUEUE_DELAY}.
492     * <p>
493     * Subclasses should override this method if they want to use a different requeue delay time
494     *
495     * @return delay time when requeue itself
496     */
497    protected long getRequeueDelay() {
498        return ConfigurationService.getLong(DEFAULT_REQUEUE_DELAY);
499    }
500
501    /**
502     * Get command key
503     *
504     * @return command key
505     */
506    @Override
507    public String getKey() {
508        return this.key;
509    }
510
511    /**
512     * set the mode of execution for the callable. True if in interrupt, false
513     * if not
514     */
515    public void setInterruptMode(boolean mode) {
516        this.inInterrupt = mode;
517    }
518
519    /**
520     * @return the mode of execution. true if it is executed as an Interrupt,
521     *         false otherwise
522     */
523    public boolean inInterruptMode() {
524        return this.inInterrupt;
525    }
526
527    /**
528     * Get XLog log
529     *
530     * @return XLog
531     */
532    public XLog getLog() {
533        return LOG;
534    }
535
536    /**
537     * String for the command - key
538     * @return String
539     */
540    @Override
541    public String toString() {
542        return getKey();
543    }
544}