001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.wf;
019
020import java.util.ArrayList;
021import java.util.Date;
022import java.util.List;
023
024import org.apache.oozie.ErrorCode;
025import org.apache.oozie.WorkflowActionBean;
026import org.apache.oozie.WorkflowJobBean;
027import org.apache.oozie.client.WorkflowJob;
028import org.apache.oozie.command.CommandException;
029import org.apache.oozie.command.PreconditionException;
030import org.apache.oozie.executor.jpa.BatchQueryExecutor;
031import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
032import org.apache.oozie.executor.jpa.JPAExecutorException;
033import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
034import org.apache.oozie.executor.jpa.WorkflowActionRetryManualGetJPAExecutor;
035import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
036import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
037import org.apache.oozie.service.EventHandlerService;
038import org.apache.oozie.service.JPAService;
039import org.apache.oozie.service.Services;
040import org.apache.oozie.util.InstrumentUtils;
041import org.apache.oozie.util.LogUtils;
042import org.apache.oozie.util.ParamChecker;
043import org.apache.oozie.workflow.WorkflowException;
044import org.apache.oozie.workflow.WorkflowInstance;
045import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
046
047public class SuspendXCommand extends WorkflowXCommand<Void> {
048    private final String wfid;
049    private WorkflowJobBean wfJobBean;
050    private JPAService jpaService;
051    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
052
053    public SuspendXCommand(String id) {
054        super("suspend", "suspend", 1);
055        this.wfid = ParamChecker.notEmpty(id, "wfid");
056    }
057
058    @Override
059    protected Void execute() throws CommandException {
060        InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
061        try {
062            suspendJob(this.jpaService, this.wfJobBean, this.wfid, null, updateList);
063            this.wfJobBean.setLastModifiedTime(new Date());
064            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
065                    this.wfJobBean));
066            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
067            queue(new NotificationXCommand(this.wfJobBean));
068        }
069        catch (WorkflowException e) {
070            throw new CommandException(e);
071        }
072        catch (JPAExecutorException je) {
073            throw new CommandException(je);
074        }
075        finally {
076            updateParentIfNecessary(wfJobBean);
077        }
078        return null;
079    }
080
081    /**
082     * Suspend the workflow job and pending flag to false for the actions that are START_RETRY or START_MANUAL or
083     * END_RETRY or END_MANUAL
084     *
085     * @param jpaService jpa service
086     * @param workflow workflow job
087     * @param id workflow job id
088     * @param actionId workflow action id
089     * @throws WorkflowException thrown if failed to suspend workflow instance
090     * @throws CommandException thrown if unable set pending false for actions
091     */
092    public static void suspendJob(JPAService jpaService, WorkflowJobBean workflow, String id,
093            String actionId, List<UpdateEntry> updateList) throws WorkflowException, CommandException {
094        if (workflow.getStatus() == WorkflowJob.Status.RUNNING) {
095            workflow.getWorkflowInstance().suspend();
096            WorkflowInstance wfInstance = workflow.getWorkflowInstance();
097            ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.SUSPENDED);
098            workflow.setStatus(WorkflowJob.Status.SUSPENDED);
099            workflow.setWorkflowInstance(wfInstance);
100
101            setPendingFalseForActions(jpaService, id, actionId, updateList);
102            if (EventHandlerService.isEnabled()) {
103                generateEvent(workflow);
104            }
105        }
106    }
107
108    /**
109     * Set pending flag to false for the actions that are START_RETRY or START_MANUAL or END_RETRY or END_MANUAL
110     * <p/>
111     *
112     * @param jpaService jpa service
113     * @param id workflow job id
114     * @param actionId workflow action id
115     * @throws CommandException thrown if failed to update workflow action
116     */
117    private static void setPendingFalseForActions(JPAService jpaService, String id, String actionId,
118            List<UpdateEntry> updateList) throws CommandException {
119        List<WorkflowActionBean> actions;
120        try {
121            actions = jpaService.execute(new WorkflowActionRetryManualGetJPAExecutor(id));
122
123            for (WorkflowActionBean action : actions) {
124                if (actionId != null && actionId.equals(action.getId())) {
125                    // this action has been changed in handleNonTransient()
126                    continue;
127                }
128                else {
129                    action.resetPendingOnly();
130                }
131                if (updateList != null) { // will be null when suspendJob
132                                          // invoked statically via
133                                          // handleNonTransient()
134                    updateList.add(new UpdateEntry<WorkflowActionQuery>(
135                            WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, action));
136                }
137            }
138        }
139        catch (JPAExecutorException je) {
140            throw new CommandException(je);
141        }
142    }
143
144    @Override
145    protected void eagerLoadState() throws CommandException {
146        try {
147            jpaService = Services.get().get(JPAService.class);
148            this.wfJobBean = WorkflowJobQueryExecutor.getInstance()
149                    .get(WorkflowJobQuery.GET_WORKFLOW_STATUS, this.wfid);
150        }
151        catch (Exception ex) {
152            throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex);
153        }
154        LogUtils.setLogInfo(this.wfJobBean, logInfo);
155    }
156
157    @Override
158    protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
159        if (this.wfJobBean.getStatus() != WorkflowJob.Status.RUNNING) {
160            throw new PreconditionException(ErrorCode.E0727, this.wfJobBean.getId(), this.wfJobBean.getStatus());
161        }
162    }
163
164    @Override
165    public String getEntityKey() {
166        return this.wfid;
167    }
168
169    @Override
170    public String getKey() {
171        return getName() + "_" + this.wfid;
172    }
173
174    @Override
175    protected boolean isLockRequired() {
176        return true;
177    }
178
179    @Override
180    protected void loadState() throws CommandException {
181        try {
182            this.wfJobBean = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_SUSPEND,
183                    this.wfid);
184        }
185        catch (Exception ex) {
186            throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex);
187        }
188    }
189
190    @Override
191    protected void verifyPrecondition() throws CommandException, PreconditionException {
192        eagerVerifyPrecondition();
193    }
194}