This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.util.Date;
021
022 import org.apache.oozie.ErrorCode;
023 import org.apache.oozie.WorkflowActionBean;
024 import org.apache.oozie.WorkflowJobBean;
025 import org.apache.oozie.client.WorkflowJob;
026 import org.apache.oozie.command.CommandException;
027 import org.apache.oozie.command.PreconditionException;
028 import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
029 import org.apache.oozie.executor.jpa.JPAExecutorException;
030 import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
031 import org.apache.oozie.executor.jpa.WorkflowJobGetActionsJPAExecutor;
032 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
033 import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
034 import org.apache.oozie.service.JPAService;
035 import org.apache.oozie.service.Services;
036 import org.apache.oozie.util.InstrumentUtils;
037 import org.apache.oozie.util.LogUtils;
038 import org.apache.oozie.util.ParamChecker;
039 import org.apache.oozie.workflow.WorkflowException;
040 import org.apache.oozie.workflow.WorkflowInstance;
041 import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
042
043 public class ResumeXCommand extends WorkflowXCommand<Void> {
044
045 private String id;
046 private JPAService jpaService = null;
047 private WorkflowJobBean workflow = null;
048
049 public ResumeXCommand(String id) {
050 super("resume", "resume", 1);
051 this.id = ParamChecker.notEmpty(id, "id");
052 }
053
054 @Override
055 protected Void execute() throws CommandException {
056 try {
057 if (workflow.getStatus() == WorkflowJob.Status.SUSPENDED) {
058 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
059 workflow.getWorkflowInstance().resume();
060 WorkflowInstance wfInstance = workflow.getWorkflowInstance();
061 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.RUNNING);
062 workflow.setWorkflowInstance(wfInstance);
063 workflow.setStatus(WorkflowJob.Status.RUNNING);
064
065
066 //for (WorkflowActionBean action : store.getActionsForWorkflow(id, false)) {
067 for (WorkflowActionBean action : jpaService.execute(new WorkflowJobGetActionsJPAExecutor(id))) {
068
069 // Set pending flag to true for the actions that are START_RETRY or
070 // START_MANUAL or END_RETRY or END_MANUAL
071 if (action.isRetryOrManual()) {
072 action.setPendingOnly();
073 jpaService.execute(new WorkflowActionUpdateJPAExecutor(action));
074 }
075
076 if (action.isPending()) {
077 if (action.getStatus() == WorkflowActionBean.Status.PREP
078 || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
079 queue(new ActionStartXCommand(action.getId(), action.getType()));
080 }
081 else {
082 if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) {
083 Date nextRunTime = action.getPendingAge();
084 queue(new ActionStartXCommand(action.getId(), action.getType()),
085 nextRunTime.getTime() - System.currentTimeMillis());
086 }
087 else {
088 if (action.getStatus() == WorkflowActionBean.Status.DONE
089 || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
090 queue(new ActionEndXCommand(action.getId(), action.getType()));
091 }
092 else {
093 if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) {
094 Date nextRunTime = action.getPendingAge();
095 queue(new ActionEndXCommand(action.getId(), action.getType()),
096 nextRunTime.getTime() - System.currentTimeMillis());
097 }
098 }
099 }
100 }
101
102 }
103 }
104
105 jpaService.execute(new WorkflowJobUpdateJPAExecutor(workflow));
106 queue(new NotificationXCommand(workflow));
107 }
108 return null;
109 }
110 catch (WorkflowException ex) {
111 throw new CommandException(ex);
112 }
113 catch (JPAExecutorException e) {
114 throw new CommandException(e);
115 }
116 finally {
117 // update coordinator action
118 new CoordActionUpdateXCommand(workflow).call();
119 }
120 }
121
122 @Override
123 public String getEntityKey() {
124 return id;
125 }
126
127 @Override
128 protected boolean isLockRequired() {
129 return true;
130 }
131
132 @Override
133 protected void loadState() throws CommandException {
134 jpaService = Services.get().get(JPAService.class);
135 if (jpaService == null) {
136 throw new CommandException(ErrorCode.E0610);
137 }
138 try {
139 workflow = jpaService.execute(new WorkflowJobGetJPAExecutor(id));
140 }
141 catch (JPAExecutorException e) {
142 throw new CommandException(e);
143 }
144 LogUtils.setLogInfo(workflow, logInfo);
145 }
146
147 @Override
148 protected void verifyPrecondition() throws CommandException, PreconditionException {
149 if (workflow.getStatus() != WorkflowJob.Status.SUSPENDED) {
150 throw new PreconditionException(ErrorCode.E1100, "workflow's status is " + workflow.getStatusStr() + " is not SUSPENDED");
151 }
152 }
153 }