001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.oozie.client.CoordinatorAction;
022    import org.apache.oozie.client.WorkflowJob;
023    import org.apache.oozie.client.SLAEvent.SlaAppType;
024    import org.apache.oozie.client.SLAEvent.Status;
025    import org.apache.oozie.CoordinatorActionBean;
026    import org.apache.oozie.WorkflowActionBean;
027    import org.apache.oozie.WorkflowJobBean;
028    import org.apache.oozie.ErrorCode;
029    import org.apache.oozie.XException;
030    import org.apache.oozie.command.CommandException;
031    import org.apache.oozie.command.coord.CoordActionReadyCommand;
032    import org.apache.oozie.command.coord.CoordActionUpdateCommand;
033    import org.apache.oozie.coord.CoordELFunctions;
034    import org.apache.oozie.coord.CoordinatorJobException;
035    import org.apache.oozie.service.ELService;
036    import org.apache.oozie.service.SchemaService;
037    import org.apache.oozie.service.Services;
038    import org.apache.oozie.service.StoreService;
039    import org.apache.oozie.service.UUIDService;
040    import org.apache.oozie.service.WorkflowStoreService;
041    import org.apache.oozie.store.CoordinatorStore;
042    import org.apache.oozie.store.StoreException;
043    import org.apache.oozie.store.WorkflowStore;
044    import org.apache.oozie.workflow.WorkflowException;
045    import org.apache.oozie.workflow.WorkflowInstance;
046    import org.apache.oozie.util.ELEvaluator;
047    import org.apache.oozie.util.XConfiguration;
048    import org.apache.oozie.util.XLog;
049    import org.apache.oozie.util.ParamChecker;
050    import org.apache.oozie.util.XmlUtils;
051    import org.apache.oozie.util.db.SLADbOperations;
052    import org.apache.openjpa.lib.log.Log;
053    import org.jdom.Element;
054    import org.jdom.JDOMException;
055    import org.jdom.Namespace;
056    
057    import java.io.StringReader;
058    import java.util.Date;
059    import java.util.List;
060    import java.util.Map;
061    
062    public class SignalCommand extends WorkflowCommand<Void> {
063    
064        protected static final String INSTR_SUCCEEDED_JOBS_COUNTER_NAME = "succeeded";
065    
066        private String jobId;
067        private String actionId;
068    
069        protected SignalCommand(String name, int priority, String jobId) {
070            super(name, name, priority, XLog.STD);
071            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
072        }
073    
074        public SignalCommand(String jobId, String actionId) {
075            super("signal", "signal", 1, XLog.STD);
076            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
077            this.actionId = ParamChecker.notEmpty(actionId, "actionId");
078        }
079    
080        @Override
081        protected Void call(WorkflowStore store) throws CommandException, StoreException {
082    
083            WorkflowJobBean workflow = store.getWorkflow(jobId, false);
084            setLogInfo(workflow);
085            WorkflowActionBean action = null;
086            boolean skipAction = false;
087            if (actionId != null) {
088                action = store.getAction(actionId, false);
089                setLogInfo(action);
090            }
091            if ((action == null) || (action.isComplete() && action.isPending())) {
092                try {
093                    if (workflow.getStatus() == WorkflowJob.Status.RUNNING
094                            || workflow.getStatus() == WorkflowJob.Status.PREP) {
095                        WorkflowInstance workflowInstance = workflow.getWorkflowInstance();
096                        workflowInstance.setTransientVar(WorkflowStoreService.WORKFLOW_BEAN, workflow);
097                        boolean completed;
098                        if (action == null) {
099                            if (workflow.getStatus() == WorkflowJob.Status.PREP) {
100                                completed = workflowInstance.start();
101                                workflow.setStatus(WorkflowJob.Status.RUNNING);
102                                workflow.setStartTime(new Date());
103                                workflow.setWorkflowInstance(workflowInstance);
104                                // 1. Add SLA status event for WF-JOB with status
105                                // STARTED
106                                // 2. Add SLA registration events for all WF_ACTIONS
107                                SLADbOperations.writeStausEvent(workflow.getSlaXml(), jobId, store, Status.STARTED,
108                                                                SlaAppType.WORKFLOW_JOB);
109                                writeSLARegistrationForAllActions(workflowInstance.getApp().getDefinition(), workflow
110                                        .getUser(), workflow.getGroup(), workflow.getConf(), store);
111                                queueCallable(new NotificationCommand(workflow));
112                            }
113                            else {
114                                throw new CommandException(ErrorCode.E0801, workflow.getId());
115                            }
116                        }
117                        else {
118                            String skipVar = workflowInstance.getVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR
119                                    + ReRunCommand.TO_SKIP);
120                            if (skipVar != null) {
121                                skipAction = skipVar.equals("true");
122                            }
123                            completed = workflowInstance.signal(action.getExecutionPath(), action.getSignalValue());
124                            workflow.setWorkflowInstance(workflowInstance);
125                            action.resetPending();
126                            if (!skipAction) {
127                                action.setTransition(workflowInstance.getTransition(action.getName()));
128                            }
129                            store.updateAction(action);
130                        }
131    
132                        if (completed) {
133                            for (String actionToKillId : WorkflowStoreService.getActionsToKill(workflowInstance)) {
134                                WorkflowActionBean actionToKill = store.getAction(actionToKillId, false);
135                                actionToKill.setPending();
136                                actionToKill.setStatus(WorkflowActionBean.Status.KILLED);
137                                store.updateAction(actionToKill);
138                                queueCallable(new ActionKillCommand(actionToKill.getId(), actionToKill.getType()));
139                            }
140    
141                            for (String actionToFailId : WorkflowStoreService.getActionsToFail(workflowInstance)) {
142                                WorkflowActionBean actionToFail = store.getAction(actionToFailId, false);
143                                actionToFail.resetPending();
144                                actionToFail.setStatus(WorkflowActionBean.Status.FAILED);
145                                SLADbOperations.writeStausEvent(action.getSlaXml(), action.getId(), store, Status.FAILED,
146                                                                SlaAppType.WORKFLOW_ACTION);
147                                store.updateAction(actionToFail);
148                            }
149    
150                            workflow.setStatus(WorkflowJob.Status.valueOf(workflowInstance.getStatus().toString()));
151                            workflow.setEndTime(new Date());
152                            workflow.setWorkflowInstance(workflowInstance);
153                            Status slaStatus = Status.SUCCEEDED;
154                            switch (workflow.getStatus()) {
155                                case SUCCEEDED:
156                                    slaStatus = Status.SUCCEEDED;
157                                    break;
158                                case KILLED:
159                                    slaStatus = Status.KILLED;
160                                    break;
161                                case FAILED:
162                                    slaStatus = Status.FAILED;
163                                    break;
164                                default: // TODO about SUSPENDED
165    
166                            }
167                            SLADbOperations.writeStausEvent(workflow.getSlaXml(), jobId, store, slaStatus,
168                                                            SlaAppType.WORKFLOW_JOB);
169                            queueCallable(new NotificationCommand(workflow));
170                            if (workflow.getStatus() == WorkflowJob.Status.SUCCEEDED) {
171                                incrJobCounter(INSTR_SUCCEEDED_JOBS_COUNTER_NAME, 1);
172                            }
173                        }
174                        else {
175                            for (WorkflowActionBean newAction : WorkflowStoreService.getStartedActions(workflowInstance)) {
176                                String skipVar = workflowInstance.getVar(newAction.getName()
177                                        + WorkflowInstance.NODE_VAR_SEPARATOR + ReRunCommand.TO_SKIP);
178                                boolean skipNewAction = false;
179                                if (skipVar != null) {
180                                    skipNewAction = skipVar.equals("true");
181                                }
182                                if (skipNewAction) {
183                                    WorkflowActionBean oldAction = store.getAction(newAction.getId(), false);
184                                    oldAction.setPending();
185                                    store.updateAction(oldAction);
186                                    queueCallable(new SignalCommand(jobId, oldAction.getId()));
187                                }
188                                else {
189                                    newAction.setPending();
190                                    String actionSlaXml = getActionSLAXml(newAction.getName(), workflowInstance.getApp()
191                                            .getDefinition(), workflow.getConf());
192                                    // System.out.println("111111 actionXml " +
193                                    // actionSlaXml);
194                                    // newAction.setSlaXml(workflow.getSlaXml());
195                                    newAction.setSlaXml(actionSlaXml);
196                                    XLog.getLog(getClass()).debug("SignalCOmmand: Name: "+ newAction.getName() +"Id: " +newAction.getId()+ " Authcode:" + newAction.getCred());
197                                    store.insertAction(newAction);
198                                    queueCallable(new ActionStartCommand(newAction.getId(), newAction.getType()));
199                                }
200                            }
201                        }
202    
203                        store.updateWorkflow(workflow);
204                        XLog.getLog(getClass()).debug(
205                                "Updated the workflow status to " + workflow.getId() + "  status ="
206                                        + workflow.getStatusStr());
207                        if (workflow.getStatus() != WorkflowJob.Status.RUNNING
208                                && workflow.getStatus() != WorkflowJob.Status.SUSPENDED) {
209                            queueCallable(new CoordActionUpdateCommand(workflow));
210                        }
211                    }
212                    else {
213                        XLog.getLog(getClass()).warn("Workflow not RUNNING, current status [{0}]", workflow.getStatus());
214                    }
215                }
216                catch (WorkflowException ex) {
217                    throw new CommandException(ex);
218                }
219            }
220            else {
221                XLog.getLog(getClass()).warn(
222                        "SignalCommand for action id :" + actionId + " is already processed. status=" + action.getStatus()
223                                + ", Pending=" + action.isPending());
224            }
225            return null;
226        }
227    
228        public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
229            ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
230            for (Map.Entry<String, String> entry : conf) {
231                eval.setVariable(entry.getKey(), entry.getValue());
232            }
233            return eval;
234        }
235    
236        private String getActionSLAXml(String actionName, String wfXml, String wfConf) throws CommandException {
237            String slaXml = null;
238            // TODO need to fill-out the code
239            // Get the appropriate action:slaXml and resolve that.
240            try {
241                // Configuration conf = new XConfiguration(new
242                // StringReader(wfConf));
243                Element eWfJob = XmlUtils.parseXml(wfXml);
244                // String prefix = XmlUtils.getNamespacePrefix(eWfJob,
245                // SchemaService.SLA_NAME_SPACE_URI);
246                for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
247                    if (action.getAttributeValue("name").equals(actionName) == false) {
248                        continue;
249                    }
250                    Element eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
251                    if (eSla != null) {
252                        // resolveSla(eSla, conf);
253                        slaXml = XmlUtils.prettyPrint(eSla).toString();// Could use
254                        // any
255                        // non-null
256                        // string
257                        break;
258                    }
259                }
260            }
261            catch (Exception e) {
262                throw new CommandException(ErrorCode.E1004, e.getMessage(), e);
263            }
264            return slaXml;
265        }
266    
267        private String resolveSla(Element eSla, Configuration conf) throws CommandException {
268            String slaXml = null;
269            try {
270                ELEvaluator evalSla = SubmitCommand.createELEvaluatorForGroup(conf, "wf-sla-submit");
271                slaXml = SubmitCommand.resolveSla(eSla, evalSla);
272            }
273            catch (Exception e) {
274                throw new CommandException(ErrorCode.E1004, e.getMessage(), e);
275            }
276            return slaXml;
277        }
278    
279        private void writeSLARegistrationForAllActions(String wfXml, String user, String group, String strConf,
280                                                       WorkflowStore store) throws CommandException {
281            try {
282                Element eWfJob = XmlUtils.parseXml(wfXml);
283                // String prefix = XmlUtils.getNamespacePrefix(eWfJob,
284                // SchemaService.SLA_NAME_SPACE_URI);
285                Configuration conf = new XConfiguration(new StringReader(strConf));
286                for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
287                    Element eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
288                    if (eSla != null) {
289                        String slaXml = resolveSla(eSla, conf);
290                        eSla = XmlUtils.parseXml(slaXml);
291                        String actionId = Services.get().get(UUIDService.class).generateChildId(jobId,
292                                                                                                action.getAttributeValue("name") + "");
293                        SLADbOperations.writeSlaRegistrationEvent(eSla, store, actionId, SlaAppType.WORKFLOW_ACTION, user,
294                                                                  group);
295                    }
296                }
297            }
298            catch (Exception e) {
299                throw new CommandException(ErrorCode.E1007, "workflow:Actions " + jobId, e);
300            }
301    
302        }
303    
304        @Override
305        protected Void execute(WorkflowStore store) throws CommandException, StoreException {
306            XLog.getLog(getClass()).debug("STARTED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
307            try {
308                if (lock(jobId)) {
309                    call(store);
310                }
311                else {
312                    queueCallable(new SignalCommand(jobId, actionId), LOCK_FAILURE_REQUEUE_INTERVAL);
313                    XLog.getLog(getClass()).warn("SignalCommand lock was not acquired - failed {0}", jobId);
314                }
315            }
316            catch (InterruptedException e) {
317                queueCallable(new SignalCommand(jobId, actionId), LOCK_FAILURE_REQUEUE_INTERVAL);
318                XLog.getLog(getClass()).warn("SignalCommand lock not acquired - interrupted exception failed {0}", jobId);
319            }
320            XLog.getLog(getClass()).debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
321            return null;
322        }
323    }