001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import java.util.Date;
021    
022    import org.apache.oozie.WorkflowActionBean;
023    import org.apache.oozie.WorkflowJobBean;
024    import org.apache.oozie.client.WorkflowJob;
025    import org.apache.oozie.command.CommandException;
026    import org.apache.oozie.store.StoreException;
027    import org.apache.oozie.store.WorkflowStore;
028    import org.apache.oozie.util.ParamChecker;
029    import org.apache.oozie.util.XLog;
030    import org.apache.oozie.workflow.WorkflowException;
031    import org.apache.oozie.workflow.WorkflowInstance;
032    import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
033    
034    public class ResumeCommand extends WorkflowCommand<Void> {
035    
036        private String id;
037    
038        public ResumeCommand(String id) {
039            super("resume", "resume", 1, XLog.STD);
040            this.id = ParamChecker.notEmpty(id, "id");
041        }
042    
043        @Override
044        protected Void call(WorkflowStore store) throws StoreException, CommandException {
045            try {
046                WorkflowJobBean workflow = store.getWorkflow(id, false);
047                setLogInfo(workflow);
048                if (workflow.getStatus() == WorkflowJob.Status.SUSPENDED) {
049                    incrJobCounter(1);
050                    workflow.getWorkflowInstance().resume();
051                    WorkflowInstance wfInstance = workflow.getWorkflowInstance();
052                    ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.RUNNING);
053                    workflow.setWorkflowInstance(wfInstance);
054                    workflow.setStatus(WorkflowJob.Status.RUNNING);
055    
056                    for (WorkflowActionBean action : store.getActionsForWorkflow(id, false)) {
057    
058                        // Set pending flag to true for the actions that are START_RETRY or
059                        // START_MANUAL or END_RETRY or END_MANUAL
060                        if (action.isRetryOrManual()) {
061                            action.setPendingOnly();
062                            store.updateAction(action);
063                        }
064    
065                        if (action.isPending()) {
066                            if (action.getStatus() == WorkflowActionBean.Status.PREP
067                                    || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
068                                queueCallable(new ActionStartCommand(action.getId(), action.getType()));
069                            }
070                            else {
071                                if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) {
072                                    Date nextRunTime = action.getPendingAge();
073                                    queueCallable(new ActionStartCommand(action.getId(), action.getType()),
074                                                  nextRunTime.getTime() - System.currentTimeMillis());
075                                }
076                                else {
077                                    if (action.getStatus() == WorkflowActionBean.Status.DONE
078                                            || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
079                                        queueCallable(new ActionEndCommand(action.getId(), action.getType()));
080                                    }
081                                    else {
082                                        if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) {
083                                            Date nextRunTime = action.getPendingAge();
084                                            queueCallable(new ActionEndCommand(action.getId(), action.getType()),
085                                                          nextRunTime.getTime() - System.currentTimeMillis());
086                                        }
087                                    }
088                                }
089                            }
090    
091                        }
092                    }
093    
094                    store.updateWorkflow(workflow);
095                    queueCallable(new NotificationCommand(workflow));
096                }
097                return null;
098            }
099            catch (WorkflowException ex) {
100                throw new CommandException(ex);
101            }
102        }
103    
104        @Override
105        protected Void execute(WorkflowStore store) throws CommandException, StoreException {
106            XLog.getLog(getClass()).debug("STARTED ResumeCommand for action " + id);
107            try {
108                if (lock(id)) {
109                    call(store);
110                }
111                else {
112                    queueCallable(new KillCommand(id), LOCK_FAILURE_REQUEUE_INTERVAL);
113                    XLog.getLog(getClass()).warn("Resume lock was not acquired - failed {0}", id);
114                }
115            }
116            catch (InterruptedException e) {
117                queueCallable(new KillCommand(id), LOCK_FAILURE_REQUEUE_INTERVAL);
118                XLog.getLog(getClass()).warn("ResumeCommand lock was not acquired - interrupted exception failed {0}", id);
119            }
120            finally {
121                XLog.getLog(getClass()).debug("ENDED ResumeCommand for action " + id);
122            }
123            return null;
124        }
125    }