001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.util.Date; 021 022 import org.apache.hadoop.conf.Configuration; 023 import org.apache.oozie.DagELFunctions; 024 import org.apache.oozie.ErrorCode; 025 import org.apache.oozie.WorkflowActionBean; 026 import org.apache.oozie.WorkflowJobBean; 027 import org.apache.oozie.XException; 028 import org.apache.oozie.action.ActionExecutor; 029 import org.apache.oozie.action.ActionExecutorException; 030 import org.apache.oozie.client.OozieClient; 031 import org.apache.oozie.client.WorkflowAction; 032 import org.apache.oozie.client.WorkflowJob; 033 import org.apache.oozie.client.SLAEvent.SlaAppType; 034 import org.apache.oozie.client.SLAEvent.Status; 035 import org.apache.oozie.command.CommandException; 036 import org.apache.oozie.command.PreconditionException; 037 import org.apache.oozie.executor.jpa.JPAExecutorException; 038 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; 039 import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor; 040 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 041 import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor; 042 import org.apache.oozie.service.ActionService; 043 import org.apache.oozie.service.JPAService; 044 import org.apache.oozie.service.Services; 045 import org.apache.oozie.service.UUIDService; 046 import org.apache.oozie.util.Instrumentation; 047 import org.apache.oozie.util.LogUtils; 048 import org.apache.oozie.util.XLog; 049 import org.apache.oozie.util.db.SLADbXOperations; 050 import org.apache.oozie.workflow.WorkflowInstance; 051 052 public class ActionEndXCommand extends ActionXCommand<Void> { 053 public static final String COULD_NOT_END = "COULD_NOT_END"; 054 public static final String END_DATA_MISSING = "END_DATA_MISSING"; 055 056 private String jobId = null; 057 private String actionId = null; 058 private WorkflowJobBean wfJob = null; 059 private WorkflowActionBean wfAction = null; 060 private JPAService jpaService = null; 061 private ActionExecutor executor = null; 062 063 public ActionEndXCommand(String actionId, String type) { 064 super("action.end", type, 0); 065 this.actionId = actionId; 066 this.jobId = Services.get().get(UUIDService.class).getId(actionId); 067 } 068 069 @Override 070 protected boolean isLockRequired() { 071 return true; 072 } 073 074 @Override 075 protected String getEntityKey() { 076 return this.jobId; 077 } 078 079 @Override 080 protected void loadState() throws CommandException { 081 try { 082 jpaService = Services.get().get(JPAService.class); 083 if (jpaService != null) { 084 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId)); 085 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId)); 086 LogUtils.setLogInfo(wfJob, logInfo); 087 LogUtils.setLogInfo(wfAction, logInfo); 088 } 089 else { 090 throw new CommandException(ErrorCode.E0610); 091 } 092 } 093 catch (XException ex) { 094 throw new CommandException(ex); 095 } 096 } 097 098 @Override 099 protected void verifyPrecondition() throws CommandException, PreconditionException { 100 if (wfJob == null) { 101 throw new PreconditionException(ErrorCode.E0604, jobId); 102 } 103 if (wfAction == null) { 104 throw new PreconditionException(ErrorCode.E0605, actionId); 105 } 106 if (wfAction.isPending() 107 && (wfAction.getStatus() == WorkflowActionBean.Status.DONE 108 || wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL)) { 109 110 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) { 111 throw new PreconditionException(ErrorCode.E0811, WorkflowJob.Status.RUNNING.toString()); 112 } 113 } 114 else { 115 throw new PreconditionException(ErrorCode.E0812, wfAction.getPending(), wfAction.getStatusStr()); 116 } 117 118 executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType()); 119 if (executor == null) { 120 throw new CommandException(ErrorCode.E0802, wfAction.getType()); 121 } 122 } 123 124 @Override 125 protected Void execute() throws CommandException { 126 LOG.debug("STARTED ActionEndXCommand for action " + actionId); 127 128 Configuration conf = wfJob.getWorkflowInstance().getConf(); 129 int maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries()); 130 long retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval()); 131 executor.setMaxRetries(maxRetries); 132 executor.setRetryInterval(retryInterval); 133 134 boolean isRetry = false; 135 if (wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY 136 || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL) { 137 isRetry = true; 138 } 139 boolean isUserRetry = false; 140 ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry); 141 try { 142 143 LOG.debug( 144 "End, name [{0}] type [{1}] status[{2}] external status [{3}] signal value [{4}]", 145 wfAction.getName(), wfAction.getType(), wfAction.getStatus(), wfAction.getExternalStatus(), 146 wfAction.getSignalValue()); 147 148 Instrumentation.Cron cron = new Instrumentation.Cron(); 149 cron.start(); 150 executor.end(context, wfAction); 151 cron.stop(); 152 addActionCron(wfAction.getType(), cron); 153 154 WorkflowInstance wfInstance = wfJob.getWorkflowInstance(); 155 DagELFunctions.setActionInfo(wfInstance, wfAction); 156 wfJob.setWorkflowInstance(wfInstance); 157 incrActionCounter(wfAction.getType(), 1); 158 159 if (!context.isEnded()) { 160 LOG.warn(XLog.OPS, "Action Ended, ActionExecutor [{0}] must call setEndData()", 161 executor.getType()); 162 wfAction.setErrorInfo(END_DATA_MISSING, "Execution Ended, but End Data Missing from Action"); 163 failJob(context); 164 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction)); 165 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob)); 166 return null; 167 } 168 wfAction.setRetries(0); 169 wfAction.setEndTime(new Date()); 170 171 boolean shouldHandleUserRetry = false; 172 Status slaStatus = null; 173 switch (wfAction.getStatus()) { 174 case OK: 175 slaStatus = Status.SUCCEEDED; 176 break; 177 case KILLED: 178 slaStatus = Status.KILLED; 179 break; 180 case FAILED: 181 slaStatus = Status.FAILED; 182 shouldHandleUserRetry = true; 183 break; 184 case ERROR: 185 LOG.info("ERROR is considered as FAILED for SLA"); 186 slaStatus = Status.KILLED; 187 shouldHandleUserRetry = true; 188 break; 189 default: 190 slaStatus = Status.FAILED; 191 shouldHandleUserRetry = true; 192 break; 193 } 194 if (!shouldHandleUserRetry || !handleUserRetry(wfAction)) { 195 SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus, SlaAppType.WORKFLOW_ACTION); 196 queue(new NotificationXCommand(wfJob, wfAction)); 197 LOG.debug( 198 "Queuing commands for action=" + actionId + ", status=" + wfAction.getStatus() 199 + ", Set pending=" + wfAction.getPending()); 200 queue(new SignalXCommand(jobId, actionId)); 201 } 202 203 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction)); 204 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob)); 205 } 206 catch (ActionExecutorException ex) { 207 LOG.warn( 208 "Error ending action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]", 209 wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage()); 210 wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage()); 211 wfAction.setEndTime(null); 212 213 switch (ex.getErrorType()) { 214 case TRANSIENT: 215 if (!handleTransient(context, executor, WorkflowAction.Status.END_RETRY)) { 216 handleNonTransient(context, executor, WorkflowAction.Status.END_MANUAL); 217 wfAction.setPendingAge(new Date()); 218 wfAction.setRetries(0); 219 } 220 wfAction.setEndTime(null); 221 break; 222 case NON_TRANSIENT: 223 handleNonTransient(context, executor, WorkflowAction.Status.END_MANUAL); 224 wfAction.setEndTime(null); 225 break; 226 case ERROR: 227 handleError(context, executor, COULD_NOT_END, false, WorkflowAction.Status.ERROR); 228 queue(new SignalXCommand(jobId, actionId)); 229 break; 230 case FAILED: 231 failJob(context); 232 break; 233 } 234 235 WorkflowInstance wfInstance = wfJob.getWorkflowInstance(); 236 DagELFunctions.setActionInfo(wfInstance, wfAction); 237 wfJob.setWorkflowInstance(wfInstance); 238 239 try { 240 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction)); 241 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob)); 242 } 243 catch (JPAExecutorException je) { 244 throw new CommandException(je); 245 } 246 247 } 248 catch (JPAExecutorException je) { 249 throw new CommandException(je); 250 } 251 252 253 LOG.debug("ENDED ActionEndXCommand for action " + actionId); 254 return null; 255 } 256 257 }