001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.wf; 019 020import java.io.IOException; 021 022import org.apache.hadoop.conf.Configuration; 023import org.apache.oozie.action.ActionExecutor; 024import org.apache.oozie.action.control.ForkActionExecutor; 025import org.apache.oozie.action.control.StartActionExecutor; 026import org.apache.oozie.client.WorkflowJob; 027import org.apache.oozie.client.SLAEvent.SlaAppType; 028import org.apache.oozie.client.SLAEvent.Status; 029import org.apache.oozie.client.rest.JsonBean; 030import org.apache.oozie.SLAEventBean; 031import org.apache.oozie.WorkflowActionBean; 032import org.apache.oozie.WorkflowJobBean; 033import org.apache.oozie.ErrorCode; 034import org.apache.oozie.XException; 035import org.apache.oozie.command.CommandException; 036import org.apache.oozie.command.PreconditionException; 037import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext; 038import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 039import org.apache.oozie.executor.jpa.BatchQueryExecutor; 040import org.apache.oozie.executor.jpa.JPAExecutorException; 041import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; 042import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; 043import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 044import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 045import org.apache.oozie.service.ActionService; 046import org.apache.oozie.service.ELService; 047import org.apache.oozie.service.EventHandlerService; 048import org.apache.oozie.service.JPAService; 049import org.apache.oozie.service.Services; 050import org.apache.oozie.service.UUIDService; 051import org.apache.oozie.service.WorkflowStoreService; 052import org.apache.oozie.workflow.WorkflowException; 053import org.apache.oozie.workflow.WorkflowInstance; 054import org.apache.oozie.workflow.lite.KillNodeDef; 055import org.apache.oozie.workflow.lite.NodeDef; 056import org.apache.oozie.util.ELEvaluator; 057import org.apache.oozie.util.InstrumentUtils; 058import org.apache.oozie.util.LogUtils; 059import org.apache.oozie.util.XConfiguration; 060import org.apache.oozie.util.ParamChecker; 061import org.apache.oozie.util.XLog; 062import org.apache.oozie.util.XmlUtils; 063import org.apache.oozie.util.db.SLADbXOperations; 064import org.jdom.Element; 065 066import java.io.StringReader; 067import java.util.ArrayList; 068import java.util.Date; 069import java.util.List; 070import java.util.Map; 071 072import org.apache.oozie.client.OozieClient; 073 074@SuppressWarnings("deprecation") 075public class SignalXCommand extends WorkflowXCommand<Void> { 076 077 private JPAService jpaService = null; 078 private String jobId; 079 private String actionId; 080 private WorkflowJobBean wfJob; 081 private WorkflowActionBean wfAction; 082 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 083 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 084 private boolean generateEvent = false; 085 private String wfJobErrorCode; 086 private String wfJobErrorMsg; 087 088 public SignalXCommand(String name, int priority, String jobId) { 089 super(name, name, priority); 090 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 091 } 092 093 public SignalXCommand(String jobId, String actionId) { 094 this("signal", 1, jobId); 095 this.actionId = ParamChecker.notEmpty(actionId, "actionId"); 096 } 097 098 @Override 099 protected boolean isLockRequired() { 100 return true; 101 } 102 103 @Override 104 public String getEntityKey() { 105 return this.jobId; 106 } 107 108 @Override 109 public String getKey() { 110 return getName() + "_" + jobId + "_" + actionId; 111 } 112 113 @Override 114 protected void loadState() throws CommandException { 115 try { 116 jpaService = Services.get().get(JPAService.class); 117 if (jpaService != null) { 118 this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId); 119 LogUtils.setLogInfo(wfJob, logInfo); 120 if (actionId != null) { 121 this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_SIGNAL, actionId); 122 LogUtils.setLogInfo(wfAction, logInfo); 123 } 124 } 125 else { 126 throw new CommandException(ErrorCode.E0610); 127 } 128 } 129 catch (XException ex) { 130 throw new CommandException(ex); 131 } 132 } 133 134 @Override 135 protected void verifyPrecondition() throws CommandException, PreconditionException { 136 if ((wfAction == null) || (wfAction.isComplete() && wfAction.isPending())) { 137 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.PREP) { 138 throw new PreconditionException(ErrorCode.E0813, wfJob.getStatusStr()); 139 } 140 } 141 else { 142 throw new PreconditionException(ErrorCode.E0814, actionId, wfAction.getStatusStr(), wfAction.isPending()); 143 } 144 } 145 146 @Override 147 protected Void execute() throws CommandException { 148 149 LOG.debug("STARTED SignalCommand for jobid=" + jobId + ", actionId=" + actionId); 150 WorkflowInstance workflowInstance = wfJob.getWorkflowInstance(); 151 workflowInstance.setTransientVar(WorkflowStoreService.WORKFLOW_BEAN, wfJob); 152 WorkflowJob.Status prevStatus = wfJob.getStatus(); 153 boolean completed = false, skipAction = false; 154 WorkflowActionBean syncAction = null; 155 156 if (wfAction == null) { 157 if (wfJob.getStatus() == WorkflowJob.Status.PREP) { 158 try { 159 completed = workflowInstance.start(); 160 } 161 catch (WorkflowException e) { 162 throw new CommandException(e); 163 } 164 wfJob.setStatus(WorkflowJob.Status.RUNNING); 165 wfJob.setStartTime(new Date()); 166 wfJob.setWorkflowInstance(workflowInstance); 167 generateEvent = true; 168 // 1. Add SLA status event for WF-JOB with status STARTED 169 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId, Status.STARTED, 170 SlaAppType.WORKFLOW_JOB); 171 if (slaEvent != null) { 172 insertList.add(slaEvent); 173 } 174 // 2. Add SLA registration events for all WF_ACTIONS 175 createSLARegistrationForAllActions(workflowInstance.getApp().getDefinition(), wfJob.getUser(), 176 wfJob.getGroup(), wfJob.getConf()); 177 queue(new NotificationXCommand(wfJob)); 178 } 179 else { 180 throw new CommandException(ErrorCode.E0801, wfJob.getId()); 181 } 182 } 183 else { 184 WorkflowInstance.Status initialStatus = workflowInstance.getStatus(); 185 String skipVar = workflowInstance.getVar(wfAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR 186 + ReRunXCommand.TO_SKIP); 187 if (skipVar != null) { 188 skipAction = skipVar.equals("true"); 189 } 190 try { 191 completed = workflowInstance.signal(wfAction.getExecutionPath(), wfAction.getSignalValue()); 192 } 193 catch (WorkflowException e) { 194 LOG.error("Workflow action failed : " + e.getMessage(), e); 195 wfJob.setStatus(WorkflowJob.Status.valueOf(workflowInstance.getStatus().toString())); 196 completed = true; 197 } 198 wfJob.setWorkflowInstance(workflowInstance); 199 wfAction.resetPending(); 200 if (!skipAction) { 201 wfAction.setTransition(workflowInstance.getTransition(wfAction.getName())); 202 queue(new NotificationXCommand(wfJob, wfAction)); 203 } 204 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_PENDING_TRANS, 205 wfAction)); 206 WorkflowInstance.Status endStatus = workflowInstance.getStatus(); 207 if (endStatus != initialStatus) { 208 generateEvent = true; 209 } 210 } 211 212 if (completed) { 213 try { 214 for (String actionToKillId : WorkflowStoreService.getActionsToKill(workflowInstance)) { 215 WorkflowActionBean actionToKill; 216 217 actionToKill = WorkflowActionQueryExecutor.getInstance().get( 218 WorkflowActionQuery.GET_ACTION_ID_TYPE_LASTCHECK, actionToKillId); 219 220 actionToKill.setPending(); 221 actionToKill.setStatus(WorkflowActionBean.Status.KILLED); 222 updateList.add(new UpdateEntry<WorkflowActionQuery>( 223 WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, actionToKill)); 224 queue(new ActionKillXCommand(actionToKill.getId(), actionToKill.getType())); 225 } 226 227 for (String actionToFailId : WorkflowStoreService.getActionsToFail(workflowInstance)) { 228 WorkflowActionBean actionToFail = WorkflowActionQueryExecutor.getInstance().get( 229 WorkflowActionQuery.GET_ACTION_FAIL, actionToFailId); 230 actionToFail.resetPending(); 231 actionToFail.setStatus(WorkflowActionBean.Status.FAILED); 232 if (wfJobErrorCode != null) { 233 wfJobErrorCode = actionToFail.getErrorCode(); 234 wfJobErrorMsg = actionToFail.getErrorMessage(); 235 } 236 queue(new NotificationXCommand(wfJob, actionToFail)); 237 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), 238 Status.FAILED, SlaAppType.WORKFLOW_ACTION); 239 if (slaEvent != null) { 240 insertList.add(slaEvent); 241 } 242 updateList.add(new UpdateEntry<WorkflowActionQuery>( 243 WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, actionToFail)); 244 } 245 } 246 catch (JPAExecutorException je) { 247 throw new CommandException(je); 248 } 249 250 wfJob.setStatus(WorkflowJob.Status.valueOf(workflowInstance.getStatus().toString())); 251 wfJob.setEndTime(new Date()); 252 wfJob.setWorkflowInstance(workflowInstance); 253 Status slaStatus = Status.SUCCEEDED; 254 switch (wfJob.getStatus()) { 255 case SUCCEEDED: 256 slaStatus = Status.SUCCEEDED; 257 break; 258 case KILLED: 259 slaStatus = Status.KILLED; 260 break; 261 case FAILED: 262 slaStatus = Status.FAILED; 263 break; 264 default: // TODO SUSPENDED 265 break; 266 } 267 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId, slaStatus, 268 SlaAppType.WORKFLOW_JOB); 269 if (slaEvent != null) { 270 insertList.add(slaEvent); 271 } 272 queue(new NotificationXCommand(wfJob)); 273 if (wfJob.getStatus() == WorkflowJob.Status.SUCCEEDED) { 274 InstrumentUtils.incrJobCounter(INSTR_SUCCEEDED_JOBS_COUNTER_NAME, 1, getInstrumentation()); 275 } 276 277 // output message for Kill node 278 if (wfAction != null) { // wfAction could be a no-op job 279 NodeDef nodeDef = workflowInstance.getNodeDef(wfAction.getExecutionPath()); 280 if (nodeDef != null && nodeDef instanceof KillNodeDef) { 281 boolean isRetry = false; 282 boolean isUserRetry = false; 283 ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, 284 isUserRetry); 285 InstrumentUtils.incrJobCounter(INSTR_KILLED_JOBS_COUNTER_NAME, 1, getInstrumentation()); 286 try { 287 String tmpNodeConf = nodeDef.getConf(); 288 String actionConf = context.getELEvaluator().evaluate(tmpNodeConf, String.class); 289 LOG.debug( 290 "Try to resolve KillNode message for jobid [{0}], actionId [{1}], before resolve [{2}], " 291 + "after resolve [{3}]", jobId, actionId, tmpNodeConf, actionConf); 292 if (wfAction.getErrorCode() != null) { 293 wfAction.setErrorInfo(wfAction.getErrorCode(), actionConf); 294 } 295 else { 296 wfAction.setErrorInfo(ErrorCode.E0729.toString(), actionConf); 297 } 298 updateList.add(new UpdateEntry<WorkflowActionQuery>( 299 WorkflowActionQuery.UPDATE_ACTION_PENDING_TRANS_ERROR, wfAction)); 300 } 301 catch (Exception ex) { 302 LOG.warn("Exception in SignalXCommand ", ex.getMessage(), ex); 303 throw new CommandException(ErrorCode.E0729, wfAction.getName(), ex); 304 } 305 } 306 } 307 308 } 309 else { 310 for (WorkflowActionBean newAction : WorkflowStoreService.getActionsToStart(workflowInstance)) { 311 String skipVar = workflowInstance.getVar(newAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR 312 + ReRunXCommand.TO_SKIP); 313 boolean skipNewAction = false, suspendNewAction = false; 314 if (skipVar != null) { 315 skipNewAction = skipVar.equals("true"); 316 } 317 318 if (skipNewAction) { 319 WorkflowActionBean oldAction = new WorkflowActionBean(); 320 oldAction.setId(newAction.getId()); 321 oldAction.setPending(); 322 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_PENDING, 323 oldAction)); 324 queue(new SignalXCommand(jobId, oldAction.getId())); 325 } 326 else { 327 try { 328 // Make sure that transition node for a forked action 329 // is inserted only once 330 WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_ID_TYPE_LASTCHECK, 331 newAction.getId()); 332 333 continue; 334 } 335 catch (JPAExecutorException jee) { 336 } 337 suspendNewAction = checkForSuspendNode(newAction); 338 newAction.setPending(); 339 String actionSlaXml = getActionSLAXml(newAction.getName(), workflowInstance.getApp() 340 .getDefinition(), wfJob.getConf()); 341 newAction.setSlaXml(actionSlaXml); 342 newAction.setCreatedTime(new Date()); 343 insertList.add(newAction); 344 LOG.debug("SignalXCommand: Name: " + newAction.getName() + ", Id: " + newAction.getId() 345 + ", Authcode:" + newAction.getCred()); 346 if (wfAction != null) { // null during wf job submit 347 ActionService as = Services.get().get(ActionService.class); 348 ActionExecutor current = as.getExecutor(wfAction.getType()); 349 LOG.trace("Current Action Type:" + current.getClass()); 350 if (!suspendNewAction) { 351 if (!(current instanceof ForkActionExecutor) && !(current instanceof StartActionExecutor)) { 352 // Excluding :start: here from executing first action synchronously since it 353 // blocks the consumer thread till the action is submitted to Hadoop, 354 // in turn reducing the number of new submissions the threads can accept. 355 // Would also be susceptible to longer delays in case Hadoop cluster is busy. 356 syncAction = newAction; 357 } 358 else { 359 queue(new ActionStartXCommand(newAction.getId(), newAction.getType())); 360 } 361 } 362 } 363 else { 364 syncAction = newAction; // first action after wf submit should always be sync 365 } 366 } 367 } 368 } 369 370 try { 371 wfJob.setLastModifiedTime(new Date()); 372 updateList.add(new UpdateEntry<WorkflowJobQuery>( 373 WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob)); 374 // call JPAExecutor to do the bulk writes 375 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); 376 if (prevStatus != wfJob.getStatus()) { 377 LOG.debug("Updated the workflow status to " + wfJob.getId() + " status =" + wfJob.getStatusStr()); 378 } 379 if (generateEvent && EventHandlerService.isEnabled()) { 380 generateEvent(wfJob, wfJobErrorCode, wfJobErrorMsg); 381 } 382 } 383 catch (JPAExecutorException je) { 384 throw new CommandException(je); 385 } 386 // Changing to synchronous call from asynchronous queuing to prevent 387 // undue delay from between end of previous and start of next action 388 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING 389 && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) { 390 // only for asynchronous actions, parent coord action's external id will 391 // persisted and following update will succeed. 392 updateParentIfNecessary(wfJob); 393 new WfEndXCommand(wfJob).call(); // To delete the WF temp dir 394 } 395 else if (syncAction != null) { 396 new ActionStartXCommand(wfJob, syncAction.getId(), syncAction.getType()).call(getEntityKey()); 397 } 398 LOG.debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" + actionId); 399 return null; 400 } 401 402 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) { 403 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group); 404 for (Map.Entry<String, String> entry : conf) { 405 eval.setVariable(entry.getKey(), entry.getValue()); 406 } 407 return eval; 408 } 409 410 @SuppressWarnings("unchecked") 411 private String getActionSLAXml(String actionName, String wfXml, String wfConf) throws CommandException { 412 String slaXml = null; 413 try { 414 Element eWfJob = XmlUtils.parseXml(wfXml); 415 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 416 if (action.getAttributeValue("name").equals(actionName) == false) { 417 continue; 418 } 419 Element eSla = XmlUtils.getSLAElement(action); 420 if (eSla != null) { 421 slaXml = XmlUtils.prettyPrint(eSla).toString(); 422 break; 423 } 424 } 425 } 426 catch (Exception e) { 427 throw new CommandException(ErrorCode.E1004, e.getMessage(), e); 428 } 429 return slaXml; 430 } 431 432 private String resolveSla(Element eSla, Configuration conf) throws CommandException { 433 String slaXml = null; 434 try { 435 ELEvaluator evalSla = SubmitXCommand.createELEvaluatorForGroup(conf, "wf-sla-submit"); 436 slaXml = SubmitXCommand.resolveSla(eSla, evalSla); 437 } 438 catch (Exception e) { 439 throw new CommandException(ErrorCode.E1004, e.getMessage(), e); 440 } 441 return slaXml; 442 } 443 444 @SuppressWarnings("unchecked") 445 private void createSLARegistrationForAllActions(String wfXml, String user, String group, String strConf) 446 throws CommandException { 447 try { 448 Element eWfJob = XmlUtils.parseXml(wfXml); 449 Configuration conf = new XConfiguration(new StringReader(strConf)); 450 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 451 Element eSla = XmlUtils.getSLAElement(action); 452 if (eSla != null) { 453 String slaXml = resolveSla(eSla, conf); 454 eSla = XmlUtils.parseXml(slaXml); 455 String actionId = Services.get().get(UUIDService.class) 456 .generateChildId(jobId, action.getAttributeValue("name") + ""); 457 SLAEventBean slaEvent = SLADbXOperations.createSlaRegistrationEvent(eSla, actionId, 458 SlaAppType.WORKFLOW_ACTION, user, group); 459 if (slaEvent != null) { 460 insertList.add(slaEvent); 461 } 462 } 463 } 464 } 465 catch (Exception e) { 466 throw new CommandException(ErrorCode.E1007, "workflow:Actions " + jobId, e.getMessage(), e); 467 } 468 469 } 470 471 private boolean checkForSuspendNode(WorkflowActionBean newAction) { 472 boolean suspendNewAction = false; 473 try { 474 XConfiguration wfjobConf = new XConfiguration(new StringReader(wfJob.getConf())); 475 String[] values = wfjobConf.getTrimmedStrings(OozieClient.OOZIE_SUSPEND_ON_NODES); 476 if (values != null) { 477 if (values.length == 1 && values[0].equals("*")) { 478 LOG.info("Reached suspend node at [{0}], suspending workflow [{1}]", newAction.getName(), 479 wfJob.getId()); 480 queue(new SuspendXCommand(jobId)); 481 suspendNewAction = true; 482 } 483 else { 484 for (String suspendPoint : values) { 485 if (suspendPoint.equals(newAction.getName())) { 486 LOG.info("Reached suspend node at [{0}], suspending workflow [{1}]", newAction.getName(), 487 wfJob.getId()); 488 queue(new SuspendXCommand(jobId)); 489 suspendNewAction = true; 490 break; 491 } 492 } 493 } 494 } 495 } 496 catch (IOException ex) { 497 LOG.warn("Error reading " + OozieClient.OOZIE_SUSPEND_ON_NODES + ", ignoring [{0}]", ex.getMessage()); 498 } 499 return suspendNewAction; 500 } 501 502}