001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command;
019    
020    import org.apache.oozie.ErrorCode;
021    import org.apache.oozie.FaultInjection;
022    import org.apache.oozie.XException;
023    import org.apache.oozie.service.CallableQueueService;
024    import org.apache.oozie.service.InstrumentationService;
025    import org.apache.oozie.service.MemoryLocksService;
026    import org.apache.oozie.service.Services;
027    import org.apache.oozie.util.Instrumentation;
028    import org.apache.oozie.util.MemoryLocks;
029    import org.apache.oozie.util.XCallable;
030    import org.apache.oozie.util.XLog;
031    
032    import java.util.ArrayList;
033    import java.util.HashMap;
034    import java.util.List;
035    import java.util.Map;
036    import java.util.UUID;
037    
038    /**
039     * Base class for synchronous and asynchronous commands.
040     * <p/>
041     * It enables by API the following pattern:
042     * <p/>
043     * <ul>
044     * <li>single execution: a command instance can be executed only once</li>
045     * <li>eager data loading: loads data for eager precondition check</li>
046     * <li>eager precondition check: verify precondition before obtaining lock</li>
047     * <li>data loading: loads data for precondition check and execution</li>
048     * <li>precondition check: verifies precondition for execution is still met</li>
049     * <li>locking: obtains exclusive lock on key before executing the command</li>
050     * <li>execution: command logic</li>
051     * </ul>
052     * <p/>
053     * It has built in instrumentation and logging.
054     */
055    public abstract class XCommand<T> implements XCallable<T> {
056        public static final String DEFAULT_LOCK_TIMEOUT = "oozie.command.default.lock.timeout";
057    
058        public static final String INSTRUMENTATION_GROUP = "commands";
059    
060        public static final Long DEFAULT_REQUEUE_DELAY = 10L;
061    
062        public XLog LOG = XLog.getLog(getClass());
063    
064        private String key;
065        private String name;
066        private int priority;
067        private String type;
068        private long createdTime;
069        private MemoryLocks.LockToken lock;
070        private boolean used = false;
071    
072        private Map<Long, List<XCommand<?>>> commandQueue;
073        protected boolean dryrun = false;
074        protected Instrumentation instrumentation;
075    
076        protected XLog.Info logInfo;
077    
078        /**
079         * Create a command.
080         *
081         * @param name command name.
082         * @param type command type.
083         * @param priority command priority.
084         */
085        public XCommand(String name, String type, int priority) {
086            this.name = name;
087            this.type = type;
088            this.priority = priority;
089            this.key = name + "_" + UUID.randomUUID();
090            createdTime = System.currentTimeMillis();
091            logInfo = new XLog.Info();
092            instrumentation = Services.get().get(InstrumentationService.class).get();
093        }
094    
095        /**
096         * @param name command name.
097         * @param type command type.
098         * @param priority command priority.
099         * @param dryrun indicates if dryrun option is enabled. if enabled bundle will show a diagnostic output without
100         *        really running the job
101         */
102        public XCommand(String name, String type, int priority, boolean dryrun) {
103            this(name, type, priority);
104            this.dryrun = dryrun;
105        }
106    
107        /**
108         * Return the command name.
109         *
110         * @return the command name.
111         */
112        @Override
113        public String getName() {
114            return name;
115        }
116    
117        /**
118         * Return the callable type.
119         * <p/>
120         * The command type is used for concurrency throttling in the {@link CallableQueueService}.
121         *
122         * @return the command type.
123         */
124        @Override
125        public String getType() {
126            return type;
127        }
128    
129        /**
130         * Return the priority of the command.
131         *
132         * @return the command priority.
133         */
134        @Override
135        public int getPriority() {
136            return priority;
137        }
138    
139        /**
140         * Returns the creation time of the command.
141         *
142         * @return the command creation time, in milliseconds.
143         */
144        @Override
145        public long getCreatedTime() {
146            return createdTime;
147        }
148    
149        /**
150         * Queue a command for execution after the current command execution completes.
151         * <p/>
152         * All commands queued during the execution of the current command will be queued for a single serial execution.
153         * <p/>
154         * If the command execution throws an exception, no command will be effectively queued.
155         *
156         * @param command command to queue.
157         */
158        protected void queue(XCommand<?> command) {
159            queue(command, 0);
160        }
161    
162        /**
163         * Queue a command for delayed execution after the current command execution completes.
164         * <p/>
165         * All commands queued during the execution of the current command with the same delay will be queued for a single
166         * serial execution.
167         * <p/>
168         * If the command execution throws an exception, no command will be effectively queued.
169         *
170         * @param command command to queue.
171         * @param msDelay delay in milliseconds.
172         */
173        protected void queue(XCommand<?> command, long msDelay) {
174            if (commandQueue == null) {
175                commandQueue = new HashMap<Long, List<XCommand<?>>>();
176            }
177            List<XCommand<?>> list = commandQueue.get(msDelay);
178            if (list == null) {
179                list = new ArrayList<XCommand<?>>();
180                commandQueue.put(msDelay, list);
181            }
182            list.add(command);
183        }
184    
185        /**
186         * Obtain an exclusive lock on the {link #getEntityKey}.
187         * <p/>
188         * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock.
189         *
190         * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock
191         * @throws CommandException thrown i the lock could not be obtained.
192         */
193        private void acquireLock() throws InterruptedException, CommandException {
194            lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut());
195            if (lock == null) {
196                Instrumentation instrumentation = Services.get().get(InstrumentationService.class).get();
197                instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".lockTimeOut", 1);
198                if (isReQueueRequired()) {
199                    //if not acquire the lock, re-queue itself with default delay
200                    resetUsed();
201                    queue(this, getRequeueDelay());
202                    LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and requeue itself [{2}]", this.toString(), getLockTimeOut(), getName());
203                } else {
204                    throw new CommandException(ErrorCode.E0606, this.toString(), getLockTimeOut());
205                }
206            } else {
207                LOG.debug("Acquired lock for [{0}] in [{1}]", getEntityKey(), getName());
208            }
209        }
210    
211        /**
212         * Release the lock on the {link #getEntityKey}.
213         */
214        private void releaseLock() {
215            if (lock != null) {
216                lock.release();
217                LOG.debug("Released lock for [{0}] in [{1}]", getEntityKey(), getName());
218            }
219        }
220    
221        /**
222         * Implements the XCommand life-cycle.
223         *
224         * @return the {link #execute} return value.
225         * @throws Exception thrown if the command could not be executed.
226         */
227        @Override
228        public final T call() throws CommandException {
229            if (used) {
230                throw new IllegalStateException(this.getClass().getSimpleName() + " already used.");
231            }
232            used = true;
233            commandQueue = null;
234            Instrumentation instrumentation = Services.get().get(InstrumentationService.class).get();
235            instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions", 1);
236            Instrumentation.Cron callCron = new Instrumentation.Cron();
237            try {
238                callCron.start();
239                eagerLoadState();
240                LOG = XLog.resetPrefix(LOG);
241                eagerVerifyPrecondition();
242                try {
243                    T ret = null;
244                    if (isLockRequired()) {
245                        Instrumentation.Cron acquireLockCron = new Instrumentation.Cron();
246                        acquireLockCron.start();
247                        acquireLock();
248                        acquireLockCron.stop();
249                        instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".acquireLock", acquireLockCron);
250                    }
251                    if (!isLockRequired() || (isLockRequired() && lock != null)) {
252                        LOG.debug("Load state for [{0}]", getEntityKey());
253                        loadState();
254                        LOG = XLog.resetPrefix(LOG);
255                        LOG.debug("Precondition check for command [{0}] key [{1}]", getName(), getEntityKey());
256                        verifyPrecondition();
257                        LOG.debug("Execute command [{0}] key [{1}]", getName(), getEntityKey());
258                        Instrumentation.Cron executeCron = new Instrumentation.Cron();
259                        executeCron.start();
260                        ret = execute();
261                        executeCron.stop();
262                        instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".execute", executeCron);
263                    }
264                    if (commandQueue != null) {
265                        CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
266                        for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) {
267                            LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey());
268                            if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) {
269                                LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, queue full", entry.getValue()
270                                        .size(), entry.getKey());
271                            }
272                        }
273                    }
274                    return ret;
275                }
276                finally {
277                    if (isLockRequired()) {
278                        releaseLock();
279                    }
280                }
281            }
282            catch(PreconditionException pex){
283                LOG.warn(pex.getMessage().toString() + ", Error Code: " + pex.getErrorCode().toString());
284                instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".preconditionfailed", 1);
285                return null;
286            }
287            catch (XException ex) {
288                LOG.error("XException, ", ex);
289                instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".xexceptions", 1);
290                if (ex instanceof CommandException) {
291                    throw (CommandException) ex;
292                }
293                else {
294                    throw new CommandException(ex);
295                }
296            }
297            catch (Exception ex) {
298                LOG.error("Exception, ", ex);
299                instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".exceptions", 1);
300                throw new CommandException(ErrorCode.E0607, ex);
301            }
302            finally {
303                FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
304                callCron.stop();
305                instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".call", callCron);
306            }
307        }
308    
309        /**
310         * Return the time out when acquiring a lock.
311         * <p/>
312         * The value is loaded from the Oozie configuration, the property {link #DEFAULT_LOCK_TIMEOUT}.
313         * <p/>
314         * Subclasses should override this method if they want to use a different time out.
315         *
316         * @return the lock time out in milliseconds.
317         */
318        protected long getLockTimeOut() {
319            return Services.get().getConf().getLong(DEFAULT_LOCK_TIMEOUT, 5 * 1000);
320        }
321    
322        /**
323         * Indicate if the the command requires locking.
324         * <p/>
325         * Subclasses should override this method if they require locking.
326         *
327         * @return <code>true/false</code>
328         */
329        protected abstract boolean isLockRequired();
330    
331        /**
332         * Return the entity key for the command.
333         * <p/>
334         *
335         * @return the entity key for the command.
336         */
337        protected abstract String getEntityKey();
338    
339        /**
340         * Indicate if the the command requires to requeue itself if the lock is not acquired.
341         * <p/>
342         * Subclasses should override this method if they don't want to requeue.
343         * <p/>
344         * Default is true.
345         *
346         * @return <code>true/false</code>
347         */
348        protected boolean isReQueueRequired() {
349            return true;
350        }
351    
352        /**
353         * Load the necessary state to perform an eager precondition check.
354         * <p/>
355         * This implementation does a NOP.
356         * <p/>
357         * Subclasses should override this method and load the state needed to do an eager precondition check.
358         * <p/>
359         * A trivial implementation is calling {link #loadState}.
360         */
361        protected void eagerLoadState() throws CommandException{
362        }
363    
364        /**
365         * Verify the precondition for the command before obtaining a lock.
366         * <p/>
367         * This implementation does a NOP.
368         * <p/>
369         * A trivial implementation is calling {link #verifyPrecondition}.
370         *
371         * @throws CommandException thrown if the precondition is not met.
372         */
373        protected void eagerVerifyPrecondition() throws CommandException,PreconditionException {
374        }
375    
376        /**
377         * Load the necessary state to perform the precondition check and to execute the command.
378         * <p/>
379         * Subclasses must implement this method and load the state needed to do the precondition check and execute the
380         * command.
381         */
382        protected abstract void loadState() throws CommandException;
383    
384        /**
385         * Verify the precondition for the command after a lock has been obtain, just before executing the command.
386         * <p/>
387         *
388         * @throws CommandException thrown if the precondition is not met.
389         */
390        protected abstract void verifyPrecondition() throws CommandException,PreconditionException;
391    
392        /**
393         * Command execution body.
394         * <p/>
395         * This method will be invoked after the {link #loadState} and {link #verifyPrecondition} methods.
396         * <p/>
397         * If the command requires locking, this method will be invoked ONLY if the lock has been acquired.
398         *
399         * @return a return value from the execution of the command, only meaningful if the command is executed
400         *         synchronously.
401         * @throws CommandException thrown if the command execution failed.
402         */
403        protected abstract T execute() throws CommandException;
404    
405    
406        /**
407         * Return the {@link Instrumentation} instance in use.
408         *
409         * @return the {@link Instrumentation} instance in use.
410         */
411        protected Instrumentation getInstrumentation() {
412            return instrumentation;
413        }
414    
415        /**
416         * @param used set false to the used
417         */
418        public void resetUsed() {
419            this.used = false;
420        }
421    
422    
423        /**
424         * Return the delay time for requeue
425         *
426         * @return delay time when requeue itself
427         */
428        protected Long getRequeueDelay() {
429            return DEFAULT_REQUEUE_DELAY;
430        }
431    
432        /**
433         * Get command key
434         *
435         * @return command key
436         */
437        @Override
438        public String getKey(){
439            return this.key;
440        }
441    
442        /**
443         * Get XLog log
444         *
445         * @return XLog
446         */
447        public XLog getLog() {
448            return LOG;
449        }
450    
451    }