001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.util.ArrayList; 021 import java.util.Date; 022 import java.util.List; 023 024 import org.apache.oozie.ErrorCode; 025 import org.apache.oozie.WorkflowActionBean; 026 import org.apache.oozie.WorkflowJobBean; 027 import org.apache.oozie.client.WorkflowJob; 028 import org.apache.oozie.client.rest.JsonBean; 029 import org.apache.oozie.command.CommandException; 030 import org.apache.oozie.command.PreconditionException; 031 import org.apache.oozie.command.coord.CoordActionUpdateXCommand; 032 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 033 import org.apache.oozie.executor.jpa.JPAExecutorException; 034 import org.apache.oozie.executor.jpa.WorkflowActionRetryManualGetJPAExecutor; 035 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 036 import org.apache.oozie.service.JPAService; 037 import org.apache.oozie.service.Services; 038 import org.apache.oozie.util.InstrumentUtils; 039 import org.apache.oozie.util.LogUtils; 040 import org.apache.oozie.util.ParamChecker; 041 import org.apache.oozie.workflow.WorkflowException; 042 import org.apache.oozie.workflow.WorkflowInstance; 043 import org.apache.oozie.workflow.lite.LiteWorkflowInstance; 044 045 public class SuspendXCommand extends WorkflowXCommand<Void> { 046 private final String wfid; 047 private WorkflowJobBean wfJobBean; 048 private JPAService jpaService; 049 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 050 051 public SuspendXCommand(String id) { 052 super("suspend", "suspend", 1); 053 this.wfid = ParamChecker.notEmpty(id, "wfid"); 054 } 055 056 /* (non-Javadoc) 057 * @see org.apache.oozie.command.XCommand#execute() 058 */ 059 @Override 060 protected Void execute() throws CommandException { 061 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 062 try { 063 suspendJob(this.jpaService, this.wfJobBean, this.wfid, null, updateList); 064 this.wfJobBean.setLastModifiedTime(new Date()); 065 updateList.add(this.wfJobBean); 066 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null)); 067 queue(new NotificationXCommand(this.wfJobBean)); 068 } 069 catch (WorkflowException e) { 070 throw new CommandException(e); 071 } 072 catch (JPAExecutorException je) { 073 throw new CommandException(je); 074 } 075 finally { 076 // update coordinator action 077 new CoordActionUpdateXCommand(wfJobBean).call(); 078 } 079 return null; 080 } 081 082 /** 083 * Suspend the workflow job and pending flag to false for the actions that are START_RETRY or START_MANUAL or 084 * END_RETRY or END_MANUAL 085 * 086 * @param jpaService jpa service 087 * @param workflow workflow job 088 * @param id workflow job id 089 * @param actionId workflow action id 090 * @throws WorkflowException thrown if failed to suspend workflow instance 091 * @throws CommandException thrown if unable set pending false for actions 092 */ 093 public static void suspendJob(JPAService jpaService, WorkflowJobBean workflow, String id, 094 String actionId, List<JsonBean> updateList) throws WorkflowException, CommandException { 095 if (workflow.getStatus() == WorkflowJob.Status.RUNNING) { 096 workflow.getWorkflowInstance().suspend(); 097 WorkflowInstance wfInstance = workflow.getWorkflowInstance(); 098 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.SUSPENDED); 099 workflow.setStatus(WorkflowJob.Status.SUSPENDED); 100 workflow.setWorkflowInstance(wfInstance); 101 102 setPendingFalseForActions(jpaService, id, actionId, updateList); 103 } 104 } 105 106 /** 107 * Set pending flag to false for the actions that are START_RETRY or START_MANUAL or END_RETRY or END_MANUAL 108 * <p/> 109 * 110 * @param jpaService jpa service 111 * @param id workflow job id 112 * @param actionId workflow action id 113 * @throws CommandException thrown if failed to update workflow action 114 */ 115 private static void setPendingFalseForActions(JPAService jpaService, String id, String actionId, 116 List<JsonBean> updateList) throws CommandException { 117 List<WorkflowActionBean> actions; 118 try { 119 actions = jpaService.execute(new WorkflowActionRetryManualGetJPAExecutor(id)); 120 121 for (WorkflowActionBean action : actions) { 122 if (actionId != null && actionId.equals(action.getId())) { 123 // this action has been changed in handleNonTransient() 124 continue; 125 } 126 else { 127 action.resetPendingOnly(); 128 } 129 if (updateList != null) { // will be null when suspendJob 130 // invoked statically via 131 // handleNonTransient() 132 updateList.add(action); 133 } 134 } 135 } 136 catch (JPAExecutorException je) { 137 throw new CommandException(je); 138 } 139 } 140 141 /* (non-Javadoc) 142 * @see org.apache.oozie.command.XCommand#eagerLoadState() 143 */ 144 @Override 145 protected void eagerLoadState() throws CommandException { 146 super.eagerLoadState(); 147 try { 148 jpaService = Services.get().get(JPAService.class); 149 if (jpaService != null) { 150 this.wfJobBean = jpaService.execute(new WorkflowJobGetJPAExecutor(this.wfid)); 151 } 152 else { 153 throw new CommandException(ErrorCode.E0610); 154 } 155 } 156 catch (Exception ex) { 157 throw new CommandException(ErrorCode.E0603, ex); 158 } 159 LogUtils.setLogInfo(this.wfJobBean, logInfo); 160 } 161 162 /* (non-Javadoc) 163 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition() 164 */ 165 @Override 166 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 167 super.eagerVerifyPrecondition(); 168 if (this.wfJobBean.getStatus() != WorkflowJob.Status.RUNNING) { 169 throw new PreconditionException(ErrorCode.E0727, this.wfJobBean.getStatus()); 170 } 171 } 172 173 /* (non-Javadoc) 174 * @see org.apache.oozie.command.XCommand#getEntityKey() 175 */ 176 @Override 177 public String getEntityKey() { 178 return this.wfid; 179 } 180 181 /* (non-Javadoc) 182 * @see org.apache.oozie.command.XCommand#isLockRequired() 183 */ 184 @Override 185 protected boolean isLockRequired() { 186 return true; 187 } 188 189 /* (non-Javadoc) 190 * @see org.apache.oozie.command.XCommand#loadState() 191 */ 192 @Override 193 protected void loadState() throws CommandException { 194 195 } 196 197 /* (non-Javadoc) 198 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 199 */ 200 @Override 201 protected void verifyPrecondition() throws CommandException, PreconditionException { 202 } 203 }