001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.oozie.client.CoordinatorAction; 022 import org.apache.oozie.client.WorkflowJob; 023 import org.apache.oozie.client.SLAEvent.SlaAppType; 024 import org.apache.oozie.client.SLAEvent.Status; 025 import org.apache.oozie.CoordinatorActionBean; 026 import org.apache.oozie.WorkflowActionBean; 027 import org.apache.oozie.WorkflowJobBean; 028 import org.apache.oozie.ErrorCode; 029 import org.apache.oozie.XException; 030 import org.apache.oozie.command.CommandException; 031 import org.apache.oozie.command.coord.CoordActionReadyCommand; 032 import org.apache.oozie.command.coord.CoordActionUpdateCommand; 033 import org.apache.oozie.coord.CoordELFunctions; 034 import org.apache.oozie.coord.CoordinatorJobException; 035 import org.apache.oozie.service.ELService; 036 import org.apache.oozie.service.SchemaService; 037 import org.apache.oozie.service.Services; 038 import org.apache.oozie.service.StoreService; 039 import org.apache.oozie.service.UUIDService; 040 import org.apache.oozie.service.WorkflowStoreService; 041 import org.apache.oozie.store.CoordinatorStore; 042 import org.apache.oozie.store.StoreException; 043 import org.apache.oozie.store.WorkflowStore; 044 import org.apache.oozie.workflow.WorkflowException; 045 import org.apache.oozie.workflow.WorkflowInstance; 046 import org.apache.oozie.util.ELEvaluator; 047 import org.apache.oozie.util.XConfiguration; 048 import org.apache.oozie.util.XLog; 049 import org.apache.oozie.util.ParamChecker; 050 import org.apache.oozie.util.XmlUtils; 051 import org.apache.oozie.util.db.SLADbOperations; 052 import org.apache.openjpa.lib.log.Log; 053 import org.jdom.Element; 054 import org.jdom.JDOMException; 055 import org.jdom.Namespace; 056 057 import java.io.StringReader; 058 import java.util.Date; 059 import java.util.List; 060 import java.util.Map; 061 062 public class SignalCommand extends WorkflowCommand<Void> { 063 064 protected static final String INSTR_SUCCEEDED_JOBS_COUNTER_NAME = "succeeded"; 065 066 private String jobId; 067 private String actionId; 068 069 protected SignalCommand(String name, int priority, String jobId) { 070 super(name, name, priority, XLog.STD); 071 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 072 } 073 074 public SignalCommand(String jobId, String actionId) { 075 super("signal", "signal", 1, XLog.STD); 076 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 077 this.actionId = ParamChecker.notEmpty(actionId, "actionId"); 078 } 079 080 @Override 081 protected Void call(WorkflowStore store) throws CommandException, StoreException { 082 083 WorkflowJobBean workflow = store.getWorkflow(jobId, false); 084 setLogInfo(workflow); 085 WorkflowActionBean action = null; 086 boolean skipAction = false; 087 if (actionId != null) { 088 action = store.getAction(actionId, false); 089 setLogInfo(action); 090 } 091 if ((action == null) || (action.isComplete() && action.isPending())) { 092 try { 093 if (workflow.getStatus() == WorkflowJob.Status.RUNNING 094 || workflow.getStatus() == WorkflowJob.Status.PREP) { 095 WorkflowInstance workflowInstance = workflow.getWorkflowInstance(); 096 workflowInstance.setTransientVar(WorkflowStoreService.WORKFLOW_BEAN, workflow); 097 boolean completed; 098 if (action == null) { 099 if (workflow.getStatus() == WorkflowJob.Status.PREP) { 100 completed = workflowInstance.start(); 101 workflow.setStatus(WorkflowJob.Status.RUNNING); 102 workflow.setStartTime(new Date()); 103 workflow.setWorkflowInstance(workflowInstance); 104 // 1. Add SLA status event for WF-JOB with status 105 // STARTED 106 // 2. Add SLA registration events for all WF_ACTIONS 107 SLADbOperations.writeStausEvent(workflow.getSlaXml(), jobId, store, Status.STARTED, 108 SlaAppType.WORKFLOW_JOB); 109 writeSLARegistrationForAllActions(workflowInstance.getApp().getDefinition(), workflow 110 .getUser(), workflow.getGroup(), workflow.getConf(), store); 111 queueCallable(new NotificationCommand(workflow)); 112 } 113 else { 114 throw new CommandException(ErrorCode.E0801, workflow.getId()); 115 } 116 } 117 else { 118 String skipVar = workflowInstance.getVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR 119 + ReRunCommand.TO_SKIP); 120 if (skipVar != null) { 121 skipAction = skipVar.equals("true"); 122 } 123 completed = workflowInstance.signal(action.getExecutionPath(), action.getSignalValue()); 124 workflow.setWorkflowInstance(workflowInstance); 125 action.resetPending(); 126 if (!skipAction) { 127 action.setTransition(workflowInstance.getTransition(action.getName())); 128 } 129 store.updateAction(action); 130 } 131 132 if (completed) { 133 for (String actionToKillId : WorkflowStoreService.getActionsToKill(workflowInstance)) { 134 WorkflowActionBean actionToKill = store.getAction(actionToKillId, false); 135 actionToKill.setPending(); 136 actionToKill.setStatus(WorkflowActionBean.Status.KILLED); 137 store.updateAction(actionToKill); 138 queueCallable(new ActionKillCommand(actionToKill.getId(), actionToKill.getType())); 139 } 140 141 for (String actionToFailId : WorkflowStoreService.getActionsToFail(workflowInstance)) { 142 WorkflowActionBean actionToFail = store.getAction(actionToFailId, false); 143 actionToFail.resetPending(); 144 actionToFail.setStatus(WorkflowActionBean.Status.FAILED); 145 SLADbOperations.writeStausEvent(action.getSlaXml(), action.getId(), store, Status.FAILED, 146 SlaAppType.WORKFLOW_ACTION); 147 store.updateAction(actionToFail); 148 } 149 150 workflow.setStatus(WorkflowJob.Status.valueOf(workflowInstance.getStatus().toString())); 151 workflow.setEndTime(new Date()); 152 workflow.setWorkflowInstance(workflowInstance); 153 Status slaStatus = Status.SUCCEEDED; 154 switch (workflow.getStatus()) { 155 case SUCCEEDED: 156 slaStatus = Status.SUCCEEDED; 157 break; 158 case KILLED: 159 slaStatus = Status.KILLED; 160 break; 161 case FAILED: 162 slaStatus = Status.FAILED; 163 break; 164 default: // TODO about SUSPENDED 165 166 } 167 SLADbOperations.writeStausEvent(workflow.getSlaXml(), jobId, store, slaStatus, 168 SlaAppType.WORKFLOW_JOB); 169 queueCallable(new NotificationCommand(workflow)); 170 if (workflow.getStatus() == WorkflowJob.Status.SUCCEEDED) { 171 incrJobCounter(INSTR_SUCCEEDED_JOBS_COUNTER_NAME, 1); 172 } 173 } 174 else { 175 for (WorkflowActionBean newAction : WorkflowStoreService.getStartedActions(workflowInstance)) { 176 String skipVar = workflowInstance.getVar(newAction.getName() 177 + WorkflowInstance.NODE_VAR_SEPARATOR + ReRunCommand.TO_SKIP); 178 boolean skipNewAction = false; 179 if (skipVar != null) { 180 skipNewAction = skipVar.equals("true"); 181 } 182 if (skipNewAction) { 183 WorkflowActionBean oldAction = store.getAction(newAction.getId(), false); 184 oldAction.setPending(); 185 store.updateAction(oldAction); 186 queueCallable(new SignalCommand(jobId, oldAction.getId())); 187 } 188 else { 189 newAction.setPending(); 190 String actionSlaXml = getActionSLAXml(newAction.getName(), workflowInstance.getApp() 191 .getDefinition(), workflow.getConf()); 192 // System.out.println("111111 actionXml " + 193 // actionSlaXml); 194 // newAction.setSlaXml(workflow.getSlaXml()); 195 newAction.setSlaXml(actionSlaXml); 196 XLog.getLog(getClass()).debug("SignalCOmmand: Name: "+ newAction.getName() +"Id: " +newAction.getId()+ " Authcode:" + newAction.getCred()); 197 store.insertAction(newAction); 198 queueCallable(new ActionStartCommand(newAction.getId(), newAction.getType())); 199 } 200 } 201 } 202 203 store.updateWorkflow(workflow); 204 XLog.getLog(getClass()).debug( 205 "Updated the workflow status to " + workflow.getId() + " status =" 206 + workflow.getStatusStr()); 207 if (workflow.getStatus() != WorkflowJob.Status.RUNNING 208 && workflow.getStatus() != WorkflowJob.Status.SUSPENDED) { 209 queueCallable(new CoordActionUpdateCommand(workflow)); 210 } 211 } 212 else { 213 XLog.getLog(getClass()).warn("Workflow not RUNNING, current status [{0}]", workflow.getStatus()); 214 } 215 } 216 catch (WorkflowException ex) { 217 throw new CommandException(ex); 218 } 219 } 220 else { 221 XLog.getLog(getClass()).warn( 222 "SignalCommand for action id :" + actionId + " is already processed. status=" + action.getStatus() 223 + ", Pending=" + action.isPending()); 224 } 225 return null; 226 } 227 228 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) { 229 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group); 230 for (Map.Entry<String, String> entry : conf) { 231 eval.setVariable(entry.getKey(), entry.getValue()); 232 } 233 return eval; 234 } 235 236 private String getActionSLAXml(String actionName, String wfXml, String wfConf) throws CommandException { 237 String slaXml = null; 238 // TODO need to fill-out the code 239 // Get the appropriate action:slaXml and resolve that. 240 try { 241 // Configuration conf = new XConfiguration(new 242 // StringReader(wfConf)); 243 Element eWfJob = XmlUtils.parseXml(wfXml); 244 // String prefix = XmlUtils.getNamespacePrefix(eWfJob, 245 // SchemaService.SLA_NAME_SPACE_URI); 246 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 247 if (action.getAttributeValue("name").equals(actionName) == false) { 248 continue; 249 } 250 Element eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI)); 251 if (eSla != null) { 252 // resolveSla(eSla, conf); 253 slaXml = XmlUtils.prettyPrint(eSla).toString();// Could use 254 // any 255 // non-null 256 // string 257 break; 258 } 259 } 260 } 261 catch (Exception e) { 262 throw new CommandException(ErrorCode.E1004, e.getMessage(), e); 263 } 264 return slaXml; 265 } 266 267 private String resolveSla(Element eSla, Configuration conf) throws CommandException { 268 String slaXml = null; 269 try { 270 ELEvaluator evalSla = SubmitCommand.createELEvaluatorForGroup(conf, "wf-sla-submit"); 271 slaXml = SubmitCommand.resolveSla(eSla, evalSla); 272 } 273 catch (Exception e) { 274 throw new CommandException(ErrorCode.E1004, e.getMessage(), e); 275 } 276 return slaXml; 277 } 278 279 private void writeSLARegistrationForAllActions(String wfXml, String user, String group, String strConf, 280 WorkflowStore store) throws CommandException { 281 try { 282 Element eWfJob = XmlUtils.parseXml(wfXml); 283 // String prefix = XmlUtils.getNamespacePrefix(eWfJob, 284 // SchemaService.SLA_NAME_SPACE_URI); 285 Configuration conf = new XConfiguration(new StringReader(strConf)); 286 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) { 287 Element eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI)); 288 if (eSla != null) { 289 String slaXml = resolveSla(eSla, conf); 290 eSla = XmlUtils.parseXml(slaXml); 291 String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, 292 action.getAttributeValue("name") + ""); 293 SLADbOperations.writeSlaRegistrationEvent(eSla, store, actionId, SlaAppType.WORKFLOW_ACTION, user, 294 group); 295 } 296 } 297 } 298 catch (Exception e) { 299 throw new CommandException(ErrorCode.E1007, "workflow:Actions " + jobId, e); 300 } 301 302 } 303 304 @Override 305 protected Void execute(WorkflowStore store) throws CommandException, StoreException { 306 XLog.getLog(getClass()).debug("STARTED SignalCommand for jobid=" + jobId + ", actionId=" + actionId); 307 try { 308 if (lock(jobId)) { 309 call(store); 310 } 311 else { 312 queueCallable(new SignalCommand(jobId, actionId), LOCK_FAILURE_REQUEUE_INTERVAL); 313 XLog.getLog(getClass()).warn("SignalCommand lock was not acquired - failed {0}", jobId); 314 } 315 } 316 catch (InterruptedException e) { 317 queueCallable(new SignalCommand(jobId, actionId), LOCK_FAILURE_REQUEUE_INTERVAL); 318 XLog.getLog(getClass()).warn("SignalCommand lock not acquired - interrupted exception failed {0}", jobId); 319 } 320 XLog.getLog(getClass()).debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" + actionId); 321 return null; 322 } 323 }