001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.oozie.client.WorkflowJob;
022    import org.apache.oozie.client.SLAEvent.SlaAppType;
023    import org.apache.oozie.client.SLAEvent.Status;
024    import org.apache.oozie.client.rest.JsonBean;
025    import org.apache.oozie.SLAEventBean;
026    import org.apache.oozie.WorkflowActionBean;
027    import org.apache.oozie.WorkflowJobBean;
028    import org.apache.oozie.ErrorCode;
029    import org.apache.oozie.XException;
030    import org.apache.oozie.command.CommandException;
031    import org.apache.oozie.command.PreconditionException;
032    import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
033    import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
034    import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
035    import org.apache.oozie.executor.jpa.JPAExecutorException;
036    import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
037    import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
038    import org.apache.oozie.service.ELService;
039    import org.apache.oozie.service.JPAService;
040    import org.apache.oozie.service.SchemaService;
041    import org.apache.oozie.service.Services;
042    import org.apache.oozie.service.UUIDService;
043    import org.apache.oozie.service.WorkflowStoreService;
044    import org.apache.oozie.workflow.WorkflowException;
045    import org.apache.oozie.workflow.WorkflowInstance;
046    import org.apache.oozie.workflow.lite.KillNodeDef;
047    import org.apache.oozie.workflow.lite.NodeDef;
048    import org.apache.oozie.util.ELEvaluator;
049    import org.apache.oozie.util.InstrumentUtils;
050    import org.apache.oozie.util.LogUtils;
051    import org.apache.oozie.util.XConfiguration;
052    import org.apache.oozie.util.ParamChecker;
053    import org.apache.oozie.util.XmlUtils;
054    import org.apache.oozie.util.db.SLADbXOperations;
055    import org.jdom.Element;
056    import org.jdom.Namespace;
057    
058    import java.io.StringReader;
059    import java.util.ArrayList;
060    import java.util.Date;
061    import java.util.List;
062    import java.util.Map;
063    
064    public class SignalXCommand extends WorkflowXCommand<Void> {
065    
066        protected static final String INSTR_SUCCEEDED_JOBS_COUNTER_NAME = "succeeded";
067    
068        private JPAService jpaService = null;
069        private String jobId;
070        private String actionId;
071        private WorkflowJobBean wfJob;
072        private WorkflowActionBean wfAction;
073        private List<JsonBean> updateList = new ArrayList<JsonBean>();
074        private List<JsonBean> insertList = new ArrayList<JsonBean>();
075    
076    
077        public SignalXCommand(String name, int priority, String jobId) {
078            super(name, name, priority);
079            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
080        }
081    
082        public SignalXCommand(String jobId, String actionId) {
083            this("signal", 1, jobId);
084            this.actionId = ParamChecker.notEmpty(actionId, "actionId");
085        }
086    
087        @Override
088        protected boolean isLockRequired() {
089            return true;
090        }
091    
092        @Override
093        public String getEntityKey() {
094            return this.jobId;
095        }
096    
097        @Override
098        protected void loadState() throws CommandException {
099            try {
100                jpaService = Services.get().get(JPAService.class);
101                if (jpaService != null) {
102                    this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
103                    LogUtils.setLogInfo(wfJob, logInfo);
104                    if (actionId != null) {
105                        this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
106                        LogUtils.setLogInfo(wfAction, logInfo);
107                    }
108                }
109                else {
110                    throw new CommandException(ErrorCode.E0610);
111                }
112            }
113            catch (XException ex) {
114                throw new CommandException(ex);
115            }
116        }
117    
118        @Override
119        protected void verifyPrecondition() throws CommandException, PreconditionException {
120            if ((wfAction == null) || (wfAction.isComplete() && wfAction.isPending())) {
121                if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.PREP) {
122                    throw new PreconditionException(ErrorCode.E0813, wfJob.getStatusStr());
123                }
124            }
125            else {
126                throw new PreconditionException(ErrorCode.E0814, actionId, wfAction.getStatusStr(), wfAction.isPending());
127            }
128        }
129    
130        @Override
131        protected Void execute() throws CommandException {
132            LOG.debug("STARTED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
133            WorkflowInstance workflowInstance = wfJob.getWorkflowInstance();
134            workflowInstance.setTransientVar(WorkflowStoreService.WORKFLOW_BEAN, wfJob);
135            boolean completed = false;
136            boolean skipAction = false;
137            if (wfAction == null) {
138                if (wfJob.getStatus() == WorkflowJob.Status.PREP) {
139                    try {
140                        completed = workflowInstance.start();
141                    }
142                    catch (WorkflowException e) {
143                        throw new CommandException(e);
144                    }
145                    wfJob.setStatus(WorkflowJob.Status.RUNNING);
146                    wfJob.setStartTime(new Date());
147                    wfJob.setWorkflowInstance(workflowInstance);
148                    // 1. Add SLA status event for WF-JOB with status STARTED
149                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId,
150                            Status.STARTED, SlaAppType.WORKFLOW_JOB);
151                    if(slaEvent != null) {
152                        insertList.add(slaEvent);
153                    }
154                    // 2. Add SLA registration events for all WF_ACTIONS
155                    createSLARegistrationForAllActions(workflowInstance.getApp().getDefinition(), wfJob.getUser(), wfJob
156                            .getGroup(), wfJob.getConf());
157                    queue(new NotificationXCommand(wfJob));
158                }
159                else {
160                    throw new CommandException(ErrorCode.E0801, wfJob.getId());
161                }
162            }
163            else {
164                String skipVar = workflowInstance.getVar(wfAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR
165                        + ReRunXCommand.TO_SKIP);
166                if (skipVar != null) {
167                    skipAction = skipVar.equals("true");
168                }
169                try {
170                    completed = workflowInstance.signal(wfAction.getExecutionPath(), wfAction.getSignalValue());
171                }
172                catch (WorkflowException e) {
173                    throw new CommandException(e);
174                }
175                wfJob.setWorkflowInstance(workflowInstance);
176                wfAction.resetPending();
177                if (!skipAction) {
178                    wfAction.setTransition(workflowInstance.getTransition(wfAction.getName()));
179                    queue(new NotificationXCommand(wfJob, wfAction));
180                }
181                updateList.add(wfAction);
182            }
183    
184            if (completed) {
185                try {
186                    for (String actionToKillId : WorkflowStoreService.getActionsToKill(workflowInstance)) {
187                        WorkflowActionBean actionToKill;
188    
189                        actionToKill = jpaService.execute(new WorkflowActionGetJPAExecutor(actionToKillId));
190    
191                        actionToKill.setPending();
192                        actionToKill.setStatus(WorkflowActionBean.Status.KILLED);
193                        updateList.add(actionToKill);
194                        queue(new ActionKillXCommand(actionToKill.getId(), actionToKill.getType()));
195                    }
196    
197                    for (String actionToFailId : WorkflowStoreService.getActionsToFail(workflowInstance)) {
198                        WorkflowActionBean actionToFail = jpaService.execute(new WorkflowActionGetJPAExecutor(
199                                actionToFailId));
200                        actionToFail.resetPending();
201                        actionToFail.setStatus(WorkflowActionBean.Status.FAILED);
202                        queue(new NotificationXCommand(wfJob, actionToFail));
203                        SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(),
204                                Status.FAILED, SlaAppType.WORKFLOW_ACTION);
205                        if(slaEvent != null) {
206                            insertList.add(slaEvent);
207                        }
208                        updateList.add(actionToFail);
209                    }
210                }
211                catch (JPAExecutorException je) {
212                    throw new CommandException(je);
213                }
214    
215                wfJob.setStatus(WorkflowJob.Status.valueOf(workflowInstance.getStatus().toString()));
216                wfJob.setEndTime(new Date());
217                wfJob.setWorkflowInstance(workflowInstance);
218                Status slaStatus = Status.SUCCEEDED;
219                switch (wfJob.getStatus()) {
220                    case SUCCEEDED:
221                        slaStatus = Status.SUCCEEDED;
222                        break;
223                    case KILLED:
224                        slaStatus = Status.KILLED;
225                        break;
226                    case FAILED:
227                        slaStatus = Status.FAILED;
228                        break;
229                    default: // TODO SUSPENDED
230                        break;
231                }
232                SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId,
233                        slaStatus, SlaAppType.WORKFLOW_JOB);
234                if(slaEvent != null) {
235                    insertList.add(slaEvent);
236                }
237                queue(new NotificationXCommand(wfJob));
238                if (wfJob.getStatus() == WorkflowJob.Status.SUCCEEDED) {
239                    InstrumentUtils.incrJobCounter(INSTR_SUCCEEDED_JOBS_COUNTER_NAME, 1, getInstrumentation());
240                }
241    
242                // output message for Kill node
243                if (wfAction != null) { // wfAction could be a no-op job
244                    NodeDef nodeDef = workflowInstance.getNodeDef(wfAction.getExecutionPath());
245                    if (nodeDef != null && nodeDef instanceof KillNodeDef) {
246                        boolean isRetry = false;
247                        boolean isUserRetry = false;
248                        ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry,
249                                isUserRetry);
250                        try {
251                            String tmpNodeConf = nodeDef.getConf();
252                            String actionConf = context.getELEvaluator().evaluate(tmpNodeConf, String.class);
253                            LOG.debug("Try to resolve KillNode message for jobid [{0}], actionId [{1}], before resolve [{2}], after resolve [{3}]",
254                                            jobId, actionId, tmpNodeConf, actionConf);
255                            if (wfAction.getErrorCode() != null) {
256                                wfAction.setErrorInfo(wfAction.getErrorCode(), actionConf);
257                            }
258                            else {
259                                wfAction.setErrorInfo(ErrorCode.E0729.toString(), actionConf);
260                            }
261                            updateList.add(wfAction);
262                        }
263                        catch (Exception ex) {
264                            LOG.warn("Exception in SignalXCommand ", ex.getMessage(), ex);
265                            throw new CommandException(ErrorCode.E0729, wfAction.getName(), ex);
266                        }
267                    }
268                }
269    
270            }
271            else {
272                for (WorkflowActionBean newAction : WorkflowStoreService.getStartedActions(workflowInstance)) {
273                    String skipVar = workflowInstance.getVar(newAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR
274                            + ReRunXCommand.TO_SKIP);
275                    boolean skipNewAction = false;
276                    if (skipVar != null) {
277                        skipNewAction = skipVar.equals("true");
278                    }
279                    try {
280                        if (skipNewAction) {
281                            WorkflowActionBean oldAction;
282    
283                            oldAction = jpaService.execute(new WorkflowActionGetJPAExecutor(newAction.getId()));
284    
285                            oldAction.setPending();
286                            updateList.add(oldAction);
287    
288                            queue(new SignalXCommand(jobId, oldAction.getId()));
289                        }
290                        else {
291                            newAction.setPending();
292                            String actionSlaXml = getActionSLAXml(newAction.getName(), workflowInstance.getApp()
293                                    .getDefinition(), wfJob.getConf());
294                            newAction.setSlaXml(actionSlaXml);
295                            insertList.add(newAction);
296                            LOG.debug("SignalXCommand: Name: "+ newAction.getName() + ", Id: " +newAction.getId() + ", Authcode:" + newAction.getCred());
297                            queue(new ActionStartXCommand(newAction.getId(), newAction.getType()));
298                        }
299                    }
300                    catch (JPAExecutorException je) {
301                        throw new CommandException(je);
302                    }
303                }
304            }
305    
306            try {
307                wfJob.setLastModifiedTime(new Date());
308                updateList.add(wfJob);
309                // call JPAExecutor to do the bulk writes
310                jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
311            }
312            catch (JPAExecutorException je) {
313                throw new CommandException(je);
314            }
315            LOG.debug(
316                    "Updated the workflow status to " + wfJob.getId() + "  status =" + wfJob.getStatusStr());
317            if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) {
318                // update coordinator action
319                new CoordActionUpdateXCommand(wfJob).call();    //Note: Called even if wf is not necessarily instantiated by coordinator
320                new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
321            }
322            LOG.debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
323            return null;
324        }
325    
326        public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
327            ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
328            for (Map.Entry<String, String> entry : conf) {
329                eval.setVariable(entry.getKey(), entry.getValue());
330            }
331            return eval;
332        }
333    
334        @SuppressWarnings("unchecked")
335        private String getActionSLAXml(String actionName, String wfXml, String wfConf) throws CommandException {
336            String slaXml = null;
337            try {
338                Element eWfJob = XmlUtils.parseXml(wfXml);
339                for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
340                    if (action.getAttributeValue("name").equals(actionName) == false) {
341                        continue;
342                    }
343                    Element eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
344                    if (eSla != null) {
345                        slaXml = XmlUtils.prettyPrint(eSla).toString();
346                        break;
347                    }
348                }
349            }
350            catch (Exception e) {
351                throw new CommandException(ErrorCode.E1004, e.getMessage(), e);
352            }
353            return slaXml;
354        }
355    
356        private String resolveSla(Element eSla, Configuration conf) throws CommandException {
357            String slaXml = null;
358            try {
359                ELEvaluator evalSla = SubmitXCommand.createELEvaluatorForGroup(conf, "wf-sla-submit");
360                slaXml = SubmitXCommand.resolveSla(eSla, evalSla);
361            }
362            catch (Exception e) {
363                throw new CommandException(ErrorCode.E1004, e.getMessage(), e);
364            }
365            return slaXml;
366        }
367    
368        @SuppressWarnings("unchecked")
369        private void createSLARegistrationForAllActions(String wfXml, String user, String group, String strConf)
370                throws CommandException {
371            try {
372                Element eWfJob = XmlUtils.parseXml(wfXml);
373                Configuration conf = new XConfiguration(new StringReader(strConf));
374                for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
375                    Element eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
376                    if (eSla != null) {
377                        String slaXml = resolveSla(eSla, conf);
378                        eSla = XmlUtils.parseXml(slaXml);
379                        String actionId = Services.get().get(UUIDService.class).generateChildId(jobId,
380                                action.getAttributeValue("name") + "");
381                        SLAEventBean slaEvent = SLADbXOperations.createSlaRegistrationEvent(eSla, actionId,
382                                SlaAppType.WORKFLOW_ACTION, user, group);
383                        if(slaEvent != null) {
384                            insertList.add(slaEvent);
385                        }
386                    }
387                }
388            }
389            catch (Exception e) {
390                throw new CommandException(ErrorCode.E1007, "workflow:Actions " + jobId, e.getMessage(), e);
391            }
392    
393        }
394    
395    }