001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.util.ArrayList; 021 import java.util.Date; 022 import java.util.List; 023 024 import org.apache.hadoop.conf.Configuration; 025 import org.apache.oozie.DagELFunctions; 026 import org.apache.oozie.ErrorCode; 027 import org.apache.oozie.SLAEventBean; 028 import org.apache.oozie.WorkflowActionBean; 029 import org.apache.oozie.WorkflowJobBean; 030 import org.apache.oozie.XException; 031 import org.apache.oozie.action.ActionExecutor; 032 import org.apache.oozie.action.ActionExecutorException; 033 import org.apache.oozie.action.control.ControlNodeActionExecutor; 034 import org.apache.oozie.client.OozieClient; 035 import org.apache.oozie.client.WorkflowAction; 036 import org.apache.oozie.client.WorkflowJob; 037 import org.apache.oozie.client.SLAEvent.SlaAppType; 038 import org.apache.oozie.client.SLAEvent.Status; 039 import org.apache.oozie.client.rest.JsonBean; 040 import org.apache.oozie.command.CommandException; 041 import org.apache.oozie.command.PreconditionException; 042 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 043 import org.apache.oozie.executor.jpa.JPAExecutorException; 044 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; 045 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 046 import org.apache.oozie.service.ActionService; 047 import org.apache.oozie.service.EventHandlerService; 048 import org.apache.oozie.service.JPAService; 049 import org.apache.oozie.service.Services; 050 import org.apache.oozie.service.UUIDService; 051 import org.apache.oozie.util.Instrumentation; 052 import org.apache.oozie.util.LogUtils; 053 import org.apache.oozie.util.XLog; 054 import org.apache.oozie.util.db.SLADbXOperations; 055 import org.apache.oozie.workflow.WorkflowInstance; 056 057 @SuppressWarnings("deprecation") 058 public class ActionEndXCommand extends ActionXCommand<Void> { 059 public static final String COULD_NOT_END = "COULD_NOT_END"; 060 public static final String END_DATA_MISSING = "END_DATA_MISSING"; 061 062 private String jobId = null; 063 private String actionId = null; 064 private WorkflowJobBean wfJob = null; 065 private WorkflowActionBean wfAction = null; 066 private JPAService jpaService = null; 067 private ActionExecutor executor = null; 068 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 069 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 070 071 public ActionEndXCommand(String actionId, String type) { 072 super("action.end", type, 0); 073 this.actionId = actionId; 074 this.jobId = Services.get().get(UUIDService.class).getId(actionId); 075 } 076 077 @Override 078 protected boolean isLockRequired() { 079 return true; 080 } 081 082 @Override 083 public String getEntityKey() { 084 return this.jobId; 085 } 086 087 @Override 088 public String getKey() { 089 return getName() + "_" + actionId; 090 } 091 092 @Override 093 protected void loadState() throws CommandException { 094 try { 095 jpaService = Services.get().get(JPAService.class); 096 if (jpaService != null) { 097 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId)); 098 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId)); 099 LogUtils.setLogInfo(wfJob, logInfo); 100 LogUtils.setLogInfo(wfAction, logInfo); 101 } 102 else { 103 throw new CommandException(ErrorCode.E0610); 104 } 105 } 106 catch (XException ex) { 107 throw new CommandException(ex); 108 } 109 } 110 111 @Override 112 protected void verifyPrecondition() throws CommandException, PreconditionException { 113 if (wfJob == null) { 114 throw new PreconditionException(ErrorCode.E0604, jobId); 115 } 116 if (wfAction == null) { 117 throw new PreconditionException(ErrorCode.E0605, actionId); 118 } 119 if (wfAction.isPending() 120 && (wfAction.getStatus() == WorkflowActionBean.Status.DONE 121 || wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL)) { 122 123 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) { 124 throw new PreconditionException(ErrorCode.E0811, WorkflowJob.Status.RUNNING.toString()); 125 } 126 } 127 else { 128 throw new PreconditionException(ErrorCode.E0812, wfAction.getPending(), wfAction.getStatusStr()); 129 } 130 131 executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType()); 132 if (executor == null) { 133 throw new CommandException(ErrorCode.E0802, wfAction.getType()); 134 } 135 } 136 137 @Override 138 protected Void execute() throws CommandException { 139 LOG.debug("STARTED ActionEndXCommand for action " + actionId); 140 141 Configuration conf = wfJob.getWorkflowInstance().getConf(); 142 143 int maxRetries = 0; 144 long retryInterval = 0; 145 146 if (!(executor instanceof ControlNodeActionExecutor)) { 147 maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries()); 148 retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval()); 149 } 150 151 executor.setMaxRetries(maxRetries); 152 executor.setRetryInterval(retryInterval); 153 154 boolean isRetry = false; 155 if (wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY 156 || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL) { 157 isRetry = true; 158 } 159 boolean isUserRetry = false; 160 ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry); 161 try { 162 163 LOG.debug( 164 "End, name [{0}] type [{1}] status[{2}] external status [{3}] signal value [{4}]", 165 wfAction.getName(), wfAction.getType(), wfAction.getStatus(), wfAction.getExternalStatus(), 166 wfAction.getSignalValue()); 167 168 Instrumentation.Cron cron = new Instrumentation.Cron(); 169 cron.start(); 170 executor.end(context, wfAction); 171 cron.stop(); 172 addActionCron(wfAction.getType(), cron); 173 174 WorkflowInstance wfInstance = wfJob.getWorkflowInstance(); 175 DagELFunctions.setActionInfo(wfInstance, wfAction); 176 wfJob.setWorkflowInstance(wfInstance); 177 incrActionCounter(wfAction.getType(), 1); 178 179 if (!context.isEnded()) { 180 LOG.warn(XLog.OPS, "Action Ended, ActionExecutor [{0}] must call setEndData()", 181 executor.getType()); 182 wfAction.setErrorInfo(END_DATA_MISSING, "Execution Ended, but End Data Missing from Action"); 183 failJob(context); 184 } else { 185 wfAction.setRetries(0); 186 wfAction.setEndTime(new Date()); 187 188 boolean shouldHandleUserRetry = false; 189 Status slaStatus = null; 190 switch (wfAction.getStatus()) { 191 case OK: 192 slaStatus = Status.SUCCEEDED; 193 break; 194 case KILLED: 195 slaStatus = Status.KILLED; 196 break; 197 case FAILED: 198 slaStatus = Status.FAILED; 199 shouldHandleUserRetry = true; 200 break; 201 case ERROR: 202 LOG.info("ERROR is considered as FAILED for SLA"); 203 slaStatus = Status.KILLED; 204 shouldHandleUserRetry = true; 205 break; 206 default: 207 slaStatus = Status.FAILED; 208 shouldHandleUserRetry = true; 209 break; 210 } 211 if (!shouldHandleUserRetry || !handleUserRetry(wfAction)) { 212 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus, SlaAppType.WORKFLOW_ACTION); 213 LOG.debug("Queuing commands for action=" + actionId + ", status=" + wfAction.getStatus() 214 + ", Set pending=" + wfAction.getPending()); 215 if(slaEvent != null) { 216 insertList.add(slaEvent); 217 } 218 queue(new SignalXCommand(jobId, actionId)); 219 } 220 } 221 updateList.add(wfAction); 222 wfJob.setLastModifiedTime(new Date()); 223 updateList.add(wfJob); 224 } 225 catch (ActionExecutorException ex) { 226 LOG.warn( 227 "Error ending action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]", 228 wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage()); 229 wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage()); 230 wfAction.setEndTime(null); 231 232 switch (ex.getErrorType()) { 233 case TRANSIENT: 234 if (!handleTransient(context, executor, WorkflowAction.Status.END_RETRY)) { 235 handleNonTransient(context, executor, WorkflowAction.Status.END_MANUAL); 236 wfAction.setPendingAge(new Date()); 237 wfAction.setRetries(0); 238 } 239 wfAction.setEndTime(null); 240 break; 241 case NON_TRANSIENT: 242 handleNonTransient(context, executor, WorkflowAction.Status.END_MANUAL); 243 wfAction.setEndTime(null); 244 break; 245 case ERROR: 246 handleError(context, executor, COULD_NOT_END, false, WorkflowAction.Status.ERROR); 247 queue(new SignalXCommand(jobId, actionId)); 248 break; 249 case FAILED: 250 failJob(context); 251 break; 252 } 253 254 WorkflowInstance wfInstance = wfJob.getWorkflowInstance(); 255 DagELFunctions.setActionInfo(wfInstance, wfAction); 256 wfJob.setWorkflowInstance(wfInstance); 257 258 updateList.add(wfAction); 259 wfJob.setLastModifiedTime(new Date()); 260 updateList.add(wfJob); 261 } 262 finally { 263 try { 264 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList)); 265 if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) { 266 generateEvent(wfAction, wfJob.getUser()); 267 } 268 } 269 catch (JPAExecutorException e) { 270 throw new CommandException(e); 271 } 272 } 273 274 LOG.debug("ENDED ActionEndXCommand for action " + actionId); 275 return null; 276 } 277 278 }