001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command;
019    
020    import org.apache.oozie.ErrorCode;
021    import org.apache.oozie.FaultInjection;
022    import org.apache.oozie.XException;
023    import org.apache.oozie.service.CallableQueueService;
024    import org.apache.oozie.service.InstrumentationService;
025    import org.apache.oozie.service.MemoryLocksService;
026    import org.apache.oozie.service.Services;
027    import org.apache.oozie.util.Instrumentation;
028    import org.apache.oozie.util.MemoryLocks;
029    import org.apache.oozie.util.XCallable;
030    import org.apache.oozie.util.XLog;
031    
032    import java.util.ArrayList;
033    import java.util.HashMap;
034    import java.util.List;
035    import java.util.Map;
036    import java.util.Set;
037    import java.util.UUID;
038    import java.util.concurrent.atomic.AtomicBoolean;
039    
040    /**
041     * Base class for synchronous and asynchronous commands.
042     * <p/>
043     * It enables by API the following pattern:
044     * <p/>
045     * <ul>
046     * <li>single execution: a command instance can be executed only once</li>
047     * <li>eager data loading: loads data for eager precondition check</li>
048     * <li>eager precondition check: verify precondition before obtaining lock</li>
049     * <li>data loading: loads data for precondition check and execution</li>
050     * <li>precondition check: verifies precondition for execution is still met</li>
051     * <li>locking: obtains exclusive lock on key before executing the command</li>
052     * <li>execution: command logic</li>
053     * </ul>
054     * <p/>
055     * It has built in instrumentation and logging.
056     */
057    public abstract class XCommand<T> implements XCallable<T> {
058        public static final String DEFAULT_LOCK_TIMEOUT = "oozie.command.default.lock.timeout";
059    
060        public static final String INSTRUMENTATION_GROUP = "commands";
061    
062        public static final Long DEFAULT_REQUEUE_DELAY = 10L;
063    
064        public XLog LOG = XLog.getLog(getClass());
065    
066        private String key;
067        private String name;
068        private int priority;
069        private String type;
070        private long createdTime;
071        private MemoryLocks.LockToken lock;
072        private AtomicBoolean used = new AtomicBoolean(false);
073        private boolean inInterrupt = false;
074    
075        private Map<Long, List<XCommand<?>>> commandQueue;
076        protected boolean dryrun = false;
077        protected Instrumentation instrumentation;
078    
079        protected XLog.Info logInfo;
080    
081        /**
082         * Create a command.
083         *
084         * @param name command name.
085         * @param type command type.
086         * @param priority command priority.
087         */
088        public XCommand(String name, String type, int priority) {
089            this.name = name;
090            this.type = type;
091            this.priority = priority;
092            this.key = name + "_" + UUID.randomUUID();
093            createdTime = System.currentTimeMillis();
094            logInfo = new XLog.Info();
095            instrumentation = Services.get().get(InstrumentationService.class).get();
096        }
097    
098        /**
099         * @param name command name.
100         * @param type command type.
101         * @param priority command priority.
102         * @param dryrun indicates if dryrun option is enabled. if enabled bundle will show a diagnostic output without
103         *        really running the job
104         */
105        public XCommand(String name, String type, int priority, boolean dryrun) {
106            this(name, type, priority);
107            this.dryrun = dryrun;
108        }
109    
110        /**
111         * Return the command name.
112         *
113         * @return the command name.
114         */
115        @Override
116        public String getName() {
117            return name;
118        }
119    
120        /**
121         * Return the callable type.
122         * <p/>
123         * The command type is used for concurrency throttling in the {@link CallableQueueService}.
124         *
125         * @return the command type.
126         */
127        @Override
128        public String getType() {
129            return type;
130        }
131    
132        /**
133         * Return the priority of the command.
134         *
135         * @return the command priority.
136         */
137        @Override
138        public int getPriority() {
139            return priority;
140        }
141    
142        /**
143         * Returns the creation time of the command.
144         *
145         * @return the command creation time, in milliseconds.
146         */
147        @Override
148        public long getCreatedTime() {
149            return createdTime;
150        }
151    
152        /**
153         * Queue a command for execution after the current command execution completes.
154         * <p/>
155         * All commands queued during the execution of the current command will be queued for a single serial execution.
156         * <p/>
157         * If the command execution throws an exception, no command will be effectively queued.
158         *
159         * @param command command to queue.
160         */
161        protected void queue(XCommand<?> command) {
162            queue(command, 0);
163        }
164    
165        /**
166         * Queue a command for delayed execution after the current command execution completes.
167         * <p/>
168         * All commands queued during the execution of the current command with the same delay will be queued for a single
169         * serial execution.
170         * <p/>
171         * If the command execution throws an exception, no command will be effectively queued.
172         *
173         * @param command command to queue.
174         * @param msDelay delay in milliseconds.
175         */
176        protected void queue(XCommand<?> command, long msDelay) {
177            if (commandQueue == null) {
178                commandQueue = new HashMap<Long, List<XCommand<?>>>();
179            }
180            List<XCommand<?>> list = commandQueue.get(msDelay);
181            if (list == null) {
182                list = new ArrayList<XCommand<?>>();
183                commandQueue.put(msDelay, list);
184            }
185            list.add(command);
186        }
187    
188        /**
189         * Obtain an exclusive lock on the {link #getEntityKey}.
190         * <p/>
191         * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock.
192         *
193         * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock
194         * @throws CommandException thrown i the lock could not be obtained.
195         */
196        private void acquireLock() throws InterruptedException, CommandException {
197            if (getEntityKey() == null) {
198                // no lock for null entity key
199                return;
200            }
201            lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut());
202            if (lock == null) {
203                Instrumentation instrumentation = Services.get().get(InstrumentationService.class).get();
204                instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".lockTimeOut", 1);
205                if (isReQueueRequired()) {
206                    //if not acquire the lock, re-queue itself with default delay
207                    queue(this, getRequeueDelay());
208                    LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and requeue itself [{2}]", this.toString(), getLockTimeOut(), getName());
209                } else {
210                    throw new CommandException(ErrorCode.E0606, this.toString(), getLockTimeOut());
211                }
212            } else {
213                LOG.debug("Acquired lock for [{0}] in [{1}]", getEntityKey(), getName());
214            }
215        }
216    
217        /**
218         * Release the lock on the {link #getEntityKey}.
219         */
220        private void releaseLock() {
221            if (lock != null) {
222                lock.release();
223                LOG.debug("Released lock for [{0}] in [{1}]", getEntityKey(), getName());
224            }
225        }
226    
227        /**
228         * Implements the XCommand life-cycle.
229         *
230         * @return the {link #execute} return value.
231         * @throws Exception thrown if the command could not be executed.
232         */
233        @Override
234        public final T call() throws CommandException {
235            if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) && used.get()) {
236                LOG.debug("Command [{0}] key [{1}]  already used for [{2}]", getName(), getEntityKey(), this.toString());
237                return null;
238            }
239    
240            commandQueue = null;
241            Instrumentation instrumentation = Services.get().get(InstrumentationService.class).get();
242            instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions", 1);
243            Instrumentation.Cron callCron = new Instrumentation.Cron();
244            try {
245                callCron.start();
246                eagerLoadState();
247                LOG = XLog.resetPrefix(LOG);
248                eagerVerifyPrecondition();
249                try {
250                    T ret = null;
251                    if (isLockRequired() && !this.inInterruptMode()) {
252                        Instrumentation.Cron acquireLockCron = new Instrumentation.Cron();
253                        acquireLockCron.start();
254                        acquireLock();
255                        acquireLockCron.stop();
256                        instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".acquireLock", acquireLockCron);
257                    }
258                    // executing interrupts only in case of the lock required commands
259                    if (lock != null) {
260                        this.executeInterrupts();
261                    }
262    
263                    if (!isLockRequired() || (lock != null) || this.inInterruptMode()) {
264                        if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType())
265                                && !used.compareAndSet(false, true)) {
266                            LOG.debug("Command [{0}] key [{1}]  already executed for [{2}]", getName(), getEntityKey(), this.toString());
267                            return null;
268                        }
269                        LOG.trace("Load state for [{0}]", getEntityKey());
270                        loadState();
271                        LOG = XLog.resetPrefix(LOG);
272                        LOG.trace("Precondition check for command [{0}] key [{1}]", getName(), getEntityKey());
273                        verifyPrecondition();
274                        LOG.debug("Execute command [{0}] key [{1}]", getName(), getEntityKey());
275                        Instrumentation.Cron executeCron = new Instrumentation.Cron();
276                        executeCron.start();
277                        ret = execute();
278                        executeCron.stop();
279                        instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".execute", executeCron);
280                    }
281                    if (commandQueue != null) {
282                        CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
283                        for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) {
284                            LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey());
285                            if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) {
286                                LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, queue full", entry.getValue()
287                                        .size(), entry.getKey());
288                            }
289                        }
290                    }
291                    return ret;
292                }
293                finally {
294                    if (isLockRequired() && !this.inInterruptMode()) {
295                        releaseLock();
296                    }
297                }
298            }
299            catch(PreconditionException pex){
300                LOG.warn(pex.getMessage().toString() + ", Error Code: " + pex.getErrorCode().toString());
301                instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".preconditionfailed", 1);
302                return null;
303            }
304            catch (XException ex) {
305                LOG.error("XException, ", ex);
306                instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".xexceptions", 1);
307                if (ex instanceof CommandException) {
308                    throw (CommandException) ex;
309                }
310                else {
311                    throw new CommandException(ex);
312                }
313            }
314            catch (Exception ex) {
315                LOG.error("Exception, ", ex);
316                instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".exceptions", 1);
317                throw new CommandException(ErrorCode.E0607, getName(), ex.getMessage(), ex);
318            }
319            catch (Error er) {
320                LOG.error("Error, ", er);
321                throw er;
322            }
323            finally {
324                FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
325                callCron.stop();
326                instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".call", callCron);
327            }
328        }
329    
330        /**
331         * Check for the existence of interrupts for the same lock key
332         * Execute them if exist.
333         *
334         */
335        protected void executeInterrupts() {
336            CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
337            // getting all the list of interrupts to be executed
338            Set<XCallable<?>> callables = callableQueueService.checkInterrupts(this.getEntityKey());
339    
340            if (callables != null) {
341                // executing the list of interrupts in the given order of insertion
342                // in the list
343                for (XCallable<?> callable : callables) {
344                    LOG.trace("executing interrupt callable [{0}]", callable.getName());
345                    try {
346                        // executing the callable in interrupt mode
347                        callable.setInterruptMode(true);
348                        callable.call();
349                        LOG.trace("executed interrupt callable [{0}]", callable.getName());
350                    }
351                    catch (Exception ex) {
352                        LOG.warn("exception interrupt callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
353                    }
354                    finally {
355                        // reseting the interrupt mode to false after the command is
356                        // executed
357                        callable.setInterruptMode(false);
358                    }
359                }
360            }
361        }
362    
363        /**
364         * Return the time out when acquiring a lock.
365         * <p/>
366         * The value is loaded from the Oozie configuration, the property {link #DEFAULT_LOCK_TIMEOUT}.
367         * <p/>
368         * Subclasses should override this method if they want to use a different time out.
369         *
370         * @return the lock time out in milliseconds.
371         */
372        protected long getLockTimeOut() {
373            return Services.get().getConf().getLong(DEFAULT_LOCK_TIMEOUT, 5 * 1000);
374        }
375    
376        /**
377         * Indicate if the the command requires locking.
378         * <p/>
379         * Subclasses should override this method if they require locking.
380         *
381         * @return <code>true/false</code>
382         */
383        protected abstract boolean isLockRequired();
384    
385        /**
386         * Return the entity key for the command.
387         * <p/>
388         *
389         * @return the entity key for the command.
390         */
391        public abstract String getEntityKey();
392    
393        /**
394         * Indicate if the the command requires to requeue itself if the lock is not acquired.
395         * <p/>
396         * Subclasses should override this method if they don't want to requeue.
397         * <p/>
398         * Default is true.
399         *
400         * @return <code>true/false</code>
401         */
402        protected boolean isReQueueRequired() {
403            return true;
404        }
405    
406        /**
407         * Load the necessary state to perform an eager precondition check.
408         * <p/>
409         * This implementation does a NOP.
410         * <p/>
411         * Subclasses should override this method and load the state needed to do an eager precondition check.
412         * <p/>
413         * A trivial implementation is calling {link #loadState}.
414         */
415        protected void eagerLoadState() throws CommandException{
416        }
417    
418        /**
419         * Verify the precondition for the command before obtaining a lock.
420         * <p/>
421         * This implementation does a NOP.
422         * <p/>
423         * A trivial implementation is calling {link #verifyPrecondition}.
424         *
425         * @throws CommandException thrown if the precondition is not met.
426         */
427        protected void eagerVerifyPrecondition() throws CommandException,PreconditionException {
428        }
429    
430        /**
431         * Load the necessary state to perform the precondition check and to execute the command.
432         * <p/>
433         * Subclasses must implement this method and load the state needed to do the precondition check and execute the
434         * command.
435         */
436        protected abstract void loadState() throws CommandException;
437    
438        /**
439         * Verify the precondition for the command after a lock has been obtain, just before executing the command.
440         * <p/>
441         *
442         * @throws CommandException thrown if the precondition is not met.
443         */
444        protected abstract void verifyPrecondition() throws CommandException,PreconditionException;
445    
446        /**
447         * Command execution body.
448         * <p/>
449         * This method will be invoked after the {link #loadState} and {link #verifyPrecondition} methods.
450         * <p/>
451         * If the command requires locking, this method will be invoked ONLY if the lock has been acquired.
452         *
453         * @return a return value from the execution of the command, only meaningful if the command is executed
454         *         synchronously.
455         * @throws CommandException thrown if the command execution failed.
456         */
457        protected abstract T execute() throws CommandException;
458    
459    
460        /**
461         * Return the {@link Instrumentation} instance in use.
462         *
463         * @return the {@link Instrumentation} instance in use.
464         */
465        protected Instrumentation getInstrumentation() {
466            return instrumentation;
467        }
468    
469        /**
470         * @param used set false to the used
471         */
472        public void resetUsed() {
473            this.used.set(false);
474        }
475    
476    
477        /**
478         * Return the delay time for requeue
479         *
480         * @return delay time when requeue itself
481         */
482        protected Long getRequeueDelay() {
483            return DEFAULT_REQUEUE_DELAY;
484        }
485    
486        /**
487         * Get command key
488         *
489         * @return command key
490         */
491        @Override
492        public String getKey(){
493            return this.key;
494        }
495    
496        /**
497         * set the mode of execution for the callable. True if in interrupt, false
498         * if not
499         */
500        public void setInterruptMode(boolean mode) {
501            this.inInterrupt = mode;
502        }
503    
504        /**
505         * @return the mode of execution. true if it is executed as an Interrupt,
506         *         false otherwise
507         */
508        public boolean inInterruptMode() {
509            return this.inInterrupt;
510        }
511    
512        /**
513         * Get XLog log
514         *
515         * @return XLog
516         */
517        public XLog getLog() {
518            return LOG;
519        }
520    
521    }