001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.wf; 020 021import java.io.IOException; 022 023import org.apache.hadoop.conf.Configuration; 024import org.apache.oozie.action.ActionExecutor; 025import org.apache.oozie.action.control.ForkActionExecutor; 026import org.apache.oozie.action.control.StartActionExecutor; 027import org.apache.oozie.action.oozie.SubWorkflowActionExecutor; 028import org.apache.oozie.client.Job; 029import org.apache.oozie.client.WorkflowAction; 030import org.apache.oozie.client.WorkflowJob; 031import org.apache.oozie.client.SLAEvent.SlaAppType; 032import org.apache.oozie.client.SLAEvent.Status; 033import org.apache.oozie.client.rest.JsonBean; 034import org.apache.oozie.SLAEventBean; 035import org.apache.oozie.WorkflowActionBean; 036import org.apache.oozie.WorkflowJobBean; 037import org.apache.oozie.ErrorCode; 038import org.apache.oozie.XException; 039import org.apache.oozie.command.CommandException; 040import org.apache.oozie.command.PreconditionException; 041import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext; 042import org.apache.oozie.command.wf.ActionXCommand.ForkedActionExecutorContext; 043import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 044import org.apache.oozie.executor.jpa.BatchQueryExecutor; 045import org.apache.oozie.executor.jpa.JPAExecutorException; 046import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; 047import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; 048import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 049import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 050import org.apache.oozie.service.ActionService; 051import org.apache.oozie.service.CallableQueueService; 052import org.apache.oozie.service.CallableQueueService.CallableWrapper; 053import org.apache.oozie.service.ConfigurationService; 054import org.apache.oozie.service.ELService; 055import org.apache.oozie.service.EventHandlerService; 056import org.apache.oozie.service.JPAService; 057import org.apache.oozie.service.Services; 058import org.apache.oozie.service.UUIDService; 059import org.apache.oozie.service.WorkflowStoreService; 060import org.apache.oozie.workflow.WorkflowException; 061import org.apache.oozie.workflow.WorkflowInstance; 062import org.apache.oozie.workflow.lite.KillNodeDef; 063import org.apache.oozie.workflow.lite.NodeDef; 064import org.apache.oozie.util.ELEvaluator; 065import org.apache.oozie.util.InstrumentUtils; 066import org.apache.oozie.util.LogUtils; 067import org.apache.oozie.util.XConfiguration; 068import org.apache.oozie.util.ParamChecker; 069import org.apache.oozie.util.XmlUtils; 070import org.apache.oozie.util.db.SLADbXOperations; 071import org.jdom.Element; 072 073import java.io.StringReader; 074import java.util.ArrayList; 075import java.util.Date; 076import java.util.List; 077import java.util.Map; 078import java.util.concurrent.Future; 079 080import org.apache.oozie.client.OozieClient; 081 082@SuppressWarnings("deprecation") 083public class SignalXCommand extends WorkflowXCommand<Void> { 084 085 private JPAService jpaService = null; 086 private String jobId; 087 private String actionId; 088 private WorkflowJobBean wfJob; 089 private WorkflowActionBean wfAction; 090 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 091 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 092 private boolean generateEvent = false; 093 private String wfJobErrorCode; 094 private String wfJobErrorMsg; 095 public final static String FORK_PARALLEL_JOBSUBMISSION = "oozie.workflow.parallel.fork.action.start"; 096 097 public SignalXCommand(String name, int priority, String jobId) { 098 super(name, name, priority); 099 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 100 } 101 102 public SignalXCommand(String jobId, String actionId) { 103 this("signal", 1, jobId); 104 this.actionId = ParamChecker.notEmpty(actionId, "actionId"); 105 } 106 107 @Override 108 protected void setLogInfo() { 109 if (jobId != null) { 110 LogUtils.setLogInfo(jobId); 111 } 112 else if (actionId !=null) { 113 LogUtils.setLogInfo(actionId); 114 } 115 } 116 117 @Override 118 protected boolean isLockRequired() { 119 return true; 120 } 121 122 @Override 123 public String getEntityKey() { 124 return this.jobId; 125 } 126 127 @Override 128 public String getKey() { 129 return getName() + "_" + jobId + "_" + actionId; 130 } 131 132 @Override 133 protected void loadState() throws CommandException { 134 try { 135 jpaService = Services.get().get(JPAService.class); 136 if (jpaService != null) { 137 this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId); 138 LogUtils.setLogInfo(wfJob); 139 if (actionId != null) { 140 this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_SIGNAL, actionId); 141 LogUtils.setLogInfo(wfAction); 142 } 143 } 144 else { 145 throw new CommandException(ErrorCode.E0610); 146 } 147 } 148 catch (XException ex) { 149 throw new CommandException(ex); 150 } 151 } 152 153 @Override 154 protected void verifyPrecondition() throws CommandException, PreconditionException { 155 if ((wfAction == null) || (wfAction.isComplete() && wfAction.isPending())) { 156 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.PREP) { 157 throw new PreconditionException(ErrorCode.E0813, wfJob.getStatusStr()); 158 } 159 } 160 else { 161 throw new PreconditionException(ErrorCode.E0814, actionId, wfAction.getStatusStr(), wfAction.isPending()); 162 } 163 } 164 165 @Override 166 protected Void execute() throws CommandException { 167 168 LOG.debug("STARTED SignalCommand for jobid=" + jobId + ", actionId=" + actionId); 169 WorkflowInstance workflowInstance = wfJob.getWorkflowInstance(); 170 workflowInstance.setTransientVar(WorkflowStoreService.WORKFLOW_BEAN, wfJob); 171 WorkflowJob.Status prevStatus = wfJob.getStatus(); 172 boolean completed = false, skipAction = false; 173 WorkflowActionBean syncAction = null; 174 List<WorkflowActionBean> workflowActionBeanListForForked = new ArrayList<WorkflowActionBean>(); 175 176 177 if (wfAction == null) { 178 if (wfJob.getStatus() == WorkflowJob.Status.PREP) { 179 try { 180 completed = workflowInstance.start(); 181 } 182 catch (WorkflowException e) { 183 throw new CommandException(e); 184 } 185 wfJob.setStatus(WorkflowJob.Status.RUNNING); 186 wfJob.setStartTime(new Date()); 187 wfJob.setWorkflowInstance(workflowInstance); 188 generateEvent = true; 189 // 1. Add SLA status event for WF-JOB with status STARTED 190 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId, Status.STARTED, 191 SlaAppType.WORKFLOW_JOB); 192 if (slaEvent != null) { 193 insertList.add(slaEvent); 194 } 195 // 2. Add SLA registration events for all WF_ACTIONS 196 createSLARegistrationForAllActions(workflowInstance.getApp().getDefinition(), wfJob.getUser(), 197 wfJob.getGroup(), wfJob.getConf()); 198 queue(new WorkflowNotificationXCommand(wfJob)); 199 } 200 else { 201 throw new CommandException(ErrorCode.E0801, wfJob.getId()); 202 } 203 } 204 else { 205 WorkflowInstance.Status initialStatus = workflowInstance.getStatus(); 206 String skipVar = workflowInstance.getVar(wfAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR 207 + ReRunXCommand.TO_SKIP); 208 if (skipVar != null) { 209 skipAction = skipVar.equals("true"); 210 } 211 try { 212 completed = workflowInstance.signal(wfAction.getExecutionPath(), wfAction.getSignalValue()); 213 } 214 catch (WorkflowException e) { 215 LOG.error("Workflow action failed : " + e.getMessage(), e); 216 wfJob.setStatus(WorkflowJob.Status.valueOf(workflowInstance.getStatus().toString())); 217 completed = true; 218 } 219 wfJob.setWorkflowInstance(workflowInstance); 220 wfAction.resetPending(); 221 if (!skipAction) { 222 wfAction.setTransition(workflowInstance.getTransition(wfAction.getName())); 223 queue(new WorkflowNotificationXCommand(wfJob, wfAction)); 224 } 225 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_PENDING_TRANS, 226 wfAction)); 227 WorkflowInstance.Status endStatus = workflowInstance.getStatus(); 228 if (endStatus != initialStatus) { 229 generateEvent = true; 230 } 231 } 232 233 if (completed) { 234 try { 235 for (String actionToKillId : WorkflowStoreService.getActionsToKill(workflowInstance)) { 236 WorkflowActionBean actionToKill; 237 238 actionToKill = WorkflowActionQueryExecutor.getInstance().get( 239 WorkflowActionQuery.GET_ACTION_ID_TYPE_LASTCHECK, actionToKillId); 240 241 actionToKill.setPending(); 242 actionToKill.setStatus(WorkflowActionBean.Status.KILLED); 243 updateList.add(new UpdateEntry<WorkflowActionQuery>( 244 WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, actionToKill)); 245 queue(new ActionKillXCommand(actionToKill.getId(), actionToKill.getType())); 246 } 247 248 for (String actionToFailId : WorkflowStoreService.getActionsToFail(workflowInstance)) { 249 WorkflowActionBean actionToFail = WorkflowActionQueryExecutor.getInstance().get( 250 WorkflowActionQuery.GET_ACTION_FAIL, actionToFailId); 251 actionToFail.resetPending(); 252 actionToFail.setStatus(WorkflowActionBean.Status.FAILED); 253 if (wfJobErrorCode != null) { 254 wfJobErrorCode = actionToFail.getErrorCode(); 255 wfJobErrorMsg = actionToFail.getErrorMessage(); 256 } 257 queue(new WorkflowNotificationXCommand(wfJob, actionToFail)); 258 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), 259 Status.FAILED, SlaAppType.WORKFLOW_ACTION); 260 if (slaEvent != null) { 261 insertList.add(slaEvent); 262 } 263 updateList.add(new UpdateEntry<WorkflowActionQuery>( 264 WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, actionToFail)); 265 } 266 } 267 catch (JPAExecutorException je) { 268 throw new CommandException(je); 269 } 270 271 wfJob.setStatus(WorkflowJob.Status.valueOf(workflowInstance.getStatus().toString())); 272 wfJob.setEndTime(new Date()); 273 wfJob.setWorkflowInstance(workflowInstance); 274 Status slaStatus = Status.SUCCEEDED; 275 switch (wfJob.getStatus()) { 276 case SUCCEEDED: 277 slaStatus = Status.SUCCEEDED; 278 break; 279 case KILLED: 280 slaStatus = Status.KILLED; 281 break; 282 case FAILED: 283 slaStatus = Status.FAILED; 284 break; 285 default: // TODO SUSPENDED 286 break; 287 } 288 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId, slaStatus, 289 SlaAppType.WORKFLOW_JOB); 290 if (slaEvent != null) { 291 insertList.add(slaEvent); 292 } 293 queue(new WorkflowNotificationXCommand(wfJob)); 294 if (wfJob.getStatus() == WorkflowJob.Status.SUCCEEDED) { 295 InstrumentUtils.incrJobCounter(INSTR_SUCCEEDED_JOBS_COUNTER_NAME, 1, getInstrumentation()); 296 } 297 298 // output message for Kill node 299 if (wfAction != null) { // wfAction could be a no-op job 300 NodeDef nodeDef = workflowInstance.getNodeDef(wfAction.getExecutionPath()); 301 if (nodeDef != null && nodeDef instanceof KillNodeDef) { 302 boolean isRetry = false; 303 boolean isUserRetry = false; 304 ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, 305 isUserRetry); 306 InstrumentUtils.incrJobCounter(INSTR_KILLED_JOBS_COUNTER_NAME, 1, getInstrumentation()); 307 try { 308 String tmpNodeConf = nodeDef.getConf(); 309 String message = context.getELEvaluator().evaluate(tmpNodeConf, String.class); 310 LOG.debug( 311 "Try to resolve KillNode message for jobid [{0}], actionId [{1}], before resolve [{2}], " 312 + "after resolve [{3}]", jobId, actionId, tmpNodeConf, message); 313 if (wfAction.getErrorCode() != null) { 314 wfAction.setErrorInfo(wfAction.getErrorCode(), message); 315 } 316 else { 317 wfAction.setErrorInfo(ErrorCode.E0729.toString(), message); 318 } 319 } 320 catch (Exception ex) { 321 LOG.warn("Exception in SignalXCommand when processing Kill node message: {0}", ex.getMessage(), ex); 322 wfAction.setErrorInfo(ErrorCode.E0756.toString(), ErrorCode.E0756.format(ex.getMessage())); 323 wfAction.setStatus(WorkflowAction.Status.ERROR); 324 } 325 updateList.add(new UpdateEntry<WorkflowActionQuery>( 326 WorkflowActionQuery.UPDATE_ACTION_PENDING_TRANS_ERROR, wfAction)); 327 } 328 } 329 330 } 331 else { 332 for (WorkflowActionBean newAction : WorkflowStoreService.getActionsToStart(workflowInstance)) { 333 boolean isOldWFAction = false; 334 335 // In case of subworkflow rerun when failed option have been provided, rerun command do not delete 336 // old action. To avoid twice entry for same action, Checking in Db if the workflow action already exist. 337 if(SubWorkflowActionExecutor.ACTION_TYPE.equals(newAction.getType())) { 338 try { 339 WorkflowActionBean oldAction = WorkflowActionQueryExecutor.getInstance() 340 .get(WorkflowActionQuery.GET_ACTION_CHECK, 341 newAction.getId()); 342 newAction.setExternalId(oldAction.getExternalId()); 343 newAction.setCreatedTime(oldAction.getCreatedTime()); 344 isOldWFAction = true; 345 } catch (JPAExecutorException e) { 346 if(e.getErrorCode() != ErrorCode.E0605) { 347 throw new CommandException(e); 348 } 349 } 350 } 351 352 String skipVar = workflowInstance.getVar(newAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR 353 + ReRunXCommand.TO_SKIP); 354 boolean skipNewAction = false, suspendNewAction = false; 355 if (skipVar != null) { 356 skipNewAction = skipVar.equals("true"); 357 } 358 359 if (skipNewAction) { 360 WorkflowActionBean oldAction = new WorkflowActionBean(); 361 oldAction.setId(newAction.getId()); 362 oldAction.setPending(); 363 oldAction.setExecutionPath(newAction.getExecutionPath()); 364 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_PENDING, 365 oldAction)); 366 queue(new SignalXCommand(jobId, oldAction.getId())); 367 } 368 else { 369 if(!skipAction) { 370 try { 371 // Make sure that transition node for a forked action 372 // is inserted only once 373 WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_ID_TYPE_LASTCHECK, 374 newAction.getId()); 375 376 continue; 377 } catch (JPAExecutorException jee) { 378 } 379 } 380 suspendNewAction = checkForSuspendNode(newAction); 381 newAction.setPending(); 382 String actionSlaXml = getActionSLAXml(newAction.getName(), workflowInstance.getApp() 383 .getDefinition(), wfJob.getConf()); 384 newAction.setSlaXml(actionSlaXml); 385 if(!isOldWFAction) { 386 newAction.setCreatedTime(new Date()); 387 insertList.add(newAction); 388 } else { 389 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, 390 newAction)); 391 } 392 LOG.debug("SignalXCommand: Name: " + newAction.getName() + ", Id: " + newAction.getId() 393 + ", Authcode:" + newAction.getCred()); 394 if (wfAction != null) { // null during wf job submit 395 ActionService as = Services.get().get(ActionService.class); 396 ActionExecutor current = as.getExecutor(wfAction.getType()); 397 LOG.trace("Current Action Type:" + current.getClass()); 398 if (!suspendNewAction) { 399 if (current instanceof StartActionExecutor) { 400 // Excluding :start: here from executing first action synchronously since it 401 // blocks the consumer thread till the action is submitted to Hadoop, 402 // in turn reducing the number of new submissions the threads can accept. 403 // Would also be susceptible to longer delays in case Hadoop cluster is busy. 404 queue(new ActionStartXCommand(newAction.getId(), newAction.getType())); 405 } 406 else if (current instanceof ForkActionExecutor) { 407 if (ConfigurationService.getBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION)) { 408 workflowActionBeanListForForked.add(newAction); 409 } 410 else { 411 queue(new ActionStartXCommand(newAction.getId(), newAction.getType())); 412 413 } 414 } 415 else { 416 syncAction = newAction; 417 } 418 } 419 else { 420 // suspend check will happen later... where if one of action is suspended all forked action 421 // will be ignored. 422 if (ConfigurationService.getBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION)) { 423 workflowActionBeanListForForked.add(newAction); 424 } 425 } 426 } 427 else { 428 syncAction = newAction; // first action after wf submit should always be sync 429 } 430 } 431 } 432 } 433 434 try { 435 wfJob.setLastModifiedTime(new Date()); 436 updateList.add(new UpdateEntry<WorkflowJobQuery>( 437 WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob)); 438 // call JPAExecutor to do the bulk writes 439 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); 440 if (prevStatus != wfJob.getStatus()) { 441 LOG.debug("Updated the workflow status to " + wfJob.getId() + " status =" + wfJob.getStatusStr()); 442 } 443 if (generateEvent && EventHandlerService.isEnabled()) { 444 generateEvent(wfJob, wfJobErrorCode, wfJobErrorMsg); 445 } 446 } 447 catch (JPAExecutorException je) { 448 throw new CommandException(je); 449 } 450 // Changing to synchronous call from asynchronous queuing to prevent 451 // undue delay from between end of previous and start of next action 452 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) { 453 // only for asynchronous actions, parent coord action's external id will 454 // persisted and following update will succeed. 455 updateParentIfNecessary(wfJob); 456 new WfEndXCommand(wfJob).call(); // To delete the WF temp dir 457 } 458 else if (syncAction != null) { 459 new ActionStartXCommand(wfJob, syncAction.getId(), syncAction.getType()).call(); 460 } 461 else if (!workflowActionBeanListForForked.isEmpty() && !checkForSuspendNode(workflowActionBeanListForForked)) { 462 startForkedActions(workflowActionBeanListForForked); 463 } 464 LOG.debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" + actionId); 465 return null; 466 } 467 468 public void startForkedActions(List<WorkflowActionBean> workflowActionBeanListForForked) throws CommandException { 469 470 List<CallableWrapper<ActionExecutorContext>> tasks = new ArrayList<CallableWrapper<ActionExecutorContext>>(); 471 List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 472 List<JsonBean> insertList = new ArrayList<JsonBean>(); 473 474 boolean endWorkflow = false; 475 boolean submitJobByQueuing = false; 476 for (WorkflowActionBean workflowActionBean : workflowActionBeanListForForked) { 477 LOG.debug("Starting forked actions parallely : " + workflowActionBean.getId()); 478 tasks.add(Services.get().get(CallableQueueService.class).new CallableWrapper<ActionExecutorContext>( 479 new ForkedActionStartXCommand(wfJob, workflowActionBean.getId(), workflowActionBean.getType()), 0)); 480 } 481 482 try { 483 List<Future<ActionExecutorContext>> futures = Services.get().get(CallableQueueService.class) 484 .invokeAll(tasks); 485 for (Future<ActionExecutorContext> result : futures) { 486 if (result == null) { 487 submitJobByQueuing = true; 488 continue; 489 } 490 ActionExecutorContext context = result.get(); 491 Map<String, String> contextVariableMap = ((ForkedActionExecutorContext) context).getContextMap(); 492 LOG.debug("contextVariableMap size of action " + context.getAction().getId() + " is " + contextVariableMap.size()); 493 for (String key : contextVariableMap.keySet()) { 494 context.setVarToWorkflow(key, contextVariableMap.get(key)); 495 } 496 if (context.getJobStatus() != null && context.getJobStatus().equals(Job.Status.FAILED)) { 497 LOG.warn("Action has failed, failing job" + context.getAction().getId()); 498 new ActionStartXCommand(context.getAction().getId(), null).failJob(context); 499 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, 500 (WorkflowActionBean) context.getAction())); 501 if (context.isShouldEndWF()) { 502 endWorkflow = true; 503 } 504 } 505 if (context.getJobStatus() != null && context.getJobStatus().equals(Job.Status.SUSPENDED)) { 506 LOG.warn("Action has failed, failing job" + context.getAction().getId()); 507 new ActionStartXCommand(context.getAction().getId(), null).handleNonTransient(context, null, 508 WorkflowAction.Status.START_MANUAL); 509 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, 510 (WorkflowActionBean) context.getAction())); 511 if (context.isShouldEndWF()) { 512 endWorkflow = true; 513 } 514 } 515 } 516 if (endWorkflow) { 517 endWF(insertList); 518 } 519 520 } 521 catch (Exception e) { 522 LOG.error("Error running forked jobs parallely", e); 523 startForkedActionsByQueuing(workflowActionBeanListForForked); 524 submitJobByQueuing = false; 525 } 526 if (submitJobByQueuing && !endWorkflow) { 527 LOG.error("There is error in running forked jobs parallely"); 528 startForkedActionsByQueuing(workflowActionBeanListForForked); 529 } 530 wfJob.setLastModifiedTime(new Date()); 531 updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, 532 wfJob)); 533 try { 534 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); 535 } 536 catch (JPAExecutorException e) { 537 throw new CommandException(e); 538 } 539 540 LOG.debug("forked actions submitted parallely"); 541 } 542 543 public void startForkedActionsByQueuing(List<WorkflowActionBean> workflowActionBeanListForForked) throws CommandException { 544 //queuing all jobs, submitted job will fail in precondition 545 for (WorkflowActionBean workflowActionBean : workflowActionBeanListForForked) { 546 LOG.debug("Queuing fork action " + workflowActionBean.getId()); 547 queue(new ActionStartXCommand(workflowActionBean.getId(), workflowActionBean.getType())); 548 } 549 } 550 551 private void endWF(List<JsonBean> insertList) throws CommandException { 552 updateParentIfNecessary(wfJob, 3); 553 new WfEndXCommand(wfJob).call(); // To delete the WF temp dir 554 SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED, 555 SlaAppType.WORKFLOW_JOB); 556 if (slaEvent2 != null) { 557 insertList.add(slaEvent2); 558 } 559 } 560 561 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) { 562 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group); 563 for (Map.Entry<String, String> entry : conf) { 564 eval.setVariable(entry.getKey(), entry.getValue()); 565 } 566 return eval; 567 } 568 569 @SuppressWarnings("unchecked") 570 private String getActionSLAXml(String actionName, String wfXml, String wfConf) throws CommandException { 571 String slaXml = null; 572 try { 573 Element eWfJob = XmlUtils.parseXml(wfXml); 574 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 575 if (action.getAttributeValue("name").equals(actionName) == false) { 576 continue; 577 } 578 Element eSla = XmlUtils.getSLAElement(action); 579 if (eSla != null) { 580 slaXml = XmlUtils.prettyPrint(eSla).toString(); 581 break; 582 } 583 } 584 } 585 catch (Exception e) { 586 throw new CommandException(ErrorCode.E1004, e.getMessage(), e); 587 } 588 return slaXml; 589 } 590 591 private String resolveSla(Element eSla, Configuration conf) throws CommandException { 592 String slaXml = null; 593 try { 594 ELEvaluator evalSla = SubmitXCommand.createELEvaluatorForGroup(conf, "wf-sla-submit"); 595 slaXml = SubmitXCommand.resolveSla(eSla, evalSla); 596 } 597 catch (Exception e) { 598 throw new CommandException(ErrorCode.E1004, e.getMessage(), e); 599 } 600 return slaXml; 601 } 602 603 @SuppressWarnings("unchecked") 604 private void createSLARegistrationForAllActions(String wfXml, String user, String group, String strConf) 605 throws CommandException { 606 try { 607 Element eWfJob = XmlUtils.parseXml(wfXml); 608 Configuration conf = new XConfiguration(new StringReader(strConf)); 609 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 610 Element eSla = XmlUtils.getSLAElement(action); 611 if (eSla != null) { 612 String slaXml = resolveSla(eSla, conf); 613 eSla = XmlUtils.parseXml(slaXml); 614 String actionId = Services.get().get(UUIDService.class) 615 .generateChildId(jobId, action.getAttributeValue("name") + ""); 616 SLAEventBean slaEvent = SLADbXOperations.createSlaRegistrationEvent(eSla, actionId, 617 SlaAppType.WORKFLOW_ACTION, user, group); 618 if (slaEvent != null) { 619 insertList.add(slaEvent); 620 } 621 } 622 } 623 } 624 catch (Exception e) { 625 throw new CommandException(ErrorCode.E1007, "workflow:Actions " + jobId, e.getMessage(), e); 626 } 627 628 } 629 630 private boolean checkForSuspendNode(WorkflowActionBean newAction) { 631 boolean suspendNewAction = false; 632 try { 633 XConfiguration wfjobConf = new XConfiguration(new StringReader(wfJob.getConf())); 634 String[] values = wfjobConf.getTrimmedStrings(OozieClient.OOZIE_SUSPEND_ON_NODES); 635 if (values != null) { 636 if (values.length == 1 && values[0].equals("*")) { 637 LOG.info("Reached suspend node at [{0}], suspending workflow [{1}]", newAction.getName(), 638 wfJob.getId()); 639 queue(new SuspendXCommand(jobId)); 640 suspendNewAction = true; 641 } 642 else { 643 for (String suspendPoint : values) { 644 if (suspendPoint.equals(newAction.getName())) { 645 LOG.info("Reached suspend node at [{0}], suspending workflow [{1}]", newAction.getName(), 646 wfJob.getId()); 647 queue(new SuspendXCommand(jobId)); 648 suspendNewAction = true; 649 break; 650 } 651 } 652 } 653 } 654 } 655 catch (IOException ex) { 656 LOG.warn("Error reading " + OozieClient.OOZIE_SUSPEND_ON_NODES + ", ignoring [{0}]", ex.getMessage()); 657 } 658 return suspendNewAction; 659 } 660 661 662 663private boolean checkForSuspendNode(List<WorkflowActionBean> workflowActionBeanListForForked) { 664 for(WorkflowActionBean bean :workflowActionBeanListForForked) 665 if(checkForSuspendNode(bean)){ 666 return true; 667 } 668 return false; 669} 670 671 672}