001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.oozie.client.WorkflowJob; 022 import org.apache.oozie.client.SLAEvent.SlaAppType; 023 import org.apache.oozie.client.SLAEvent.Status; 024 import org.apache.oozie.client.rest.JsonBean; 025 import org.apache.oozie.SLAEventBean; 026 import org.apache.oozie.WorkflowActionBean; 027 import org.apache.oozie.WorkflowJobBean; 028 import org.apache.oozie.ErrorCode; 029 import org.apache.oozie.XException; 030 import org.apache.oozie.command.CommandException; 031 import org.apache.oozie.command.PreconditionException; 032 import org.apache.oozie.command.coord.CoordActionUpdateXCommand; 033 import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext; 034 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 035 import org.apache.oozie.executor.jpa.JPAExecutorException; 036 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; 037 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 038 import org.apache.oozie.service.ELService; 039 import org.apache.oozie.service.JPAService; 040 import org.apache.oozie.service.SchemaService; 041 import org.apache.oozie.service.Services; 042 import org.apache.oozie.service.UUIDService; 043 import org.apache.oozie.service.WorkflowStoreService; 044 import org.apache.oozie.workflow.WorkflowException; 045 import org.apache.oozie.workflow.WorkflowInstance; 046 import org.apache.oozie.workflow.lite.KillNodeDef; 047 import org.apache.oozie.workflow.lite.NodeDef; 048 import org.apache.oozie.util.ELEvaluator; 049 import org.apache.oozie.util.InstrumentUtils; 050 import org.apache.oozie.util.LogUtils; 051 import org.apache.oozie.util.XConfiguration; 052 import org.apache.oozie.util.ParamChecker; 053 import org.apache.oozie.util.XmlUtils; 054 import org.apache.oozie.util.db.SLADbXOperations; 055 import org.jdom.Element; 056 import org.jdom.Namespace; 057 058 import java.io.StringReader; 059 import java.util.ArrayList; 060 import java.util.Date; 061 import java.util.List; 062 import java.util.Map; 063 064 public class SignalXCommand extends WorkflowXCommand<Void> { 065 066 protected static final String INSTR_SUCCEEDED_JOBS_COUNTER_NAME = "succeeded"; 067 068 private JPAService jpaService = null; 069 private String jobId; 070 private String actionId; 071 private WorkflowJobBean wfJob; 072 private WorkflowActionBean wfAction; 073 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 074 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 075 076 077 public SignalXCommand(String name, int priority, String jobId) { 078 super(name, name, priority); 079 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 080 } 081 082 public SignalXCommand(String jobId, String actionId) { 083 this("signal", 1, jobId); 084 this.actionId = ParamChecker.notEmpty(actionId, "actionId"); 085 } 086 087 @Override 088 protected boolean isLockRequired() { 089 return true; 090 } 091 092 @Override 093 public String getEntityKey() { 094 return this.jobId; 095 } 096 097 @Override 098 protected void loadState() throws CommandException { 099 try { 100 jpaService = Services.get().get(JPAService.class); 101 if (jpaService != null) { 102 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId)); 103 LogUtils.setLogInfo(wfJob, logInfo); 104 if (actionId != null) { 105 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId)); 106 LogUtils.setLogInfo(wfAction, logInfo); 107 } 108 } 109 else { 110 throw new CommandException(ErrorCode.E0610); 111 } 112 } 113 catch (XException ex) { 114 throw new CommandException(ex); 115 } 116 } 117 118 @Override 119 protected void verifyPrecondition() throws CommandException, PreconditionException { 120 if ((wfAction == null) || (wfAction.isComplete() && wfAction.isPending())) { 121 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.PREP) { 122 throw new PreconditionException(ErrorCode.E0813, wfJob.getStatusStr()); 123 } 124 } 125 else { 126 throw new PreconditionException(ErrorCode.E0814, actionId, wfAction.getStatusStr(), wfAction.isPending()); 127 } 128 } 129 130 @Override 131 protected Void execute() throws CommandException { 132 LOG.debug("STARTED SignalCommand for jobid=" + jobId + ", actionId=" + actionId); 133 WorkflowInstance workflowInstance = wfJob.getWorkflowInstance(); 134 workflowInstance.setTransientVar(WorkflowStoreService.WORKFLOW_BEAN, wfJob); 135 boolean completed = false; 136 boolean skipAction = false; 137 if (wfAction == null) { 138 if (wfJob.getStatus() == WorkflowJob.Status.PREP) { 139 try { 140 completed = workflowInstance.start(); 141 } 142 catch (WorkflowException e) { 143 throw new CommandException(e); 144 } 145 wfJob.setStatus(WorkflowJob.Status.RUNNING); 146 wfJob.setStartTime(new Date()); 147 wfJob.setWorkflowInstance(workflowInstance); 148 // 1. Add SLA status event for WF-JOB with status STARTED 149 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId, 150 Status.STARTED, SlaAppType.WORKFLOW_JOB); 151 if(slaEvent != null) { 152 insertList.add(slaEvent); 153 } 154 // 2. Add SLA registration events for all WF_ACTIONS 155 createSLARegistrationForAllActions(workflowInstance.getApp().getDefinition(), wfJob.getUser(), wfJob 156 .getGroup(), wfJob.getConf()); 157 queue(new NotificationXCommand(wfJob)); 158 } 159 else { 160 throw new CommandException(ErrorCode.E0801, wfJob.getId()); 161 } 162 } 163 else { 164 String skipVar = workflowInstance.getVar(wfAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR 165 + ReRunXCommand.TO_SKIP); 166 if (skipVar != null) { 167 skipAction = skipVar.equals("true"); 168 } 169 try { 170 completed = workflowInstance.signal(wfAction.getExecutionPath(), wfAction.getSignalValue()); 171 } 172 catch (WorkflowException e) { 173 throw new CommandException(e); 174 } 175 wfJob.setWorkflowInstance(workflowInstance); 176 wfAction.resetPending(); 177 if (!skipAction) { 178 wfAction.setTransition(workflowInstance.getTransition(wfAction.getName())); 179 queue(new NotificationXCommand(wfJob, wfAction)); 180 } 181 updateList.add(wfAction); 182 } 183 184 if (completed) { 185 try { 186 for (String actionToKillId : WorkflowStoreService.getActionsToKill(workflowInstance)) { 187 WorkflowActionBean actionToKill; 188 189 actionToKill = jpaService.execute(new WorkflowActionGetJPAExecutor(actionToKillId)); 190 191 actionToKill.setPending(); 192 actionToKill.setStatus(WorkflowActionBean.Status.KILLED); 193 updateList.add(actionToKill); 194 queue(new ActionKillXCommand(actionToKill.getId(), actionToKill.getType())); 195 } 196 197 for (String actionToFailId : WorkflowStoreService.getActionsToFail(workflowInstance)) { 198 WorkflowActionBean actionToFail = jpaService.execute(new WorkflowActionGetJPAExecutor( 199 actionToFailId)); 200 actionToFail.resetPending(); 201 actionToFail.setStatus(WorkflowActionBean.Status.FAILED); 202 queue(new NotificationXCommand(wfJob, actionToFail)); 203 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), 204 Status.FAILED, SlaAppType.WORKFLOW_ACTION); 205 if(slaEvent != null) { 206 insertList.add(slaEvent); 207 } 208 updateList.add(actionToFail); 209 } 210 } 211 catch (JPAExecutorException je) { 212 throw new CommandException(je); 213 } 214 215 wfJob.setStatus(WorkflowJob.Status.valueOf(workflowInstance.getStatus().toString())); 216 wfJob.setEndTime(new Date()); 217 wfJob.setWorkflowInstance(workflowInstance); 218 Status slaStatus = Status.SUCCEEDED; 219 switch (wfJob.getStatus()) { 220 case SUCCEEDED: 221 slaStatus = Status.SUCCEEDED; 222 break; 223 case KILLED: 224 slaStatus = Status.KILLED; 225 break; 226 case FAILED: 227 slaStatus = Status.FAILED; 228 break; 229 default: // TODO SUSPENDED 230 break; 231 } 232 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId, 233 slaStatus, SlaAppType.WORKFLOW_JOB); 234 if(slaEvent != null) { 235 insertList.add(slaEvent); 236 } 237 queue(new NotificationXCommand(wfJob)); 238 if (wfJob.getStatus() == WorkflowJob.Status.SUCCEEDED) { 239 InstrumentUtils.incrJobCounter(INSTR_SUCCEEDED_JOBS_COUNTER_NAME, 1, getInstrumentation()); 240 } 241 242 // output message for Kill node 243 if (wfAction != null) { // wfAction could be a no-op job 244 NodeDef nodeDef = workflowInstance.getNodeDef(wfAction.getExecutionPath()); 245 if (nodeDef != null && nodeDef instanceof KillNodeDef) { 246 boolean isRetry = false; 247 boolean isUserRetry = false; 248 ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, 249 isUserRetry); 250 try { 251 String tmpNodeConf = nodeDef.getConf(); 252 String actionConf = context.getELEvaluator().evaluate(tmpNodeConf, String.class); 253 LOG.debug("Try to resolve KillNode message for jobid [{0}], actionId [{1}], before resolve [{2}], after resolve [{3}]", 254 jobId, actionId, tmpNodeConf, actionConf); 255 if (wfAction.getErrorCode() != null) { 256 wfAction.setErrorInfo(wfAction.getErrorCode(), actionConf); 257 } 258 else { 259 wfAction.setErrorInfo(ErrorCode.E0729.toString(), actionConf); 260 } 261 updateList.add(wfAction); 262 } 263 catch (Exception ex) { 264 LOG.warn("Exception in SignalXCommand ", ex.getMessage(), ex); 265 throw new CommandException(ErrorCode.E0729, wfAction.getName(), ex); 266 } 267 } 268 } 269 270 } 271 else { 272 for (WorkflowActionBean newAction : WorkflowStoreService.getStartedActions(workflowInstance)) { 273 String skipVar = workflowInstance.getVar(newAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR 274 + ReRunXCommand.TO_SKIP); 275 boolean skipNewAction = false; 276 if (skipVar != null) { 277 skipNewAction = skipVar.equals("true"); 278 } 279 try { 280 if (skipNewAction) { 281 WorkflowActionBean oldAction; 282 283 oldAction = jpaService.execute(new WorkflowActionGetJPAExecutor(newAction.getId())); 284 285 oldAction.setPending(); 286 updateList.add(oldAction); 287 288 queue(new SignalXCommand(jobId, oldAction.getId())); 289 } 290 else { 291 newAction.setPending(); 292 String actionSlaXml = getActionSLAXml(newAction.getName(), workflowInstance.getApp() 293 .getDefinition(), wfJob.getConf()); 294 newAction.setSlaXml(actionSlaXml); 295 insertList.add(newAction); 296 LOG.debug("SignalXCommand: Name: "+ newAction.getName() + ", Id: " +newAction.getId() + ", Authcode:" + newAction.getCred()); 297 queue(new ActionStartXCommand(newAction.getId(), newAction.getType())); 298 } 299 } 300 catch (JPAExecutorException je) { 301 throw new CommandException(je); 302 } 303 } 304 } 305 306 try { 307 wfJob.setLastModifiedTime(new Date()); 308 updateList.add(wfJob); 309 // call JPAExecutor to do the bulk writes 310 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList)); 311 } 312 catch (JPAExecutorException je) { 313 throw new CommandException(je); 314 } 315 LOG.debug( 316 "Updated the workflow status to " + wfJob.getId() + " status =" + wfJob.getStatusStr()); 317 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) { 318 // update coordinator action 319 new CoordActionUpdateXCommand(wfJob).call(); //Note: Called even if wf is not necessarily instantiated by coordinator 320 new WfEndXCommand(wfJob).call(); //To delete the WF temp dir 321 } 322 LOG.debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" + actionId); 323 return null; 324 } 325 326 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) { 327 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group); 328 for (Map.Entry<String, String> entry : conf) { 329 eval.setVariable(entry.getKey(), entry.getValue()); 330 } 331 return eval; 332 } 333 334 @SuppressWarnings("unchecked") 335 private String getActionSLAXml(String actionName, String wfXml, String wfConf) throws CommandException { 336 String slaXml = null; 337 try { 338 Element eWfJob = XmlUtils.parseXml(wfXml); 339 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 340 if (action.getAttributeValue("name").equals(actionName) == false) { 341 continue; 342 } 343 Element eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI)); 344 if (eSla != null) { 345 slaXml = XmlUtils.prettyPrint(eSla).toString(); 346 break; 347 } 348 } 349 } 350 catch (Exception e) { 351 throw new CommandException(ErrorCode.E1004, e.getMessage(), e); 352 } 353 return slaXml; 354 } 355 356 private String resolveSla(Element eSla, Configuration conf) throws CommandException { 357 String slaXml = null; 358 try { 359 ELEvaluator evalSla = SubmitXCommand.createELEvaluatorForGroup(conf, "wf-sla-submit"); 360 slaXml = SubmitXCommand.resolveSla(eSla, evalSla); 361 } 362 catch (Exception e) { 363 throw new CommandException(ErrorCode.E1004, e.getMessage(), e); 364 } 365 return slaXml; 366 } 367 368 @SuppressWarnings("unchecked") 369 private void createSLARegistrationForAllActions(String wfXml, String user, String group, String strConf) 370 throws CommandException { 371 try { 372 Element eWfJob = XmlUtils.parseXml(wfXml); 373 Configuration conf = new XConfiguration(new StringReader(strConf)); 374 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 375 Element eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI)); 376 if (eSla != null) { 377 String slaXml = resolveSla(eSla, conf); 378 eSla = XmlUtils.parseXml(slaXml); 379 String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, 380 action.getAttributeValue("name") + ""); 381 SLAEventBean slaEvent = SLADbXOperations.createSlaRegistrationEvent(eSla, actionId, 382 SlaAppType.WORKFLOW_ACTION, user, group); 383 if(slaEvent != null) { 384 insertList.add(slaEvent); 385 } 386 } 387 } 388 } 389 catch (Exception e) { 390 throw new CommandException(ErrorCode.E1007, "workflow:Actions " + jobId, e.getMessage(), e); 391 } 392 393 } 394 395 }