001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.sql.Timestamp; 021 import java.util.ArrayList; 022 import java.util.Date; 023 import java.util.List; 024 025 import org.apache.oozie.ErrorCode; 026 import org.apache.oozie.WorkflowActionBean; 027 import org.apache.oozie.WorkflowJobBean; 028 import org.apache.oozie.XException; 029 import org.apache.oozie.action.ActionExecutor; 030 import org.apache.oozie.action.ActionExecutorException; 031 import org.apache.oozie.client.WorkflowAction; 032 import org.apache.oozie.client.WorkflowJob; 033 import org.apache.oozie.client.WorkflowAction.Status; 034 import org.apache.oozie.client.rest.JsonBean; 035 import org.apache.oozie.command.CommandException; 036 import org.apache.oozie.command.PreconditionException; 037 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 038 import org.apache.oozie.executor.jpa.JPAExecutorException; 039 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; 040 import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor; 041 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 042 import org.apache.oozie.service.ActionCheckerService; 043 import org.apache.oozie.service.ActionService; 044 import org.apache.oozie.service.JPAService; 045 import org.apache.oozie.service.Services; 046 import org.apache.oozie.service.UUIDService; 047 import org.apache.oozie.util.InstrumentUtils; 048 import org.apache.oozie.util.Instrumentation; 049 import org.apache.oozie.util.LogUtils; 050 import org.apache.oozie.util.XLog; 051 052 /** 053 * Executes the check command for ActionHandlers. </p> Ensures the action is in 054 * RUNNING state before executing 055 * {@link ActionExecutor#check(org.apache.oozie.action.ActionExecutor.Context, org.apache.oozie.client.WorkflowAction)} 056 */ 057 public class ActionCheckXCommand extends ActionXCommand<Void> { 058 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING"; 059 private String actionId; 060 private String jobId; 061 private int actionCheckDelay; 062 private WorkflowJobBean wfJob = null; 063 private WorkflowActionBean wfAction = null; 064 private JPAService jpaService = null; 065 private ActionExecutor executor = null; 066 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 067 068 public ActionCheckXCommand(String actionId) { 069 this(actionId, -1); 070 } 071 072 public ActionCheckXCommand(String actionId, int priority, int checkDelay) { 073 super("action.check", "action.check", priority); 074 this.actionId = actionId; 075 this.actionCheckDelay = checkDelay; 076 this.jobId = Services.get().get(UUIDService.class).getId(actionId); 077 } 078 079 public ActionCheckXCommand(String actionId, int checkDelay) { 080 this(actionId, 0, checkDelay); 081 } 082 083 @Override 084 protected void eagerLoadState() throws CommandException { 085 try { 086 jpaService = Services.get().get(JPAService.class); 087 if (jpaService != null) { 088 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId)); 089 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId)); 090 LogUtils.setLogInfo(wfJob, logInfo); 091 LogUtils.setLogInfo(wfAction, logInfo); 092 } 093 else { 094 throw new CommandException(ErrorCode.E0610); 095 } 096 } 097 catch (XException ex) { 098 throw new CommandException(ex); 099 } 100 } 101 102 @Override 103 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 104 if (wfJob == null) { 105 throw new PreconditionException(ErrorCode.E0604, jobId); 106 } 107 if (wfAction == null) { 108 throw new PreconditionException(ErrorCode.E0605, actionId); 109 } 110 // if the action has been updated, quit this command 111 if (actionCheckDelay > 0) { 112 Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000); 113 Timestamp actionLmt = wfAction.getLastCheckTimestamp(); 114 if (actionLmt.after(actionCheckTs)) { 115 throw new PreconditionException(ErrorCode.E0817, actionId); 116 } 117 } 118 119 executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType()); 120 if (executor == null) { 121 throw new CommandException(ErrorCode.E0802, wfAction.getType()); 122 } 123 } 124 125 @Override 126 protected boolean isLockRequired() { 127 return true; 128 } 129 130 @Override 131 public String getEntityKey() { 132 return this.jobId; 133 } 134 135 @Override 136 protected void loadState() throws CommandException { 137 } 138 139 @Override 140 protected void verifyPrecondition() throws CommandException, PreconditionException { 141 if (!wfAction.isPending() || wfAction.getStatus() != WorkflowActionBean.Status.RUNNING) { 142 throw new PreconditionException(ErrorCode.E0815, wfAction.getPending(), wfAction.getStatusStr()); 143 } 144 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) { 145 wfAction.setLastCheckTime(new Date()); 146 try { 147 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction)); 148 } 149 catch (JPAExecutorException e) { 150 throw new CommandException(e); 151 } 152 throw new PreconditionException(ErrorCode.E0818, wfAction.getId(), wfJob.getId(), wfJob.getStatus()); 153 } 154 } 155 156 @Override 157 protected Void execute() throws CommandException { 158 LOG.debug("STARTED ActionCheckXCommand for wf actionId=" + actionId + " priority =" + getPriority()); 159 160 long retryInterval = Services.get().getConf().getLong(ActionCheckerService.CONF_ACTION_CHECK_INTERVAL, executor 161 .getRetryInterval()); 162 executor.setRetryInterval(retryInterval); 163 164 ActionExecutorContext context = null; 165 try { 166 boolean isRetry = false; 167 if (wfAction.getRetries() > 0) { 168 isRetry = true; 169 } 170 boolean isUserRetry = false; 171 context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry); 172 incrActionCounter(wfAction.getType(), 1); 173 174 Instrumentation.Cron cron = new Instrumentation.Cron(); 175 cron.start(); 176 executor.check(context, wfAction); 177 cron.stop(); 178 addActionCron(wfAction.getType(), cron); 179 180 if (wfAction.isExecutionComplete()) { 181 if (!context.isExecuted()) { 182 LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor 183 .getType()); 184 wfAction.setErrorInfo(EXEC_DATA_MISSING, 185 "Execution Complete, but Execution Data Missing from Action"); 186 failJob(context); 187 } else { 188 wfAction.setPending(); 189 queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType())); 190 } 191 } 192 wfAction.setLastCheckTime(new Date()); 193 updateList.add(wfAction); 194 wfJob.setLastModifiedTime(new Date()); 195 updateList.add(wfJob); 196 } 197 catch (ActionExecutorException ex) { 198 LOG.warn("Exception while executing check(). Error Code [{0}], Message[{1}]", ex.getErrorCode(), ex 199 .getMessage(), ex); 200 201 wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage()); 202 203 switch (ex.getErrorType()) { 204 case FAILED: 205 failAction(wfJob, wfAction); 206 break; 207 case ERROR: 208 handleUserRetry(wfAction); 209 break; 210 case TRANSIENT: // retry N times, then suspend workflow 211 if (!handleTransient(context, executor, WorkflowAction.Status.RUNNING)) { 212 handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL); 213 wfAction.setPendingAge(new Date()); 214 wfAction.setRetries(0); 215 wfAction.setStartTime(null); 216 } 217 break; 218 } 219 wfAction.setLastCheckTime(new Date()); 220 updateList = new ArrayList<JsonBean>(); 221 updateList.add(wfAction); 222 wfJob.setLastModifiedTime(new Date()); 223 updateList.add(wfJob); 224 } 225 finally { 226 try { 227 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null)); 228 } 229 catch (JPAExecutorException e) { 230 throw new CommandException(e); 231 } 232 } 233 234 LOG.debug("ENDED ActionCheckXCommand for wf actionId=" + actionId + ", jobId=" + jobId); 235 return null; 236 } 237 238 private void failAction(WorkflowJobBean workflow, WorkflowActionBean action) throws CommandException { 239 if (!handleUserRetry(action)) { 240 LOG.warn("Failing Job [{0}] due to failed action [{1}]", workflow.getId(), action.getId()); 241 action.resetPending(); 242 action.setStatus(Status.FAILED); 243 workflow.setStatus(WorkflowJob.Status.FAILED); 244 InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1, getInstrumentation()); 245 } 246 } 247 248 protected long getRetryInterval() { 249 return (executor != null) ? executor.getRetryInterval() : ActionExecutor.RETRY_INTERVAL; 250 } 251 252 }