This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.util.ArrayList;
021 import java.util.Date;
022 import java.util.List;
023
024 import org.apache.oozie.ErrorCode;
025 import org.apache.oozie.WorkflowActionBean;
026 import org.apache.oozie.WorkflowJobBean;
027 import org.apache.oozie.client.WorkflowJob;
028 import org.apache.oozie.client.rest.JsonBean;
029 import org.apache.oozie.command.CommandException;
030 import org.apache.oozie.command.PreconditionException;
031 import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
032 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
033 import org.apache.oozie.executor.jpa.JPAExecutorException;
034 import org.apache.oozie.executor.jpa.WorkflowActionRetryManualGetJPAExecutor;
035 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
036 import org.apache.oozie.service.JPAService;
037 import org.apache.oozie.service.Services;
038 import org.apache.oozie.util.InstrumentUtils;
039 import org.apache.oozie.util.LogUtils;
040 import org.apache.oozie.util.ParamChecker;
041 import org.apache.oozie.workflow.WorkflowException;
042 import org.apache.oozie.workflow.WorkflowInstance;
043 import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
044
045 public class SuspendXCommand extends WorkflowXCommand<Void> {
046 private final String wfid;
047 private WorkflowJobBean wfJobBean;
048 private JPAService jpaService;
049 private List<JsonBean> updateList = new ArrayList<JsonBean>();
050
051 public SuspendXCommand(String id) {
052 super("suspend", "suspend", 1);
053 this.wfid = ParamChecker.notEmpty(id, "wfid");
054 }
055
056 /* (non-Javadoc)
057 * @see org.apache.oozie.command.XCommand#execute()
058 */
059 @Override
060 protected Void execute() throws CommandException {
061 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
062 try {
063 suspendJob(this.jpaService, this.wfJobBean, this.wfid, null, updateList);
064 this.wfJobBean.setLastModifiedTime(new Date());
065 updateList.add(this.wfJobBean);
066 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
067 queue(new NotificationXCommand(this.wfJobBean));
068 }
069 catch (WorkflowException e) {
070 throw new CommandException(e);
071 }
072 catch (JPAExecutorException je) {
073 throw new CommandException(je);
074 }
075 finally {
076 // update coordinator action
077 new CoordActionUpdateXCommand(wfJobBean).call();
078 }
079 return null;
080 }
081
082 /**
083 * Suspend the workflow job and pending flag to false for the actions that are START_RETRY or START_MANUAL or
084 * END_RETRY or END_MANUAL
085 *
086 * @param jpaService jpa service
087 * @param workflow workflow job
088 * @param id workflow job id
089 * @param actionId workflow action id
090 * @throws WorkflowException thrown if failed to suspend workflow instance
091 * @throws CommandException thrown if unable set pending false for actions
092 */
093 public static void suspendJob(JPAService jpaService, WorkflowJobBean workflow, String id,
094 String actionId, List<JsonBean> updateList) throws WorkflowException, CommandException {
095 if (workflow.getStatus() == WorkflowJob.Status.RUNNING) {
096 workflow.getWorkflowInstance().suspend();
097 WorkflowInstance wfInstance = workflow.getWorkflowInstance();
098 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.SUSPENDED);
099 workflow.setStatus(WorkflowJob.Status.SUSPENDED);
100 workflow.setWorkflowInstance(wfInstance);
101
102 setPendingFalseForActions(jpaService, id, actionId, updateList);
103 }
104 }
105
106 /**
107 * Set pending flag to false for the actions that are START_RETRY or START_MANUAL or END_RETRY or END_MANUAL
108 * <p/>
109 *
110 * @param jpaService jpa service
111 * @param id workflow job id
112 * @param actionId workflow action id
113 * @throws CommandException thrown if failed to update workflow action
114 */
115 private static void setPendingFalseForActions(JPAService jpaService, String id, String actionId,
116 List<JsonBean> updateList) throws CommandException {
117 List<WorkflowActionBean> actions;
118 try {
119 actions = jpaService.execute(new WorkflowActionRetryManualGetJPAExecutor(id));
120
121 for (WorkflowActionBean action : actions) {
122 if (actionId != null && actionId.equals(action.getId())) {
123 // this action has been changed in handleNonTransient()
124 continue;
125 }
126 else {
127 action.resetPendingOnly();
128 }
129 if (updateList != null) { // will be null when suspendJob
130 // invoked statically via
131 // handleNonTransient()
132 updateList.add(action);
133 }
134 }
135 }
136 catch (JPAExecutorException je) {
137 throw new CommandException(je);
138 }
139 }
140
141 /* (non-Javadoc)
142 * @see org.apache.oozie.command.XCommand#eagerLoadState()
143 */
144 @Override
145 protected void eagerLoadState() throws CommandException {
146 super.eagerLoadState();
147 try {
148 jpaService = Services.get().get(JPAService.class);
149 if (jpaService != null) {
150 this.wfJobBean = jpaService.execute(new WorkflowJobGetJPAExecutor(this.wfid));
151 }
152 else {
153 throw new CommandException(ErrorCode.E0610);
154 }
155 }
156 catch (Exception ex) {
157 throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex);
158 }
159 LogUtils.setLogInfo(this.wfJobBean, logInfo);
160 }
161
162 /* (non-Javadoc)
163 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
164 */
165 @Override
166 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
167 super.eagerVerifyPrecondition();
168 if (this.wfJobBean.getStatus() != WorkflowJob.Status.RUNNING) {
169 throw new PreconditionException(ErrorCode.E0727, this.wfJobBean.getId(), this.wfJobBean.getStatus());
170 }
171 }
172
173 /* (non-Javadoc)
174 * @see org.apache.oozie.command.XCommand#getEntityKey()
175 */
176 @Override
177 public String getEntityKey() {
178 return this.wfid;
179 }
180
181 /* (non-Javadoc)
182 * @see org.apache.oozie.command.XCommand#isLockRequired()
183 */
184 @Override
185 protected boolean isLockRequired() {
186 return true;
187 }
188
189 /* (non-Javadoc)
190 * @see org.apache.oozie.command.XCommand#loadState()
191 */
192 @Override
193 protected void loadState() throws CommandException {
194
195 }
196
197 /* (non-Javadoc)
198 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
199 */
200 @Override
201 protected void verifyPrecondition() throws CommandException, PreconditionException {
202 }
203 }