001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command;
019    
020    import org.apache.oozie.ErrorCode;
021    import org.apache.oozie.FaultInjection;
022    import org.apache.oozie.XException;
023    import org.apache.oozie.service.CallableQueueService;
024    import org.apache.oozie.service.EventHandlerService;
025    import org.apache.oozie.service.InstrumentationService;
026    import org.apache.oozie.service.MemoryLocksService;
027    import org.apache.oozie.service.Services;
028    import org.apache.oozie.util.Instrumentation;
029    import org.apache.oozie.util.MemoryLocks;
030    import org.apache.oozie.util.XCallable;
031    import org.apache.oozie.util.XLog;
032    
033    import java.util.ArrayList;
034    import java.util.HashMap;
035    import java.util.List;
036    import java.util.Map;
037    import java.util.Set;
038    import java.util.UUID;
039    import java.util.concurrent.atomic.AtomicBoolean;
040    
041    /**
042     * Base class for synchronous and asynchronous commands.
043     * <p/>
044     * It enables by API the following pattern:
045     * <p/>
046     * <ul>
047     * <li>single execution: a command instance can be executed only once</li>
048     * <li>eager data loading: loads data for eager precondition check</li>
049     * <li>eager precondition check: verify precondition before obtaining lock</li>
050     * <li>data loading: loads data for precondition check and execution</li>
051     * <li>precondition check: verifies precondition for execution is still met</li>
052     * <li>locking: obtains exclusive lock on key before executing the command</li>
053     * <li>execution: command logic</li>
054     * </ul>
055     * <p/>
056     * It has built in instrumentation and logging.
057     */
058    public abstract class XCommand<T> implements XCallable<T> {
059        public static final String DEFAULT_LOCK_TIMEOUT = "oozie.command.default.lock.timeout";
060    
061        public static final String INSTRUMENTATION_GROUP = "commands";
062    
063        public static final Long DEFAULT_REQUEUE_DELAY = 10L;
064    
065        public XLog LOG = XLog.getLog(getClass());
066    
067        private String key;
068        private String name;
069        private int priority;
070        private String type;
071        private long createdTime;
072        private MemoryLocks.LockToken lock;
073        private AtomicBoolean used = new AtomicBoolean(false);
074        private boolean inInterrupt = false;
075    
076        private Map<Long, List<XCommand<?>>> commandQueue;
077        protected boolean dryrun = false;
078        protected Instrumentation instrumentation;
079    
080        protected XLog.Info logInfo;
081        protected static EventHandlerService eventService;
082    
083        /**
084         * Create a command.
085         *
086         * @param name command name.
087         * @param type command type.
088         * @param priority command priority.
089         */
090        public XCommand(String name, String type, int priority) {
091            this.name = name;
092            this.type = type;
093            this.priority = priority;
094            this.key = name + "_" + UUID.randomUUID();
095            createdTime = System.currentTimeMillis();
096            logInfo = new XLog.Info();
097            instrumentation = Services.get().get(InstrumentationService.class).get();
098            eventService = Services.get().get(EventHandlerService.class);
099        }
100    
101        /**
102         * @param name command name.
103         * @param type command type.
104         * @param priority command priority.
105         * @param dryrun indicates if dryrun option is enabled. if enabled bundle will show a diagnostic output without
106         *        really running the job
107         */
108        public XCommand(String name, String type, int priority, boolean dryrun) {
109            this(name, type, priority);
110            this.dryrun = dryrun;
111        }
112    
113        /**
114         * Return the command name.
115         *
116         * @return the command name.
117         */
118        @Override
119        public String getName() {
120            return name;
121        }
122    
123        /**
124         * Return the callable type.
125         * <p/>
126         * The command type is used for concurrency throttling in the {@link CallableQueueService}.
127         *
128         * @return the command type.
129         */
130        @Override
131        public String getType() {
132            return type;
133        }
134    
135        /**
136         * Return the priority of the command.
137         *
138         * @return the command priority.
139         */
140        @Override
141        public int getPriority() {
142            return priority;
143        }
144    
145        /**
146         * Returns the creation time of the command.
147         *
148         * @return the command creation time, in milliseconds.
149         */
150        @Override
151        public long getCreatedTime() {
152            return createdTime;
153        }
154    
155        /**
156         * Queue a command for execution after the current command execution completes.
157         * <p/>
158         * All commands queued during the execution of the current command will be queued for a single serial execution.
159         * <p/>
160         * If the command execution throws an exception, no command will be effectively queued.
161         *
162         * @param command command to queue.
163         */
164        protected void queue(XCommand<?> command) {
165            queue(command, 0);
166        }
167    
168        /**
169         * Queue a command for delayed execution after the current command execution completes.
170         * <p/>
171         * All commands queued during the execution of the current command with the same delay will be queued for a single
172         * serial execution.
173         * <p/>
174         * If the command execution throws an exception, no command will be effectively queued.
175         *
176         * @param command command to queue.
177         * @param msDelay delay in milliseconds.
178         */
179        protected void queue(XCommand<?> command, long msDelay) {
180            if (commandQueue == null) {
181                commandQueue = new HashMap<Long, List<XCommand<?>>>();
182            }
183            List<XCommand<?>> list = commandQueue.get(msDelay);
184            if (list == null) {
185                list = new ArrayList<XCommand<?>>();
186                commandQueue.put(msDelay, list);
187            }
188            list.add(command);
189        }
190    
191        /**
192         * Obtain an exclusive lock on the {link #getEntityKey}.
193         * <p/>
194         * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock.
195         *
196         * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock
197         * @throws CommandException thrown i the lock could not be obtained.
198         */
199        private void acquireLock() throws InterruptedException, CommandException {
200            if (getEntityKey() == null) {
201                // no lock for null entity key
202                return;
203            }
204            lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut());
205            if (lock == null) {
206                Instrumentation instrumentation = Services.get().get(InstrumentationService.class).get();
207                instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".lockTimeOut", 1);
208                if (isReQueueRequired()) {
209                    //if not acquire the lock, re-queue itself with default delay
210                    queue(this, getRequeueDelay());
211                    LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and requeue itself [{2}]", this.toString(), getLockTimeOut(), getName());
212                } else {
213                    throw new CommandException(ErrorCode.E0606, this.toString(), getLockTimeOut());
214                }
215            } else {
216                LOG.debug("Acquired lock for [{0}] in [{1}]", getEntityKey(), getName());
217            }
218        }
219    
220        /**
221         * Release the lock on the {link #getEntityKey}.
222         */
223        private void releaseLock() {
224            if (lock != null) {
225                lock.release();
226                LOG.debug("Released lock for [{0}] in [{1}]", getEntityKey(), getName());
227            }
228        }
229    
230        /**
231         * Implements the XCommand life-cycle.
232         *
233         * @return the {link #execute} return value.
234         * @throws Exception thrown if the command could not be executed.
235         */
236        @Override
237        public final T call() throws CommandException {
238            if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) && used.get()) {
239                LOG.debug("Command [{0}] key [{1}]  already used for [{2}]", getName(), getEntityKey(), this.toString());
240                return null;
241            }
242    
243            commandQueue = null;
244            Instrumentation instrumentation = Services.get().get(InstrumentationService.class).get();
245            instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions", 1);
246            Instrumentation.Cron callCron = new Instrumentation.Cron();
247            try {
248                callCron.start();
249                eagerLoadState();
250                LOG = XLog.resetPrefix(LOG);
251                eagerVerifyPrecondition();
252                try {
253                    T ret = null;
254                    if (isLockRequired() && !this.inInterruptMode()) {
255                        Instrumentation.Cron acquireLockCron = new Instrumentation.Cron();
256                        acquireLockCron.start();
257                        acquireLock();
258                        acquireLockCron.stop();
259                        instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".acquireLock", acquireLockCron);
260                    }
261                    // executing interrupts only in case of the lock required commands
262                    if (lock != null) {
263                        this.executeInterrupts();
264                    }
265    
266                    if (!isLockRequired() || (lock != null) || this.inInterruptMode()) {
267                        if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType())
268                                && !used.compareAndSet(false, true)) {
269                            LOG.debug("Command [{0}] key [{1}]  already executed for [{2}]", getName(), getEntityKey(), this.toString());
270                            return null;
271                        }
272                        LOG.trace("Load state for [{0}]", getEntityKey());
273                        loadState();
274                        LOG = XLog.resetPrefix(LOG);
275                        LOG.trace("Precondition check for command [{0}] key [{1}]", getName(), getEntityKey());
276                        verifyPrecondition();
277                        LOG.debug("Execute command [{0}] key [{1}]", getName(), getEntityKey());
278                        Instrumentation.Cron executeCron = new Instrumentation.Cron();
279                        executeCron.start();
280                        ret = execute();
281                        executeCron.stop();
282                        instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".execute", executeCron);
283                    }
284                    if (commandQueue != null) {
285                        CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
286                        for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) {
287                            LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey());
288                            if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) {
289                                LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, queue full", entry.getValue()
290                                        .size(), entry.getKey());
291                            }
292                        }
293                    }
294                    return ret;
295                }
296                finally {
297                    if (isLockRequired() && !this.inInterruptMode()) {
298                        releaseLock();
299                    }
300                }
301            }
302            catch(PreconditionException pex){
303                LOG.warn(pex.getMessage().toString() + ", Error Code: " + pex.getErrorCode().toString());
304                instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".preconditionfailed", 1);
305                return null;
306            }
307            catch (XException ex) {
308                LOG.error("XException, ", ex);
309                instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".xexceptions", 1);
310                if (ex instanceof CommandException) {
311                    throw (CommandException) ex;
312                }
313                else {
314                    throw new CommandException(ex);
315                }
316            }
317            catch (Exception ex) {
318                LOG.error("Exception, ", ex);
319                instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".exceptions", 1);
320                throw new CommandException(ErrorCode.E0607, getName(), ex.getMessage(), ex);
321            }
322            catch (Error er) {
323                LOG.error("Error, ", er);
324                throw er;
325            }
326            finally {
327                FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
328                callCron.stop();
329                instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".call", callCron);
330            }
331        }
332    
333        /**
334         * Check for the existence of interrupts for the same lock key
335         * Execute them if exist.
336         *
337         */
338        protected void executeInterrupts() {
339            CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
340            // getting all the list of interrupts to be executed
341            Set<XCallable<?>> callables = callableQueueService.checkInterrupts(this.getEntityKey());
342    
343            if (callables != null) {
344                // executing the list of interrupts in the given order of insertion
345                // in the list
346                for (XCallable<?> callable : callables) {
347                    LOG.trace("executing interrupt callable [{0}]", callable.getName());
348                    try {
349                        // executing the callable in interrupt mode
350                        callable.setInterruptMode(true);
351                        callable.call();
352                        LOG.trace("executed interrupt callable [{0}]", callable.getName());
353                    }
354                    catch (Exception ex) {
355                        LOG.warn("exception interrupt callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
356                    }
357                    finally {
358                        // reseting the interrupt mode to false after the command is
359                        // executed
360                        callable.setInterruptMode(false);
361                    }
362                }
363            }
364        }
365    
366        /**
367         * Return the time out when acquiring a lock.
368         * <p/>
369         * The value is loaded from the Oozie configuration, the property {link #DEFAULT_LOCK_TIMEOUT}.
370         * <p/>
371         * Subclasses should override this method if they want to use a different time out.
372         *
373         * @return the lock time out in milliseconds.
374         */
375        protected long getLockTimeOut() {
376            return Services.get().getConf().getLong(DEFAULT_LOCK_TIMEOUT, 5 * 1000);
377        }
378    
379        /**
380         * Indicate if the the command requires locking.
381         * <p/>
382         * Subclasses should override this method if they require locking.
383         *
384         * @return <code>true/false</code>
385         */
386        protected abstract boolean isLockRequired();
387    
388        /**
389         * Return the entity key for the command.
390         * <p/>
391         *
392         * @return the entity key for the command.
393         */
394        public abstract String getEntityKey();
395    
396        /**
397         * Indicate if the the command requires to requeue itself if the lock is not acquired.
398         * <p/>
399         * Subclasses should override this method if they don't want to requeue.
400         * <p/>
401         * Default is true.
402         *
403         * @return <code>true/false</code>
404         */
405        protected boolean isReQueueRequired() {
406            return true;
407        }
408    
409        /**
410         * Load the necessary state to perform an eager precondition check.
411         * <p/>
412         * This implementation does a NOP.
413         * <p/>
414         * Subclasses should override this method and load the state needed to do an eager precondition check.
415         * <p/>
416         * A trivial implementation is calling {link #loadState}.
417         */
418        protected void eagerLoadState() throws CommandException{
419        }
420    
421        /**
422         * Verify the precondition for the command before obtaining a lock.
423         * <p/>
424         * This implementation does a NOP.
425         * <p/>
426         * A trivial implementation is calling {link #verifyPrecondition}.
427         *
428         * @throws CommandException thrown if the precondition is not met.
429         */
430        protected void eagerVerifyPrecondition() throws CommandException,PreconditionException {
431        }
432    
433        /**
434         * Load the necessary state to perform the precondition check and to execute the command.
435         * <p/>
436         * Subclasses must implement this method and load the state needed to do the precondition check and execute the
437         * command.
438         */
439        protected abstract void loadState() throws CommandException;
440    
441        /**
442         * Verify the precondition for the command after a lock has been obtain, just before executing the command.
443         * <p/>
444         *
445         * @throws CommandException thrown if the precondition is not met.
446         */
447        protected abstract void verifyPrecondition() throws CommandException,PreconditionException;
448    
449        /**
450         * Command execution body.
451         * <p/>
452         * This method will be invoked after the {link #loadState} and {link #verifyPrecondition} methods.
453         * <p/>
454         * If the command requires locking, this method will be invoked ONLY if the lock has been acquired.
455         *
456         * @return a return value from the execution of the command, only meaningful if the command is executed
457         *         synchronously.
458         * @throws CommandException thrown if the command execution failed.
459         */
460        protected abstract T execute() throws CommandException;
461    
462    
463        /**
464         * Return the {@link Instrumentation} instance in use.
465         *
466         * @return the {@link Instrumentation} instance in use.
467         */
468        protected Instrumentation getInstrumentation() {
469            return instrumentation;
470        }
471    
472        /**
473         * @param used set false to the used
474         */
475        public void resetUsed() {
476            this.used.set(false);
477        }
478    
479    
480        /**
481         * Return the delay time for requeue
482         *
483         * @return delay time when requeue itself
484         */
485        protected Long getRequeueDelay() {
486            return DEFAULT_REQUEUE_DELAY;
487        }
488    
489        /**
490         * Get command key
491         *
492         * @return command key
493         */
494        @Override
495        public String getKey(){
496            return this.key;
497        }
498    
499        /**
500         * set the mode of execution for the callable. True if in interrupt, false
501         * if not
502         */
503        public void setInterruptMode(boolean mode) {
504            this.inInterrupt = mode;
505        }
506    
507        /**
508         * @return the mode of execution. true if it is executed as an Interrupt,
509         *         false otherwise
510         */
511        public boolean inInterruptMode() {
512            return this.inInterrupt;
513        }
514    
515        /**
516         * Get XLog log
517         *
518         * @return XLog
519         */
520        public XLog getLog() {
521            return LOG;
522        }
523    
524    }