001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.sql.Timestamp; 021 import java.util.Date; 022 import org.apache.oozie.ErrorCode; 023 import org.apache.oozie.WorkflowActionBean; 024 import org.apache.oozie.WorkflowJobBean; 025 import org.apache.oozie.XException; 026 import org.apache.oozie.action.ActionExecutor; 027 import org.apache.oozie.action.ActionExecutorException; 028 import org.apache.oozie.client.WorkflowJob; 029 import org.apache.oozie.client.WorkflowAction.Status; 030 import org.apache.oozie.command.CommandException; 031 import org.apache.oozie.command.PreconditionException; 032 import org.apache.oozie.executor.jpa.JPAExecutorException; 033 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; 034 import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor; 035 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 036 import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor; 037 import org.apache.oozie.service.ActionService; 038 import org.apache.oozie.service.JPAService; 039 import org.apache.oozie.service.Services; 040 import org.apache.oozie.service.UUIDService; 041 import org.apache.oozie.util.InstrumentUtils; 042 import org.apache.oozie.util.Instrumentation; 043 import org.apache.oozie.util.LogUtils; 044 import org.apache.oozie.util.XLog; 045 046 /** 047 * Executes the check command for ActionHandlers. </p> Ensures the action is in 048 * RUNNING state before executing 049 * {@link ActionExecutor#check(org.apache.oozie.action.ActionExecutor.Context, org.apache.oozie.client.WorkflowAction)} 050 */ 051 public class ActionCheckXCommand extends ActionXCommand<Void> { 052 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING"; 053 private String actionId; 054 private String jobId; 055 private int actionCheckDelay; 056 private WorkflowJobBean wfJob = null; 057 private WorkflowActionBean wfAction = null; 058 private JPAService jpaService = null; 059 private ActionExecutor executor = null; 060 061 public ActionCheckXCommand(String actionId) { 062 this(actionId, -1); 063 } 064 065 public ActionCheckXCommand(String actionId, int priority, int checkDelay) { 066 super("action.check", "action.check", priority); 067 this.actionId = actionId; 068 this.actionCheckDelay = checkDelay; 069 this.jobId = Services.get().get(UUIDService.class).getId(actionId); 070 } 071 072 public ActionCheckXCommand(String actionId, int checkDelay) { 073 this(actionId, 0, checkDelay); 074 } 075 076 @Override 077 protected void eagerLoadState() throws CommandException { 078 try { 079 jpaService = Services.get().get(JPAService.class); 080 if (jpaService != null) { 081 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId)); 082 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId)); 083 LogUtils.setLogInfo(wfJob, logInfo); 084 LogUtils.setLogInfo(wfAction, logInfo); 085 } 086 else { 087 throw new CommandException(ErrorCode.E0610); 088 } 089 } 090 catch (XException ex) { 091 throw new CommandException(ex); 092 } 093 } 094 095 @Override 096 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 097 if (wfJob == null) { 098 throw new PreconditionException(ErrorCode.E0604, jobId); 099 } 100 if (wfAction == null) { 101 throw new PreconditionException(ErrorCode.E0605, actionId); 102 } 103 // if the action has been updated, quit this command 104 if (actionCheckDelay > 0) { 105 Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000); 106 Timestamp actionLmt = wfAction.getLastCheckTimestamp(); 107 if (actionLmt.after(actionCheckTs)) { 108 throw new PreconditionException(ErrorCode.E0817, actionId); 109 } 110 } 111 112 executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType()); 113 if (executor == null) { 114 throw new CommandException(ErrorCode.E0802, wfAction.getType()); 115 } 116 } 117 118 @Override 119 protected boolean isLockRequired() { 120 return true; 121 } 122 123 @Override 124 protected String getEntityKey() { 125 return this.jobId; 126 } 127 128 @Override 129 protected void loadState() throws CommandException { 130 } 131 132 @Override 133 protected void verifyPrecondition() throws CommandException, PreconditionException { 134 if (!wfAction.isPending() || wfAction.getStatus() != WorkflowActionBean.Status.RUNNING) { 135 throw new PreconditionException(ErrorCode.E0815, wfAction.getPending(), wfAction.getStatusStr()); 136 } 137 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) { 138 wfAction.setLastCheckTime(new Date()); 139 try { 140 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction)); 141 } 142 catch (JPAExecutorException e) { 143 throw new CommandException(e); 144 } 145 throw new PreconditionException(ErrorCode.E0818, wfAction.getId(), wfJob.getId(), wfJob.getStatus()); 146 } 147 } 148 149 @Override 150 protected Void execute() throws CommandException { 151 LOG.debug("STARTED ActionCheckXCommand for wf actionId=" + actionId + " priority =" + getPriority()); 152 153 ActionExecutorContext context = null; 154 try { 155 boolean isRetry = false; 156 boolean isUserRetry = false; 157 context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry); 158 incrActionCounter(wfAction.getType(), 1); 159 160 Instrumentation.Cron cron = new Instrumentation.Cron(); 161 cron.start(); 162 executor.check(context, wfAction); 163 cron.stop(); 164 addActionCron(wfAction.getType(), cron); 165 166 if (wfAction.isExecutionComplete()) { 167 if (!context.isExecuted()) { 168 LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor 169 .getType()); 170 wfAction.setErrorInfo(EXEC_DATA_MISSING, 171 "Execution Complete, but Execution Data Missing from Action"); 172 failJob(context); 173 wfAction.setLastCheckTime(new Date()); 174 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction)); 175 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob)); 176 return null; 177 } 178 wfAction.setPending(); 179 queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType())); 180 } 181 wfAction.setLastCheckTime(new Date()); 182 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction)); 183 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob)); 184 } 185 catch (ActionExecutorException ex) { 186 LOG.warn("Exception while executing check(). Error Code [{0}], Message[{1}]", ex.getErrorCode(), ex 187 .getMessage(), ex); 188 189 wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage()); 190 191 switch (ex.getErrorType()) { 192 case FAILED: 193 failAction(wfJob, wfAction); 194 break; 195 case ERROR: 196 handleUserRetry(wfAction); 197 break; 198 } 199 wfAction.setLastCheckTime(new Date()); 200 try { 201 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction)); 202 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob)); 203 } 204 catch (JPAExecutorException e) { 205 throw new CommandException(e); 206 } 207 return null; 208 } 209 catch (JPAExecutorException e) { 210 throw new CommandException(e); 211 } 212 213 LOG.debug("ENDED ActionCheckXCommand for wf actionId=" + actionId + ", jobId=" + jobId); 214 return null; 215 } 216 217 private void failAction(WorkflowJobBean workflow, WorkflowActionBean action) throws CommandException { 218 if (!handleUserRetry(action)) { 219 LOG.warn("Failing Job [{0}] due to failed action [{1}]", workflow.getId(), action.getId()); 220 action.resetPending(); 221 action.setStatus(Status.FAILED); 222 workflow.setStatus(WorkflowJob.Status.FAILED); 223 InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1, getInstrumentation()); 224 } 225 } 226 }