001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import java.io.IOException;
021    import org.apache.hadoop.conf.Configuration;
022    import org.apache.oozie.client.WorkflowJob;
023    import org.apache.oozie.client.SLAEvent.SlaAppType;
024    import org.apache.oozie.client.SLAEvent.Status;
025    import org.apache.oozie.client.rest.JsonBean;
026    import org.apache.oozie.SLAEventBean;
027    import org.apache.oozie.WorkflowActionBean;
028    import org.apache.oozie.WorkflowJobBean;
029    import org.apache.oozie.ErrorCode;
030    import org.apache.oozie.XException;
031    import org.apache.oozie.command.CommandException;
032    import org.apache.oozie.command.PreconditionException;
033    import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
034    import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
035    import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
036    import org.apache.oozie.executor.jpa.JPAExecutorException;
037    import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
038    import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
039    import org.apache.oozie.service.ELService;
040    import org.apache.oozie.service.EventHandlerService;
041    import org.apache.oozie.service.JPAService;
042    import org.apache.oozie.service.Services;
043    import org.apache.oozie.service.UUIDService;
044    import org.apache.oozie.service.WorkflowStoreService;
045    import org.apache.oozie.workflow.WorkflowException;
046    import org.apache.oozie.workflow.WorkflowInstance;
047    import org.apache.oozie.workflow.lite.KillNodeDef;
048    import org.apache.oozie.workflow.lite.NodeDef;
049    import org.apache.oozie.util.ELEvaluator;
050    import org.apache.oozie.util.InstrumentUtils;
051    import org.apache.oozie.util.LogUtils;
052    import org.apache.oozie.util.XConfiguration;
053    import org.apache.oozie.util.ParamChecker;
054    import org.apache.oozie.util.XmlUtils;
055    import org.apache.oozie.util.db.SLADbXOperations;
056    import org.jdom.Element;
057    import java.io.StringReader;
058    import java.util.ArrayList;
059    import java.util.Date;
060    import java.util.List;
061    import java.util.Map;
062    import org.apache.oozie.client.OozieClient;
063    
064    @SuppressWarnings("deprecation")
065    public class SignalXCommand extends WorkflowXCommand<Void> {
066    
067        protected static final String INSTR_SUCCEEDED_JOBS_COUNTER_NAME = "succeeded";
068    
069        private JPAService jpaService = null;
070        private String jobId;
071        private String actionId;
072        private WorkflowJobBean wfJob;
073        private WorkflowActionBean wfAction;
074        private List<JsonBean> updateList = new ArrayList<JsonBean>();
075        private List<JsonBean> insertList = new ArrayList<JsonBean>();
076        private boolean generateEvent = false;
077        private String wfJobErrorCode;
078        private String wfJobErrorMsg;
079    
080    
081        public SignalXCommand(String name, int priority, String jobId) {
082            super(name, name, priority);
083            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
084        }
085    
086        public SignalXCommand(String jobId, String actionId) {
087            this("signal", 1, jobId);
088            this.actionId = ParamChecker.notEmpty(actionId, "actionId");
089        }
090    
091        @Override
092        protected boolean isLockRequired() {
093            return true;
094        }
095    
096        @Override
097        public String getEntityKey() {
098            return this.jobId;
099        }
100    
101        @Override
102        public String getKey() {
103            return getName() + "_" + jobId + "_" + actionId;
104        }
105    
106        @Override
107        protected void loadState() throws CommandException {
108            try {
109                jpaService = Services.get().get(JPAService.class);
110                if (jpaService != null) {
111                    this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
112                    LogUtils.setLogInfo(wfJob, logInfo);
113                    if (actionId != null) {
114                        this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
115                        LogUtils.setLogInfo(wfAction, logInfo);
116                    }
117                }
118                else {
119                    throw new CommandException(ErrorCode.E0610);
120                }
121            }
122            catch (XException ex) {
123                throw new CommandException(ex);
124            }
125        }
126    
127        @Override
128        protected void verifyPrecondition() throws CommandException, PreconditionException {
129            if ((wfAction == null) || (wfAction.isComplete() && wfAction.isPending())) {
130                if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.PREP) {
131                    throw new PreconditionException(ErrorCode.E0813, wfJob.getStatusStr());
132                }
133            }
134            else {
135                throw new PreconditionException(ErrorCode.E0814, actionId, wfAction.getStatusStr(), wfAction.isPending());
136            }
137        }
138    
139        @Override
140        protected Void execute() throws CommandException {
141            LOG.debug("STARTED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
142            WorkflowInstance workflowInstance = wfJob.getWorkflowInstance();
143            workflowInstance.setTransientVar(WorkflowStoreService.WORKFLOW_BEAN, wfJob);
144            boolean completed = false;
145            boolean skipAction = false;
146            if (wfAction == null) {
147                if (wfJob.getStatus() == WorkflowJob.Status.PREP) {
148                    try {
149                        completed = workflowInstance.start();
150                    }
151                    catch (WorkflowException e) {
152                        throw new CommandException(e);
153                    }
154                    wfJob.setStatus(WorkflowJob.Status.RUNNING);
155                    wfJob.setStartTime(new Date());
156                    wfJob.setWorkflowInstance(workflowInstance);
157                    generateEvent = true;
158                    // 1. Add SLA status event for WF-JOB with status STARTED
159                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId,
160                            Status.STARTED, SlaAppType.WORKFLOW_JOB);
161                    if(slaEvent != null) {
162                        insertList.add(slaEvent);
163                    }
164                    // 2. Add SLA registration events for all WF_ACTIONS
165                    createSLARegistrationForAllActions(workflowInstance.getApp().getDefinition(), wfJob.getUser(), wfJob
166                            .getGroup(), wfJob.getConf());
167                    queue(new NotificationXCommand(wfJob));
168                }
169                else {
170                    throw new CommandException(ErrorCode.E0801, wfJob.getId());
171                }
172            }
173            else {
174                WorkflowInstance.Status initialStatus = workflowInstance.getStatus();
175                String skipVar = workflowInstance.getVar(wfAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR
176                        + ReRunXCommand.TO_SKIP);
177                if (skipVar != null) {
178                    skipAction = skipVar.equals("true");
179                }
180                try {
181                    completed = workflowInstance.signal(wfAction.getExecutionPath(), wfAction.getSignalValue());
182                }
183                catch (WorkflowException e) {
184                    throw new CommandException(e);
185                }
186                wfJob.setWorkflowInstance(workflowInstance);
187                wfAction.resetPending();
188                if (!skipAction) {
189                    wfAction.setTransition(workflowInstance.getTransition(wfAction.getName()));
190                    queue(new NotificationXCommand(wfJob, wfAction));
191                }
192                updateList.add(wfAction);
193                WorkflowInstance.Status endStatus = workflowInstance.getStatus();
194                if (endStatus != initialStatus) {
195                    generateEvent = true;
196                }
197            }
198    
199            if (completed) {
200                try {
201                    for (String actionToKillId : WorkflowStoreService.getActionsToKill(workflowInstance)) {
202                        WorkflowActionBean actionToKill;
203    
204                        actionToKill = jpaService.execute(new WorkflowActionGetJPAExecutor(actionToKillId));
205    
206                        actionToKill.setPending();
207                        actionToKill.setStatus(WorkflowActionBean.Status.KILLED);
208                        updateList.add(actionToKill);
209                        queue(new ActionKillXCommand(actionToKill.getId(), actionToKill.getType()));
210                    }
211    
212                    for (String actionToFailId : WorkflowStoreService.getActionsToFail(workflowInstance)) {
213                        WorkflowActionBean actionToFail = jpaService.execute(new WorkflowActionGetJPAExecutor(
214                                actionToFailId));
215                        actionToFail.resetPending();
216                        actionToFail.setStatus(WorkflowActionBean.Status.FAILED);
217                        if (wfJobErrorCode != null) {
218                            wfJobErrorCode = actionToFail.getErrorCode();
219                            wfJobErrorMsg = actionToFail.getErrorMessage();
220                        }
221                        queue(new NotificationXCommand(wfJob, actionToFail));
222                        SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(),
223                                Status.FAILED, SlaAppType.WORKFLOW_ACTION);
224                        if(slaEvent != null) {
225                            insertList.add(slaEvent);
226                        }
227                        updateList.add(actionToFail);
228                    }
229                }
230                catch (JPAExecutorException je) {
231                    throw new CommandException(je);
232                }
233    
234                wfJob.setStatus(WorkflowJob.Status.valueOf(workflowInstance.getStatus().toString()));
235                wfJob.setEndTime(new Date());
236                wfJob.setWorkflowInstance(workflowInstance);
237                Status slaStatus = Status.SUCCEEDED;
238                switch (wfJob.getStatus()) {
239                    case SUCCEEDED:
240                        slaStatus = Status.SUCCEEDED;
241                        break;
242                    case KILLED:
243                        slaStatus = Status.KILLED;
244                        break;
245                    case FAILED:
246                        slaStatus = Status.FAILED;
247                        break;
248                    default: // TODO SUSPENDED
249                        break;
250                }
251                SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId,
252                        slaStatus, SlaAppType.WORKFLOW_JOB);
253                if(slaEvent != null) {
254                    insertList.add(slaEvent);
255                }
256                queue(new NotificationXCommand(wfJob));
257                if (wfJob.getStatus() == WorkflowJob.Status.SUCCEEDED) {
258                    InstrumentUtils.incrJobCounter(INSTR_SUCCEEDED_JOBS_COUNTER_NAME, 1, getInstrumentation());
259                }
260    
261                // output message for Kill node
262                if (wfAction != null) { // wfAction could be a no-op job
263                    NodeDef nodeDef = workflowInstance.getNodeDef(wfAction.getExecutionPath());
264                    if (nodeDef != null && nodeDef instanceof KillNodeDef) {
265                        boolean isRetry = false;
266                        boolean isUserRetry = false;
267                        ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry,
268                                isUserRetry);
269                        try {
270                            String tmpNodeConf = nodeDef.getConf();
271                            String actionConf = context.getELEvaluator().evaluate(tmpNodeConf, String.class);
272                            LOG.debug("Try to resolve KillNode message for jobid [{0}], actionId [{1}], before resolve [{2}], after resolve [{3}]",
273                                            jobId, actionId, tmpNodeConf, actionConf);
274                            if (wfAction.getErrorCode() != null) {
275                                wfAction.setErrorInfo(wfAction.getErrorCode(), actionConf);
276                            }
277                            else {
278                                wfAction.setErrorInfo(ErrorCode.E0729.toString(), actionConf);
279                            }
280                            updateList.add(wfAction);
281                        }
282                        catch (Exception ex) {
283                            LOG.warn("Exception in SignalXCommand ", ex.getMessage(), ex);
284                            throw new CommandException(ErrorCode.E0729, wfAction.getName(), ex);
285                        }
286                    }
287                }
288    
289            }
290            else {
291                for (WorkflowActionBean newAction : WorkflowStoreService.getStartedActions(workflowInstance)) {
292                    String skipVar = workflowInstance.getVar(newAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR
293                            + ReRunXCommand.TO_SKIP);
294                    boolean skipNewAction = false;
295                    if (skipVar != null) {
296                        skipNewAction = skipVar.equals("true");
297                    }
298                    try {
299                        if (skipNewAction) {
300                            WorkflowActionBean oldAction;
301    
302                            oldAction = jpaService.execute(new WorkflowActionGetJPAExecutor(newAction.getId()));
303    
304                            oldAction.setPending();
305                            updateList.add(oldAction);
306    
307                            queue(new SignalXCommand(jobId, oldAction.getId()));
308                        }
309                        else {
310                            checkForSuspendNode(newAction);
311                            newAction.setPending();
312                            String actionSlaXml = getActionSLAXml(newAction.getName(), workflowInstance.getApp()
313                                    .getDefinition(), wfJob.getConf());
314                            newAction.setSlaXml(actionSlaXml);
315                            insertList.add(newAction);
316                            LOG.debug("SignalXCommand: Name: "+ newAction.getName() + ", Id: " +newAction.getId() + ", Authcode:" + newAction.getCred());
317                            queue(new ActionStartXCommand(newAction.getId(), newAction.getType()));
318                        }
319                    }
320                    catch (JPAExecutorException je) {
321                        throw new CommandException(je);
322                    }
323                }
324            }
325    
326            try {
327                wfJob.setLastModifiedTime(new Date());
328                updateList.add(wfJob);
329                // call JPAExecutor to do the bulk writes
330                jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
331                if (generateEvent && EventHandlerService.isEnabled()) {
332                    generateEvent(wfJob, wfJobErrorCode, wfJobErrorMsg);
333                }
334            }
335            catch (JPAExecutorException je) {
336                throw new CommandException(je);
337            }
338            LOG.debug(
339                    "Updated the workflow status to " + wfJob.getId() + "  status =" + wfJob.getStatusStr());
340            if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) {
341                // update coordinator action
342                new CoordActionUpdateXCommand(wfJob).call();    //Note: Called even if wf is not necessarily instantiated by coordinator
343                new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
344            }
345            LOG.debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
346            return null;
347        }
348    
349        public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
350            ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
351            for (Map.Entry<String, String> entry : conf) {
352                eval.setVariable(entry.getKey(), entry.getValue());
353            }
354            return eval;
355        }
356    
357        @SuppressWarnings("unchecked")
358        private String getActionSLAXml(String actionName, String wfXml, String wfConf) throws CommandException {
359            String slaXml = null;
360            try {
361                Element eWfJob = XmlUtils.parseXml(wfXml);
362                for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
363                    if (action.getAttributeValue("name").equals(actionName) == false) {
364                        continue;
365                    }
366                    Element eSla = XmlUtils.getSLAElement(action);
367                    if (eSla != null) {
368                        slaXml = XmlUtils.prettyPrint(eSla).toString();
369                        break;
370                    }
371                }
372            }
373            catch (Exception e) {
374                throw new CommandException(ErrorCode.E1004, e.getMessage(), e);
375            }
376            return slaXml;
377        }
378    
379        private String resolveSla(Element eSla, Configuration conf) throws CommandException {
380            String slaXml = null;
381            try {
382                ELEvaluator evalSla = SubmitXCommand.createELEvaluatorForGroup(conf, "wf-sla-submit");
383                slaXml = SubmitXCommand.resolveSla(eSla, evalSla);
384            }
385            catch (Exception e) {
386                throw new CommandException(ErrorCode.E1004, e.getMessage(), e);
387            }
388            return slaXml;
389        }
390    
391        @SuppressWarnings("unchecked")
392        private void createSLARegistrationForAllActions(String wfXml, String user, String group, String strConf)
393                throws CommandException {
394            try {
395                Element eWfJob = XmlUtils.parseXml(wfXml);
396                Configuration conf = new XConfiguration(new StringReader(strConf));
397                for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
398                    Element eSla = XmlUtils.getSLAElement(action);
399                    if (eSla != null) {
400                        String slaXml = resolveSla(eSla, conf);
401                        eSla = XmlUtils.parseXml(slaXml);
402                        String actionId = Services.get().get(UUIDService.class).generateChildId(jobId,
403                                action.getAttributeValue("name") + "");
404                        SLAEventBean slaEvent = SLADbXOperations.createSlaRegistrationEvent(eSla, actionId,
405                                SlaAppType.WORKFLOW_ACTION, user, group);
406                        if(slaEvent != null) {
407                            insertList.add(slaEvent);
408                        }
409                    }
410                }
411            }
412            catch (Exception e) {
413                throw new CommandException(ErrorCode.E1007, "workflow:Actions " + jobId, e.getMessage(), e);
414            }
415    
416        }
417    
418        private void checkForSuspendNode(WorkflowActionBean newAction) {
419            try {
420                XConfiguration wfjobConf = new XConfiguration(new StringReader(wfJob.getConf()));
421                String[] values = wfjobConf.getTrimmedStrings(OozieClient.OOZIE_SUSPEND_ON_NODES);
422                if (values != null) {
423                    if (values.length == 1 && values[0].equals("*")) {
424                        LOG.info("Reached suspend node at [{0}], suspending workflow [{1}]", newAction.getName(), wfJob.getId());
425                        queue(new SuspendXCommand(jobId));
426                    }
427                    else {
428                        for (String suspendPoint : values) {
429                            if (suspendPoint.equals(newAction.getName())) {
430                                LOG.info("Reached suspend node at [{0}], suspending workflow [{1}]", newAction.getName(),
431                                        wfJob.getId());
432                                queue(new SuspendXCommand(jobId));
433                                break;
434                            }
435                        }
436                    }
437                }
438            }
439            catch (IOException ex) {
440                LOG.warn("Error reading " + OozieClient.OOZIE_SUSPEND_ON_NODES + ", ignoring [{0}]", ex.getMessage());
441            }
442        }
443    
444    }