001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.sql.Timestamp; 021 import java.util.ArrayList; 022 import java.util.Date; 023 import java.util.List; 024 025 import org.apache.oozie.ErrorCode; 026 import org.apache.oozie.WorkflowActionBean; 027 import org.apache.oozie.WorkflowJobBean; 028 import org.apache.oozie.XException; 029 import org.apache.oozie.action.ActionExecutor; 030 import org.apache.oozie.action.ActionExecutorException; 031 import org.apache.oozie.client.WorkflowAction; 032 import org.apache.oozie.client.WorkflowJob; 033 import org.apache.oozie.client.WorkflowAction.Status; 034 import org.apache.oozie.client.rest.JsonBean; 035 import org.apache.oozie.command.CommandException; 036 import org.apache.oozie.command.PreconditionException; 037 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 038 import org.apache.oozie.executor.jpa.JPAExecutorException; 039 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; 040 import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor; 041 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 042 import org.apache.oozie.service.ActionService; 043 import org.apache.oozie.service.JPAService; 044 import org.apache.oozie.service.Services; 045 import org.apache.oozie.service.UUIDService; 046 import org.apache.oozie.util.InstrumentUtils; 047 import org.apache.oozie.util.Instrumentation; 048 import org.apache.oozie.util.LogUtils; 049 import org.apache.oozie.util.XLog; 050 051 /** 052 * Executes the check command for ActionHandlers. </p> Ensures the action is in 053 * RUNNING state before executing 054 * {@link ActionExecutor#check(org.apache.oozie.action.ActionExecutor.Context, org.apache.oozie.client.WorkflowAction)} 055 */ 056 public class ActionCheckXCommand extends ActionXCommand<Void> { 057 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING"; 058 private String actionId; 059 private String jobId; 060 private int actionCheckDelay; 061 private WorkflowJobBean wfJob = null; 062 private WorkflowActionBean wfAction = null; 063 private JPAService jpaService = null; 064 private ActionExecutor executor = null; 065 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 066 067 public ActionCheckXCommand(String actionId) { 068 this(actionId, -1); 069 } 070 071 public ActionCheckXCommand(String actionId, int priority, int checkDelay) { 072 super("action.check", "action.check", priority); 073 this.actionId = actionId; 074 this.actionCheckDelay = checkDelay; 075 this.jobId = Services.get().get(UUIDService.class).getId(actionId); 076 } 077 078 public ActionCheckXCommand(String actionId, int checkDelay) { 079 this(actionId, 0, checkDelay); 080 } 081 082 @Override 083 protected void eagerLoadState() throws CommandException { 084 try { 085 jpaService = Services.get().get(JPAService.class); 086 if (jpaService != null) { 087 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId)); 088 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId)); 089 LogUtils.setLogInfo(wfJob, logInfo); 090 LogUtils.setLogInfo(wfAction, logInfo); 091 } 092 else { 093 throw new CommandException(ErrorCode.E0610); 094 } 095 } 096 catch (XException ex) { 097 throw new CommandException(ex); 098 } 099 } 100 101 @Override 102 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 103 if (wfJob == null) { 104 throw new PreconditionException(ErrorCode.E0604, jobId); 105 } 106 if (wfAction == null) { 107 throw new PreconditionException(ErrorCode.E0605, actionId); 108 } 109 // if the action has been updated, quit this command 110 if (actionCheckDelay > 0) { 111 Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000); 112 Timestamp actionLmt = wfAction.getLastCheckTimestamp(); 113 if (actionLmt.after(actionCheckTs)) { 114 throw new PreconditionException(ErrorCode.E0817, actionId); 115 } 116 } 117 118 executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType()); 119 if (executor == null) { 120 throw new CommandException(ErrorCode.E0802, wfAction.getType()); 121 } 122 } 123 124 @Override 125 protected boolean isLockRequired() { 126 return true; 127 } 128 129 @Override 130 public String getEntityKey() { 131 return this.jobId; 132 } 133 134 @Override 135 protected void loadState() throws CommandException { 136 } 137 138 @Override 139 protected void verifyPrecondition() throws CommandException, PreconditionException { 140 if (!wfAction.isPending() || wfAction.getStatus() != WorkflowActionBean.Status.RUNNING) { 141 throw new PreconditionException(ErrorCode.E0815, wfAction.getPending(), wfAction.getStatusStr()); 142 } 143 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) { 144 wfAction.setLastCheckTime(new Date()); 145 try { 146 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction)); 147 } 148 catch (JPAExecutorException e) { 149 throw new CommandException(e); 150 } 151 throw new PreconditionException(ErrorCode.E0818, wfAction.getId(), wfJob.getId(), wfJob.getStatus()); 152 } 153 } 154 155 @Override 156 protected Void execute() throws CommandException { 157 LOG.debug("STARTED ActionCheckXCommand for wf actionId=" + actionId + " priority =" + getPriority()); 158 159 ActionExecutorContext context = null; 160 try { 161 boolean isRetry = false; 162 if (wfAction.getRetries() > 0) { 163 isRetry = true; 164 } 165 boolean isUserRetry = false; 166 context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry); 167 incrActionCounter(wfAction.getType(), 1); 168 169 Instrumentation.Cron cron = new Instrumentation.Cron(); 170 cron.start(); 171 executor.check(context, wfAction); 172 cron.stop(); 173 addActionCron(wfAction.getType(), cron); 174 175 if (wfAction.isExecutionComplete()) { 176 if (!context.isExecuted()) { 177 LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor 178 .getType()); 179 wfAction.setErrorInfo(EXEC_DATA_MISSING, 180 "Execution Complete, but Execution Data Missing from Action"); 181 failJob(context); 182 } else { 183 wfAction.setPending(); 184 queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType())); 185 } 186 } 187 wfAction.setLastCheckTime(new Date()); 188 updateList.add(wfAction); 189 wfJob.setLastModifiedTime(new Date()); 190 updateList.add(wfJob); 191 } 192 catch (ActionExecutorException ex) { 193 LOG.warn("Exception while executing check(). Error Code [{0}], Message[{1}]", ex.getErrorCode(), ex 194 .getMessage(), ex); 195 196 wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage()); 197 198 switch (ex.getErrorType()) { 199 case FAILED: 200 failAction(wfJob, wfAction); 201 break; 202 case ERROR: 203 handleUserRetry(wfAction); 204 break; 205 case TRANSIENT: // retry N times, then suspend workflow 206 if (!handleTransient(context, executor, WorkflowAction.Status.RUNNING)) { 207 handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL); 208 wfAction.setPendingAge(new Date()); 209 wfAction.setRetries(0); 210 wfAction.setStartTime(null); 211 } 212 break; 213 } 214 wfAction.setLastCheckTime(new Date()); 215 updateList = new ArrayList<JsonBean>(); 216 updateList.add(wfAction); 217 wfJob.setLastModifiedTime(new Date()); 218 updateList.add(wfJob); 219 } 220 finally { 221 try { 222 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null)); 223 } 224 catch (JPAExecutorException e) { 225 throw new CommandException(e); 226 } 227 } 228 229 LOG.debug("ENDED ActionCheckXCommand for wf actionId=" + actionId + ", jobId=" + jobId); 230 return null; 231 } 232 233 private void failAction(WorkflowJobBean workflow, WorkflowActionBean action) throws CommandException { 234 if (!handleUserRetry(action)) { 235 LOG.warn("Failing Job [{0}] due to failed action [{1}]", workflow.getId(), action.getId()); 236 action.resetPending(); 237 action.setStatus(Status.FAILED); 238 workflow.setStatus(WorkflowJob.Status.FAILED); 239 InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1, getInstrumentation()); 240 } 241 } 242 }