001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.oozie.client.WorkflowJob; 022 import org.apache.oozie.client.SLAEvent.SlaAppType; 023 import org.apache.oozie.client.SLAEvent.Status; 024 import org.apache.oozie.WorkflowActionBean; 025 import org.apache.oozie.WorkflowJobBean; 026 import org.apache.oozie.ErrorCode; 027 import org.apache.oozie.XException; 028 import org.apache.oozie.command.CommandException; 029 import org.apache.oozie.command.PreconditionException; 030 import org.apache.oozie.command.coord.CoordActionUpdateXCommand; 031 import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext; 032 import org.apache.oozie.executor.jpa.JPAExecutorException; 033 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; 034 import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor; 035 import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor; 036 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 037 import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor; 038 import org.apache.oozie.service.ELService; 039 import org.apache.oozie.service.JPAService; 040 import org.apache.oozie.service.SchemaService; 041 import org.apache.oozie.service.Services; 042 import org.apache.oozie.service.UUIDService; 043 import org.apache.oozie.service.WorkflowStoreService; 044 import org.apache.oozie.workflow.WorkflowException; 045 import org.apache.oozie.workflow.WorkflowInstance; 046 import org.apache.oozie.workflow.lite.KillNodeDef; 047 import org.apache.oozie.workflow.lite.NodeDef; 048 import org.apache.oozie.util.ELEvaluator; 049 import org.apache.oozie.util.InstrumentUtils; 050 import org.apache.oozie.util.LogUtils; 051 import org.apache.oozie.util.XConfiguration; 052 import org.apache.oozie.util.ParamChecker; 053 import org.apache.oozie.util.XmlUtils; 054 import org.apache.oozie.util.db.SLADbXOperations; 055 import org.jdom.Element; 056 import org.jdom.Namespace; 057 058 import java.io.StringReader; 059 import java.util.Date; 060 import java.util.List; 061 import java.util.Map; 062 063 public class SignalXCommand extends WorkflowXCommand<Void> { 064 065 protected static final String INSTR_SUCCEEDED_JOBS_COUNTER_NAME = "succeeded"; 066 067 private JPAService jpaService = null; 068 private String jobId; 069 private String actionId; 070 private WorkflowJobBean wfJob; 071 private WorkflowActionBean wfAction; 072 073 public SignalXCommand(String name, int priority, String jobId) { 074 super(name, name, priority); 075 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 076 } 077 078 public SignalXCommand(String jobId, String actionId) { 079 this("signal", 1, jobId); 080 this.actionId = ParamChecker.notEmpty(actionId, "actionId"); 081 } 082 083 @Override 084 protected boolean isLockRequired() { 085 return true; 086 } 087 088 @Override 089 public String getEntityKey() { 090 return this.jobId; 091 } 092 093 @Override 094 protected void loadState() throws CommandException { 095 try { 096 jpaService = Services.get().get(JPAService.class); 097 if (jpaService != null) { 098 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId)); 099 LogUtils.setLogInfo(wfJob, logInfo); 100 if (actionId != null) { 101 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId)); 102 LogUtils.setLogInfo(wfAction, logInfo); 103 } 104 } 105 else { 106 throw new CommandException(ErrorCode.E0610); 107 } 108 } 109 catch (XException ex) { 110 throw new CommandException(ex); 111 } 112 } 113 114 @Override 115 protected void verifyPrecondition() throws CommandException, PreconditionException { 116 if ((wfAction == null) || (wfAction.isComplete() && wfAction.isPending())) { 117 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.PREP) { 118 throw new PreconditionException(ErrorCode.E0813, wfJob.getStatusStr()); 119 } 120 } 121 else { 122 throw new PreconditionException(ErrorCode.E0814, actionId, wfAction.getStatusStr(), wfAction.isPending()); 123 } 124 } 125 126 @Override 127 protected Void execute() throws CommandException { 128 LOG.debug("STARTED SignalCommand for jobid=" + jobId + ", actionId=" + actionId); 129 WorkflowInstance workflowInstance = wfJob.getWorkflowInstance(); 130 workflowInstance.setTransientVar(WorkflowStoreService.WORKFLOW_BEAN, wfJob); 131 boolean completed = false; 132 boolean skipAction = false; 133 if (wfAction == null) { 134 if (wfJob.getStatus() == WorkflowJob.Status.PREP) { 135 try { 136 completed = workflowInstance.start(); 137 } 138 catch (WorkflowException e) { 139 throw new CommandException(e); 140 } 141 wfJob.setStatus(WorkflowJob.Status.RUNNING); 142 wfJob.setStartTime(new Date()); 143 wfJob.setWorkflowInstance(workflowInstance); 144 // 1. Add SLA status event for WF-JOB with status STARTED 145 // 2. Add SLA registration events for all WF_ACTIONS 146 SLADbXOperations.writeStausEvent(wfJob.getSlaXml(), jobId, Status.STARTED, SlaAppType.WORKFLOW_JOB); 147 writeSLARegistrationForAllActions(workflowInstance.getApp().getDefinition(), wfJob.getUser(), wfJob 148 .getGroup(), wfJob.getConf()); 149 queue(new NotificationXCommand(wfJob)); 150 } 151 else { 152 throw new CommandException(ErrorCode.E0801, wfJob.getId()); 153 } 154 } 155 else { 156 String skipVar = workflowInstance.getVar(wfAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR 157 + ReRunXCommand.TO_SKIP); 158 if (skipVar != null) { 159 skipAction = skipVar.equals("true"); 160 } 161 try { 162 completed = workflowInstance.signal(wfAction.getExecutionPath(), wfAction.getSignalValue()); 163 } 164 catch (WorkflowException e) { 165 throw new CommandException(e); 166 } 167 wfJob.setWorkflowInstance(workflowInstance); 168 wfAction.resetPending(); 169 if (!skipAction) { 170 wfAction.setTransition(workflowInstance.getTransition(wfAction.getName())); 171 queue(new NotificationXCommand(wfJob, wfAction)); 172 } 173 try { 174 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction)); 175 } 176 catch (JPAExecutorException je) { 177 throw new CommandException(je); 178 } 179 } 180 181 if (completed) { 182 try { 183 for (String actionToKillId : WorkflowStoreService.getActionsToKill(workflowInstance)) { 184 WorkflowActionBean actionToKill; 185 186 actionToKill = jpaService.execute(new WorkflowActionGetJPAExecutor(actionToKillId)); 187 188 actionToKill.setPending(); 189 actionToKill.setStatus(WorkflowActionBean.Status.KILLED); 190 jpaService.execute(new WorkflowActionUpdateJPAExecutor(actionToKill)); 191 queue(new ActionKillXCommand(actionToKill.getId(), actionToKill.getType())); 192 } 193 194 for (String actionToFailId : WorkflowStoreService.getActionsToFail(workflowInstance)) { 195 WorkflowActionBean actionToFail = jpaService.execute(new WorkflowActionGetJPAExecutor( 196 actionToFailId)); 197 actionToFail.resetPending(); 198 actionToFail.setStatus(WorkflowActionBean.Status.FAILED); 199 queue(new NotificationXCommand(wfJob, actionToFail)); 200 SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED, 201 SlaAppType.WORKFLOW_ACTION); 202 jpaService.execute(new WorkflowActionUpdateJPAExecutor(actionToFail)); 203 } 204 } 205 catch (JPAExecutorException je) { 206 throw new CommandException(je); 207 } 208 209 wfJob.setStatus(WorkflowJob.Status.valueOf(workflowInstance.getStatus().toString())); 210 wfJob.setEndTime(new Date()); 211 wfJob.setWorkflowInstance(workflowInstance); 212 Status slaStatus = Status.SUCCEEDED; 213 switch (wfJob.getStatus()) { 214 case SUCCEEDED: 215 slaStatus = Status.SUCCEEDED; 216 break; 217 case KILLED: 218 slaStatus = Status.KILLED; 219 break; 220 case FAILED: 221 slaStatus = Status.FAILED; 222 break; 223 default: // TODO SUSPENDED 224 break; 225 } 226 SLADbXOperations.writeStausEvent(wfJob.getSlaXml(), jobId, slaStatus, SlaAppType.WORKFLOW_JOB); 227 queue(new NotificationXCommand(wfJob)); 228 if (wfJob.getStatus() == WorkflowJob.Status.SUCCEEDED) { 229 InstrumentUtils.incrJobCounter(INSTR_SUCCEEDED_JOBS_COUNTER_NAME, 1, getInstrumentation()); 230 } 231 232 // output message for Kill node 233 if (wfAction != null) { // wfAction could be a no-op job 234 NodeDef nodeDef = workflowInstance.getNodeDef(wfAction.getExecutionPath()); 235 if (nodeDef != null && nodeDef instanceof KillNodeDef) { 236 boolean isRetry = false; 237 boolean isUserRetry = false; 238 ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, 239 isUserRetry); 240 try { 241 String tmpNodeConf = nodeDef.getConf(); 242 String actionConf = context.getELEvaluator().evaluate(tmpNodeConf, String.class); 243 LOG.debug("Try to resolve KillNode message for jobid [{0}], actionId [{1}], before resolve [{2}], after resolve [{3}]", 244 jobId, actionId, tmpNodeConf, actionConf); 245 if (wfAction.getErrorCode() != null) { 246 wfAction.setErrorInfo(wfAction.getErrorCode(), actionConf); 247 } 248 else { 249 wfAction.setErrorInfo(ErrorCode.E0729.toString(), actionConf); 250 } 251 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction)); 252 } 253 catch (JPAExecutorException je) { 254 throw new CommandException(je); 255 } 256 catch (Exception ex) { 257 LOG.warn("Exception in SignalXCommand ", ex.getMessage(), ex); 258 throw new CommandException(ErrorCode.E0729, wfAction.getName(), ex); 259 } 260 } 261 } 262 263 } 264 else { 265 for (WorkflowActionBean newAction : WorkflowStoreService.getStartedActions(workflowInstance)) { 266 String skipVar = workflowInstance.getVar(newAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR 267 + ReRunXCommand.TO_SKIP); 268 boolean skipNewAction = false; 269 if (skipVar != null) { 270 skipNewAction = skipVar.equals("true"); 271 } 272 try { 273 if (skipNewAction) { 274 WorkflowActionBean oldAction; 275 276 oldAction = jpaService.execute(new WorkflowActionGetJPAExecutor(newAction.getId())); 277 278 oldAction.setPending(); 279 jpaService.execute(new WorkflowActionUpdateJPAExecutor(oldAction)); 280 281 queue(new SignalXCommand(jobId, oldAction.getId())); 282 } 283 else { 284 newAction.setPending(); 285 String actionSlaXml = getActionSLAXml(newAction.getName(), workflowInstance.getApp() 286 .getDefinition(), wfJob.getConf()); 287 newAction.setSlaXml(actionSlaXml); 288 jpaService.execute(new WorkflowActionInsertJPAExecutor(newAction)); 289 LOG.debug("SignalXCommand: Name: "+ newAction.getName() + ", Id: " +newAction.getId() + ", Authcode:" + newAction.getCred()); 290 queue(new ActionStartXCommand(newAction.getId(), newAction.getType())); 291 } 292 } 293 catch (JPAExecutorException je) { 294 throw new CommandException(je); 295 } 296 } 297 } 298 299 try { 300 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob)); 301 } 302 catch (JPAExecutorException je) { 303 throw new CommandException(je); 304 } 305 LOG.debug( 306 "Updated the workflow status to " + wfJob.getId() + " status =" + wfJob.getStatusStr()); 307 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) { 308 // update coordinator action 309 new CoordActionUpdateXCommand(wfJob).call(); //Note: Called even if wf is not necessarily instantiated by coordinator 310 new WfEndXCommand(wfJob).call(); //To delete the WF temp dir 311 } 312 LOG.debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" + actionId); 313 return null; 314 } 315 316 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) { 317 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group); 318 for (Map.Entry<String, String> entry : conf) { 319 eval.setVariable(entry.getKey(), entry.getValue()); 320 } 321 return eval; 322 } 323 324 @SuppressWarnings("unchecked") 325 private String getActionSLAXml(String actionName, String wfXml, String wfConf) throws CommandException { 326 String slaXml = null; 327 try { 328 Element eWfJob = XmlUtils.parseXml(wfXml); 329 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 330 if (action.getAttributeValue("name").equals(actionName) == false) { 331 continue; 332 } 333 Element eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI)); 334 if (eSla != null) { 335 slaXml = XmlUtils.prettyPrint(eSla).toString(); 336 break; 337 } 338 } 339 } 340 catch (Exception e) { 341 throw new CommandException(ErrorCode.E1004, e.getMessage(), e); 342 } 343 return slaXml; 344 } 345 346 private String resolveSla(Element eSla, Configuration conf) throws CommandException { 347 String slaXml = null; 348 try { 349 ELEvaluator evalSla = SubmitXCommand.createELEvaluatorForGroup(conf, "wf-sla-submit"); 350 slaXml = SubmitXCommand.resolveSla(eSla, evalSla); 351 } 352 catch (Exception e) { 353 throw new CommandException(ErrorCode.E1004, e.getMessage(), e); 354 } 355 return slaXml; 356 } 357 358 @SuppressWarnings("unchecked") 359 private void writeSLARegistrationForAllActions(String wfXml, String user, String group, String strConf) 360 throws CommandException { 361 try { 362 Element eWfJob = XmlUtils.parseXml(wfXml); 363 Configuration conf = new XConfiguration(new StringReader(strConf)); 364 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 365 Element eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI)); 366 if (eSla != null) { 367 String slaXml = resolveSla(eSla, conf); 368 eSla = XmlUtils.parseXml(slaXml); 369 String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, 370 action.getAttributeValue("name") + ""); 371 SLADbXOperations.writeSlaRegistrationEvent(eSla, actionId, SlaAppType.WORKFLOW_ACTION, user, group); 372 } 373 } 374 } 375 catch (Exception e) { 376 throw new CommandException(ErrorCode.E1007, "workflow:Actions " + jobId, e); 377 } 378 379 } 380 381 }