001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.io.IOException; 021 import org.apache.hadoop.conf.Configuration; 022 import org.apache.oozie.client.WorkflowJob; 023 import org.apache.oozie.client.SLAEvent.SlaAppType; 024 import org.apache.oozie.client.SLAEvent.Status; 025 import org.apache.oozie.client.rest.JsonBean; 026 import org.apache.oozie.SLAEventBean; 027 import org.apache.oozie.WorkflowActionBean; 028 import org.apache.oozie.WorkflowJobBean; 029 import org.apache.oozie.ErrorCode; 030 import org.apache.oozie.XException; 031 import org.apache.oozie.command.CommandException; 032 import org.apache.oozie.command.PreconditionException; 033 import org.apache.oozie.command.coord.CoordActionUpdateXCommand; 034 import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext; 035 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 036 import org.apache.oozie.executor.jpa.JPAExecutorException; 037 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; 038 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 039 import org.apache.oozie.service.ELService; 040 import org.apache.oozie.service.EventHandlerService; 041 import org.apache.oozie.service.JPAService; 042 import org.apache.oozie.service.Services; 043 import org.apache.oozie.service.UUIDService; 044 import org.apache.oozie.service.WorkflowStoreService; 045 import org.apache.oozie.workflow.WorkflowException; 046 import org.apache.oozie.workflow.WorkflowInstance; 047 import org.apache.oozie.workflow.lite.KillNodeDef; 048 import org.apache.oozie.workflow.lite.NodeDef; 049 import org.apache.oozie.util.ELEvaluator; 050 import org.apache.oozie.util.InstrumentUtils; 051 import org.apache.oozie.util.LogUtils; 052 import org.apache.oozie.util.XConfiguration; 053 import org.apache.oozie.util.ParamChecker; 054 import org.apache.oozie.util.XmlUtils; 055 import org.apache.oozie.util.db.SLADbXOperations; 056 import org.jdom.Element; 057 import java.io.StringReader; 058 import java.util.ArrayList; 059 import java.util.Date; 060 import java.util.List; 061 import java.util.Map; 062 import org.apache.oozie.client.OozieClient; 063 064 @SuppressWarnings("deprecation") 065 public class SignalXCommand extends WorkflowXCommand<Void> { 066 067 protected static final String INSTR_SUCCEEDED_JOBS_COUNTER_NAME = "succeeded"; 068 069 private JPAService jpaService = null; 070 private String jobId; 071 private String actionId; 072 private WorkflowJobBean wfJob; 073 private WorkflowActionBean wfAction; 074 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 075 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 076 private boolean generateEvent = false; 077 private String wfJobErrorCode; 078 private String wfJobErrorMsg; 079 080 081 public SignalXCommand(String name, int priority, String jobId) { 082 super(name, name, priority); 083 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 084 } 085 086 public SignalXCommand(String jobId, String actionId) { 087 this("signal", 1, jobId); 088 this.actionId = ParamChecker.notEmpty(actionId, "actionId"); 089 } 090 091 @Override 092 protected boolean isLockRequired() { 093 return true; 094 } 095 096 @Override 097 public String getEntityKey() { 098 return this.jobId; 099 } 100 101 @Override 102 public String getKey() { 103 return getName() + "_" + jobId + "_" + actionId; 104 } 105 106 @Override 107 protected void loadState() throws CommandException { 108 try { 109 jpaService = Services.get().get(JPAService.class); 110 if (jpaService != null) { 111 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId)); 112 LogUtils.setLogInfo(wfJob, logInfo); 113 if (actionId != null) { 114 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId)); 115 LogUtils.setLogInfo(wfAction, logInfo); 116 } 117 } 118 else { 119 throw new CommandException(ErrorCode.E0610); 120 } 121 } 122 catch (XException ex) { 123 throw new CommandException(ex); 124 } 125 } 126 127 @Override 128 protected void verifyPrecondition() throws CommandException, PreconditionException { 129 if ((wfAction == null) || (wfAction.isComplete() && wfAction.isPending())) { 130 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.PREP) { 131 throw new PreconditionException(ErrorCode.E0813, wfJob.getStatusStr()); 132 } 133 } 134 else { 135 throw new PreconditionException(ErrorCode.E0814, actionId, wfAction.getStatusStr(), wfAction.isPending()); 136 } 137 } 138 139 @Override 140 protected Void execute() throws CommandException { 141 LOG.debug("STARTED SignalCommand for jobid=" + jobId + ", actionId=" + actionId); 142 WorkflowInstance workflowInstance = wfJob.getWorkflowInstance(); 143 workflowInstance.setTransientVar(WorkflowStoreService.WORKFLOW_BEAN, wfJob); 144 boolean completed = false; 145 boolean skipAction = false; 146 if (wfAction == null) { 147 if (wfJob.getStatus() == WorkflowJob.Status.PREP) { 148 try { 149 completed = workflowInstance.start(); 150 } 151 catch (WorkflowException e) { 152 throw new CommandException(e); 153 } 154 wfJob.setStatus(WorkflowJob.Status.RUNNING); 155 wfJob.setStartTime(new Date()); 156 wfJob.setWorkflowInstance(workflowInstance); 157 generateEvent = true; 158 // 1. Add SLA status event for WF-JOB with status STARTED 159 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId, 160 Status.STARTED, SlaAppType.WORKFLOW_JOB); 161 if(slaEvent != null) { 162 insertList.add(slaEvent); 163 } 164 // 2. Add SLA registration events for all WF_ACTIONS 165 createSLARegistrationForAllActions(workflowInstance.getApp().getDefinition(), wfJob.getUser(), wfJob 166 .getGroup(), wfJob.getConf()); 167 queue(new NotificationXCommand(wfJob)); 168 } 169 else { 170 throw new CommandException(ErrorCode.E0801, wfJob.getId()); 171 } 172 } 173 else { 174 WorkflowInstance.Status initialStatus = workflowInstance.getStatus(); 175 String skipVar = workflowInstance.getVar(wfAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR 176 + ReRunXCommand.TO_SKIP); 177 if (skipVar != null) { 178 skipAction = skipVar.equals("true"); 179 } 180 try { 181 completed = workflowInstance.signal(wfAction.getExecutionPath(), wfAction.getSignalValue()); 182 } 183 catch (WorkflowException e) { 184 throw new CommandException(e); 185 } 186 wfJob.setWorkflowInstance(workflowInstance); 187 wfAction.resetPending(); 188 if (!skipAction) { 189 wfAction.setTransition(workflowInstance.getTransition(wfAction.getName())); 190 queue(new NotificationXCommand(wfJob, wfAction)); 191 } 192 updateList.add(wfAction); 193 WorkflowInstance.Status endStatus = workflowInstance.getStatus(); 194 if (endStatus != initialStatus) { 195 generateEvent = true; 196 } 197 } 198 199 if (completed) { 200 try { 201 for (String actionToKillId : WorkflowStoreService.getActionsToKill(workflowInstance)) { 202 WorkflowActionBean actionToKill; 203 204 actionToKill = jpaService.execute(new WorkflowActionGetJPAExecutor(actionToKillId)); 205 206 actionToKill.setPending(); 207 actionToKill.setStatus(WorkflowActionBean.Status.KILLED); 208 updateList.add(actionToKill); 209 queue(new ActionKillXCommand(actionToKill.getId(), actionToKill.getType())); 210 } 211 212 for (String actionToFailId : WorkflowStoreService.getActionsToFail(workflowInstance)) { 213 WorkflowActionBean actionToFail = jpaService.execute(new WorkflowActionGetJPAExecutor( 214 actionToFailId)); 215 actionToFail.resetPending(); 216 actionToFail.setStatus(WorkflowActionBean.Status.FAILED); 217 if (wfJobErrorCode != null) { 218 wfJobErrorCode = actionToFail.getErrorCode(); 219 wfJobErrorMsg = actionToFail.getErrorMessage(); 220 } 221 queue(new NotificationXCommand(wfJob, actionToFail)); 222 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), 223 Status.FAILED, SlaAppType.WORKFLOW_ACTION); 224 if(slaEvent != null) { 225 insertList.add(slaEvent); 226 } 227 updateList.add(actionToFail); 228 } 229 } 230 catch (JPAExecutorException je) { 231 throw new CommandException(je); 232 } 233 234 wfJob.setStatus(WorkflowJob.Status.valueOf(workflowInstance.getStatus().toString())); 235 wfJob.setEndTime(new Date()); 236 wfJob.setWorkflowInstance(workflowInstance); 237 Status slaStatus = Status.SUCCEEDED; 238 switch (wfJob.getStatus()) { 239 case SUCCEEDED: 240 slaStatus = Status.SUCCEEDED; 241 break; 242 case KILLED: 243 slaStatus = Status.KILLED; 244 break; 245 case FAILED: 246 slaStatus = Status.FAILED; 247 break; 248 default: // TODO SUSPENDED 249 break; 250 } 251 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId, 252 slaStatus, SlaAppType.WORKFLOW_JOB); 253 if(slaEvent != null) { 254 insertList.add(slaEvent); 255 } 256 queue(new NotificationXCommand(wfJob)); 257 if (wfJob.getStatus() == WorkflowJob.Status.SUCCEEDED) { 258 InstrumentUtils.incrJobCounter(INSTR_SUCCEEDED_JOBS_COUNTER_NAME, 1, getInstrumentation()); 259 } 260 261 // output message for Kill node 262 if (wfAction != null) { // wfAction could be a no-op job 263 NodeDef nodeDef = workflowInstance.getNodeDef(wfAction.getExecutionPath()); 264 if (nodeDef != null && nodeDef instanceof KillNodeDef) { 265 boolean isRetry = false; 266 boolean isUserRetry = false; 267 ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, 268 isUserRetry); 269 try { 270 String tmpNodeConf = nodeDef.getConf(); 271 String actionConf = context.getELEvaluator().evaluate(tmpNodeConf, String.class); 272 LOG.debug("Try to resolve KillNode message for jobid [{0}], actionId [{1}], before resolve [{2}], after resolve [{3}]", 273 jobId, actionId, tmpNodeConf, actionConf); 274 if (wfAction.getErrorCode() != null) { 275 wfAction.setErrorInfo(wfAction.getErrorCode(), actionConf); 276 } 277 else { 278 wfAction.setErrorInfo(ErrorCode.E0729.toString(), actionConf); 279 } 280 updateList.add(wfAction); 281 } 282 catch (Exception ex) { 283 LOG.warn("Exception in SignalXCommand ", ex.getMessage(), ex); 284 throw new CommandException(ErrorCode.E0729, wfAction.getName(), ex); 285 } 286 } 287 } 288 289 } 290 else { 291 for (WorkflowActionBean newAction : WorkflowStoreService.getStartedActions(workflowInstance)) { 292 String skipVar = workflowInstance.getVar(newAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR 293 + ReRunXCommand.TO_SKIP); 294 boolean skipNewAction = false; 295 if (skipVar != null) { 296 skipNewAction = skipVar.equals("true"); 297 } 298 try { 299 if (skipNewAction) { 300 WorkflowActionBean oldAction; 301 302 oldAction = jpaService.execute(new WorkflowActionGetJPAExecutor(newAction.getId())); 303 304 oldAction.setPending(); 305 updateList.add(oldAction); 306 307 queue(new SignalXCommand(jobId, oldAction.getId())); 308 } 309 else { 310 checkForSuspendNode(newAction); 311 newAction.setPending(); 312 String actionSlaXml = getActionSLAXml(newAction.getName(), workflowInstance.getApp() 313 .getDefinition(), wfJob.getConf()); 314 newAction.setSlaXml(actionSlaXml); 315 insertList.add(newAction); 316 LOG.debug("SignalXCommand: Name: "+ newAction.getName() + ", Id: " +newAction.getId() + ", Authcode:" + newAction.getCred()); 317 queue(new ActionStartXCommand(newAction.getId(), newAction.getType())); 318 } 319 } 320 catch (JPAExecutorException je) { 321 throw new CommandException(je); 322 } 323 } 324 } 325 326 try { 327 wfJob.setLastModifiedTime(new Date()); 328 updateList.add(wfJob); 329 // call JPAExecutor to do the bulk writes 330 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList)); 331 if (generateEvent && EventHandlerService.isEnabled()) { 332 generateEvent(wfJob, wfJobErrorCode, wfJobErrorMsg); 333 } 334 } 335 catch (JPAExecutorException je) { 336 throw new CommandException(je); 337 } 338 LOG.debug( 339 "Updated the workflow status to " + wfJob.getId() + " status =" + wfJob.getStatusStr()); 340 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) { 341 // update coordinator action 342 new CoordActionUpdateXCommand(wfJob).call(); //Note: Called even if wf is not necessarily instantiated by coordinator 343 new WfEndXCommand(wfJob).call(); //To delete the WF temp dir 344 } 345 LOG.debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" + actionId); 346 return null; 347 } 348 349 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) { 350 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group); 351 for (Map.Entry<String, String> entry : conf) { 352 eval.setVariable(entry.getKey(), entry.getValue()); 353 } 354 return eval; 355 } 356 357 @SuppressWarnings("unchecked") 358 private String getActionSLAXml(String actionName, String wfXml, String wfConf) throws CommandException { 359 String slaXml = null; 360 try { 361 Element eWfJob = XmlUtils.parseXml(wfXml); 362 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 363 if (action.getAttributeValue("name").equals(actionName) == false) { 364 continue; 365 } 366 Element eSla = XmlUtils.getSLAElement(action); 367 if (eSla != null) { 368 slaXml = XmlUtils.prettyPrint(eSla).toString(); 369 break; 370 } 371 } 372 } 373 catch (Exception e) { 374 throw new CommandException(ErrorCode.E1004, e.getMessage(), e); 375 } 376 return slaXml; 377 } 378 379 private String resolveSla(Element eSla, Configuration conf) throws CommandException { 380 String slaXml = null; 381 try { 382 ELEvaluator evalSla = SubmitXCommand.createELEvaluatorForGroup(conf, "wf-sla-submit"); 383 slaXml = SubmitXCommand.resolveSla(eSla, evalSla); 384 } 385 catch (Exception e) { 386 throw new CommandException(ErrorCode.E1004, e.getMessage(), e); 387 } 388 return slaXml; 389 } 390 391 @SuppressWarnings("unchecked") 392 private void createSLARegistrationForAllActions(String wfXml, String user, String group, String strConf) 393 throws CommandException { 394 try { 395 Element eWfJob = XmlUtils.parseXml(wfXml); 396 Configuration conf = new XConfiguration(new StringReader(strConf)); 397 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 398 Element eSla = XmlUtils.getSLAElement(action); 399 if (eSla != null) { 400 String slaXml = resolveSla(eSla, conf); 401 eSla = XmlUtils.parseXml(slaXml); 402 String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, 403 action.getAttributeValue("name") + ""); 404 SLAEventBean slaEvent = SLADbXOperations.createSlaRegistrationEvent(eSla, actionId, 405 SlaAppType.WORKFLOW_ACTION, user, group); 406 if(slaEvent != null) { 407 insertList.add(slaEvent); 408 } 409 } 410 } 411 } 412 catch (Exception e) { 413 throw new CommandException(ErrorCode.E1007, "workflow:Actions " + jobId, e.getMessage(), e); 414 } 415 416 } 417 418 private void checkForSuspendNode(WorkflowActionBean newAction) { 419 try { 420 XConfiguration wfjobConf = new XConfiguration(new StringReader(wfJob.getConf())); 421 String[] values = wfjobConf.getTrimmedStrings(OozieClient.OOZIE_SUSPEND_ON_NODES); 422 if (values != null) { 423 if (values.length == 1 && values[0].equals("*")) { 424 LOG.info("Reached suspend node at [{0}], suspending workflow [{1}]", newAction.getName(), wfJob.getId()); 425 queue(new SuspendXCommand(jobId)); 426 } 427 else { 428 for (String suspendPoint : values) { 429 if (suspendPoint.equals(newAction.getName())) { 430 LOG.info("Reached suspend node at [{0}], suspending workflow [{1}]", newAction.getName(), 431 wfJob.getId()); 432 queue(new SuspendXCommand(jobId)); 433 break; 434 } 435 } 436 } 437 } 438 } 439 catch (IOException ex) { 440 LOG.warn("Error reading " + OozieClient.OOZIE_SUSPEND_ON_NODES + ", ignoring [{0}]", ex.getMessage()); 441 } 442 } 443 444 }