001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.wf;
019
020import java.io.IOException;
021
022import org.apache.hadoop.conf.Configuration;
023import org.apache.oozie.action.ActionExecutor;
024import org.apache.oozie.action.control.ForkActionExecutor;
025import org.apache.oozie.action.control.StartActionExecutor;
026import org.apache.oozie.client.WorkflowJob;
027import org.apache.oozie.client.SLAEvent.SlaAppType;
028import org.apache.oozie.client.SLAEvent.Status;
029import org.apache.oozie.client.rest.JsonBean;
030import org.apache.oozie.SLAEventBean;
031import org.apache.oozie.WorkflowActionBean;
032import org.apache.oozie.WorkflowJobBean;
033import org.apache.oozie.ErrorCode;
034import org.apache.oozie.XException;
035import org.apache.oozie.command.CommandException;
036import org.apache.oozie.command.PreconditionException;
037import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
038import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
039import org.apache.oozie.executor.jpa.BatchQueryExecutor;
040import org.apache.oozie.executor.jpa.JPAExecutorException;
041import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
042import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
043import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
044import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
045import org.apache.oozie.service.ActionService;
046import org.apache.oozie.service.ELService;
047import org.apache.oozie.service.EventHandlerService;
048import org.apache.oozie.service.JPAService;
049import org.apache.oozie.service.Services;
050import org.apache.oozie.service.UUIDService;
051import org.apache.oozie.service.WorkflowStoreService;
052import org.apache.oozie.workflow.WorkflowException;
053import org.apache.oozie.workflow.WorkflowInstance;
054import org.apache.oozie.workflow.lite.KillNodeDef;
055import org.apache.oozie.workflow.lite.NodeDef;
056import org.apache.oozie.util.ELEvaluator;
057import org.apache.oozie.util.InstrumentUtils;
058import org.apache.oozie.util.LogUtils;
059import org.apache.oozie.util.XConfiguration;
060import org.apache.oozie.util.ParamChecker;
061import org.apache.oozie.util.XLog;
062import org.apache.oozie.util.XmlUtils;
063import org.apache.oozie.util.db.SLADbXOperations;
064import org.jdom.Element;
065
066import java.io.StringReader;
067import java.util.ArrayList;
068import java.util.Date;
069import java.util.List;
070import java.util.Map;
071
072import org.apache.oozie.client.OozieClient;
073
074@SuppressWarnings("deprecation")
075public class SignalXCommand extends WorkflowXCommand<Void> {
076
077    private JPAService jpaService = null;
078    private String jobId;
079    private String actionId;
080    private WorkflowJobBean wfJob;
081    private WorkflowActionBean wfAction;
082    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
083    private List<JsonBean> insertList = new ArrayList<JsonBean>();
084    private boolean generateEvent = false;
085    private String wfJobErrorCode;
086    private String wfJobErrorMsg;
087
088    public SignalXCommand(String name, int priority, String jobId) {
089        super(name, name, priority);
090        this.jobId = ParamChecker.notEmpty(jobId, "jobId");
091    }
092
093    public SignalXCommand(String jobId, String actionId) {
094        this("signal", 1, jobId);
095        this.actionId = ParamChecker.notEmpty(actionId, "actionId");
096    }
097
098    @Override
099    protected boolean isLockRequired() {
100        return true;
101    }
102
103    @Override
104    public String getEntityKey() {
105        return this.jobId;
106    }
107
108    @Override
109    public String getKey() {
110        return getName() + "_" + jobId + "_" + actionId;
111    }
112
113    @Override
114    protected void loadState() throws CommandException {
115        try {
116            jpaService = Services.get().get(JPAService.class);
117            if (jpaService != null) {
118                this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId);
119                LogUtils.setLogInfo(wfJob, logInfo);
120                if (actionId != null) {
121                    this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_SIGNAL, actionId);
122                    LogUtils.setLogInfo(wfAction, logInfo);
123                }
124            }
125            else {
126                throw new CommandException(ErrorCode.E0610);
127            }
128        }
129        catch (XException ex) {
130            throw new CommandException(ex);
131        }
132    }
133
134    @Override
135    protected void verifyPrecondition() throws CommandException, PreconditionException {
136        if ((wfAction == null) || (wfAction.isComplete() && wfAction.isPending())) {
137            if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.PREP) {
138                throw new PreconditionException(ErrorCode.E0813, wfJob.getStatusStr());
139            }
140        }
141        else {
142            throw new PreconditionException(ErrorCode.E0814, actionId, wfAction.getStatusStr(), wfAction.isPending());
143        }
144    }
145
146    @Override
147    protected Void execute() throws CommandException {
148
149        LOG.debug("STARTED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
150        WorkflowInstance workflowInstance = wfJob.getWorkflowInstance();
151        workflowInstance.setTransientVar(WorkflowStoreService.WORKFLOW_BEAN, wfJob);
152        WorkflowJob.Status prevStatus = wfJob.getStatus();
153        boolean completed = false, skipAction = false;
154        WorkflowActionBean syncAction = null;
155
156        if (wfAction == null) {
157            if (wfJob.getStatus() == WorkflowJob.Status.PREP) {
158                try {
159                    completed = workflowInstance.start();
160                }
161                catch (WorkflowException e) {
162                    throw new CommandException(e);
163                }
164                wfJob.setStatus(WorkflowJob.Status.RUNNING);
165                wfJob.setStartTime(new Date());
166                wfJob.setWorkflowInstance(workflowInstance);
167                generateEvent = true;
168                // 1. Add SLA status event for WF-JOB with status STARTED
169                SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId, Status.STARTED,
170                        SlaAppType.WORKFLOW_JOB);
171                if (slaEvent != null) {
172                    insertList.add(slaEvent);
173                }
174                // 2. Add SLA registration events for all WF_ACTIONS
175                createSLARegistrationForAllActions(workflowInstance.getApp().getDefinition(), wfJob.getUser(),
176                        wfJob.getGroup(), wfJob.getConf());
177                queue(new NotificationXCommand(wfJob));
178            }
179            else {
180                throw new CommandException(ErrorCode.E0801, wfJob.getId());
181            }
182        }
183        else {
184            WorkflowInstance.Status initialStatus = workflowInstance.getStatus();
185            String skipVar = workflowInstance.getVar(wfAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR
186                    + ReRunXCommand.TO_SKIP);
187            if (skipVar != null) {
188                skipAction = skipVar.equals("true");
189            }
190            try {
191                completed = workflowInstance.signal(wfAction.getExecutionPath(), wfAction.getSignalValue());
192            }
193            catch (WorkflowException e) {
194               LOG.error("Workflow action failed : " + e.getMessage(), e);
195                wfJob.setStatus(WorkflowJob.Status.valueOf(workflowInstance.getStatus().toString()));
196                completed = true;
197            }
198            wfJob.setWorkflowInstance(workflowInstance);
199            wfAction.resetPending();
200            if (!skipAction) {
201                wfAction.setTransition(workflowInstance.getTransition(wfAction.getName()));
202                queue(new NotificationXCommand(wfJob, wfAction));
203            }
204            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_PENDING_TRANS,
205                    wfAction));
206            WorkflowInstance.Status endStatus = workflowInstance.getStatus();
207            if (endStatus != initialStatus) {
208                generateEvent = true;
209            }
210        }
211
212        if (completed) {
213            try {
214                for (String actionToKillId : WorkflowStoreService.getActionsToKill(workflowInstance)) {
215                    WorkflowActionBean actionToKill;
216
217                    actionToKill = WorkflowActionQueryExecutor.getInstance().get(
218                            WorkflowActionQuery.GET_ACTION_ID_TYPE_LASTCHECK, actionToKillId);
219
220                    actionToKill.setPending();
221                    actionToKill.setStatus(WorkflowActionBean.Status.KILLED);
222                    updateList.add(new UpdateEntry<WorkflowActionQuery>(
223                            WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, actionToKill));
224                    queue(new ActionKillXCommand(actionToKill.getId(), actionToKill.getType()));
225                }
226
227                for (String actionToFailId : WorkflowStoreService.getActionsToFail(workflowInstance)) {
228                    WorkflowActionBean actionToFail = WorkflowActionQueryExecutor.getInstance().get(
229                            WorkflowActionQuery.GET_ACTION_FAIL, actionToFailId);
230                    actionToFail.resetPending();
231                    actionToFail.setStatus(WorkflowActionBean.Status.FAILED);
232                    if (wfJobErrorCode != null) {
233                        wfJobErrorCode = actionToFail.getErrorCode();
234                        wfJobErrorMsg = actionToFail.getErrorMessage();
235                    }
236                    queue(new NotificationXCommand(wfJob, actionToFail));
237                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(),
238                            Status.FAILED, SlaAppType.WORKFLOW_ACTION);
239                    if (slaEvent != null) {
240                        insertList.add(slaEvent);
241                    }
242                    updateList.add(new UpdateEntry<WorkflowActionQuery>(
243                            WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, actionToFail));
244                }
245            }
246            catch (JPAExecutorException je) {
247                throw new CommandException(je);
248            }
249
250            wfJob.setStatus(WorkflowJob.Status.valueOf(workflowInstance.getStatus().toString()));
251            wfJob.setEndTime(new Date());
252            wfJob.setWorkflowInstance(workflowInstance);
253            Status slaStatus = Status.SUCCEEDED;
254            switch (wfJob.getStatus()) {
255                case SUCCEEDED:
256                    slaStatus = Status.SUCCEEDED;
257                    break;
258                case KILLED:
259                    slaStatus = Status.KILLED;
260                    break;
261                case FAILED:
262                    slaStatus = Status.FAILED;
263                    break;
264                default: // TODO SUSPENDED
265                    break;
266            }
267            SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId, slaStatus,
268                    SlaAppType.WORKFLOW_JOB);
269            if (slaEvent != null) {
270                insertList.add(slaEvent);
271            }
272            queue(new NotificationXCommand(wfJob));
273            if (wfJob.getStatus() == WorkflowJob.Status.SUCCEEDED) {
274                InstrumentUtils.incrJobCounter(INSTR_SUCCEEDED_JOBS_COUNTER_NAME, 1, getInstrumentation());
275            }
276
277            // output message for Kill node
278            if (wfAction != null) { // wfAction could be a no-op job
279                NodeDef nodeDef = workflowInstance.getNodeDef(wfAction.getExecutionPath());
280                if (nodeDef != null && nodeDef instanceof KillNodeDef) {
281                    boolean isRetry = false;
282                    boolean isUserRetry = false;
283                    ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry,
284                            isUserRetry);
285                    InstrumentUtils.incrJobCounter(INSTR_KILLED_JOBS_COUNTER_NAME, 1, getInstrumentation());
286                    try {
287                        String tmpNodeConf = nodeDef.getConf();
288                        String actionConf = context.getELEvaluator().evaluate(tmpNodeConf, String.class);
289                        LOG.debug(
290                                "Try to resolve KillNode message for jobid [{0}], actionId [{1}], before resolve [{2}], "
291                                        + "after resolve [{3}]", jobId, actionId, tmpNodeConf, actionConf);
292                        if (wfAction.getErrorCode() != null) {
293                            wfAction.setErrorInfo(wfAction.getErrorCode(), actionConf);
294                        }
295                        else {
296                            wfAction.setErrorInfo(ErrorCode.E0729.toString(), actionConf);
297                        }
298                        updateList.add(new UpdateEntry<WorkflowActionQuery>(
299                                WorkflowActionQuery.UPDATE_ACTION_PENDING_TRANS_ERROR, wfAction));
300                    }
301                    catch (Exception ex) {
302                        LOG.warn("Exception in SignalXCommand ", ex.getMessage(), ex);
303                        throw new CommandException(ErrorCode.E0729, wfAction.getName(), ex);
304                    }
305                }
306            }
307
308        }
309        else {
310            for (WorkflowActionBean newAction : WorkflowStoreService.getActionsToStart(workflowInstance)) {
311                String skipVar = workflowInstance.getVar(newAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR
312                        + ReRunXCommand.TO_SKIP);
313                boolean skipNewAction = false, suspendNewAction = false;
314                if (skipVar != null) {
315                    skipNewAction = skipVar.equals("true");
316                }
317
318                if (skipNewAction) {
319                    WorkflowActionBean oldAction = new WorkflowActionBean();
320                    oldAction.setId(newAction.getId());
321                    oldAction.setPending();
322                    updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_PENDING,
323                            oldAction));
324                    queue(new SignalXCommand(jobId, oldAction.getId()));
325                }
326                else {
327                    try {
328                        // Make sure that transition node for a forked action
329                        // is inserted only once
330                        WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_ID_TYPE_LASTCHECK,
331                                newAction.getId());
332
333                        continue;
334                    }
335                    catch (JPAExecutorException jee) {
336                    }
337                    suspendNewAction = checkForSuspendNode(newAction);
338                    newAction.setPending();
339                    String actionSlaXml = getActionSLAXml(newAction.getName(), workflowInstance.getApp()
340                            .getDefinition(), wfJob.getConf());
341                    newAction.setSlaXml(actionSlaXml);
342                    newAction.setCreatedTime(new Date());
343                    insertList.add(newAction);
344                    LOG.debug("SignalXCommand: Name: " + newAction.getName() + ", Id: " + newAction.getId()
345                            + ", Authcode:" + newAction.getCred());
346                    if (wfAction != null) { // null during wf job submit
347                        ActionService as = Services.get().get(ActionService.class);
348                        ActionExecutor current = as.getExecutor(wfAction.getType());
349                        LOG.trace("Current Action Type:" + current.getClass());
350                        if (!suspendNewAction) {
351                            if (!(current instanceof ForkActionExecutor) && !(current instanceof StartActionExecutor)) {
352                                // Excluding :start: here from executing first action synchronously since it
353                                // blocks the consumer thread till the action is submitted to Hadoop,
354                                // in turn reducing the number of new submissions the threads can accept.
355                                // Would also be susceptible to longer delays in case Hadoop cluster is busy.
356                                syncAction = newAction;
357                            }
358                            else {
359                                queue(new ActionStartXCommand(newAction.getId(), newAction.getType()));
360                            }
361                        }
362                    }
363                    else {
364                        syncAction = newAction; // first action after wf submit should always be sync
365                    }
366                }
367            }
368        }
369
370        try {
371            wfJob.setLastModifiedTime(new Date());
372            updateList.add(new UpdateEntry<WorkflowJobQuery>(
373                    WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob));
374            // call JPAExecutor to do the bulk writes
375            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
376            if (prevStatus != wfJob.getStatus()) {
377                LOG.debug("Updated the workflow status to " + wfJob.getId() + "  status =" + wfJob.getStatusStr());
378            }
379            if (generateEvent && EventHandlerService.isEnabled()) {
380                generateEvent(wfJob, wfJobErrorCode, wfJobErrorMsg);
381            }
382        }
383        catch (JPAExecutorException je) {
384            throw new CommandException(je);
385        }
386        // Changing to synchronous call from asynchronous queuing to prevent
387        // undue delay from between end of previous and start of next action
388        if (wfJob.getStatus() != WorkflowJob.Status.RUNNING
389                && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) {
390            // only for asynchronous actions, parent coord action's external id will
391            // persisted and following update will succeed.
392            updateParentIfNecessary(wfJob);
393            new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
394        }
395        else if (syncAction != null) {
396            new ActionStartXCommand(wfJob, syncAction.getId(), syncAction.getType()).call(getEntityKey());
397        }
398        LOG.debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
399        return null;
400    }
401
402    public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
403        ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
404        for (Map.Entry<String, String> entry : conf) {
405            eval.setVariable(entry.getKey(), entry.getValue());
406        }
407        return eval;
408    }
409
410    @SuppressWarnings("unchecked")
411    private String getActionSLAXml(String actionName, String wfXml, String wfConf) throws CommandException {
412        String slaXml = null;
413        try {
414            Element eWfJob = XmlUtils.parseXml(wfXml);
415            for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
416                if (action.getAttributeValue("name").equals(actionName) == false) {
417                    continue;
418                }
419                Element eSla = XmlUtils.getSLAElement(action);
420                if (eSla != null) {
421                    slaXml = XmlUtils.prettyPrint(eSla).toString();
422                    break;
423                }
424            }
425        }
426        catch (Exception e) {
427            throw new CommandException(ErrorCode.E1004, e.getMessage(), e);
428        }
429        return slaXml;
430    }
431
432    private String resolveSla(Element eSla, Configuration conf) throws CommandException {
433        String slaXml = null;
434        try {
435            ELEvaluator evalSla = SubmitXCommand.createELEvaluatorForGroup(conf, "wf-sla-submit");
436            slaXml = SubmitXCommand.resolveSla(eSla, evalSla);
437        }
438        catch (Exception e) {
439            throw new CommandException(ErrorCode.E1004, e.getMessage(), e);
440        }
441        return slaXml;
442    }
443
444    @SuppressWarnings("unchecked")
445    private void createSLARegistrationForAllActions(String wfXml, String user, String group, String strConf)
446            throws CommandException {
447        try {
448            Element eWfJob = XmlUtils.parseXml(wfXml);
449            Configuration conf = new XConfiguration(new StringReader(strConf));
450            for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
451                Element eSla = XmlUtils.getSLAElement(action);
452                if (eSla != null) {
453                    String slaXml = resolveSla(eSla, conf);
454                    eSla = XmlUtils.parseXml(slaXml);
455                    String actionId = Services.get().get(UUIDService.class)
456                            .generateChildId(jobId, action.getAttributeValue("name") + "");
457                    SLAEventBean slaEvent = SLADbXOperations.createSlaRegistrationEvent(eSla, actionId,
458                            SlaAppType.WORKFLOW_ACTION, user, group);
459                    if (slaEvent != null) {
460                        insertList.add(slaEvent);
461                    }
462                }
463            }
464        }
465        catch (Exception e) {
466            throw new CommandException(ErrorCode.E1007, "workflow:Actions " + jobId, e.getMessage(), e);
467        }
468
469    }
470
471    private boolean checkForSuspendNode(WorkflowActionBean newAction) {
472        boolean suspendNewAction = false;
473        try {
474            XConfiguration wfjobConf = new XConfiguration(new StringReader(wfJob.getConf()));
475            String[] values = wfjobConf.getTrimmedStrings(OozieClient.OOZIE_SUSPEND_ON_NODES);
476            if (values != null) {
477                if (values.length == 1 && values[0].equals("*")) {
478                    LOG.info("Reached suspend node at [{0}], suspending workflow [{1}]", newAction.getName(),
479                            wfJob.getId());
480                    queue(new SuspendXCommand(jobId));
481                    suspendNewAction = true;
482                }
483                else {
484                    for (String suspendPoint : values) {
485                        if (suspendPoint.equals(newAction.getName())) {
486                            LOG.info("Reached suspend node at [{0}], suspending workflow [{1}]", newAction.getName(),
487                                    wfJob.getId());
488                            queue(new SuspendXCommand(jobId));
489                            suspendNewAction = true;
490                            break;
491                        }
492                    }
493                }
494            }
495        }
496        catch (IOException ex) {
497            LOG.warn("Error reading " + OozieClient.OOZIE_SUSPEND_ON_NODES + ", ignoring [{0}]", ex.getMessage());
498        }
499        return suspendNewAction;
500    }
501
502}