001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.wf; 019 020import java.sql.Timestamp; 021import java.util.ArrayList; 022import java.util.Date; 023import java.util.List; 024 025import org.apache.oozie.ErrorCode; 026import org.apache.oozie.WorkflowActionBean; 027import org.apache.oozie.WorkflowJobBean; 028import org.apache.oozie.action.ActionExecutor; 029import org.apache.oozie.action.ActionExecutorException; 030import org.apache.oozie.client.WorkflowAction; 031import org.apache.oozie.client.WorkflowJob; 032import org.apache.oozie.command.CommandException; 033import org.apache.oozie.command.PreconditionException; 034import org.apache.oozie.executor.jpa.JPAExecutorException; 035import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; 036import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 037import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; 038import org.apache.oozie.executor.jpa.BatchQueryExecutor; 039import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 040import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 041import org.apache.oozie.service.ActionCheckerService; 042import org.apache.oozie.service.ActionService; 043import org.apache.oozie.service.EventHandlerService; 044import org.apache.oozie.service.Services; 045import org.apache.oozie.service.UUIDService; 046import org.apache.oozie.util.Instrumentation; 047import org.apache.oozie.util.LogUtils; 048import org.apache.oozie.util.XLog; 049 050/** 051 * Executes the check command for ActionHandlers. </p> Ensures the action is in 052 * RUNNING state before executing 053 * {@link ActionExecutor#check(org.apache.oozie.action.ActionExecutor.Context, org.apache.oozie.client.WorkflowAction)} 054 */ 055public class ActionCheckXCommand extends ActionXCommand<Void> { 056 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING"; 057 private String actionId; 058 private String jobId; 059 private int actionCheckDelay; 060 private WorkflowJobBean wfJob = null; 061 private WorkflowActionBean wfAction = null; 062 private ActionExecutor executor = null; 063 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 064 private boolean generateEvent = false; 065 066 public ActionCheckXCommand(String actionId) { 067 this(actionId, -1); 068 } 069 070 public ActionCheckXCommand(String actionId, int priority, int checkDelay) { 071 super("action.check", "action.check", priority); 072 this.actionId = actionId; 073 this.actionCheckDelay = checkDelay; 074 this.jobId = Services.get().get(UUIDService.class).getId(actionId); 075 } 076 077 public ActionCheckXCommand(String actionId, int checkDelay) { 078 this(actionId, 0, checkDelay); 079 } 080 081 @Override 082 protected void eagerLoadState() throws CommandException { 083 try { 084 this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STATUS, jobId); 085 this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_ID_TYPE_LASTCHECK, 086 actionId); 087 LogUtils.setLogInfo(wfJob, logInfo); 088 LogUtils.setLogInfo(wfAction, logInfo); 089 } 090 catch (JPAExecutorException ex) { 091 throw new CommandException(ex); 092 } 093 } 094 095 @Override 096 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 097 if (wfJob == null) { 098 throw new PreconditionException(ErrorCode.E0604, jobId); 099 } 100 if (wfAction == null) { 101 throw new PreconditionException(ErrorCode.E0605, actionId); 102 } 103 // if the action has been updated, quit this command 104 if (actionCheckDelay > 0) { 105 Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000); 106 Timestamp actionLmt = wfAction.getLastCheckTimestamp(); 107 if (actionLmt.after(actionCheckTs)) { 108 throw new PreconditionException(ErrorCode.E0817, actionId); 109 } 110 } 111 112 executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType()); 113 if (executor == null) { 114 throw new CommandException(ErrorCode.E0802, wfAction.getType()); 115 } 116 } 117 118 @Override 119 protected boolean isLockRequired() { 120 return true; 121 } 122 123 @Override 124 public String getEntityKey() { 125 return this.jobId; 126 } 127 128 @Override 129 protected void loadState() throws CommandException { 130 try { 131 this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_ACTION_OP, jobId); 132 this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_CHECK, 133 actionId); 134 } 135 catch (JPAExecutorException e) { 136 throw new CommandException(e); 137 } 138 LogUtils.setLogInfo(wfJob, logInfo); 139 LogUtils.setLogInfo(wfAction, logInfo); 140 } 141 142 @Override 143 protected void verifyPrecondition() throws CommandException, PreconditionException { 144 if (!wfAction.isPending() || wfAction.getStatus() != WorkflowActionBean.Status.RUNNING) { 145 throw new PreconditionException(ErrorCode.E0815, wfAction.isPending(), wfAction.getStatusStr()); 146 } 147 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) { 148 wfAction.setLastCheckTime(new Date()); 149 try { 150 WorkflowActionQueryExecutor.getInstance().executeUpdate( 151 WorkflowActionQuery.UPDATE_ACTION_FOR_LAST_CHECKED_TIME, wfAction); 152 } 153 catch (JPAExecutorException e) { 154 throw new CommandException(e); 155 } 156 throw new PreconditionException(ErrorCode.E0818, wfAction.getId(), wfJob.getId(), wfJob.getStatus()); 157 } 158 } 159 160 @Override 161 protected Void execute() throws CommandException { 162 LOG.debug("STARTED ActionCheckXCommand for wf actionId=" + actionId + " priority =" + getPriority()); 163 164 long retryInterval = Services.get().getConf().getLong(ActionCheckerService.CONF_ACTION_CHECK_INTERVAL, executor 165 .getRetryInterval()); 166 executor.setRetryInterval(retryInterval); 167 168 ActionExecutorContext context = null; 169 boolean execSynchronous = false; 170 try { 171 boolean isRetry = false; 172 if (wfAction.getRetries() > 0) { 173 isRetry = true; 174 } 175 boolean isUserRetry = false; 176 context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry); 177 incrActionCounter(wfAction.getType(), 1); 178 179 Instrumentation.Cron cron = new Instrumentation.Cron(); 180 cron.start(); 181 executor.check(context, wfAction); 182 cron.stop(); 183 addActionCron(wfAction.getType(), cron); 184 185 if (wfAction.isExecutionComplete()) { 186 if (!context.isExecuted()) { 187 LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor 188 .getType()); 189 wfAction.setErrorInfo(EXEC_DATA_MISSING, 190 "Execution Complete, but Execution Data Missing from Action"); 191 failJob(context); 192 generateEvent = true; 193 } else { 194 wfAction.setPending(); 195 execSynchronous = true; 196 } 197 } 198 wfAction.setLastCheckTime(new Date()); 199 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_CHECK, wfAction)); 200 wfJob.setLastModifiedTime(new Date()); 201 updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, 202 wfJob)); 203 } 204 catch (ActionExecutorException ex) { 205 LOG.warn("Exception while executing check(). Error Code [{0}], Message[{1}]", ex.getErrorCode(), ex 206 .getMessage(), ex); 207 208 wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage()); 209 switch (ex.getErrorType()) { 210 case FAILED: 211 failJob(context, wfAction); 212 generateEvent = true; 213 break; 214 case ERROR: 215 handleUserRetry(wfAction); 216 break; 217 case TRANSIENT: // retry N times, then suspend workflow 218 if (!handleTransient(context, executor, WorkflowAction.Status.RUNNING)) { 219 handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL); 220 generateEvent = true; 221 wfAction.setPendingAge(new Date()); 222 wfAction.setRetries(0); 223 wfAction.setStartTime(null); 224 } 225 break; 226 } 227 wfAction.setLastCheckTime(new Date()); 228 updateList = new ArrayList<UpdateEntry>(); 229 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_CHECK, wfAction)); 230 wfJob.setLastModifiedTime(new Date()); 231 updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, 232 wfJob)); 233 } 234 finally { 235 try { 236 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 237 if (generateEvent && EventHandlerService.isEnabled()) { 238 generateEvent(wfAction, wfJob.getUser()); 239 } 240 if (execSynchronous) { 241 new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call(getEntityKey()); 242 } 243 } 244 catch (JPAExecutorException e) { 245 throw new CommandException(e); 246 } 247 } 248 249 LOG.debug("ENDED ActionCheckXCommand for wf actionId=" + actionId + ", jobId=" + jobId); 250 return null; 251 } 252 253 protected long getRetryInterval() { 254 return (executor != null) ? executor.getRetryInterval() : ActionExecutor.RETRY_INTERVAL; 255 } 256 257 @Override 258 public String getKey() { 259 return getName() + "_" + actionId; 260 } 261 262}