This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.io.IOException;
021 import org.apache.hadoop.conf.Configuration;
022 import org.apache.oozie.client.WorkflowJob;
023 import org.apache.oozie.client.SLAEvent.SlaAppType;
024 import org.apache.oozie.client.SLAEvent.Status;
025 import org.apache.oozie.client.rest.JsonBean;
026 import org.apache.oozie.SLAEventBean;
027 import org.apache.oozie.WorkflowActionBean;
028 import org.apache.oozie.WorkflowJobBean;
029 import org.apache.oozie.ErrorCode;
030 import org.apache.oozie.XException;
031 import org.apache.oozie.command.CommandException;
032 import org.apache.oozie.command.PreconditionException;
033 import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
034 import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
035 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
036 import org.apache.oozie.executor.jpa.JPAExecutorException;
037 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
038 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
039 import org.apache.oozie.service.ELService;
040 import org.apache.oozie.service.EventHandlerService;
041 import org.apache.oozie.service.JPAService;
042 import org.apache.oozie.service.Services;
043 import org.apache.oozie.service.UUIDService;
044 import org.apache.oozie.service.WorkflowStoreService;
045 import org.apache.oozie.workflow.WorkflowException;
046 import org.apache.oozie.workflow.WorkflowInstance;
047 import org.apache.oozie.workflow.lite.KillNodeDef;
048 import org.apache.oozie.workflow.lite.NodeDef;
049 import org.apache.oozie.util.ELEvaluator;
050 import org.apache.oozie.util.InstrumentUtils;
051 import org.apache.oozie.util.LogUtils;
052 import org.apache.oozie.util.XConfiguration;
053 import org.apache.oozie.util.ParamChecker;
054 import org.apache.oozie.util.XmlUtils;
055 import org.apache.oozie.util.db.SLADbXOperations;
056 import org.jdom.Element;
057 import java.io.StringReader;
058 import java.util.ArrayList;
059 import java.util.Date;
060 import java.util.List;
061 import java.util.Map;
062 import org.apache.oozie.client.OozieClient;
063
064 @SuppressWarnings("deprecation")
065 public class SignalXCommand extends WorkflowXCommand<Void> {
066
067 protected static final String INSTR_SUCCEEDED_JOBS_COUNTER_NAME = "succeeded";
068
069 private JPAService jpaService = null;
070 private String jobId;
071 private String actionId;
072 private WorkflowJobBean wfJob;
073 private WorkflowActionBean wfAction;
074 private List<JsonBean> updateList = new ArrayList<JsonBean>();
075 private List<JsonBean> insertList = new ArrayList<JsonBean>();
076 private boolean generateEvent = false;
077 private String wfJobErrorCode;
078 private String wfJobErrorMsg;
079
080
081 public SignalXCommand(String name, int priority, String jobId) {
082 super(name, name, priority);
083 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
084 }
085
086 public SignalXCommand(String jobId, String actionId) {
087 this("signal", 1, jobId);
088 this.actionId = ParamChecker.notEmpty(actionId, "actionId");
089 }
090
091 @Override
092 protected boolean isLockRequired() {
093 return true;
094 }
095
096 @Override
097 public String getEntityKey() {
098 return this.jobId;
099 }
100
101 @Override
102 public String getKey() {
103 return getName() + "_" + jobId + "_" + actionId;
104 }
105
106 @Override
107 protected void loadState() throws CommandException {
108 try {
109 jpaService = Services.get().get(JPAService.class);
110 if (jpaService != null) {
111 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
112 LogUtils.setLogInfo(wfJob, logInfo);
113 if (actionId != null) {
114 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
115 LogUtils.setLogInfo(wfAction, logInfo);
116 }
117 }
118 else {
119 throw new CommandException(ErrorCode.E0610);
120 }
121 }
122 catch (XException ex) {
123 throw new CommandException(ex);
124 }
125 }
126
127 @Override
128 protected void verifyPrecondition() throws CommandException, PreconditionException {
129 if ((wfAction == null) || (wfAction.isComplete() && wfAction.isPending())) {
130 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.PREP) {
131 throw new PreconditionException(ErrorCode.E0813, wfJob.getStatusStr());
132 }
133 }
134 else {
135 throw new PreconditionException(ErrorCode.E0814, actionId, wfAction.getStatusStr(), wfAction.isPending());
136 }
137 }
138
139 @Override
140 protected Void execute() throws CommandException {
141 LOG.debug("STARTED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
142 WorkflowInstance workflowInstance = wfJob.getWorkflowInstance();
143 workflowInstance.setTransientVar(WorkflowStoreService.WORKFLOW_BEAN, wfJob);
144 boolean completed = false;
145 boolean skipAction = false;
146 if (wfAction == null) {
147 if (wfJob.getStatus() == WorkflowJob.Status.PREP) {
148 try {
149 completed = workflowInstance.start();
150 }
151 catch (WorkflowException e) {
152 throw new CommandException(e);
153 }
154 wfJob.setStatus(WorkflowJob.Status.RUNNING);
155 wfJob.setStartTime(new Date());
156 wfJob.setWorkflowInstance(workflowInstance);
157 generateEvent = true;
158 // 1. Add SLA status event for WF-JOB with status STARTED
159 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId,
160 Status.STARTED, SlaAppType.WORKFLOW_JOB);
161 if(slaEvent != null) {
162 insertList.add(slaEvent);
163 }
164 // 2. Add SLA registration events for all WF_ACTIONS
165 createSLARegistrationForAllActions(workflowInstance.getApp().getDefinition(), wfJob.getUser(), wfJob
166 .getGroup(), wfJob.getConf());
167 queue(new NotificationXCommand(wfJob));
168 }
169 else {
170 throw new CommandException(ErrorCode.E0801, wfJob.getId());
171 }
172 }
173 else {
174 WorkflowInstance.Status initialStatus = workflowInstance.getStatus();
175 String skipVar = workflowInstance.getVar(wfAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR
176 + ReRunXCommand.TO_SKIP);
177 if (skipVar != null) {
178 skipAction = skipVar.equals("true");
179 }
180 try {
181 completed = workflowInstance.signal(wfAction.getExecutionPath(), wfAction.getSignalValue());
182 }
183 catch (WorkflowException e) {
184 throw new CommandException(e);
185 }
186 wfJob.setWorkflowInstance(workflowInstance);
187 wfAction.resetPending();
188 if (!skipAction) {
189 wfAction.setTransition(workflowInstance.getTransition(wfAction.getName()));
190 queue(new NotificationXCommand(wfJob, wfAction));
191 }
192 updateList.add(wfAction);
193 WorkflowInstance.Status endStatus = workflowInstance.getStatus();
194 if (endStatus != initialStatus) {
195 generateEvent = true;
196 }
197 }
198
199 if (completed) {
200 try {
201 for (String actionToKillId : WorkflowStoreService.getActionsToKill(workflowInstance)) {
202 WorkflowActionBean actionToKill;
203
204 actionToKill = jpaService.execute(new WorkflowActionGetJPAExecutor(actionToKillId));
205
206 actionToKill.setPending();
207 actionToKill.setStatus(WorkflowActionBean.Status.KILLED);
208 updateList.add(actionToKill);
209 queue(new ActionKillXCommand(actionToKill.getId(), actionToKill.getType()));
210 }
211
212 for (String actionToFailId : WorkflowStoreService.getActionsToFail(workflowInstance)) {
213 WorkflowActionBean actionToFail = jpaService.execute(new WorkflowActionGetJPAExecutor(
214 actionToFailId));
215 actionToFail.resetPending();
216 actionToFail.setStatus(WorkflowActionBean.Status.FAILED);
217 if (wfJobErrorCode != null) {
218 wfJobErrorCode = actionToFail.getErrorCode();
219 wfJobErrorMsg = actionToFail.getErrorMessage();
220 }
221 queue(new NotificationXCommand(wfJob, actionToFail));
222 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(),
223 Status.FAILED, SlaAppType.WORKFLOW_ACTION);
224 if(slaEvent != null) {
225 insertList.add(slaEvent);
226 }
227 updateList.add(actionToFail);
228 }
229 }
230 catch (JPAExecutorException je) {
231 throw new CommandException(je);
232 }
233
234 wfJob.setStatus(WorkflowJob.Status.valueOf(workflowInstance.getStatus().toString()));
235 wfJob.setEndTime(new Date());
236 wfJob.setWorkflowInstance(workflowInstance);
237 Status slaStatus = Status.SUCCEEDED;
238 switch (wfJob.getStatus()) {
239 case SUCCEEDED:
240 slaStatus = Status.SUCCEEDED;
241 break;
242 case KILLED:
243 slaStatus = Status.KILLED;
244 break;
245 case FAILED:
246 slaStatus = Status.FAILED;
247 break;
248 default: // TODO SUSPENDED
249 break;
250 }
251 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId,
252 slaStatus, SlaAppType.WORKFLOW_JOB);
253 if(slaEvent != null) {
254 insertList.add(slaEvent);
255 }
256 queue(new NotificationXCommand(wfJob));
257 if (wfJob.getStatus() == WorkflowJob.Status.SUCCEEDED) {
258 InstrumentUtils.incrJobCounter(INSTR_SUCCEEDED_JOBS_COUNTER_NAME, 1, getInstrumentation());
259 }
260
261 // output message for Kill node
262 if (wfAction != null) { // wfAction could be a no-op job
263 NodeDef nodeDef = workflowInstance.getNodeDef(wfAction.getExecutionPath());
264 if (nodeDef != null && nodeDef instanceof KillNodeDef) {
265 boolean isRetry = false;
266 boolean isUserRetry = false;
267 ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry,
268 isUserRetry);
269 try {
270 String tmpNodeConf = nodeDef.getConf();
271 String actionConf = context.getELEvaluator().evaluate(tmpNodeConf, String.class);
272 LOG.debug("Try to resolve KillNode message for jobid [{0}], actionId [{1}], before resolve [{2}], after resolve [{3}]",
273 jobId, actionId, tmpNodeConf, actionConf);
274 if (wfAction.getErrorCode() != null) {
275 wfAction.setErrorInfo(wfAction.getErrorCode(), actionConf);
276 }
277 else {
278 wfAction.setErrorInfo(ErrorCode.E0729.toString(), actionConf);
279 }
280 updateList.add(wfAction);
281 }
282 catch (Exception ex) {
283 LOG.warn("Exception in SignalXCommand ", ex.getMessage(), ex);
284 throw new CommandException(ErrorCode.E0729, wfAction.getName(), ex);
285 }
286 }
287 }
288
289 }
290 else {
291 for (WorkflowActionBean newAction : WorkflowStoreService.getStartedActions(workflowInstance)) {
292 String skipVar = workflowInstance.getVar(newAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR
293 + ReRunXCommand.TO_SKIP);
294 boolean skipNewAction = false;
295 if (skipVar != null) {
296 skipNewAction = skipVar.equals("true");
297 }
298 try {
299 if (skipNewAction) {
300 WorkflowActionBean oldAction;
301
302 oldAction = jpaService.execute(new WorkflowActionGetJPAExecutor(newAction.getId()));
303
304 oldAction.setPending();
305 updateList.add(oldAction);
306
307 queue(new SignalXCommand(jobId, oldAction.getId()));
308 }
309 else {
310 checkForSuspendNode(newAction);
311 newAction.setPending();
312 String actionSlaXml = getActionSLAXml(newAction.getName(), workflowInstance.getApp()
313 .getDefinition(), wfJob.getConf());
314 newAction.setSlaXml(actionSlaXml);
315 insertList.add(newAction);
316 LOG.debug("SignalXCommand: Name: "+ newAction.getName() + ", Id: " +newAction.getId() + ", Authcode:" + newAction.getCred());
317 queue(new ActionStartXCommand(newAction.getId(), newAction.getType()));
318 }
319 }
320 catch (JPAExecutorException je) {
321 throw new CommandException(je);
322 }
323 }
324 }
325
326 try {
327 wfJob.setLastModifiedTime(new Date());
328 updateList.add(wfJob);
329 // call JPAExecutor to do the bulk writes
330 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
331 if (generateEvent && EventHandlerService.isEnabled()) {
332 generateEvent(wfJob, wfJobErrorCode, wfJobErrorMsg);
333 }
334 }
335 catch (JPAExecutorException je) {
336 throw new CommandException(je);
337 }
338 LOG.debug(
339 "Updated the workflow status to " + wfJob.getId() + " status =" + wfJob.getStatusStr());
340 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) {
341 // update coordinator action
342 new CoordActionUpdateXCommand(wfJob).call(); //Note: Called even if wf is not necessarily instantiated by coordinator
343 new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
344 }
345 LOG.debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
346 return null;
347 }
348
349 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
350 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
351 for (Map.Entry<String, String> entry : conf) {
352 eval.setVariable(entry.getKey(), entry.getValue());
353 }
354 return eval;
355 }
356
357 @SuppressWarnings("unchecked")
358 private String getActionSLAXml(String actionName, String wfXml, String wfConf) throws CommandException {
359 String slaXml = null;
360 try {
361 Element eWfJob = XmlUtils.parseXml(wfXml);
362 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
363 if (action.getAttributeValue("name").equals(actionName) == false) {
364 continue;
365 }
366 Element eSla = XmlUtils.getSLAElement(action);
367 if (eSla != null) {
368 slaXml = XmlUtils.prettyPrint(eSla).toString();
369 break;
370 }
371 }
372 }
373 catch (Exception e) {
374 throw new CommandException(ErrorCode.E1004, e.getMessage(), e);
375 }
376 return slaXml;
377 }
378
379 private String resolveSla(Element eSla, Configuration conf) throws CommandException {
380 String slaXml = null;
381 try {
382 ELEvaluator evalSla = SubmitXCommand.createELEvaluatorForGroup(conf, "wf-sla-submit");
383 slaXml = SubmitXCommand.resolveSla(eSla, evalSla);
384 }
385 catch (Exception e) {
386 throw new CommandException(ErrorCode.E1004, e.getMessage(), e);
387 }
388 return slaXml;
389 }
390
391 @SuppressWarnings("unchecked")
392 private void createSLARegistrationForAllActions(String wfXml, String user, String group, String strConf)
393 throws CommandException {
394 try {
395 Element eWfJob = XmlUtils.parseXml(wfXml);
396 Configuration conf = new XConfiguration(new StringReader(strConf));
397 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
398 Element eSla = XmlUtils.getSLAElement(action);
399 if (eSla != null) {
400 String slaXml = resolveSla(eSla, conf);
401 eSla = XmlUtils.parseXml(slaXml);
402 String actionId = Services.get().get(UUIDService.class).generateChildId(jobId,
403 action.getAttributeValue("name") + "");
404 SLAEventBean slaEvent = SLADbXOperations.createSlaRegistrationEvent(eSla, actionId,
405 SlaAppType.WORKFLOW_ACTION, user, group);
406 if(slaEvent != null) {
407 insertList.add(slaEvent);
408 }
409 }
410 }
411 }
412 catch (Exception e) {
413 throw new CommandException(ErrorCode.E1007, "workflow:Actions " + jobId, e.getMessage(), e);
414 }
415
416 }
417
418 private void checkForSuspendNode(WorkflowActionBean newAction) {
419 try {
420 XConfiguration wfjobConf = new XConfiguration(new StringReader(wfJob.getConf()));
421 String[] values = wfjobConf.getTrimmedStrings(OozieClient.OOZIE_SUSPEND_ON_NODES);
422 if (values != null) {
423 if (values.length == 1 && values[0].equals("*")) {
424 LOG.info("Reached suspend node at [{0}], suspending workflow [{1}]", newAction.getName(), wfJob.getId());
425 queue(new SuspendXCommand(jobId));
426 }
427 else {
428 for (String suspendPoint : values) {
429 if (suspendPoint.equals(newAction.getName())) {
430 LOG.info("Reached suspend node at [{0}], suspending workflow [{1}]", newAction.getName(),
431 wfJob.getId());
432 queue(new SuspendXCommand(jobId));
433 break;
434 }
435 }
436 }
437 }
438 }
439 catch (IOException ex) {
440 LOG.warn("Error reading " + OozieClient.OOZIE_SUSPEND_ON_NODES + ", ignoring [{0}]", ex.getMessage());
441 }
442 }
443
444 }