001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.wf;
020
021import org.apache.oozie.ErrorCode;
022import org.apache.oozie.WorkflowActionBean;
023import org.apache.oozie.WorkflowJobBean;
024import org.apache.oozie.client.WorkflowJob;
025import org.apache.oozie.command.CommandException;
026import org.apache.oozie.command.PreconditionException;
027import org.apache.oozie.executor.jpa.BatchQueryExecutor;
028import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
029import org.apache.oozie.executor.jpa.JPAExecutorException;
030import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
031import org.apache.oozie.executor.jpa.WorkflowActionRetryManualGetJPAExecutor;
032import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
033import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
034import org.apache.oozie.service.EventHandlerService;
035import org.apache.oozie.service.JPAService;
036import org.apache.oozie.service.Services;
037import org.apache.oozie.util.InstrumentUtils;
038import org.apache.oozie.util.LogUtils;
039import org.apache.oozie.util.ParamChecker;
040import org.apache.oozie.workflow.WorkflowException;
041import org.apache.oozie.workflow.WorkflowInstance;
042import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
043
044import java.util.ArrayList;
045import java.util.Date;
046import java.util.List;
047
048public class SuspendXCommand extends WorkflowXCommand<Void> {
049    private final String wfid;
050    private WorkflowJobBean wfJobBean;
051    private JPAService jpaService;
052    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
053
054    public SuspendXCommand(String id) {
055        super("suspend", "suspend", 1);
056        this.wfid = ParamChecker.notEmpty(id, "wfid");
057    }
058
059    @Override
060    protected void setLogInfo() {
061        LogUtils.setLogInfo(wfid);
062    }
063
064    @Override
065    protected Void execute() throws CommandException {
066        InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
067        try {
068            suspendJob(this.jpaService, this.wfJobBean, this.wfid, null, updateList);
069            this.wfJobBean.setLastModifiedTime(new Date());
070            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
071                    this.wfJobBean));
072            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
073            queue(new WorkflowNotificationXCommand(this.wfJobBean));
074            //Calling suspend recursively to handle parent workflow
075            suspendParentWorkFlow();
076        }
077        catch (WorkflowException e) {
078            throw new CommandException(e);
079        }
080        catch (JPAExecutorException je) {
081            throw new CommandException(je);
082        }
083        finally {
084            updateParentIfNecessary(wfJobBean);
085        }
086        return null;
087    }
088
089    /**
090     * It will suspend the parent workflow
091     * @throws CommandException
092     */
093    private void suspendParentWorkFlow() throws CommandException {
094        if (this.wfJobBean.getParentId() != null && this.wfJobBean.getParentId().contains("-W")) {
095            new SuspendXCommand(this.wfJobBean.getParentId()).call();
096        } else {
097            // update the action of the parent workflow if it is launched by coordinator
098            updateParentIfNecessary(wfJobBean);
099        }
100    }
101
102    /**
103     * Suspend the workflow job and pending flag to false for the actions that are START_RETRY or START_MANUAL or
104     * END_RETRY or END_MANUAL
105     *
106     * @param jpaService jpa service
107     * @param workflow workflow job
108     * @param id workflow job id
109     * @param actionId workflow action id
110     * @throws WorkflowException thrown if failed to suspend workflow instance
111     * @throws CommandException thrown if unable set pending false for actions
112     */
113    public static void suspendJob(JPAService jpaService, WorkflowJobBean workflow, String id,
114            String actionId, List<UpdateEntry> updateList) throws WorkflowException, CommandException {
115        if (workflow.getStatus() == WorkflowJob.Status.RUNNING) {
116            workflow.getWorkflowInstance().suspend();
117            WorkflowInstance wfInstance = workflow.getWorkflowInstance();
118            ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.SUSPENDED);
119            workflow.setStatus(WorkflowJob.Status.SUSPENDED);
120            workflow.setWorkflowInstance(wfInstance);
121
122            setPendingFalseForActions(jpaService, id, actionId, updateList);
123            if (EventHandlerService.isEnabled()) {
124                generateEvent(workflow);
125            }
126        }
127    }
128
129    /**
130     * Set pending flag to false for the actions that are START_RETRY or START_MANUAL or END_RETRY or END_MANUAL
131     * <p/>
132     *
133     * @param jpaService jpa service
134     * @param id workflow job id
135     * @param actionId workflow action id
136     * @throws CommandException thrown if failed to update workflow action
137     */
138    private static void setPendingFalseForActions(JPAService jpaService, String id, String actionId,
139            List<UpdateEntry> updateList) throws CommandException {
140        List<WorkflowActionBean> actions;
141        try {
142            actions = jpaService.execute(new WorkflowActionRetryManualGetJPAExecutor(id));
143
144            for (WorkflowActionBean action : actions) {
145                if (actionId != null && actionId.equals(action.getId())) {
146                    // this action has been changed in handleNonTransient()
147                    continue;
148                }
149                else {
150                    action.resetPendingOnly();
151                }
152                if (updateList != null) { // will be null when suspendJob
153                                          // invoked statically via
154                                          // handleNonTransient()
155                    updateList.add(new UpdateEntry<WorkflowActionQuery>(
156                            WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, action));
157                }
158            }
159        }
160        catch (JPAExecutorException je) {
161            throw new CommandException(je);
162        }
163    }
164
165    @Override
166    protected void eagerLoadState() throws CommandException {
167        try {
168            jpaService = Services.get().get(JPAService.class);
169            this.wfJobBean = WorkflowJobQueryExecutor.getInstance()
170                    .get(WorkflowJobQuery.GET_WORKFLOW_STATUS, this.wfid);
171        }
172        catch (Exception ex) {
173            throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex);
174        }
175        LogUtils.setLogInfo(this.wfJobBean);
176    }
177
178    @Override
179    protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
180        if (this.wfJobBean.getStatus() != WorkflowJob.Status.RUNNING) {
181            throw new PreconditionException(ErrorCode.E0727, this.wfJobBean.getId(), this.wfJobBean.getStatus());
182        }
183    }
184
185    @Override
186    public String getEntityKey() {
187        return this.wfid;
188    }
189
190    @Override
191    public String getKey() {
192        return getName() + "_" + this.wfid;
193    }
194
195    @Override
196    protected boolean isLockRequired() {
197        return true;
198    }
199
200    @Override
201    protected void loadState() throws CommandException {
202        try {
203            this.wfJobBean = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_SUSPEND,
204                    this.wfid);
205        }
206        catch (Exception ex) {
207            throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex);
208        }
209        LogUtils.setLogInfo(wfJobBean);
210    }
211
212    @Override
213    protected void verifyPrecondition() throws CommandException, PreconditionException {
214        eagerVerifyPrecondition();
215    }
216}