001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.oozie.client.WorkflowJob; 022 import org.apache.oozie.client.SLAEvent.SlaAppType; 023 import org.apache.oozie.client.SLAEvent.Status; 024 import org.apache.oozie.WorkflowActionBean; 025 import org.apache.oozie.WorkflowJobBean; 026 import org.apache.oozie.ErrorCode; 027 import org.apache.oozie.XException; 028 import org.apache.oozie.command.CommandException; 029 import org.apache.oozie.command.PreconditionException; 030 import org.apache.oozie.command.coord.CoordActionUpdateXCommand; 031 import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext; 032 import org.apache.oozie.executor.jpa.JPAExecutorException; 033 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; 034 import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor; 035 import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor; 036 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 037 import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor; 038 import org.apache.oozie.service.ELService; 039 import org.apache.oozie.service.JPAService; 040 import org.apache.oozie.service.SchemaService; 041 import org.apache.oozie.service.Services; 042 import org.apache.oozie.service.UUIDService; 043 import org.apache.oozie.service.WorkflowStoreService; 044 import org.apache.oozie.workflow.WorkflowException; 045 import org.apache.oozie.workflow.WorkflowInstance; 046 import org.apache.oozie.workflow.lite.KillNodeDef; 047 import org.apache.oozie.workflow.lite.NodeDef; 048 import org.apache.oozie.util.ELEvaluator; 049 import org.apache.oozie.util.InstrumentUtils; 050 import org.apache.oozie.util.LogUtils; 051 import org.apache.oozie.util.XConfiguration; 052 import org.apache.oozie.util.ParamChecker; 053 import org.apache.oozie.util.XmlUtils; 054 import org.apache.oozie.util.db.SLADbXOperations; 055 import org.jdom.Element; 056 import org.jdom.Namespace; 057 058 import java.io.StringReader; 059 import java.util.Date; 060 import java.util.List; 061 import java.util.Map; 062 063 public class SignalXCommand extends WorkflowXCommand<Void> { 064 065 protected static final String INSTR_SUCCEEDED_JOBS_COUNTER_NAME = "succeeded"; 066 067 private JPAService jpaService = null; 068 private String jobId; 069 private String actionId; 070 private WorkflowJobBean wfJob; 071 private WorkflowActionBean wfAction; 072 073 public SignalXCommand(String name, int priority, String jobId) { 074 super(name, name, priority); 075 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 076 } 077 078 public SignalXCommand(String jobId, String actionId) { 079 this("signal", 1, jobId); 080 this.actionId = ParamChecker.notEmpty(actionId, "actionId"); 081 } 082 083 @Override 084 protected boolean isLockRequired() { 085 return true; 086 } 087 088 @Override 089 protected String getEntityKey() { 090 return this.jobId; 091 } 092 093 @Override 094 protected void loadState() throws CommandException { 095 try { 096 jpaService = Services.get().get(JPAService.class); 097 if (jpaService != null) { 098 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId)); 099 LogUtils.setLogInfo(wfJob, logInfo); 100 if (actionId != null) { 101 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId)); 102 LogUtils.setLogInfo(wfAction, logInfo); 103 } 104 } 105 else { 106 throw new CommandException(ErrorCode.E0610); 107 } 108 } 109 catch (XException ex) { 110 throw new CommandException(ex); 111 } 112 } 113 114 @Override 115 protected void verifyPrecondition() throws CommandException, PreconditionException { 116 if ((wfAction == null) || (wfAction.isComplete() && wfAction.isPending())) { 117 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.PREP) { 118 throw new PreconditionException(ErrorCode.E0813, wfJob.getStatusStr()); 119 } 120 } 121 else { 122 throw new PreconditionException(ErrorCode.E0814, actionId, wfAction.getStatusStr(), wfAction.isPending()); 123 } 124 } 125 126 @Override 127 protected Void execute() throws CommandException { 128 LOG.debug("STARTED SignalCommand for jobid=" + jobId + ", actionId=" + actionId); 129 WorkflowInstance workflowInstance = wfJob.getWorkflowInstance(); 130 workflowInstance.setTransientVar(WorkflowStoreService.WORKFLOW_BEAN, wfJob); 131 boolean completed = false; 132 boolean skipAction = false; 133 if (wfAction == null) { 134 if (wfJob.getStatus() == WorkflowJob.Status.PREP) { 135 try { 136 completed = workflowInstance.start(); 137 } 138 catch (WorkflowException e) { 139 throw new CommandException(e); 140 } 141 wfJob.setStatus(WorkflowJob.Status.RUNNING); 142 wfJob.setStartTime(new Date()); 143 wfJob.setWorkflowInstance(workflowInstance); 144 // 1. Add SLA status event for WF-JOB with status STARTED 145 // 2. Add SLA registration events for all WF_ACTIONS 146 SLADbXOperations.writeStausEvent(wfJob.getSlaXml(), jobId, Status.STARTED, SlaAppType.WORKFLOW_JOB); 147 writeSLARegistrationForAllActions(workflowInstance.getApp().getDefinition(), wfJob.getUser(), wfJob 148 .getGroup(), wfJob.getConf()); 149 queue(new NotificationXCommand(wfJob)); 150 } 151 else { 152 throw new CommandException(ErrorCode.E0801, wfJob.getId()); 153 } 154 } 155 else { 156 String skipVar = workflowInstance.getVar(wfAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR 157 + ReRunCommand.TO_SKIP); 158 if (skipVar != null) { 159 skipAction = skipVar.equals("true"); 160 } 161 try { 162 completed = workflowInstance.signal(wfAction.getExecutionPath(), wfAction.getSignalValue()); 163 } 164 catch (WorkflowException e) { 165 throw new CommandException(e); 166 } 167 wfJob.setWorkflowInstance(workflowInstance); 168 wfAction.resetPending(); 169 if (!skipAction) { 170 wfAction.setTransition(workflowInstance.getTransition(wfAction.getName())); 171 } 172 try { 173 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction)); 174 } 175 catch (JPAExecutorException je) { 176 throw new CommandException(je); 177 } 178 } 179 180 if (completed) { 181 try { 182 for (String actionToKillId : WorkflowStoreService.getActionsToKill(workflowInstance)) { 183 WorkflowActionBean actionToKill; 184 185 actionToKill = jpaService.execute(new WorkflowActionGetJPAExecutor(actionToKillId)); 186 187 actionToKill.setPending(); 188 actionToKill.setStatus(WorkflowActionBean.Status.KILLED); 189 jpaService.execute(new WorkflowActionUpdateJPAExecutor(actionToKill)); 190 queue(new ActionKillXCommand(actionToKill.getId(), actionToKill.getType())); 191 } 192 193 for (String actionToFailId : WorkflowStoreService.getActionsToFail(workflowInstance)) { 194 WorkflowActionBean actionToFail = jpaService.execute(new WorkflowActionGetJPAExecutor( 195 actionToFailId)); 196 actionToFail.resetPending(); 197 actionToFail.setStatus(WorkflowActionBean.Status.FAILED); 198 SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED, 199 SlaAppType.WORKFLOW_ACTION); 200 jpaService.execute(new WorkflowActionUpdateJPAExecutor(actionToFail)); 201 } 202 } 203 catch (JPAExecutorException je) { 204 throw new CommandException(je); 205 } 206 207 wfJob.setStatus(WorkflowJob.Status.valueOf(workflowInstance.getStatus().toString())); 208 wfJob.setEndTime(new Date()); 209 wfJob.setWorkflowInstance(workflowInstance); 210 Status slaStatus = Status.SUCCEEDED; 211 switch (wfJob.getStatus()) { 212 case SUCCEEDED: 213 slaStatus = Status.SUCCEEDED; 214 break; 215 case KILLED: 216 slaStatus = Status.KILLED; 217 break; 218 case FAILED: 219 slaStatus = Status.FAILED; 220 break; 221 default: // TODO SUSPENDED 222 break; 223 } 224 SLADbXOperations.writeStausEvent(wfJob.getSlaXml(), jobId, slaStatus, SlaAppType.WORKFLOW_JOB); 225 queue(new NotificationXCommand(wfJob)); 226 if (wfJob.getStatus() == WorkflowJob.Status.SUCCEEDED) { 227 InstrumentUtils.incrJobCounter(INSTR_SUCCEEDED_JOBS_COUNTER_NAME, 1, getInstrumentation()); 228 } 229 230 // output message for Kill node 231 if (wfAction != null) { // wfAction could be a no-op job 232 NodeDef nodeDef = workflowInstance.getNodeDef(wfAction.getExecutionPath()); 233 if (nodeDef != null && nodeDef instanceof KillNodeDef) { 234 boolean isRetry = false; 235 boolean isUserRetry = false; 236 ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, 237 isUserRetry); 238 try { 239 String tmpNodeConf = nodeDef.getConf(); 240 String actionConf = context.getELEvaluator().evaluate(tmpNodeConf, String.class); 241 LOG.debug("Try to resolve KillNode message for jobid [{0}], actionId [{1}], before resolve [{2}], after resolve [{3}]", 242 jobId, actionId, tmpNodeConf, actionConf); 243 if (wfAction.getErrorCode() != null) { 244 wfAction.setErrorInfo(wfAction.getErrorCode(), actionConf); 245 } 246 else { 247 wfAction.setErrorInfo(ErrorCode.E0729.toString(), actionConf); 248 } 249 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction)); 250 } 251 catch (JPAExecutorException je) { 252 throw new CommandException(je); 253 } 254 catch (Exception ex) { 255 LOG.warn("Exception in SignalXCommand ", ex.getMessage(), ex); 256 throw new CommandException(ErrorCode.E0729, wfAction.getName(), ex); 257 } 258 } 259 } 260 261 } 262 else { 263 for (WorkflowActionBean newAction : WorkflowStoreService.getStartedActions(workflowInstance)) { 264 String skipVar = workflowInstance.getVar(newAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR 265 + ReRunCommand.TO_SKIP); 266 boolean skipNewAction = false; 267 if (skipVar != null) { 268 skipNewAction = skipVar.equals("true"); 269 } 270 try { 271 if (skipNewAction) { 272 WorkflowActionBean oldAction; 273 274 oldAction = jpaService.execute(new WorkflowActionGetJPAExecutor(newAction.getId())); 275 276 oldAction.setPending(); 277 jpaService.execute(new WorkflowActionUpdateJPAExecutor(oldAction)); 278 279 queue(new SignalXCommand(jobId, oldAction.getId())); 280 } 281 else { 282 newAction.setPending(); 283 String actionSlaXml = getActionSLAXml(newAction.getName(), workflowInstance.getApp() 284 .getDefinition(), wfJob.getConf()); 285 newAction.setSlaXml(actionSlaXml); 286 jpaService.execute(new WorkflowActionInsertJPAExecutor(newAction)); 287 LOG.debug("SignalXCommand: Name: "+ newAction.getName() + ", Id: " +newAction.getId() + ", Authcode:" + newAction.getCred()); 288 queue(new ActionStartXCommand(newAction.getId(), newAction.getType())); 289 } 290 } 291 catch (JPAExecutorException je) { 292 throw new CommandException(je); 293 } 294 } 295 } 296 297 try { 298 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob)); 299 } 300 catch (JPAExecutorException je) { 301 throw new CommandException(je); 302 } 303 LOG.debug( 304 "Updated the workflow status to " + wfJob.getId() + " status =" + wfJob.getStatusStr()); 305 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) { 306 // update coordinator action 307 new CoordActionUpdateXCommand(wfJob).call(); //Note: Called even if wf is not necessarily instantiated by coordinator 308 new WfEndXCommand(wfJob).call(); //To delete the WF temp dir 309 } 310 LOG.debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" + actionId); 311 return null; 312 } 313 314 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) { 315 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group); 316 for (Map.Entry<String, String> entry : conf) { 317 eval.setVariable(entry.getKey(), entry.getValue()); 318 } 319 return eval; 320 } 321 322 @SuppressWarnings("unchecked") 323 private String getActionSLAXml(String actionName, String wfXml, String wfConf) throws CommandException { 324 String slaXml = null; 325 try { 326 Element eWfJob = XmlUtils.parseXml(wfXml); 327 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 328 if (action.getAttributeValue("name").equals(actionName) == false) { 329 continue; 330 } 331 Element eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI)); 332 if (eSla != null) { 333 slaXml = XmlUtils.prettyPrint(eSla).toString(); 334 break; 335 } 336 } 337 } 338 catch (Exception e) { 339 throw new CommandException(ErrorCode.E1004, e.getMessage(), e); 340 } 341 return slaXml; 342 } 343 344 private String resolveSla(Element eSla, Configuration conf) throws CommandException { 345 String slaXml = null; 346 try { 347 ELEvaluator evalSla = SubmitCommand.createELEvaluatorForGroup(conf, "wf-sla-submit"); 348 slaXml = SubmitCommand.resolveSla(eSla, evalSla); 349 } 350 catch (Exception e) { 351 throw new CommandException(ErrorCode.E1004, e.getMessage(), e); 352 } 353 return slaXml; 354 } 355 356 @SuppressWarnings("unchecked") 357 private void writeSLARegistrationForAllActions(String wfXml, String user, String group, String strConf) 358 throws CommandException { 359 try { 360 Element eWfJob = XmlUtils.parseXml(wfXml); 361 Configuration conf = new XConfiguration(new StringReader(strConf)); 362 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 363 Element eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI)); 364 if (eSla != null) { 365 String slaXml = resolveSla(eSla, conf); 366 eSla = XmlUtils.parseXml(slaXml); 367 String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, 368 action.getAttributeValue("name") + ""); 369 SLADbXOperations.writeSlaRegistrationEvent(eSla, actionId, SlaAppType.WORKFLOW_ACTION, user, group); 370 } 371 } 372 } 373 catch (Exception e) { 374 throw new CommandException(ErrorCode.E1007, "workflow:Actions " + jobId, e); 375 } 376 377 } 378 379 }