This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.util.List;
021
022 import org.apache.oozie.WorkflowActionBean;
023 import org.apache.oozie.WorkflowJobBean;
024 import org.apache.oozie.client.WorkflowJob;
025 import org.apache.oozie.command.CommandException;
026 import org.apache.oozie.store.StoreException;
027 import org.apache.oozie.store.WorkflowStore;
028 import org.apache.oozie.util.ParamChecker;
029 import org.apache.oozie.util.XLog;
030 import org.apache.oozie.workflow.WorkflowException;
031 import org.apache.oozie.workflow.WorkflowInstance;
032 import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
033
034 public class SuspendCommand extends WorkflowCommand<Void> {
035
036 private String id;
037
038 public SuspendCommand(String id) {
039 super("suspend", "suspend", 1, XLog.STD);
040 this.id = ParamChecker.notEmpty(id, "id");
041 }
042
043 @Override
044 protected Void call(WorkflowStore store) throws StoreException, CommandException {
045 try {
046 WorkflowJobBean workflow = store.getWorkflow(id, false);
047 setLogInfo(workflow);
048 if (workflow.getStatus() == WorkflowJob.Status.RUNNING) {
049 incrJobCounter(1);
050 suspendJob(store, workflow, id, null);
051 store.updateWorkflow(workflow);
052 queueCallable(new NotificationCommand(workflow));
053 }
054 return null;
055 }
056 catch (WorkflowException ex) {
057 throw new CommandException(ex);
058 }
059 }
060
061 /**
062 * Suspend the workflow job and pending flag to false for the actions that
063 * are START_RETRY or START_MANUAL or END_RETRY or END_MANUAL
064 *
065 * @param store WorkflowStore
066 * @param workflow WorkflowJobBean
067 * @param id String
068 * @param actionId String
069 * @throws WorkflowException
070 * @throws StoreException
071 */
072 public static void suspendJob(WorkflowStore store, WorkflowJobBean workflow, String id, String actionId)
073 throws WorkflowException, StoreException {
074 if (workflow.getStatus() == WorkflowJob.Status.RUNNING) {
075 workflow.getWorkflowInstance().suspend();
076 WorkflowInstance wfInstance = workflow.getWorkflowInstance();
077 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.SUSPENDED);
078 workflow.setStatus(WorkflowJob.Status.SUSPENDED);
079 workflow.setWorkflowInstance(wfInstance);
080
081 setPendingFalseForActions(store, id, actionId);
082 }
083 }
084
085 /**
086 * Set pending flag to false for the actions that are START_RETRY or
087 * START_MANUAL or END_RETRY or END_MANUAL
088 * <p/>
089 *
090 * @param store WorkflowStore
091 * @param id workflow id
092 * @param actionId workflow action id
093 * @throws StoreException
094 */
095 public static void setPendingFalseForActions(WorkflowStore store, String id, String actionId) throws StoreException {
096 List<WorkflowActionBean> actions = store.getRetryAndManualActions(id);
097 for (WorkflowActionBean action : actions) {
098 if (actionId != null && actionId.equals(action.getId())) {
099 // this action has been changed in handleNonTransient()
100 continue;
101 }
102 else {
103 action.resetPendingOnly();
104 }
105 store.updateAction(action);
106 }
107 }
108
109 @Override
110 protected Void execute(WorkflowStore store) throws CommandException, StoreException {
111 XLog.getLog(getClass()).debug("STARTED SuspendCommand for wf id=" + id);
112 try {
113 if (lock(id)) {
114 call(store);
115 }
116 else {
117 queueCallable(new SuspendCommand(id), LOCK_FAILURE_REQUEUE_INTERVAL);
118 XLog.getLog(getClass()).warn("Suspend lock was not acquired - failed {0}", id);
119 }
120 }
121 catch (InterruptedException e) {
122 queueCallable(new SuspendCommand(id), LOCK_FAILURE_REQUEUE_INTERVAL);
123 XLog.getLog(getClass()).warn("SuspendCommand lock was not acquired - interrupted exception failed {0}", id);
124 }
125 finally {
126 XLog.getLog(getClass()).debug("ENDED SuspendCommand for wf id=" + id);
127 }
128 return null;
129 }
130 }