001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import java.util.List;
021    
022    import org.apache.oozie.WorkflowActionBean;
023    import org.apache.oozie.WorkflowJobBean;
024    import org.apache.oozie.client.WorkflowJob;
025    import org.apache.oozie.command.CommandException;
026    import org.apache.oozie.store.StoreException;
027    import org.apache.oozie.store.WorkflowStore;
028    import org.apache.oozie.util.ParamChecker;
029    import org.apache.oozie.util.XLog;
030    import org.apache.oozie.workflow.WorkflowException;
031    import org.apache.oozie.workflow.WorkflowInstance;
032    import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
033    
034    public class SuspendCommand extends WorkflowCommand<Void> {
035    
036        private String id;
037    
038        public SuspendCommand(String id) {
039            super("suspend", "suspend", 1, XLog.STD);
040            this.id = ParamChecker.notEmpty(id, "id");
041        }
042    
043        @Override
044        protected Void call(WorkflowStore store) throws StoreException, CommandException {
045            try {
046                WorkflowJobBean workflow = store.getWorkflow(id, false);
047                setLogInfo(workflow);
048                if (workflow.getStatus() == WorkflowJob.Status.RUNNING) {
049                    incrJobCounter(1);
050                    suspendJob(store, workflow, id, null);
051                    store.updateWorkflow(workflow);
052                    queueCallable(new NotificationCommand(workflow));
053                }
054                return null;
055            }
056            catch (WorkflowException ex) {
057                throw new CommandException(ex);
058            }
059        }
060    
061        /**
062         * Suspend the workflow job and pending flag to false for the actions that
063         * are START_RETRY or START_MANUAL or END_RETRY or END_MANUAL
064         *
065         * @param store WorkflowStore
066         * @param workflow WorkflowJobBean
067         * @param id String
068         * @param actionId String
069         * @throws WorkflowException
070         * @throws StoreException
071         */
072        public static void suspendJob(WorkflowStore store, WorkflowJobBean workflow, String id, String actionId)
073                throws WorkflowException, StoreException {
074            if (workflow.getStatus() == WorkflowJob.Status.RUNNING) {
075                workflow.getWorkflowInstance().suspend();
076                WorkflowInstance wfInstance = workflow.getWorkflowInstance();
077                ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.SUSPENDED);
078                workflow.setStatus(WorkflowJob.Status.SUSPENDED);
079                workflow.setWorkflowInstance(wfInstance);
080    
081                setPendingFalseForActions(store, id, actionId);
082            }
083        }
084    
085        /**
086         * Set pending flag to false for the actions that are START_RETRY or
087         * START_MANUAL or END_RETRY or END_MANUAL
088         * <p/>
089         *
090         * @param store WorkflowStore
091         * @param id workflow id
092         * @param actionId workflow action id
093         * @throws StoreException
094         */
095        public static void setPendingFalseForActions(WorkflowStore store, String id, String actionId) throws StoreException {
096            List<WorkflowActionBean> actions = store.getRetryAndManualActions(id);
097            for (WorkflowActionBean action : actions) {
098                if (actionId != null && actionId.equals(action.getId())) {
099                    // this action has been changed in handleNonTransient()
100                    continue;
101                }
102                else {
103                    action.resetPendingOnly();
104                }
105                store.updateAction(action);
106            }
107        }
108    
109        @Override
110        protected Void execute(WorkflowStore store) throws CommandException, StoreException {
111            XLog.getLog(getClass()).debug("STARTED SuspendCommand for wf id=" + id);
112            try {
113                if (lock(id)) {
114                    call(store);
115                }
116                else {
117                    queueCallable(new SuspendCommand(id), LOCK_FAILURE_REQUEUE_INTERVAL);
118                    XLog.getLog(getClass()).warn("Suspend lock was not acquired - failed {0}", id);
119                }
120            }
121            catch (InterruptedException e) {
122                queueCallable(new SuspendCommand(id), LOCK_FAILURE_REQUEUE_INTERVAL);
123                XLog.getLog(getClass()).warn("SuspendCommand lock was not acquired - interrupted exception failed {0}", id);
124            }
125            finally {
126                XLog.getLog(getClass()).debug("ENDED SuspendCommand for wf id=" + id);
127            }
128            return null;
129        }
130    }