001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.wf; 020 021import java.sql.Timestamp; 022import java.util.ArrayList; 023import java.util.Date; 024import java.util.List; 025 026import org.apache.oozie.ErrorCode; 027import org.apache.oozie.WorkflowActionBean; 028import org.apache.oozie.WorkflowJobBean; 029import org.apache.oozie.action.ActionExecutor; 030import org.apache.oozie.action.ActionExecutorException; 031import org.apache.oozie.client.WorkflowAction; 032import org.apache.oozie.client.WorkflowJob; 033import org.apache.oozie.command.CommandException; 034import org.apache.oozie.command.PreconditionException; 035import org.apache.oozie.executor.jpa.JPAExecutorException; 036import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; 037import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 038import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; 039import org.apache.oozie.executor.jpa.BatchQueryExecutor; 040import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 041import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 042import org.apache.oozie.service.ActionCheckerService; 043import org.apache.oozie.service.ActionService; 044import org.apache.oozie.service.EventHandlerService; 045import org.apache.oozie.service.Services; 046import org.apache.oozie.service.UUIDService; 047import org.apache.oozie.util.Instrumentation; 048import org.apache.oozie.util.LogUtils; 049import org.apache.oozie.util.XLog; 050 051/** 052 * Executes the check command for ActionHandlers. <p> Ensures the action is in 053 * RUNNING state before executing 054 * {@link ActionExecutor#check(org.apache.oozie.action.ActionExecutor.Context, org.apache.oozie.client.WorkflowAction)} 055 */ 056public class ActionCheckXCommand extends ActionXCommand<Void> { 057 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING"; 058 private String actionId; 059 private String jobId; 060 private int actionCheckDelay; 061 private WorkflowJobBean wfJob = null; 062 private WorkflowActionBean wfAction = null; 063 private ActionExecutor executor = null; 064 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 065 private boolean generateEvent = false; 066 067 public ActionCheckXCommand(String actionId) { 068 this(actionId, -1); 069 } 070 071 public ActionCheckXCommand(String actionId, int priority, int checkDelay) { 072 super("action.check", "action.check", priority); 073 this.actionId = actionId; 074 this.actionCheckDelay = checkDelay; 075 this.jobId = Services.get().get(UUIDService.class).getId(actionId); 076 } 077 078 public ActionCheckXCommand(String actionId, int checkDelay) { 079 this(actionId, 0, checkDelay); 080 } 081 082 @Override 083 protected void setLogInfo() { 084 LogUtils.setLogInfo(actionId); 085 } 086 087 @Override 088 protected void eagerLoadState() throws CommandException { 089 try { 090 this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STATUS, jobId); 091 this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_ID_TYPE_LASTCHECK, 092 actionId); 093 LogUtils.setLogInfo(wfJob); 094 LogUtils.setLogInfo(wfAction); 095 } 096 catch (JPAExecutorException ex) { 097 throw new CommandException(ex); 098 } 099 } 100 101 @Override 102 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 103 if (wfJob == null) { 104 throw new PreconditionException(ErrorCode.E0604, jobId); 105 } 106 if (wfAction == null) { 107 throw new PreconditionException(ErrorCode.E0605, actionId); 108 } 109 // if the action has been updated, quit this command 110 if (actionCheckDelay > 0) { 111 Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000); 112 Timestamp actionLmt = wfAction.getLastCheckTimestamp(); 113 if (actionLmt.after(actionCheckTs)) { 114 throw new PreconditionException(ErrorCode.E0817, actionId); 115 } 116 } 117 118 executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType()); 119 if (executor == null) { 120 throw new CommandException(ErrorCode.E0802, wfAction.getType()); 121 } 122 } 123 124 @Override 125 protected boolean isLockRequired() { 126 return true; 127 } 128 129 @Override 130 public String getEntityKey() { 131 return this.jobId; 132 } 133 134 @Override 135 protected void loadState() throws CommandException { 136 try { 137 this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_ACTION_OP, jobId); 138 this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_CHECK, 139 actionId); 140 } 141 catch (JPAExecutorException e) { 142 throw new CommandException(e); 143 } 144 LogUtils.setLogInfo(wfJob); 145 LogUtils.setLogInfo(wfAction); 146 } 147 148 @Override 149 protected void verifyPrecondition() throws CommandException, PreconditionException { 150 if (!wfAction.isPending() || wfAction.getStatus() != WorkflowActionBean.Status.RUNNING) { 151 throw new PreconditionException(ErrorCode.E0815, wfAction.isPending(), wfAction.getStatusStr()); 152 } 153 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) { 154 wfAction.setLastCheckTime(new Date()); 155 try { 156 WorkflowActionQueryExecutor.getInstance().executeUpdate( 157 WorkflowActionQuery.UPDATE_ACTION_FOR_LAST_CHECKED_TIME, wfAction); 158 } 159 catch (JPAExecutorException e) { 160 throw new CommandException(e); 161 } 162 throw new PreconditionException(ErrorCode.E0818, wfAction.getId(), wfJob.getId(), wfJob.getStatus()); 163 } 164 } 165 166 @Override 167 protected Void execute() throws CommandException { 168 LOG.debug("STARTED ActionCheckXCommand for wf actionId=" + actionId + " priority =" + getPriority()); 169 ActionExecutorContext context = null; 170 boolean execSynchronous = false; 171 try { 172 boolean isRetry = false; 173 if (wfAction.getRetries() > 0) { 174 isRetry = true; 175 } 176 boolean isUserRetry = false; 177 context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry); 178 incrActionCounter(wfAction.getType(), 1); 179 180 Instrumentation.Cron cron = new Instrumentation.Cron(); 181 cron.start(); 182 executor.check(context, wfAction); 183 cron.stop(); 184 addActionCron(wfAction.getType(), cron); 185 186 if (wfAction.isExecutionComplete()) { 187 if (!context.isExecuted()) { 188 LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor 189 .getType()); 190 wfAction.setErrorInfo(EXEC_DATA_MISSING, 191 "Execution Complete, but Execution Data Missing from Action"); 192 failJob(context); 193 generateEvent = true; 194 } else { 195 wfAction.setPending(); 196 execSynchronous = true; 197 } 198 } 199 wfAction.setLastCheckTime(new Date()); 200 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_CHECK, wfAction)); 201 wfJob.setLastModifiedTime(new Date()); 202 updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, 203 wfJob)); 204 } 205 catch (ActionExecutorException ex) { 206 LOG.warn("Exception while executing check(). Error Code [{0}], Message[{1}]", ex.getErrorCode(), ex 207 .getMessage(), ex); 208 209 wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage()); 210 switch (ex.getErrorType()) { 211 case ERROR: 212 // If allowed to retry, this will handle it; otherwise, we should fall through to FAILED 213 if (handleUserRetry(context, wfAction)) { 214 break; 215 } 216 case FAILED: 217 failJob(context, wfAction); 218 generateEvent = true; 219 break; 220 case TRANSIENT: // retry N times, then suspend workflow 221 if (!handleTransient(context, executor, WorkflowAction.Status.RUNNING)) { 222 handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL); 223 generateEvent = true; 224 wfAction.setPendingAge(new Date()); 225 wfAction.setRetries(0); 226 wfAction.setStartTime(null); 227 } 228 break; 229 } 230 wfAction.setLastCheckTime(new Date()); 231 updateList = new ArrayList<UpdateEntry>(); 232 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_CHECK, wfAction)); 233 wfJob.setLastModifiedTime(new Date()); 234 updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, 235 wfJob)); 236 } 237 finally { 238 try { 239 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 240 if (generateEvent && EventHandlerService.isEnabled()) { 241 generateEvent(wfAction, wfJob.getUser()); 242 } 243 if (execSynchronous) { 244 new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call(); 245 } 246 } 247 catch (JPAExecutorException e) { 248 throw new CommandException(e); 249 } 250 } 251 252 LOG.debug("ENDED ActionCheckXCommand for wf actionId=" + actionId + ", jobId=" + jobId); 253 return null; 254 } 255 256 protected long getRetryInterval() { 257 return (executor != null) ? executor.getRetryInterval() : ActionExecutor.RETRY_INTERVAL; 258 } 259 260 @Override 261 public String getKey() { 262 return getName() + "_" + actionId; 263 } 264 265}