This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.util.ArrayList;
021 import java.util.Date;
022 import java.util.List;
023
024 import org.apache.oozie.ErrorCode;
025 import org.apache.oozie.WorkflowActionBean;
026 import org.apache.oozie.WorkflowJobBean;
027 import org.apache.oozie.client.WorkflowJob;
028 import org.apache.oozie.client.rest.JsonBean;
029 import org.apache.oozie.command.CommandException;
030 import org.apache.oozie.command.PreconditionException;
031 import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
032 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
033 import org.apache.oozie.executor.jpa.JPAExecutorException;
034 import org.apache.oozie.executor.jpa.WorkflowActionRetryManualGetJPAExecutor;
035 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
036 import org.apache.oozie.service.EventHandlerService;
037 import org.apache.oozie.service.JPAService;
038 import org.apache.oozie.service.Services;
039 import org.apache.oozie.util.InstrumentUtils;
040 import org.apache.oozie.util.LogUtils;
041 import org.apache.oozie.util.ParamChecker;
042 import org.apache.oozie.workflow.WorkflowException;
043 import org.apache.oozie.workflow.WorkflowInstance;
044 import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
045
046 public class SuspendXCommand extends WorkflowXCommand<Void> {
047 private final String wfid;
048 private WorkflowJobBean wfJobBean;
049 private JPAService jpaService;
050 private List<JsonBean> updateList = new ArrayList<JsonBean>();
051
052 public SuspendXCommand(String id) {
053 super("suspend", "suspend", 1);
054 this.wfid = ParamChecker.notEmpty(id, "wfid");
055 }
056
057 @Override
058 protected Void execute() throws CommandException {
059 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
060 try {
061 suspendJob(this.jpaService, this.wfJobBean, this.wfid, null, updateList);
062 this.wfJobBean.setLastModifiedTime(new Date());
063 updateList.add(this.wfJobBean);
064 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
065 queue(new NotificationXCommand(this.wfJobBean));
066 }
067 catch (WorkflowException e) {
068 throw new CommandException(e);
069 }
070 catch (JPAExecutorException je) {
071 throw new CommandException(je);
072 }
073 finally {
074 // update coordinator action
075 new CoordActionUpdateXCommand(wfJobBean).call();
076 }
077 return null;
078 }
079
080 /**
081 * Suspend the workflow job and pending flag to false for the actions that are START_RETRY or START_MANUAL or
082 * END_RETRY or END_MANUAL
083 *
084 * @param jpaService jpa service
085 * @param workflow workflow job
086 * @param id workflow job id
087 * @param actionId workflow action id
088 * @throws WorkflowException thrown if failed to suspend workflow instance
089 * @throws CommandException thrown if unable set pending false for actions
090 */
091 public static void suspendJob(JPAService jpaService, WorkflowJobBean workflow, String id,
092 String actionId, List<JsonBean> updateList) throws WorkflowException, CommandException {
093 if (workflow.getStatus() == WorkflowJob.Status.RUNNING) {
094 workflow.getWorkflowInstance().suspend();
095 WorkflowInstance wfInstance = workflow.getWorkflowInstance();
096 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.SUSPENDED);
097 workflow.setStatus(WorkflowJob.Status.SUSPENDED);
098 workflow.setWorkflowInstance(wfInstance);
099
100 setPendingFalseForActions(jpaService, id, actionId, updateList);
101 if (EventHandlerService.isEnabled()) {
102 generateEvent(workflow);
103 }
104 }
105 }
106
107 /**
108 * Set pending flag to false for the actions that are START_RETRY or START_MANUAL or END_RETRY or END_MANUAL
109 * <p/>
110 *
111 * @param jpaService jpa service
112 * @param id workflow job id
113 * @param actionId workflow action id
114 * @throws CommandException thrown if failed to update workflow action
115 */
116 private static void setPendingFalseForActions(JPAService jpaService, String id, String actionId,
117 List<JsonBean> updateList) throws CommandException {
118 List<WorkflowActionBean> actions;
119 try {
120 actions = jpaService.execute(new WorkflowActionRetryManualGetJPAExecutor(id));
121
122 for (WorkflowActionBean action : actions) {
123 if (actionId != null && actionId.equals(action.getId())) {
124 // this action has been changed in handleNonTransient()
125 continue;
126 }
127 else {
128 action.resetPendingOnly();
129 }
130 if (updateList != null) { // will be null when suspendJob
131 // invoked statically via
132 // handleNonTransient()
133 updateList.add(action);
134 }
135 }
136 }
137 catch (JPAExecutorException je) {
138 throw new CommandException(je);
139 }
140 }
141
142 @Override
143 protected void eagerLoadState() throws CommandException {
144 super.eagerLoadState();
145 try {
146 jpaService = Services.get().get(JPAService.class);
147 if (jpaService != null) {
148 this.wfJobBean = jpaService.execute(new WorkflowJobGetJPAExecutor(this.wfid));
149 }
150 else {
151 throw new CommandException(ErrorCode.E0610);
152 }
153 }
154 catch (Exception ex) {
155 throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex);
156 }
157 LogUtils.setLogInfo(this.wfJobBean, logInfo);
158 }
159
160 @Override
161 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
162 super.eagerVerifyPrecondition();
163 if (this.wfJobBean.getStatus() != WorkflowJob.Status.RUNNING) {
164 throw new PreconditionException(ErrorCode.E0727, this.wfJobBean.getId(), this.wfJobBean.getStatus());
165 }
166 }
167
168 @Override
169 public String getEntityKey() {
170 return this.wfid;
171 }
172
173 @Override
174 public String getKey() {
175 return getName() + "_" + this.wfid;
176 }
177
178 @Override
179 protected boolean isLockRequired() {
180 return true;
181 }
182
183 @Override
184 protected void loadState() throws CommandException {
185 eagerLoadState();
186 }
187
188 @Override
189 protected void verifyPrecondition() throws CommandException, PreconditionException {
190 eagerVerifyPrecondition();
191 }
192 }