This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.io.IOException;
021 import java.net.URISyntaxException;
022 import java.util.ArrayList;
023 import java.util.Date;
024 import java.util.List;
025
026 import org.apache.oozie.ErrorCode;
027 import org.apache.oozie.WorkflowActionBean;
028 import org.apache.oozie.WorkflowJobBean;
029 import org.apache.oozie.action.control.EndActionExecutor;
030 import org.apache.oozie.action.control.ForkActionExecutor;
031 import org.apache.oozie.action.control.JoinActionExecutor;
032 import org.apache.oozie.action.control.KillActionExecutor;
033 import org.apache.oozie.action.control.StartActionExecutor;
034 import org.apache.oozie.client.WorkflowJob;
035 import org.apache.oozie.client.rest.JsonBean;
036 import org.apache.oozie.command.CommandException;
037 import org.apache.oozie.command.PreconditionException;
038 import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
039 import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
040 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
041 import org.apache.oozie.executor.jpa.JPAExecutorException;
042 import org.apache.oozie.executor.jpa.WorkflowJobGetActionsJPAExecutor;
043 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
044 import org.apache.oozie.service.HadoopAccessorException;
045 import org.apache.oozie.service.JPAService;
046 import org.apache.oozie.service.Services;
047 import org.apache.oozie.util.InstrumentUtils;
048 import org.apache.oozie.util.LogUtils;
049 import org.apache.oozie.util.ParamChecker;
050 import org.apache.oozie.workflow.WorkflowException;
051 import org.apache.oozie.workflow.WorkflowInstance;
052 import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
053
054 public class ResumeXCommand extends WorkflowXCommand<Void> {
055
056 private String id;
057 private JPAService jpaService = null;
058 private WorkflowJobBean workflow = null;
059 private List<JsonBean> updateList = new ArrayList<JsonBean>();
060
061 public ResumeXCommand(String id) {
062 super("resume", "resume", 1);
063 this.id = ParamChecker.notEmpty(id, "id");
064 }
065
066 @Override
067 protected Void execute() throws CommandException {
068 try {
069 if (workflow.getStatus() == WorkflowJob.Status.SUSPENDED) {
070 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
071 workflow.getWorkflowInstance().resume();
072 WorkflowInstance wfInstance = workflow.getWorkflowInstance();
073 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.RUNNING);
074 workflow.setWorkflowInstance(wfInstance);
075 workflow.setStatus(WorkflowJob.Status.RUNNING);
076
077
078 //for (WorkflowActionBean action : store.getActionsForWorkflow(id, false)) {
079 for (WorkflowActionBean action : jpaService.execute(new WorkflowJobGetActionsJPAExecutor(id))) {
080
081 // Set pending flag to true for the actions that are START_RETRY or
082 // START_MANUAL or END_RETRY or END_MANUAL
083 if (action.isRetryOrManual()) {
084 action.setPendingOnly();
085 updateList.add(action);
086 }
087
088 if (action.isPending()) {
089 if (action.getStatus() == WorkflowActionBean.Status.PREP
090 || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
091 // When resuming a workflow that was programatically suspended (via ActionCheckXCommand) because of
092 // a repeated transient error, we have to clean up the action dir
093 if (!action.getType().equals(StartActionExecutor.TYPE) && // The control actions have invalid
094 !action.getType().equals(ForkActionExecutor.TYPE) && // action dir paths because they
095 !action.getType().equals(JoinActionExecutor.TYPE) && // contain ":" (colons)
096 !action.getType().equals(KillActionExecutor.TYPE) &&
097 !action.getType().equals(EndActionExecutor.TYPE)) {
098 ActionExecutorContext context =
099 new ActionXCommand.ActionExecutorContext(workflow, action, false, false);
100 if (context.getAppFileSystem().exists(context.getActionDir())) {
101 context.getAppFileSystem().delete(context.getActionDir(), true);
102 }
103 }
104 queue(new ActionStartXCommand(action.getId(), action.getType()));
105 }
106 else {
107 if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) {
108 Date nextRunTime = action.getPendingAge();
109 queue(new ActionStartXCommand(action.getId(), action.getType()),
110 nextRunTime.getTime() - System.currentTimeMillis());
111 }
112 else {
113 if (action.getStatus() == WorkflowActionBean.Status.DONE
114 || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
115 queue(new ActionEndXCommand(action.getId(), action.getType()));
116 }
117 else {
118 if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) {
119 Date nextRunTime = action.getPendingAge();
120 queue(new ActionEndXCommand(action.getId(), action.getType()),
121 nextRunTime.getTime() - System.currentTimeMillis());
122 }
123 }
124 }
125 }
126
127 }
128 }
129
130 workflow.setLastModifiedTime(new Date());
131 updateList.add(workflow);
132 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
133 queue(new NotificationXCommand(workflow));
134 }
135 return null;
136 }
137 catch (WorkflowException ex) {
138 throw new CommandException(ex);
139 }
140 catch (JPAExecutorException e) {
141 throw new CommandException(e);
142 }
143 catch (HadoopAccessorException e) {
144 throw new CommandException(e);
145 }
146 catch (IOException e) {
147 throw new CommandException(ErrorCode.E0902, e.getMessage(), e);
148 }
149 catch (URISyntaxException e) {
150 throw new CommandException(ErrorCode.E0902, e.getMessage(), e);
151 }
152 finally {
153 // update coordinator action
154 new CoordActionUpdateXCommand(workflow).call();
155 }
156 }
157
158 @Override
159 public String getEntityKey() {
160 return id;
161 }
162
163 @Override
164 protected boolean isLockRequired() {
165 return true;
166 }
167
168 @Override
169 protected void loadState() throws CommandException {
170 jpaService = Services.get().get(JPAService.class);
171 if (jpaService == null) {
172 throw new CommandException(ErrorCode.E0610);
173 }
174 try {
175 workflow = jpaService.execute(new WorkflowJobGetJPAExecutor(id));
176 }
177 catch (JPAExecutorException e) {
178 throw new CommandException(e);
179 }
180 LogUtils.setLogInfo(workflow, logInfo);
181 }
182
183 @Override
184 protected void verifyPrecondition() throws CommandException, PreconditionException {
185 if (workflow.getStatus() != WorkflowJob.Status.SUSPENDED) {
186 throw new PreconditionException(ErrorCode.E1100, "workflow's status is " + workflow.getStatusStr() + " is not SUSPENDED");
187 }
188 }
189 }