001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.wf; 019 020import java.util.ArrayList; 021import java.util.Date; 022import java.util.List; 023 024import org.apache.oozie.ErrorCode; 025import org.apache.oozie.WorkflowActionBean; 026import org.apache.oozie.WorkflowJobBean; 027import org.apache.oozie.client.WorkflowJob; 028import org.apache.oozie.command.CommandException; 029import org.apache.oozie.command.PreconditionException; 030import org.apache.oozie.executor.jpa.BatchQueryExecutor; 031import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 032import org.apache.oozie.executor.jpa.JPAExecutorException; 033import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; 034import org.apache.oozie.executor.jpa.WorkflowActionRetryManualGetJPAExecutor; 035import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 036import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 037import org.apache.oozie.service.EventHandlerService; 038import org.apache.oozie.service.JPAService; 039import org.apache.oozie.service.Services; 040import org.apache.oozie.util.InstrumentUtils; 041import org.apache.oozie.util.LogUtils; 042import org.apache.oozie.util.ParamChecker; 043import org.apache.oozie.workflow.WorkflowException; 044import org.apache.oozie.workflow.WorkflowInstance; 045import org.apache.oozie.workflow.lite.LiteWorkflowInstance; 046 047public class SuspendXCommand extends WorkflowXCommand<Void> { 048 private final String wfid; 049 private WorkflowJobBean wfJobBean; 050 private JPAService jpaService; 051 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 052 053 public SuspendXCommand(String id) { 054 super("suspend", "suspend", 1); 055 this.wfid = ParamChecker.notEmpty(id, "wfid"); 056 } 057 058 @Override 059 protected Void execute() throws CommandException { 060 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 061 try { 062 suspendJob(this.jpaService, this.wfJobBean, this.wfid, null, updateList); 063 this.wfJobBean.setLastModifiedTime(new Date()); 064 updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, 065 this.wfJobBean)); 066 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 067 queue(new NotificationXCommand(this.wfJobBean)); 068 } 069 catch (WorkflowException e) { 070 throw new CommandException(e); 071 } 072 catch (JPAExecutorException je) { 073 throw new CommandException(je); 074 } 075 finally { 076 updateParentIfNecessary(wfJobBean); 077 } 078 return null; 079 } 080 081 /** 082 * Suspend the workflow job and pending flag to false for the actions that are START_RETRY or START_MANUAL or 083 * END_RETRY or END_MANUAL 084 * 085 * @param jpaService jpa service 086 * @param workflow workflow job 087 * @param id workflow job id 088 * @param actionId workflow action id 089 * @throws WorkflowException thrown if failed to suspend workflow instance 090 * @throws CommandException thrown if unable set pending false for actions 091 */ 092 public static void suspendJob(JPAService jpaService, WorkflowJobBean workflow, String id, 093 String actionId, List<UpdateEntry> updateList) throws WorkflowException, CommandException { 094 if (workflow.getStatus() == WorkflowJob.Status.RUNNING) { 095 workflow.getWorkflowInstance().suspend(); 096 WorkflowInstance wfInstance = workflow.getWorkflowInstance(); 097 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.SUSPENDED); 098 workflow.setStatus(WorkflowJob.Status.SUSPENDED); 099 workflow.setWorkflowInstance(wfInstance); 100 101 setPendingFalseForActions(jpaService, id, actionId, updateList); 102 if (EventHandlerService.isEnabled()) { 103 generateEvent(workflow); 104 } 105 } 106 } 107 108 /** 109 * Set pending flag to false for the actions that are START_RETRY or START_MANUAL or END_RETRY or END_MANUAL 110 * <p/> 111 * 112 * @param jpaService jpa service 113 * @param id workflow job id 114 * @param actionId workflow action id 115 * @throws CommandException thrown if failed to update workflow action 116 */ 117 private static void setPendingFalseForActions(JPAService jpaService, String id, String actionId, 118 List<UpdateEntry> updateList) throws CommandException { 119 List<WorkflowActionBean> actions; 120 try { 121 actions = jpaService.execute(new WorkflowActionRetryManualGetJPAExecutor(id)); 122 123 for (WorkflowActionBean action : actions) { 124 if (actionId != null && actionId.equals(action.getId())) { 125 // this action has been changed in handleNonTransient() 126 continue; 127 } 128 else { 129 action.resetPendingOnly(); 130 } 131 if (updateList != null) { // will be null when suspendJob 132 // invoked statically via 133 // handleNonTransient() 134 updateList.add(new UpdateEntry<WorkflowActionQuery>( 135 WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, action)); 136 } 137 } 138 } 139 catch (JPAExecutorException je) { 140 throw new CommandException(je); 141 } 142 } 143 144 @Override 145 protected void eagerLoadState() throws CommandException { 146 try { 147 jpaService = Services.get().get(JPAService.class); 148 this.wfJobBean = WorkflowJobQueryExecutor.getInstance() 149 .get(WorkflowJobQuery.GET_WORKFLOW_STATUS, this.wfid); 150 } 151 catch (Exception ex) { 152 throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex); 153 } 154 LogUtils.setLogInfo(this.wfJobBean, logInfo); 155 } 156 157 @Override 158 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 159 if (this.wfJobBean.getStatus() != WorkflowJob.Status.RUNNING) { 160 throw new PreconditionException(ErrorCode.E0727, this.wfJobBean.getId(), this.wfJobBean.getStatus()); 161 } 162 } 163 164 @Override 165 public String getEntityKey() { 166 return this.wfid; 167 } 168 169 @Override 170 public String getKey() { 171 return getName() + "_" + this.wfid; 172 } 173 174 @Override 175 protected boolean isLockRequired() { 176 return true; 177 } 178 179 @Override 180 protected void loadState() throws CommandException { 181 try { 182 this.wfJobBean = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_SUSPEND, 183 this.wfid); 184 } 185 catch (Exception ex) { 186 throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex); 187 } 188 } 189 190 @Override 191 protected void verifyPrecondition() throws CommandException, PreconditionException { 192 eagerVerifyPrecondition(); 193 } 194}