001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.wf; 020 021import org.apache.oozie.ErrorCode; 022import org.apache.oozie.WorkflowActionBean; 023import org.apache.oozie.WorkflowJobBean; 024import org.apache.oozie.client.WorkflowJob; 025import org.apache.oozie.command.CommandException; 026import org.apache.oozie.command.PreconditionException; 027import org.apache.oozie.executor.jpa.BatchQueryExecutor; 028import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 029import org.apache.oozie.executor.jpa.JPAExecutorException; 030import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; 031import org.apache.oozie.executor.jpa.WorkflowActionRetryManualGetJPAExecutor; 032import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 033import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 034import org.apache.oozie.service.EventHandlerService; 035import org.apache.oozie.service.JPAService; 036import org.apache.oozie.service.Services; 037import org.apache.oozie.util.InstrumentUtils; 038import org.apache.oozie.util.LogUtils; 039import org.apache.oozie.util.ParamChecker; 040import org.apache.oozie.workflow.WorkflowException; 041import org.apache.oozie.workflow.WorkflowInstance; 042import org.apache.oozie.workflow.lite.LiteWorkflowInstance; 043 044import java.util.ArrayList; 045import java.util.Date; 046import java.util.List; 047 048public class SuspendXCommand extends WorkflowXCommand<Void> { 049 private final String wfid; 050 private WorkflowJobBean wfJobBean; 051 private JPAService jpaService; 052 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 053 054 public SuspendXCommand(String id) { 055 super("suspend", "suspend", 1); 056 this.wfid = ParamChecker.notEmpty(id, "wfid"); 057 } 058 059 @Override 060 protected void setLogInfo() { 061 LogUtils.setLogInfo(wfid); 062 } 063 064 @Override 065 protected Void execute() throws CommandException { 066 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 067 try { 068 suspendJob(this.jpaService, this.wfJobBean, this.wfid, null, updateList); 069 this.wfJobBean.setLastModifiedTime(new Date()); 070 updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, 071 this.wfJobBean)); 072 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 073 queue(new WorkflowNotificationXCommand(this.wfJobBean)); 074 //Calling suspend recursively to handle parent workflow 075 suspendParentWorkFlow(); 076 } 077 catch (WorkflowException e) { 078 throw new CommandException(e); 079 } 080 catch (JPAExecutorException je) { 081 throw new CommandException(je); 082 } 083 finally { 084 updateParentIfNecessary(wfJobBean); 085 } 086 return null; 087 } 088 089 /** 090 * It will suspend the parent workflow 091 * @throws CommandException 092 */ 093 private void suspendParentWorkFlow() throws CommandException { 094 if (this.wfJobBean.getParentId() != null && this.wfJobBean.getParentId().contains("-W")) { 095 new SuspendXCommand(this.wfJobBean.getParentId()).call(); 096 } else { 097 // update the action of the parent workflow if it is launched by coordinator 098 updateParentIfNecessary(wfJobBean); 099 } 100 } 101 102 /** 103 * Suspend the workflow job and pending flag to false for the actions that are START_RETRY or START_MANUAL or 104 * END_RETRY or END_MANUAL 105 * 106 * @param jpaService jpa service 107 * @param workflow workflow job 108 * @param id workflow job id 109 * @param actionId workflow action id 110 * @throws WorkflowException thrown if failed to suspend workflow instance 111 * @throws CommandException thrown if unable set pending false for actions 112 */ 113 public static void suspendJob(JPAService jpaService, WorkflowJobBean workflow, String id, 114 String actionId, List<UpdateEntry> updateList) throws WorkflowException, CommandException { 115 if (workflow.getStatus() == WorkflowJob.Status.RUNNING) { 116 workflow.getWorkflowInstance().suspend(); 117 WorkflowInstance wfInstance = workflow.getWorkflowInstance(); 118 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.SUSPENDED); 119 workflow.setStatus(WorkflowJob.Status.SUSPENDED); 120 workflow.setWorkflowInstance(wfInstance); 121 122 setPendingFalseForActions(jpaService, id, actionId, updateList); 123 if (EventHandlerService.isEnabled()) { 124 generateEvent(workflow); 125 } 126 } 127 } 128 129 /** 130 * Set pending flag to false for the actions that are START_RETRY or START_MANUAL or END_RETRY or END_MANUAL 131 * <p/> 132 * 133 * @param jpaService jpa service 134 * @param id workflow job id 135 * @param actionId workflow action id 136 * @throws CommandException thrown if failed to update workflow action 137 */ 138 private static void setPendingFalseForActions(JPAService jpaService, String id, String actionId, 139 List<UpdateEntry> updateList) throws CommandException { 140 List<WorkflowActionBean> actions; 141 try { 142 actions = jpaService.execute(new WorkflowActionRetryManualGetJPAExecutor(id)); 143 144 for (WorkflowActionBean action : actions) { 145 if (actionId != null && actionId.equals(action.getId())) { 146 // this action has been changed in handleNonTransient() 147 continue; 148 } 149 else { 150 action.resetPendingOnly(); 151 } 152 if (updateList != null) { // will be null when suspendJob 153 // invoked statically via 154 // handleNonTransient() 155 updateList.add(new UpdateEntry<WorkflowActionQuery>( 156 WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, action)); 157 } 158 } 159 } 160 catch (JPAExecutorException je) { 161 throw new CommandException(je); 162 } 163 } 164 165 @Override 166 protected void eagerLoadState() throws CommandException { 167 try { 168 jpaService = Services.get().get(JPAService.class); 169 this.wfJobBean = WorkflowJobQueryExecutor.getInstance() 170 .get(WorkflowJobQuery.GET_WORKFLOW_STATUS, this.wfid); 171 } 172 catch (Exception ex) { 173 throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex); 174 } 175 LogUtils.setLogInfo(this.wfJobBean); 176 } 177 178 @Override 179 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 180 if (this.wfJobBean.getStatus() != WorkflowJob.Status.RUNNING) { 181 throw new PreconditionException(ErrorCode.E0727, this.wfJobBean.getId(), this.wfJobBean.getStatus()); 182 } 183 } 184 185 @Override 186 public String getEntityKey() { 187 return this.wfid; 188 } 189 190 @Override 191 public String getKey() { 192 return getName() + "_" + this.wfid; 193 } 194 195 @Override 196 protected boolean isLockRequired() { 197 return true; 198 } 199 200 @Override 201 protected void loadState() throws CommandException { 202 try { 203 this.wfJobBean = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_SUSPEND, 204 this.wfid); 205 } 206 catch (Exception ex) { 207 throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex); 208 } 209 LogUtils.setLogInfo(wfJobBean); 210 } 211 212 @Override 213 protected void verifyPrecondition() throws CommandException, PreconditionException { 214 eagerVerifyPrecondition(); 215 } 216}