001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.util.ArrayList; 021 import java.util.Date; 022 import java.util.List; 023 024 import org.apache.oozie.ErrorCode; 025 import org.apache.oozie.WorkflowActionBean; 026 import org.apache.oozie.WorkflowJobBean; 027 import org.apache.oozie.client.WorkflowJob; 028 import org.apache.oozie.client.rest.JsonBean; 029 import org.apache.oozie.command.CommandException; 030 import org.apache.oozie.command.PreconditionException; 031 import org.apache.oozie.command.coord.CoordActionUpdateXCommand; 032 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 033 import org.apache.oozie.executor.jpa.JPAExecutorException; 034 import org.apache.oozie.executor.jpa.WorkflowActionRetryManualGetJPAExecutor; 035 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 036 import org.apache.oozie.service.EventHandlerService; 037 import org.apache.oozie.service.JPAService; 038 import org.apache.oozie.service.Services; 039 import org.apache.oozie.util.InstrumentUtils; 040 import org.apache.oozie.util.LogUtils; 041 import org.apache.oozie.util.ParamChecker; 042 import org.apache.oozie.workflow.WorkflowException; 043 import org.apache.oozie.workflow.WorkflowInstance; 044 import org.apache.oozie.workflow.lite.LiteWorkflowInstance; 045 046 public class SuspendXCommand extends WorkflowXCommand<Void> { 047 private final String wfid; 048 private WorkflowJobBean wfJobBean; 049 private JPAService jpaService; 050 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 051 052 public SuspendXCommand(String id) { 053 super("suspend", "suspend", 1); 054 this.wfid = ParamChecker.notEmpty(id, "wfid"); 055 } 056 057 @Override 058 protected Void execute() throws CommandException { 059 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 060 try { 061 suspendJob(this.jpaService, this.wfJobBean, this.wfid, null, updateList); 062 this.wfJobBean.setLastModifiedTime(new Date()); 063 updateList.add(this.wfJobBean); 064 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null)); 065 queue(new NotificationXCommand(this.wfJobBean)); 066 } 067 catch (WorkflowException e) { 068 throw new CommandException(e); 069 } 070 catch (JPAExecutorException je) { 071 throw new CommandException(je); 072 } 073 finally { 074 // update coordinator action 075 new CoordActionUpdateXCommand(wfJobBean).call(); 076 } 077 return null; 078 } 079 080 /** 081 * Suspend the workflow job and pending flag to false for the actions that are START_RETRY or START_MANUAL or 082 * END_RETRY or END_MANUAL 083 * 084 * @param jpaService jpa service 085 * @param workflow workflow job 086 * @param id workflow job id 087 * @param actionId workflow action id 088 * @throws WorkflowException thrown if failed to suspend workflow instance 089 * @throws CommandException thrown if unable set pending false for actions 090 */ 091 public static void suspendJob(JPAService jpaService, WorkflowJobBean workflow, String id, 092 String actionId, List<JsonBean> updateList) throws WorkflowException, CommandException { 093 if (workflow.getStatus() == WorkflowJob.Status.RUNNING) { 094 workflow.getWorkflowInstance().suspend(); 095 WorkflowInstance wfInstance = workflow.getWorkflowInstance(); 096 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.SUSPENDED); 097 workflow.setStatus(WorkflowJob.Status.SUSPENDED); 098 workflow.setWorkflowInstance(wfInstance); 099 100 setPendingFalseForActions(jpaService, id, actionId, updateList); 101 if (EventHandlerService.isEnabled()) { 102 generateEvent(workflow); 103 } 104 } 105 } 106 107 /** 108 * Set pending flag to false for the actions that are START_RETRY or START_MANUAL or END_RETRY or END_MANUAL 109 * <p/> 110 * 111 * @param jpaService jpa service 112 * @param id workflow job id 113 * @param actionId workflow action id 114 * @throws CommandException thrown if failed to update workflow action 115 */ 116 private static void setPendingFalseForActions(JPAService jpaService, String id, String actionId, 117 List<JsonBean> updateList) throws CommandException { 118 List<WorkflowActionBean> actions; 119 try { 120 actions = jpaService.execute(new WorkflowActionRetryManualGetJPAExecutor(id)); 121 122 for (WorkflowActionBean action : actions) { 123 if (actionId != null && actionId.equals(action.getId())) { 124 // this action has been changed in handleNonTransient() 125 continue; 126 } 127 else { 128 action.resetPendingOnly(); 129 } 130 if (updateList != null) { // will be null when suspendJob 131 // invoked statically via 132 // handleNonTransient() 133 updateList.add(action); 134 } 135 } 136 } 137 catch (JPAExecutorException je) { 138 throw new CommandException(je); 139 } 140 } 141 142 @Override 143 protected void eagerLoadState() throws CommandException { 144 super.eagerLoadState(); 145 try { 146 jpaService = Services.get().get(JPAService.class); 147 if (jpaService != null) { 148 this.wfJobBean = jpaService.execute(new WorkflowJobGetJPAExecutor(this.wfid)); 149 } 150 else { 151 throw new CommandException(ErrorCode.E0610); 152 } 153 } 154 catch (Exception ex) { 155 throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex); 156 } 157 LogUtils.setLogInfo(this.wfJobBean, logInfo); 158 } 159 160 @Override 161 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 162 super.eagerVerifyPrecondition(); 163 if (this.wfJobBean.getStatus() != WorkflowJob.Status.RUNNING) { 164 throw new PreconditionException(ErrorCode.E0727, this.wfJobBean.getId(), this.wfJobBean.getStatus()); 165 } 166 } 167 168 @Override 169 public String getEntityKey() { 170 return this.wfid; 171 } 172 173 @Override 174 public String getKey() { 175 return getName() + "_" + this.wfid; 176 } 177 178 @Override 179 protected boolean isLockRequired() { 180 return true; 181 } 182 183 @Override 184 protected void loadState() throws CommandException { 185 eagerLoadState(); 186 } 187 188 @Override 189 protected void verifyPrecondition() throws CommandException, PreconditionException { 190 eagerVerifyPrecondition(); 191 } 192 }