001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.oozie.client.WorkflowJob;
022    import org.apache.oozie.client.SLAEvent.SlaAppType;
023    import org.apache.oozie.client.SLAEvent.Status;
024    import org.apache.oozie.WorkflowActionBean;
025    import org.apache.oozie.WorkflowJobBean;
026    import org.apache.oozie.ErrorCode;
027    import org.apache.oozie.XException;
028    import org.apache.oozie.command.CommandException;
029    import org.apache.oozie.command.PreconditionException;
030    import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
031    import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
032    import org.apache.oozie.executor.jpa.JPAExecutorException;
033    import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
034    import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
035    import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
036    import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
037    import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
038    import org.apache.oozie.service.ELService;
039    import org.apache.oozie.service.JPAService;
040    import org.apache.oozie.service.SchemaService;
041    import org.apache.oozie.service.Services;
042    import org.apache.oozie.service.UUIDService;
043    import org.apache.oozie.service.WorkflowStoreService;
044    import org.apache.oozie.workflow.WorkflowException;
045    import org.apache.oozie.workflow.WorkflowInstance;
046    import org.apache.oozie.workflow.lite.KillNodeDef;
047    import org.apache.oozie.workflow.lite.NodeDef;
048    import org.apache.oozie.util.ELEvaluator;
049    import org.apache.oozie.util.InstrumentUtils;
050    import org.apache.oozie.util.LogUtils;
051    import org.apache.oozie.util.XConfiguration;
052    import org.apache.oozie.util.ParamChecker;
053    import org.apache.oozie.util.XmlUtils;
054    import org.apache.oozie.util.db.SLADbXOperations;
055    import org.jdom.Element;
056    import org.jdom.Namespace;
057    
058    import java.io.StringReader;
059    import java.util.Date;
060    import java.util.List;
061    import java.util.Map;
062    
063    public class SignalXCommand extends WorkflowXCommand<Void> {
064    
065        protected static final String INSTR_SUCCEEDED_JOBS_COUNTER_NAME = "succeeded";
066    
067        private JPAService jpaService = null;
068        private String jobId;
069        private String actionId;
070        private WorkflowJobBean wfJob;
071        private WorkflowActionBean wfAction;
072    
073        public SignalXCommand(String name, int priority, String jobId) {
074            super(name, name, priority);
075            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
076        }
077    
078        public SignalXCommand(String jobId, String actionId) {
079            this("signal", 1, jobId);
080            this.actionId = ParamChecker.notEmpty(actionId, "actionId");
081        }
082    
083        @Override
084        protected boolean isLockRequired() {
085            return true;
086        }
087    
088        @Override
089        public String getEntityKey() {
090            return this.jobId;
091        }
092    
093        @Override
094        protected void loadState() throws CommandException {
095            try {
096                jpaService = Services.get().get(JPAService.class);
097                if (jpaService != null) {
098                    this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
099                    LogUtils.setLogInfo(wfJob, logInfo);
100                    if (actionId != null) {
101                        this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
102                        LogUtils.setLogInfo(wfAction, logInfo);
103                    }
104                }
105                else {
106                    throw new CommandException(ErrorCode.E0610);
107                }
108            }
109            catch (XException ex) {
110                throw new CommandException(ex);
111            }
112        }
113    
114        @Override
115        protected void verifyPrecondition() throws CommandException, PreconditionException {
116            if ((wfAction == null) || (wfAction.isComplete() && wfAction.isPending())) {
117                if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.PREP) {
118                    throw new PreconditionException(ErrorCode.E0813, wfJob.getStatusStr());
119                }
120            }
121            else {
122                throw new PreconditionException(ErrorCode.E0814, actionId, wfAction.getStatusStr(), wfAction.isPending());
123            }
124        }
125    
126        @Override
127        protected Void execute() throws CommandException {
128            LOG.debug("STARTED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
129            WorkflowInstance workflowInstance = wfJob.getWorkflowInstance();
130            workflowInstance.setTransientVar(WorkflowStoreService.WORKFLOW_BEAN, wfJob);
131            boolean completed = false;
132            boolean skipAction = false;
133            if (wfAction == null) {
134                if (wfJob.getStatus() == WorkflowJob.Status.PREP) {
135                    try {
136                        completed = workflowInstance.start();
137                    }
138                    catch (WorkflowException e) {
139                        throw new CommandException(e);
140                    }
141                    wfJob.setStatus(WorkflowJob.Status.RUNNING);
142                    wfJob.setStartTime(new Date());
143                    wfJob.setWorkflowInstance(workflowInstance);
144                    // 1. Add SLA status event for WF-JOB with status STARTED
145                    // 2. Add SLA registration events for all WF_ACTIONS
146                    SLADbXOperations.writeStausEvent(wfJob.getSlaXml(), jobId, Status.STARTED, SlaAppType.WORKFLOW_JOB);
147                    writeSLARegistrationForAllActions(workflowInstance.getApp().getDefinition(), wfJob.getUser(), wfJob
148                            .getGroup(), wfJob.getConf());
149                    queue(new NotificationXCommand(wfJob));
150                }
151                else {
152                    throw new CommandException(ErrorCode.E0801, wfJob.getId());
153                }
154            }
155            else {
156                String skipVar = workflowInstance.getVar(wfAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR
157                        + ReRunXCommand.TO_SKIP);
158                if (skipVar != null) {
159                    skipAction = skipVar.equals("true");
160                }
161                try {
162                    completed = workflowInstance.signal(wfAction.getExecutionPath(), wfAction.getSignalValue());
163                }
164                catch (WorkflowException e) {
165                    throw new CommandException(e);
166                }
167                wfJob.setWorkflowInstance(workflowInstance);
168                wfAction.resetPending();
169                if (!skipAction) {
170                    wfAction.setTransition(workflowInstance.getTransition(wfAction.getName()));
171                    queue(new NotificationXCommand(wfJob, wfAction));
172                }
173                try {
174                    jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
175                }
176                catch (JPAExecutorException je) {
177                    throw new CommandException(je);
178                }
179            }
180    
181            if (completed) {
182                try {
183                    for (String actionToKillId : WorkflowStoreService.getActionsToKill(workflowInstance)) {
184                        WorkflowActionBean actionToKill;
185    
186                        actionToKill = jpaService.execute(new WorkflowActionGetJPAExecutor(actionToKillId));
187    
188                        actionToKill.setPending();
189                        actionToKill.setStatus(WorkflowActionBean.Status.KILLED);
190                        jpaService.execute(new WorkflowActionUpdateJPAExecutor(actionToKill));
191                        queue(new ActionKillXCommand(actionToKill.getId(), actionToKill.getType()));
192                    }
193    
194                    for (String actionToFailId : WorkflowStoreService.getActionsToFail(workflowInstance)) {
195                        WorkflowActionBean actionToFail = jpaService.execute(new WorkflowActionGetJPAExecutor(
196                                actionToFailId));
197                        actionToFail.resetPending();
198                        actionToFail.setStatus(WorkflowActionBean.Status.FAILED);
199                        queue(new NotificationXCommand(wfJob, actionToFail));
200                        SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
201                                SlaAppType.WORKFLOW_ACTION);
202                        jpaService.execute(new WorkflowActionUpdateJPAExecutor(actionToFail));
203                    }
204                }
205                catch (JPAExecutorException je) {
206                    throw new CommandException(je);
207                }
208    
209                wfJob.setStatus(WorkflowJob.Status.valueOf(workflowInstance.getStatus().toString()));
210                wfJob.setEndTime(new Date());
211                wfJob.setWorkflowInstance(workflowInstance);
212                Status slaStatus = Status.SUCCEEDED;
213                switch (wfJob.getStatus()) {
214                    case SUCCEEDED:
215                        slaStatus = Status.SUCCEEDED;
216                        break;
217                    case KILLED:
218                        slaStatus = Status.KILLED;
219                        break;
220                    case FAILED:
221                        slaStatus = Status.FAILED;
222                        break;
223                    default: // TODO SUSPENDED
224                        break;
225                }
226                SLADbXOperations.writeStausEvent(wfJob.getSlaXml(), jobId, slaStatus, SlaAppType.WORKFLOW_JOB);
227                queue(new NotificationXCommand(wfJob));
228                if (wfJob.getStatus() == WorkflowJob.Status.SUCCEEDED) {
229                    InstrumentUtils.incrJobCounter(INSTR_SUCCEEDED_JOBS_COUNTER_NAME, 1, getInstrumentation());
230                }
231    
232                // output message for Kill node
233                if (wfAction != null) { // wfAction could be a no-op job
234                    NodeDef nodeDef = workflowInstance.getNodeDef(wfAction.getExecutionPath());
235                    if (nodeDef != null && nodeDef instanceof KillNodeDef) {
236                        boolean isRetry = false;
237                        boolean isUserRetry = false;
238                        ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry,
239                                isUserRetry);
240                        try {
241                            String tmpNodeConf = nodeDef.getConf();
242                            String actionConf = context.getELEvaluator().evaluate(tmpNodeConf, String.class);
243                            LOG.debug("Try to resolve KillNode message for jobid [{0}], actionId [{1}], before resolve [{2}], after resolve [{3}]",
244                                            jobId, actionId, tmpNodeConf, actionConf);
245                            if (wfAction.getErrorCode() != null) {
246                                wfAction.setErrorInfo(wfAction.getErrorCode(), actionConf);
247                            }
248                            else {
249                                wfAction.setErrorInfo(ErrorCode.E0729.toString(), actionConf);
250                            }
251                            jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
252                        }
253                        catch (JPAExecutorException je) {
254                            throw new CommandException(je);
255                        }
256                        catch (Exception ex) {
257                            LOG.warn("Exception in SignalXCommand ", ex.getMessage(), ex);
258                            throw new CommandException(ErrorCode.E0729, wfAction.getName(), ex);
259                        }
260                    }
261                }
262    
263            }
264            else {
265                for (WorkflowActionBean newAction : WorkflowStoreService.getStartedActions(workflowInstance)) {
266                    String skipVar = workflowInstance.getVar(newAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR
267                            + ReRunXCommand.TO_SKIP);
268                    boolean skipNewAction = false;
269                    if (skipVar != null) {
270                        skipNewAction = skipVar.equals("true");
271                    }
272                    try {
273                        if (skipNewAction) {
274                            WorkflowActionBean oldAction;
275    
276                            oldAction = jpaService.execute(new WorkflowActionGetJPAExecutor(newAction.getId()));
277    
278                            oldAction.setPending();
279                            jpaService.execute(new WorkflowActionUpdateJPAExecutor(oldAction));
280    
281                            queue(new SignalXCommand(jobId, oldAction.getId()));
282                        }
283                        else {
284                            newAction.setPending();
285                            String actionSlaXml = getActionSLAXml(newAction.getName(), workflowInstance.getApp()
286                                    .getDefinition(), wfJob.getConf());
287                            newAction.setSlaXml(actionSlaXml);
288                            jpaService.execute(new WorkflowActionInsertJPAExecutor(newAction));
289                            LOG.debug("SignalXCommand: Name: "+ newAction.getName() + ", Id: " +newAction.getId() + ", Authcode:" + newAction.getCred());
290                            queue(new ActionStartXCommand(newAction.getId(), newAction.getType()));
291                        }
292                    }
293                    catch (JPAExecutorException je) {
294                        throw new CommandException(je);
295                    }
296                }
297            }
298    
299            try {
300                jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
301            }
302            catch (JPAExecutorException je) {
303                throw new CommandException(je);
304            }
305            LOG.debug(
306                    "Updated the workflow status to " + wfJob.getId() + "  status =" + wfJob.getStatusStr());
307            if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) {
308                // update coordinator action
309                new CoordActionUpdateXCommand(wfJob).call();    //Note: Called even if wf is not necessarily instantiated by coordinator
310                new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
311            }
312            LOG.debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
313            return null;
314        }
315    
316        public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
317            ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
318            for (Map.Entry<String, String> entry : conf) {
319                eval.setVariable(entry.getKey(), entry.getValue());
320            }
321            return eval;
322        }
323    
324        @SuppressWarnings("unchecked")
325        private String getActionSLAXml(String actionName, String wfXml, String wfConf) throws CommandException {
326            String slaXml = null;
327            try {
328                Element eWfJob = XmlUtils.parseXml(wfXml);
329                for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
330                    if (action.getAttributeValue("name").equals(actionName) == false) {
331                        continue;
332                    }
333                    Element eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
334                    if (eSla != null) {
335                        slaXml = XmlUtils.prettyPrint(eSla).toString();
336                        break;
337                    }
338                }
339            }
340            catch (Exception e) {
341                throw new CommandException(ErrorCode.E1004, e.getMessage(), e);
342            }
343            return slaXml;
344        }
345    
346        private String resolveSla(Element eSla, Configuration conf) throws CommandException {
347            String slaXml = null;
348            try {
349                ELEvaluator evalSla = SubmitXCommand.createELEvaluatorForGroup(conf, "wf-sla-submit");
350                slaXml = SubmitXCommand.resolveSla(eSla, evalSla);
351            }
352            catch (Exception e) {
353                throw new CommandException(ErrorCode.E1004, e.getMessage(), e);
354            }
355            return slaXml;
356        }
357    
358        @SuppressWarnings("unchecked")
359        private void writeSLARegistrationForAllActions(String wfXml, String user, String group, String strConf)
360                throws CommandException {
361            try {
362                Element eWfJob = XmlUtils.parseXml(wfXml);
363                Configuration conf = new XConfiguration(new StringReader(strConf));
364                for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
365                    Element eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
366                    if (eSla != null) {
367                        String slaXml = resolveSla(eSla, conf);
368                        eSla = XmlUtils.parseXml(slaXml);
369                        String actionId = Services.get().get(UUIDService.class).generateChildId(jobId,
370                                action.getAttributeValue("name") + "");
371                        SLADbXOperations.writeSlaRegistrationEvent(eSla, actionId, SlaAppType.WORKFLOW_ACTION, user, group);
372                    }
373                }
374            }
375            catch (Exception e) {
376                throw new CommandException(ErrorCode.E1007, "workflow:Actions " + jobId, e);
377            }
378    
379        }
380    
381    }