001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.util.Date; 021 022 import org.apache.hadoop.conf.Configuration; 023 import org.apache.oozie.DagELFunctions; 024 import org.apache.oozie.ErrorCode; 025 import org.apache.oozie.WorkflowActionBean; 026 import org.apache.oozie.WorkflowJobBean; 027 import org.apache.oozie.XException; 028 import org.apache.oozie.action.ActionExecutor; 029 import org.apache.oozie.action.ActionExecutorException; 030 import org.apache.oozie.client.OozieClient; 031 import org.apache.oozie.client.WorkflowAction; 032 import org.apache.oozie.client.WorkflowJob; 033 import org.apache.oozie.client.SLAEvent.SlaAppType; 034 import org.apache.oozie.client.SLAEvent.Status; 035 import org.apache.oozie.command.CommandException; 036 import org.apache.oozie.command.PreconditionException; 037 import org.apache.oozie.executor.jpa.JPAExecutorException; 038 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; 039 import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor; 040 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 041 import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor; 042 import org.apache.oozie.service.ActionService; 043 import org.apache.oozie.service.JPAService; 044 import org.apache.oozie.service.Services; 045 import org.apache.oozie.service.UUIDService; 046 import org.apache.oozie.util.Instrumentation; 047 import org.apache.oozie.util.LogUtils; 048 import org.apache.oozie.util.XLog; 049 import org.apache.oozie.util.db.SLADbXOperations; 050 import org.apache.oozie.workflow.WorkflowInstance; 051 052 public class ActionEndXCommand extends ActionXCommand<Void> { 053 public static final String COULD_NOT_END = "COULD_NOT_END"; 054 public static final String END_DATA_MISSING = "END_DATA_MISSING"; 055 056 private String jobId = null; 057 private String actionId = null; 058 private WorkflowJobBean wfJob = null; 059 private WorkflowActionBean wfAction = null; 060 private JPAService jpaService = null; 061 private ActionExecutor executor = null; 062 063 public ActionEndXCommand(String actionId, String type) { 064 super("action.end", type, 0); 065 this.actionId = actionId; 066 this.jobId = Services.get().get(UUIDService.class).getId(actionId); 067 } 068 069 @Override 070 protected boolean isLockRequired() { 071 return true; 072 } 073 074 @Override 075 public String getEntityKey() { 076 return this.jobId; 077 } 078 079 @Override 080 protected void loadState() throws CommandException { 081 try { 082 jpaService = Services.get().get(JPAService.class); 083 if (jpaService != null) { 084 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId)); 085 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId)); 086 LogUtils.setLogInfo(wfJob, logInfo); 087 LogUtils.setLogInfo(wfAction, logInfo); 088 } 089 else { 090 throw new CommandException(ErrorCode.E0610); 091 } 092 } 093 catch (XException ex) { 094 throw new CommandException(ex); 095 } 096 } 097 098 @Override 099 protected void verifyPrecondition() throws CommandException, PreconditionException { 100 if (wfJob == null) { 101 throw new PreconditionException(ErrorCode.E0604, jobId); 102 } 103 if (wfAction == null) { 104 throw new PreconditionException(ErrorCode.E0605, actionId); 105 } 106 if (wfAction.isPending() 107 && (wfAction.getStatus() == WorkflowActionBean.Status.DONE 108 || wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL)) { 109 110 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) { 111 throw new PreconditionException(ErrorCode.E0811, WorkflowJob.Status.RUNNING.toString()); 112 } 113 } 114 else { 115 throw new PreconditionException(ErrorCode.E0812, wfAction.getPending(), wfAction.getStatusStr()); 116 } 117 118 executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType()); 119 if (executor == null) { 120 throw new CommandException(ErrorCode.E0802, wfAction.getType()); 121 } 122 } 123 124 @Override 125 protected Void execute() throws CommandException { 126 LOG.debug("STARTED ActionEndXCommand for action " + actionId); 127 128 Configuration conf = wfJob.getWorkflowInstance().getConf(); 129 int maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries()); 130 long retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval()); 131 executor.setMaxRetries(maxRetries); 132 executor.setRetryInterval(retryInterval); 133 134 boolean isRetry = false; 135 if (wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY 136 || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL) { 137 isRetry = true; 138 } 139 boolean isUserRetry = false; 140 ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry); 141 try { 142 143 LOG.debug( 144 "End, name [{0}] type [{1}] status[{2}] external status [{3}] signal value [{4}]", 145 wfAction.getName(), wfAction.getType(), wfAction.getStatus(), wfAction.getExternalStatus(), 146 wfAction.getSignalValue()); 147 148 Instrumentation.Cron cron = new Instrumentation.Cron(); 149 cron.start(); 150 executor.end(context, wfAction); 151 cron.stop(); 152 addActionCron(wfAction.getType(), cron); 153 154 WorkflowInstance wfInstance = wfJob.getWorkflowInstance(); 155 DagELFunctions.setActionInfo(wfInstance, wfAction); 156 wfJob.setWorkflowInstance(wfInstance); 157 incrActionCounter(wfAction.getType(), 1); 158 159 if (!context.isEnded()) { 160 LOG.warn(XLog.OPS, "Action Ended, ActionExecutor [{0}] must call setEndData()", 161 executor.getType()); 162 wfAction.setErrorInfo(END_DATA_MISSING, "Execution Ended, but End Data Missing from Action"); 163 failJob(context); 164 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction)); 165 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob)); 166 return null; 167 } 168 wfAction.setRetries(0); 169 wfAction.setEndTime(new Date()); 170 171 boolean shouldHandleUserRetry = false; 172 Status slaStatus = null; 173 switch (wfAction.getStatus()) { 174 case OK: 175 slaStatus = Status.SUCCEEDED; 176 break; 177 case KILLED: 178 slaStatus = Status.KILLED; 179 break; 180 case FAILED: 181 slaStatus = Status.FAILED; 182 shouldHandleUserRetry = true; 183 break; 184 case ERROR: 185 LOG.info("ERROR is considered as FAILED for SLA"); 186 slaStatus = Status.KILLED; 187 shouldHandleUserRetry = true; 188 break; 189 default: 190 slaStatus = Status.FAILED; 191 shouldHandleUserRetry = true; 192 break; 193 } 194 if (!shouldHandleUserRetry || !handleUserRetry(wfAction)) { 195 SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus, SlaAppType.WORKFLOW_ACTION); 196 LOG.debug( 197 "Queuing commands for action=" + actionId + ", status=" + wfAction.getStatus() 198 + ", Set pending=" + wfAction.getPending()); 199 queue(new SignalXCommand(jobId, actionId)); 200 } 201 202 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction)); 203 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob)); 204 } 205 catch (ActionExecutorException ex) { 206 LOG.warn( 207 "Error ending action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]", 208 wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage()); 209 wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage()); 210 wfAction.setEndTime(null); 211 212 switch (ex.getErrorType()) { 213 case TRANSIENT: 214 if (!handleTransient(context, executor, WorkflowAction.Status.END_RETRY)) { 215 handleNonTransient(context, executor, WorkflowAction.Status.END_MANUAL); 216 wfAction.setPendingAge(new Date()); 217 wfAction.setRetries(0); 218 } 219 wfAction.setEndTime(null); 220 break; 221 case NON_TRANSIENT: 222 handleNonTransient(context, executor, WorkflowAction.Status.END_MANUAL); 223 wfAction.setEndTime(null); 224 break; 225 case ERROR: 226 handleError(context, executor, COULD_NOT_END, false, WorkflowAction.Status.ERROR); 227 queue(new SignalXCommand(jobId, actionId)); 228 break; 229 case FAILED: 230 failJob(context); 231 break; 232 } 233 234 WorkflowInstance wfInstance = wfJob.getWorkflowInstance(); 235 DagELFunctions.setActionInfo(wfInstance, wfAction); 236 wfJob.setWorkflowInstance(wfInstance); 237 238 try { 239 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction)); 240 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob)); 241 } 242 catch (JPAExecutorException je) { 243 throw new CommandException(je); 244 } 245 246 } 247 catch (JPAExecutorException je) { 248 throw new CommandException(je); 249 } 250 251 252 LOG.debug("ENDED ActionEndXCommand for action " + actionId); 253 return null; 254 } 255 256 }