001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command;
019    
020    import java.util.ArrayList;
021    import java.util.List;
022    import java.util.UUID;
023    
024    import org.apache.oozie.CoordinatorActionBean;
025    import org.apache.oozie.CoordinatorJobBean;
026    import org.apache.oozie.ErrorCode;
027    import org.apache.oozie.FaultInjection;
028    import org.apache.oozie.WorkflowActionBean;
029    import org.apache.oozie.WorkflowJobBean;
030    import org.apache.oozie.XException;
031    import org.apache.oozie.service.CallableQueueService;
032    import org.apache.oozie.service.DagXLogInfoService;
033    import org.apache.oozie.service.InstrumentationService;
034    import org.apache.oozie.service.MemoryLocksService;
035    import org.apache.oozie.service.Services;
036    import org.apache.oozie.service.StoreService;
037    import org.apache.oozie.service.XLogService;
038    import org.apache.oozie.store.Store;
039    import org.apache.oozie.store.StoreException;
040    import org.apache.oozie.store.WorkflowStore;
041    import org.apache.oozie.util.Instrumentation;
042    import org.apache.oozie.util.ParamChecker;
043    import org.apache.oozie.util.XCallable;
044    import org.apache.oozie.util.XLog;
045    import org.apache.oozie.util.MemoryLocks.LockToken;
046    
047    /**
048     * Base class for all synchronous and asynchronous DagEngine commands.
049     */
050    public abstract class Command<T, S extends Store> implements XCallable<T> {
051        /**
052         * The instrumentation group used for Commands.
053         */
054        private static final String INSTRUMENTATION_GROUP = "commands";
055    
056        private final long createdTime;
057    
058        /**
059         * The instrumentation group used for Jobs.
060         */
061        private static final String INSTRUMENTATION_JOB_GROUP = "jobs";
062    
063        private static final long LOCK_TIMEOUT = 1000;
064        protected static final long LOCK_FAILURE_REQUEUE_INTERVAL = 30000;
065    
066        protected Instrumentation instrumentation;
067        private List<XCallable<Void>> callables;
068        private List<XCallable<Void>> delayedCallables;
069        private long delay = 0;
070        private List<XCallable<Void>> exceptionCallables;
071        private String name;
072        private String type;
073        private String key;
074        private int priority;
075        private int logMask;
076        private boolean withStore;
077        protected boolean dryrun = false;
078        private ArrayList<LockToken> locks = null;
079    
080        /**
081         * This variable is package private for testing purposes only.
082         */
083        XLog.Info logInfo;
084    
085        /**
086         * Create a command that uses a {@link WorkflowStore} instance. <p/> The current {@link XLog.Info} values are
087         * captured for execution.
088         *
089         * @param name command name.
090         * @param type command type.
091         * @param priority priority of the command, used when queuing for asynchronous execution.
092         * @param logMask log mask for the command logging calls.
093         */
094        public Command(String name, String type, int priority, int logMask) {
095            this(name, type, priority, logMask, true);
096        }
097    
098        /**
099         * Create a command. <p/> The current {@link XLog.Info} values are captured for execution.
100         *
101         * @param name command name.
102         * @param type command type.
103         * @param priority priority of the command, used when queuing for asynchronous execution.
104         * @param logMask log mask for the command logging calls.
105         * @param withStore indicates if the command needs a {@link org.apache.oozie.store.WorkflowStore} instance or not.
106         */
107        public Command(String name, String type, int priority, int logMask, boolean withStore) {
108            this.name = ParamChecker.notEmpty(name, "name");
109            this.type = ParamChecker.notEmpty(type, "type");
110            this.key = name + "_" + UUID.randomUUID();
111            this.priority = priority;
112            this.withStore = withStore;
113            this.logMask = logMask;
114            instrumentation = Services.get().get(InstrumentationService.class).get();
115            logInfo = new XLog.Info(XLog.Info.get());
116            createdTime = System.currentTimeMillis();
117            locks = new ArrayList<LockToken>();
118        }
119    
120        /**
121         * Create a command. <p/> The current {@link XLog.Info} values are captured for execution.
122         *
123         * @param name command name.
124         * @param type command type.
125         * @param priority priority of the command, used when queuing for asynchronous execution.
126         * @param logMask log mask for the command logging calls.
127         * @param withStore indicates if the command needs a {@link org.apache.oozie.store.WorkflowStore} instance or not.
128         * @param dryrun indicates if dryrun option is enabled. if enabled coordinator will show a diagnostic output without
129         * really submitting the job
130         */
131        public Command(String name, String type, int priority, int logMask, boolean withStore, boolean dryrun) {
132            this(name, type, priority, logMask, withStore);
133            this.dryrun = dryrun;
134        }
135    
136        /**
137         * Return the name of the command.
138         *
139         * @return the name of the command.
140         */
141        @Override
142        public String getName() {
143            return name;
144        }
145    
146        /**
147         * Return the callable type. <p/> The callable type is used for concurrency throttling in the {@link
148         * org.apache.oozie.service.CallableQueueService}.
149         *
150         * @return the callable type.
151         */
152        @Override
153        public String getType() {
154            return type;
155        }
156    
157        /**
158         * Return the priority of the command.
159         *
160         * @return the priority of the command.
161         */
162        @Override
163        public int getPriority() {
164            return priority;
165        }
166    
167        /**
168         * Returns the createdTime of the callable in milliseconds
169         *
170         * @return the callable createdTime
171         */
172        @Override
173        public long getCreatedTime() {
174            return createdTime;
175        }
176    
177        /**
178         * Execute the command {@link #call(WorkflowStore)} setting all the necessary context. <p/> The {@link XLog.Info} is
179         * set to the values at instance creation time. <p/> The command execution is logged and instrumented. <p/> If a
180         * {@link WorkflowStore} is used, a fresh instance will be passed and it will be commited after the {@link
181         * #call(WorkflowStore)} execution. It will be closed without committing if an exception is thrown. <p/> Commands
182         * queued via the DagCommand queue methods are queued for execution after the workflow store has been committed.
183         * <p/> If an exception happends the queued commands will not be effectively queued for execution. Instead, the the
184         * commands queued for exception will be effectively queued fro execution..
185         *
186         * @throws CommandException thrown if the command could not be executed successfully, the workflow store is closed
187         * without committing, thus doing a rollback.
188         */
189        @SuppressWarnings({"ThrowFromFinallyBlock", "unchecked"})
190        public final T call() throws CommandException {
191            XLog.Info.get().setParameters(logInfo);
192            XLog log = XLog.getLog(getClass());
193            log.trace(logMask, "Start");
194            Instrumentation.Cron cron = new Instrumentation.Cron();
195            cron.start();
196            callables = new ArrayList<XCallable<Void>>();
197            delayedCallables = new ArrayList<XCallable<Void>>();
198            exceptionCallables = new ArrayList<XCallable<Void>>();
199            delay = 0;
200            S store = null;
201            boolean exception = false;
202    
203            try {
204                if (withStore) {
205                    store = (S) Services.get().get(StoreService.class).getStore(getStoreClass());
206                    store.beginTrx();
207                }
208                T result = execute(store);
209                /*
210                 *
211                 * if (store != null && log != null) { log.info(XLog.STD,
212                 * "connection log from store Flush Mode {0} ",
213                 * store.getFlushMode()); }
214                 */
215                if (withStore) {
216                    if (store == null) {
217                        throw new IllegalStateException("WorkflowStore should not be null");
218                    }
219                    if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) {
220                        throw new RuntimeException("Skipping Commit for Failover Testing");
221                    }
222                    store.commitTrx();
223                }
224    
225                // TODO figure out the reject due to concurrency problems and remove
226                // the delayed queuing for callables.
227                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables, 10);
228                if (ret == false) {
229                    logQueueCallableFalse(callables);
230                }
231    
232                ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, delay);
233                if (ret == false) {
234                    logQueueCallableFalse(delayedCallables);
235                }
236    
237                return result;
238            }
239            catch (XException ex) {
240                log.error(logMask | XLog.OPS, "XException, {0}", ex);
241                if (store != null) {
242                    log.info(XLog.STD, "XException - connection logs from store {0}, {1}", store.getConnection(), store
243                            .isClosed());
244                }
245                exception = true;
246                if (store != null && store.isActive()) {
247                    try {
248                        store.rollbackTrx();
249                    }
250                    catch (RuntimeException rex) {
251                        log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
252                    }
253                }
254    
255                // TODO figure out the reject due to concurrency problems and remove
256                // the delayed queuing for callables.
257                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(exceptionCallables, 10);
258                if (ret == false) {
259                    logQueueCallableFalse(exceptionCallables);
260                }
261                if (ex instanceof CommandException) {
262                    throw (CommandException) ex;
263                }
264                else {
265                    throw new CommandException(ex);
266                }
267            }
268            catch (Exception ex) {
269                log.error(logMask | XLog.OPS, "Exception, {0}", ex);
270                exception = true;
271                if (store != null && store.isActive()) {
272                    try {
273                        store.rollbackTrx();
274                    }
275                    catch (RuntimeException rex) {
276                        log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
277                    }
278                }
279                throw new CommandException(ErrorCode.E0607, ex);
280            }
281            catch (Error er) {
282                log.error(logMask | XLog.OPS, "Error, {0}", er);
283                exception = true;
284                if (store != null && store.isActive()) {
285                    try {
286                        store.rollbackTrx();
287                    }
288                    catch (RuntimeException rex) {
289                        log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
290                    }
291                }
292                throw er;
293            }
294            finally {
295                FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
296                cron.stop();
297                instrumentation.addCron(INSTRUMENTATION_GROUP, name, cron);
298                incrCommandCounter(1);
299                log.trace(logMask, "End");
300                if (locks != null) {
301                    for (LockToken lock : locks) {
302                        lock.release();
303                    }
304                    locks.clear();
305                }
306                if (store != null) {
307                    if (!store.isActive()) {
308                        try {
309                            store.closeTrx();
310                        }
311                        catch (RuntimeException rex) {
312                            if (exception) {
313                                log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
314                            }
315                            else {
316                                throw rex;
317                            }
318                        }
319                    }
320                    else {
321                        log.warn(logMask | XLog.OPS, "transaction is not committed or rolled back before closing entitymanager.");
322                    }
323                }
324            }
325        }
326    
327        /**
328         * Queue a callable for execution after the current callable call invocation completes and the {@link WorkflowStore}
329         * transaction commits. <p/> All queued callables, regardless of the number of queue invocations, are queued for a
330         * single serial execution. <p/> If the call invocation throws an exception all queued callables are discarded, they
331         * are not queued for execution.
332         *
333         * @param callable callable to queue for execution.
334         */
335        protected void queueCallable(XCallable<Void> callable) {
336            callables.add(callable);
337        }
338    
339        /**
340         * Queue a list of callables for execution after the current callable call invocation completes and the {@link
341         * WorkflowStore} transaction commits. <p/> All queued callables, regardless of the number of queue invocations, are
342         * queued for a single serial execution. <p/> If the call invocation throws an exception all queued callables are
343         * discarded, they are not queued for execution.
344         *
345         * @param callables list of callables to queue for execution.
346         */
347        protected void queueCallable(List<? extends XCallable<Void>> callables) {
348            this.callables.addAll(callables);
349        }
350    
351        /**
352         * Queue a callable for delayed execution after the current callable call invocation completes and the {@link
353         * WorkflowStore} transaction commits. <p/> All queued delayed callables, regardless of the number of delay queue
354         * invocations, are queued for a single serial delayed execution with the highest delay of all queued callables.
355         * <p/> If the call invocation throws an exception all queued callables are discarded, they are not queued for
356         * execution.
357         *
358         * @param callable callable to queue for delayed execution.
359         * @param delay the queue delay in milliseconds
360         */
361        protected void queueCallable(XCallable<Void> callable, long delay) {
362            this.delayedCallables.add(callable);
363            this.delay = Math.max(this.delay, delay);
364        }
365    
366        /**
367         * Queue a callable for execution only in the event of an exception being thrown during the call invocation. <p/> If
368         * an exception does not happen, all the callables queued by this method are discarded, they are not queued for
369         * execution. <p/> All queued callables, regardless of the number of queue invocations, are queued for a single
370         * serial execution.
371         *
372         * @param callable callable to queue for execution in the case of an exception.
373         */
374        protected void queueCallableForException(XCallable<Void> callable) {
375            exceptionCallables.add(callable);
376        }
377    
378        /**
379         * Logging the info if failed to queue the callables.
380         *
381         * @param callables
382         */
383        protected void logQueueCallableFalse(List<? extends XCallable<Void>> callables) {
384            StringBuilder sb = new StringBuilder(
385                    "Unable to queue the callables, delayedQueue is full or system is in SAFEMODE - failed to queue:[");
386            int size = callables.size();
387            for (int i = 0; i < size; i++) {
388                XCallable<Void> callable = callables.get(i);
389                sb.append(callable.getName());
390                if (i < size - 1) {
391                    sb.append(", ");
392                }
393                else {
394                    sb.append("]");
395                }
396            }
397            XLog.getLog(getClass()).warn(sb.toString());
398        }
399    
400        /**
401         * DagCallable subclasses must implement this method to perform their task. <p/> The workflow store works in
402         * transactional mode. The transaction is committed only if this method ends successfully. Otherwise the transaction
403         * is rolledback.
404         *
405         * @param store the workflow store instance for the callable, <code>null</code> if the callable does not use a
406         * store.
407         * @return the return value of the callable.
408         * @throws StoreException thrown if the workflow store could not perform an operation.
409         * @throws CommandException thrown if the command could not perform its operation.
410         */
411        protected abstract T call(S store) throws StoreException, CommandException;
412    
413        // to do
414        // need to implement on all sub commands and break down the transactions
415    
416        // protected abstract T execute(String id) throws CommandException;
417    
418        /**
419         * Command subclasses must implement this method correct Store can be passed to call(store);
420         *
421         * @return the Store class for use by Callable
422         * @throws CommandException thrown if the command could not perform its operation.
423         */
424        protected abstract Class<? extends Store> getStoreClass();
425    
426        /**
427         * Set the log info with the context of the given coordinator bean.
428         *
429         * @param cBean coordinator bean.
430         */
431        protected void setLogInfo(CoordinatorJobBean cBean) {
432            if (logInfo.getParameter(XLogService.GROUP) == null) {
433                logInfo.setParameter(XLogService.GROUP, cBean.getGroup());
434            }
435            if (logInfo.getParameter(XLogService.USER) == null) {
436                logInfo.setParameter(XLogService.USER, cBean.getUser());
437            }
438            logInfo.setParameter(DagXLogInfoService.JOB, cBean.getId());
439            logInfo.setParameter(DagXLogInfoService.TOKEN, "");
440            logInfo.setParameter(DagXLogInfoService.APP, cBean.getAppName());
441            XLog.Info.get().setParameters(logInfo);
442        }
443    
444        /**
445         * Set the log info with the context of the given coordinator action bean.
446         *
447         * @param action action bean.
448         */
449        protected void setLogInfo(CoordinatorActionBean action) {
450            logInfo.setParameter(DagXLogInfoService.JOB, action.getJobId());
451            // logInfo.setParameter(DagXLogInfoService.TOKEN, action.getLogToken());
452            logInfo.setParameter(DagXLogInfoService.ACTION, action.getId());
453            XLog.Info.get().setParameters(logInfo);
454        }
455    
456        /**
457         * Set the log info with the context of the given workflow bean.
458         *
459         * @param workflow workflow bean.
460         */
461        protected void setLogInfo(WorkflowJobBean workflow) {
462            if (logInfo.getParameter(XLogService.GROUP) == null) {
463                logInfo.setParameter(XLogService.GROUP, workflow.getGroup());
464            }
465            if (logInfo.getParameter(XLogService.USER) == null) {
466                logInfo.setParameter(XLogService.USER, workflow.getUser());
467            }
468            logInfo.setParameter(DagXLogInfoService.JOB, workflow.getId());
469            logInfo.setParameter(DagXLogInfoService.TOKEN, workflow.getLogToken());
470            logInfo.setParameter(DagXLogInfoService.APP, workflow.getAppName());
471            XLog.Info.get().setParameters(logInfo);
472        }
473    
474        /**
475         * Set the log info with the context of the given action bean.
476         *
477         * @param action action bean.
478         */
479        protected void setLogInfo(WorkflowActionBean action) {
480            logInfo.setParameter(DagXLogInfoService.JOB, action.getJobId());
481            logInfo.setParameter(DagXLogInfoService.TOKEN, action.getLogToken());
482            logInfo.setParameter(DagXLogInfoService.ACTION, action.getId());
483            XLog.Info.get().setParameters(logInfo);
484        }
485    
486        /**
487         * Reset the action bean information from the log info.
488         */
489        // TODO check if they are used, else delete
490        protected void resetLogInfoAction() {
491            logInfo.clearParameter(DagXLogInfoService.ACTION);
492            XLog.Info.get().clearParameter(DagXLogInfoService.ACTION);
493        }
494    
495        /**
496         * Reset the workflow bean information from the log info.
497         */
498        // TODO check if they are used, else delete
499        protected void resetLogInfoWorkflow() {
500            logInfo.clearParameter(DagXLogInfoService.JOB);
501            logInfo.clearParameter(DagXLogInfoService.APP);
502            logInfo.clearParameter(DagXLogInfoService.TOKEN);
503            XLog.Info.get().clearParameter(DagXLogInfoService.JOB);
504            XLog.Info.get().clearParameter(DagXLogInfoService.APP);
505            XLog.Info.get().clearParameter(DagXLogInfoService.TOKEN);
506        }
507    
508        /**
509         * Convenience method to increment counters.
510         *
511         * @param group the group name.
512         * @param name the counter name.
513         * @param count increment count.
514         */
515        private void incrCounter(String group, String name, int count) {
516            if (instrumentation != null) {
517                instrumentation.incr(group, name, count);
518            }
519        }
520    
521        /**
522         * Used to increment command counters.
523         *
524         * @param count the increment count.
525         */
526        protected void incrCommandCounter(int count) {
527            incrCounter(INSTRUMENTATION_GROUP, name, count);
528        }
529    
530        /**
531         * Used to increment job counters. The counter name s the same as the command name.
532         *
533         * @param count the increment count.
534         */
535        protected void incrJobCounter(int count) {
536            incrJobCounter(name, count);
537        }
538    
539        /**
540         * Used to increment job counters.
541         *
542         * @param name the job name.
543         * @param count the increment count.
544         */
545        protected void incrJobCounter(String name, int count) {
546            incrCounter(INSTRUMENTATION_JOB_GROUP, name, count);
547        }
548    
549        /**
550         * Return the {@link Instrumentation} instance in use.
551         *
552         * @return the {@link Instrumentation} instance in use.
553         */
554        protected Instrumentation getInstrumentation() {
555            return instrumentation;
556        }
557    
558        /**
559         * Return the identity.
560         *
561         * @return the identity.
562         */
563        @Override
564        public String toString() {
565            StringBuilder sb = new StringBuilder();
566            sb.append(getType());
567            sb.append(",").append(getPriority());
568            return sb.toString();
569        }
570    
571        protected boolean lock(String id) throws InterruptedException {
572            if (id == null || id.length() == 0) {
573                XLog.getLog(getClass()).warn("lock(): Id is null or empty :" + id + ":");
574                return false;
575            }
576            LockToken token = Services.get().get(MemoryLocksService.class).getWriteLock(id, LOCK_TIMEOUT);
577            if (token != null) {
578                locks.add(token);
579                return true;
580            }
581            else {
582                return false;
583            }
584        }
585    
586        /*
587         * TODO - remove store coupling to EM. Store will only contain queries
588         * protected EntityManager getEntityManager() { return
589         * store.getEntityManager(); }
590         */
591        protected T execute(S store) throws CommandException, StoreException {
592            T result = call(store);
593            return result;
594        }
595    
596        /**
597         * Get command key
598         *
599         * @return command key
600         */
601        @Override
602        public String getKey(){
603            return this.key;
604        }
605    
606    }