001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command;
019
020import java.util.ArrayList;
021import java.util.List;
022import java.util.UUID;
023
024import org.apache.oozie.CoordinatorActionBean;
025import org.apache.oozie.CoordinatorJobBean;
026import org.apache.oozie.ErrorCode;
027import org.apache.oozie.FaultInjection;
028import org.apache.oozie.WorkflowActionBean;
029import org.apache.oozie.WorkflowJobBean;
030import org.apache.oozie.XException;
031import org.apache.oozie.service.CallableQueueService;
032import org.apache.oozie.service.DagXLogInfoService;
033import org.apache.oozie.service.InstrumentationService;
034import org.apache.oozie.service.MemoryLocksService;
035import org.apache.oozie.service.Services;
036import org.apache.oozie.service.StoreService;
037import org.apache.oozie.service.XLogService;
038import org.apache.oozie.store.Store;
039import org.apache.oozie.store.StoreException;
040import org.apache.oozie.store.WorkflowStore;
041import org.apache.oozie.util.InstrumentUtils;
042import org.apache.oozie.util.Instrumentation;
043import org.apache.oozie.util.ParamChecker;
044import org.apache.oozie.util.XCallable;
045import org.apache.oozie.util.XLog;
046import org.apache.oozie.lock.LockToken;
047
048/**
049 * Base class for all synchronous and asynchronous DagEngine commands.
050 */
051public abstract class Command<T, S extends Store> implements XCallable<T> {
052    /**
053     * The instrumentation group used for Commands.
054     */
055    private static final String INSTRUMENTATION_GROUP = "commands";
056
057    private final long createdTime;
058
059    /**
060     * The instrumentation group used for Jobs.
061     */
062    private static final String INSTRUMENTATION_JOB_GROUP = "jobs";
063
064    private static final long LOCK_TIMEOUT = 1000;
065    protected static final long LOCK_FAILURE_REQUEUE_INTERVAL = 30000;
066
067    protected Instrumentation instrumentation;
068    private List<XCallable<Void>> callables;
069    private List<XCallable<Void>> delayedCallables;
070    private long delay = 0;
071    private List<XCallable<Void>> exceptionCallables;
072    private String name;
073    private String type;
074    private String key;
075    private int priority;
076    private int logMask;
077    private boolean withStore;
078    protected boolean dryrun = false;
079    private ArrayList<LockToken> locks = null;
080
081    /**
082     * This variable is package private for testing purposes only.
083     */
084    XLog.Info logInfo;
085
086    /**
087     * Create a command that uses a {@link WorkflowStore} instance. <p/> The current {@link XLog.Info} values are
088     * captured for execution.
089     *
090     * @param name command name.
091     * @param type command type.
092     * @param priority priority of the command, used when queuing for asynchronous execution.
093     * @param logMask log mask for the command logging calls.
094     */
095    public Command(String name, String type, int priority, int logMask) {
096        this(name, type, priority, logMask, true);
097    }
098
099    /**
100     * Create a command. <p/> The current {@link XLog.Info} values are captured for execution.
101     *
102     * @param name command name.
103     * @param type command type.
104     * @param priority priority of the command, used when queuing for asynchronous execution.
105     * @param logMask log mask for the command logging calls.
106     * @param withStore indicates if the command needs a {@link org.apache.oozie.store.WorkflowStore} instance or not.
107     */
108    public Command(String name, String type, int priority, int logMask, boolean withStore) {
109        this.name = ParamChecker.notEmpty(name, "name");
110        this.type = ParamChecker.notEmpty(type, "type");
111        this.key = name + "_" + UUID.randomUUID();
112        this.priority = priority;
113        this.withStore = withStore;
114        this.logMask = logMask;
115        instrumentation = Services.get().get(InstrumentationService.class).get();
116        logInfo = new XLog.Info(XLog.Info.get());
117        createdTime = System.currentTimeMillis();
118        locks = new ArrayList<LockToken>();
119    }
120
121    /**
122     * Create a command. <p/> The current {@link XLog.Info} values are captured for execution.
123     *
124     * @param name command name.
125     * @param type command type.
126     * @param priority priority of the command, used when queuing for asynchronous execution.
127     * @param logMask log mask for the command logging calls.
128     * @param withStore indicates if the command needs a {@link org.apache.oozie.store.WorkflowStore} instance or not.
129     * @param dryrun indicates if dryrun option is enabled. if enabled coordinator will show a diagnostic output without
130     * really submitting the job
131     */
132    public Command(String name, String type, int priority, int logMask, boolean withStore, boolean dryrun) {
133        this(name, type, priority, logMask, withStore);
134        this.dryrun = dryrun;
135    }
136
137    /**
138     * Return the name of the command.
139     *
140     * @return the name of the command.
141     */
142    @Override
143    public String getName() {
144        return name;
145    }
146
147    /**
148     * Return the callable type. <p/> The callable type is used for concurrency throttling in the {@link
149     * org.apache.oozie.service.CallableQueueService}.
150     *
151     * @return the callable type.
152     */
153    @Override
154    public String getType() {
155        return type;
156    }
157
158    /**
159     * Return the priority of the command.
160     *
161     * @return the priority of the command.
162     */
163    @Override
164    public int getPriority() {
165        return priority;
166    }
167
168    /**
169     * Returns the createdTime of the callable in milliseconds
170     *
171     * @return the callable createdTime
172     */
173    @Override
174    public long getCreatedTime() {
175        return createdTime;
176    }
177
178    /**
179     * Execute the command {@link #call(WorkflowStore)} setting all the necessary context. <p/> The {@link XLog.Info} is
180     * set to the values at instance creation time. <p/> The command execution is logged and instrumented. <p/> If a
181     * {@link WorkflowStore} is used, a fresh instance will be passed and it will be commited after the {@link
182     * #call(WorkflowStore)} execution. It will be closed without committing if an exception is thrown. <p/> Commands
183     * queued via the DagCommand queue methods are queued for execution after the workflow store has been committed.
184     * <p/> If an exception happends the queued commands will not be effectively queued for execution. Instead, the the
185     * commands queued for exception will be effectively queued fro execution..
186     *
187     * @throws CommandException thrown if the command could not be executed successfully, the workflow store is closed
188     * without committing, thus doing a rollback.
189     */
190    @SuppressWarnings({"ThrowFromFinallyBlock", "unchecked"})
191    public final T call() throws CommandException {
192        XLog.Info.get().setParameters(logInfo);
193        XLog log = XLog.getLog(getClass());
194        log.trace(logMask, "Start");
195        Instrumentation.Cron cron = new Instrumentation.Cron();
196        cron.start();
197        callables = new ArrayList<XCallable<Void>>();
198        delayedCallables = new ArrayList<XCallable<Void>>();
199        exceptionCallables = new ArrayList<XCallable<Void>>();
200        delay = 0;
201        S store = null;
202        boolean exception = false;
203
204        try {
205            if (withStore) {
206                store = (S) Services.get().get(StoreService.class).getStore(getStoreClass());
207                store.beginTrx();
208            }
209            T result = execute(store);
210            /*
211             *
212             * if (store != null && log != null) { log.info(XLog.STD,
213             * "connection log from store Flush Mode {0} ",
214             * store.getFlushMode()); }
215             */
216            if (withStore) {
217                if (store == null) {
218                    throw new IllegalStateException("WorkflowStore should not be null");
219                }
220                if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) {
221                    throw new RuntimeException("Skipping Commit for Failover Testing");
222                }
223                store.commitTrx();
224            }
225
226            // TODO figure out the reject due to concurrency problems and remove
227            // the delayed queuing for callables.
228            boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables, 10);
229            if (ret == false) {
230                logQueueCallableFalse(callables);
231            }
232
233            ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, delay);
234            if (ret == false) {
235                logQueueCallableFalse(delayedCallables);
236            }
237
238            return result;
239        }
240        catch (XException ex) {
241            log.error(logMask | XLog.OPS, "XException, {0}", ex.getMessage(), ex);
242            if (store != null) {
243                log.info(XLog.STD, "XException - connection logs from store {0}, {1}", store.getConnection(), store
244                        .isClosed());
245            }
246            exception = true;
247            if (store != null && store.isActive()) {
248                try {
249                    store.rollbackTrx();
250                }
251                catch (RuntimeException rex) {
252                    log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
253                }
254            }
255
256            // TODO figure out the reject due to concurrency problems and remove
257            // the delayed queuing for callables.
258            boolean ret = Services.get().get(CallableQueueService.class).queueSerial(exceptionCallables, 10);
259            if (ret == false) {
260                logQueueCallableFalse(exceptionCallables);
261            }
262            if (ex instanceof CommandException) {
263                throw (CommandException) ex;
264            }
265            else {
266                throw new CommandException(ex);
267            }
268        }
269        catch (Exception ex) {
270            log.error(logMask | XLog.OPS, "Exception, {0}", ex.getMessage(), ex);
271            exception = true;
272            if (store != null && store.isActive()) {
273                try {
274                    store.rollbackTrx();
275                }
276                catch (RuntimeException rex) {
277                    log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
278                }
279            }
280            throw new CommandException(ErrorCode.E0607, name, ex.getMessage(), ex);
281        }
282        catch (Error er) {
283            log.error(logMask | XLog.OPS, "Error, {0}", er.getMessage(), er);
284            exception = true;
285            if (store != null && store.isActive()) {
286                try {
287                    store.rollbackTrx();
288                }
289                catch (RuntimeException rex) {
290                    log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
291                }
292            }
293            throw er;
294        }
295        finally {
296            FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
297            cron.stop();
298            instrumentation.addCron(INSTRUMENTATION_GROUP, name, cron);
299            InstrumentUtils.incrCommandCounter(name, 1, instrumentation);
300            log.trace(logMask, "End");
301            if (locks != null) {
302                for (LockToken lock : locks) {
303                    lock.release();
304                }
305                locks.clear();
306            }
307            if (store != null) {
308                if (!store.isActive()) {
309                    try {
310                        store.closeTrx();
311                    }
312                    catch (RuntimeException rex) {
313                        if (exception) {
314                            log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
315                        }
316                        else {
317                            throw rex;
318                        }
319                    }
320                }
321                else {
322                    log.warn(logMask | XLog.OPS, "transaction is not committed or rolled back before closing entitymanager.");
323                }
324            }
325        }
326    }
327
328    /**
329     * Queue a callable for execution after the current callable call invocation completes and the {@link WorkflowStore}
330     * transaction commits. <p/> All queued callables, regardless of the number of queue invocations, are queued for a
331     * single serial execution. <p/> If the call invocation throws an exception all queued callables are discarded, they
332     * are not queued for execution.
333     *
334     * @param callable callable to queue for execution.
335     */
336    protected void queueCallable(XCallable<Void> callable) {
337        callables.add(callable);
338    }
339
340    /**
341     * Queue a list of callables for execution after the current callable call invocation completes and the {@link
342     * WorkflowStore} transaction commits. <p/> All queued callables, regardless of the number of queue invocations, are
343     * queued for a single serial execution. <p/> If the call invocation throws an exception all queued callables are
344     * discarded, they are not queued for execution.
345     *
346     * @param callables list of callables to queue for execution.
347     */
348    protected void queueCallable(List<? extends XCallable<Void>> callables) {
349        this.callables.addAll(callables);
350    }
351
352    /**
353     * Queue a callable for delayed execution after the current callable call invocation completes and the {@link
354     * WorkflowStore} transaction commits. <p/> All queued delayed callables, regardless of the number of delay queue
355     * invocations, are queued for a single serial delayed execution with the highest delay of all queued callables.
356     * <p/> If the call invocation throws an exception all queued callables are discarded, they are not queued for
357     * execution.
358     *
359     * @param callable callable to queue for delayed execution.
360     * @param delay the queue delay in milliseconds
361     */
362    protected void queueCallable(XCallable<Void> callable, long delay) {
363        this.delayedCallables.add(callable);
364        this.delay = Math.max(this.delay, delay);
365    }
366
367    /**
368     * Queue a callable for execution only in the event of an exception being thrown during the call invocation. <p/> If
369     * an exception does not happen, all the callables queued by this method are discarded, they are not queued for
370     * execution. <p/> All queued callables, regardless of the number of queue invocations, are queued for a single
371     * serial execution.
372     *
373     * @param callable callable to queue for execution in the case of an exception.
374     */
375    protected void queueCallableForException(XCallable<Void> callable) {
376        exceptionCallables.add(callable);
377    }
378
379    /**
380     * Logging the info if failed to queue the callables.
381     *
382     * @param callables
383     */
384    protected void logQueueCallableFalse(List<? extends XCallable<Void>> callables) {
385        StringBuilder sb = new StringBuilder(
386                "Unable to queue the callables, delayedQueue is full or system is in SAFEMODE - failed to queue:[");
387        int size = callables.size();
388        for (int i = 0; i < size; i++) {
389            XCallable<Void> callable = callables.get(i);
390            sb.append(callable.getName());
391            if (i < size - 1) {
392                sb.append(", ");
393            }
394            else {
395                sb.append("]");
396            }
397        }
398        XLog.getLog(getClass()).warn(sb.toString());
399    }
400
401    /**
402     * DagCallable subclasses must implement this method to perform their task. <p/> The workflow store works in
403     * transactional mode. The transaction is committed only if this method ends successfully. Otherwise the transaction
404     * is rolledback.
405     *
406     * @param store the workflow store instance for the callable, <code>null</code> if the callable does not use a
407     * store.
408     * @return the return value of the callable.
409     * @throws StoreException thrown if the workflow store could not perform an operation.
410     * @throws CommandException thrown if the command could not perform its operation.
411     */
412    protected abstract T call(S store) throws StoreException, CommandException;
413
414    // to do
415    // need to implement on all sub commands and break down the transactions
416
417    // protected abstract T execute(String id) throws CommandException;
418
419    /**
420     * Command subclasses must implement this method correct Store can be passed to call(store);
421     *
422     * @return the Store class for use by Callable
423     * @throws CommandException thrown if the command could not perform its operation.
424     */
425    protected abstract Class<? extends Store> getStoreClass();
426
427    /**
428     * Set the log info with the context of the given coordinator bean.
429     *
430     * @param cBean coordinator bean.
431     */
432    protected void setLogInfo(CoordinatorJobBean cBean) {
433        if (logInfo.getParameter(XLogService.GROUP) == null) {
434            logInfo.setParameter(XLogService.GROUP, cBean.getGroup());
435        }
436        if (logInfo.getParameter(XLogService.USER) == null) {
437            logInfo.setParameter(XLogService.USER, cBean.getUser());
438        }
439        logInfo.setParameter(DagXLogInfoService.JOB, cBean.getId());
440        logInfo.setParameter(DagXLogInfoService.TOKEN, "");
441        logInfo.setParameter(DagXLogInfoService.APP, cBean.getAppName());
442        XLog.Info.get().setParameters(logInfo);
443    }
444
445    /**
446     * Set the log info with the context of the given coordinator action bean.
447     *
448     * @param action action bean.
449     */
450    protected void setLogInfo(CoordinatorActionBean action) {
451        logInfo.setParameter(DagXLogInfoService.JOB, action.getJobId());
452        // logInfo.setParameter(DagXLogInfoService.TOKEN, action.getLogToken());
453        logInfo.setParameter(DagXLogInfoService.ACTION, action.getId());
454        XLog.Info.get().setParameters(logInfo);
455    }
456
457    /**
458     * Set the log info with the context of the given workflow bean.
459     *
460     * @param workflow workflow bean.
461     */
462    protected void setLogInfo(WorkflowJobBean workflow) {
463        if (logInfo.getParameter(XLogService.GROUP) == null) {
464            logInfo.setParameter(XLogService.GROUP, workflow.getGroup());
465        }
466        if (logInfo.getParameter(XLogService.USER) == null) {
467            logInfo.setParameter(XLogService.USER, workflow.getUser());
468        }
469        logInfo.setParameter(DagXLogInfoService.JOB, workflow.getId());
470        logInfo.setParameter(DagXLogInfoService.TOKEN, workflow.getLogToken());
471        logInfo.setParameter(DagXLogInfoService.APP, workflow.getAppName());
472        XLog.Info.get().setParameters(logInfo);
473    }
474
475    /**
476     * Set the log info with the context of the given action bean.
477     *
478     * @param action action bean.
479     */
480    protected void setLogInfo(WorkflowActionBean action) {
481        logInfo.setParameter(DagXLogInfoService.JOB, action.getJobId());
482        logInfo.setParameter(DagXLogInfoService.TOKEN, action.getLogToken());
483        logInfo.setParameter(DagXLogInfoService.ACTION, action.getId());
484        XLog.Info.get().setParameters(logInfo);
485    }
486
487    /**
488     * Reset the action bean information from the log info.
489     */
490    // TODO check if they are used, else delete
491    protected void resetLogInfoAction() {
492        logInfo.clearParameter(DagXLogInfoService.ACTION);
493        XLog.Info.get().clearParameter(DagXLogInfoService.ACTION);
494    }
495
496    /**
497     * Reset the workflow bean information from the log info.
498     */
499    // TODO check if they are used, else delete
500    protected void resetLogInfoWorkflow() {
501        logInfo.clearParameter(DagXLogInfoService.JOB);
502        logInfo.clearParameter(DagXLogInfoService.APP);
503        logInfo.clearParameter(DagXLogInfoService.TOKEN);
504        XLog.Info.get().clearParameter(DagXLogInfoService.JOB);
505        XLog.Info.get().clearParameter(DagXLogInfoService.APP);
506        XLog.Info.get().clearParameter(DagXLogInfoService.TOKEN);
507    }
508
509    /**
510     * Return the {@link Instrumentation} instance in use.
511     *
512     * @return the {@link Instrumentation} instance in use.
513     */
514    protected Instrumentation getInstrumentation() {
515        return instrumentation;
516    }
517
518    /**
519     * Return the identity.
520     *
521     * @return the identity.
522     */
523    @Override
524    public String toString() {
525        StringBuilder sb = new StringBuilder();
526        sb.append(getType());
527        sb.append(",").append(getPriority());
528        return sb.toString();
529    }
530
531    protected boolean lock(String id) throws InterruptedException {
532        if (id == null || id.length() == 0) {
533            XLog.getLog(getClass()).warn("lock(): Id is null or empty :" + id + ":");
534            return false;
535        }
536        LockToken token = Services.get().get(MemoryLocksService.class).getWriteLock(id, LOCK_TIMEOUT);
537        if (token != null) {
538            locks.add(token);
539            return true;
540        }
541        else {
542            return false;
543        }
544    }
545
546    /*
547     * TODO - remove store coupling to EM. Store will only contain queries
548     * protected EntityManager getEntityManager() { return
549     * store.getEntityManager(); }
550     */
551    protected T execute(S store) throws CommandException, StoreException {
552        T result = call(store);
553        return result;
554    }
555
556    /**
557     * Get command key
558     *
559     * @return command key
560     */
561    @Override
562    public String getKey(){
563        return this.key;
564    }
565
566    /**
567     * Get command lock key returning the key as an entity key, [not used] Just
568     * to be able to implement XCallable [to be deprecated]
569     *
570     * @return key
571     */
572    @Override
573    public String getEntityKey() {
574        return this.key;
575    }
576
577    /**
578     * set the mode of execution for the callable. True if in interrupt, false
579     * if not [to be deprecated]
580     */
581    public void setInterruptMode(boolean mode) {
582    }
583
584    /**
585     * [to be deprecated]
586     *
587     * @return the mode of execution. true if it is executed as an Interrupt,
588     *         false otherwise
589     */
590    public boolean inInterruptMode() {
591        return false;
592    }
593
594}