001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import org.apache.oozie.client.WorkflowJob;
021    import org.apache.oozie.client.SLAEvent.SlaAppType;
022    import org.apache.oozie.client.SLAEvent.Status;
023    import org.apache.oozie.WorkflowActionBean;
024    import org.apache.oozie.WorkflowJobBean;
025    import org.apache.oozie.command.Command;
026    import org.apache.oozie.command.CommandException;
027    import org.apache.oozie.store.StoreException;
028    import org.apache.oozie.store.WorkflowStore;
029    import org.apache.oozie.store.Store;
030    import org.apache.oozie.workflow.WorkflowException;
031    import org.apache.oozie.workflow.WorkflowInstance;
032    import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
033    import org.apache.oozie.util.ParamChecker;
034    import org.apache.oozie.util.XLog;
035    import org.apache.oozie.util.db.SLADbOperations;
036    
037    import java.util.Date;
038    
039    public class KillCommand extends WorkflowCommand<Void> {
040    
041        private String id;
042        private final XLog log = XLog.getLog(getClass());
043    
044        public KillCommand(String id) {
045            super("kill", "kill", 1, XLog.STD);
046            this.id = ParamChecker.notEmpty(id, "id");
047        }
048    
049        @Override
050        protected Void call(WorkflowStore store) throws StoreException, CommandException {
051            try {
052                log.info("In Workflow KillCommand.call() for jobId=" + id);
053                WorkflowJobBean workflow = store.getWorkflow(id, false);
054                setLogInfo(workflow);
055    
056                if (workflow.getStatus() == WorkflowJob.Status.PREP || workflow.getStatus() == WorkflowJob.Status.RUNNING
057                        || workflow.getStatus() == WorkflowJob.Status.SUSPENDED
058                        || workflow.getStatus() == WorkflowJob.Status.FAILED) {
059                    workflow.setEndTime(new Date());
060    
061                    if (workflow.getStatus() != WorkflowJob.Status.FAILED) {
062                        incrJobCounter(1);
063                        workflow.setStatus(WorkflowJob.Status.KILLED);
064                        SLADbOperations.writeStausEvent(workflow.getSlaXml(), workflow.getId(), store, Status.KILLED,
065                                                        SlaAppType.WORKFLOW_JOB);
066                        workflow.getWorkflowInstance().kill();
067                        WorkflowInstance wfInstance = workflow.getWorkflowInstance();
068                        ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.KILLED);
069                        workflow.setWorkflowInstance(wfInstance);
070                    }
071                    for (WorkflowActionBean action : store.getActionsForWorkflow(id, true)) {
072                        if (action.getStatus() == WorkflowActionBean.Status.RUNNING
073                                || action.getStatus() == WorkflowActionBean.Status.DONE) {
074                            action.setPending();
075                            action.setStatus(WorkflowActionBean.Status.KILLED);
076                            store.updateAction(action);
077                            queueCallable(new ActionKillCommand(action.getId(), action.getType()));
078                        }
079                        if (action.getStatus() == WorkflowActionBean.Status.PREP
080                                || action.getStatus() == WorkflowActionBean.Status.START_RETRY
081                                || action.getStatus() == WorkflowActionBean.Status.START_MANUAL
082                                || action.getStatus() == WorkflowActionBean.Status.END_RETRY
083                                || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
084                            action.setStatus(WorkflowActionBean.Status.KILLED);
085                            action.resetPending();
086                            SLADbOperations.writeStausEvent(action.getSlaXml(), action.getId(), store, Status.KILLED,
087                                                            SlaAppType.WORKFLOW_ACTION);
088                            store.updateAction(action);
089                        }
090                    }
091                    store.updateWorkflow(workflow);
092                    queueCallable(new NotificationCommand(workflow));
093                }
094                return null;
095            }
096            catch (WorkflowException ex) {
097                throw new CommandException(ex);
098            }
099        }
100    
101        @Override
102        protected Void execute(WorkflowStore store) throws CommandException, StoreException {
103            try {
104                XLog.getLog(getClass()).debug("STARTED KillCommand for job " + id);
105                if (lock(id)) {
106                    call(store);
107                }
108                else {
109                    queueCallable(new KillCommand(id), LOCK_FAILURE_REQUEUE_INTERVAL);
110                    XLog.getLog(getClass()).warn("KillCommand lock was not acquired - failed {0}", id);
111                }
112            }
113            catch (InterruptedException e) {
114                queueCallable(new KillCommand(id), LOCK_FAILURE_REQUEUE_INTERVAL);
115                XLog.getLog(getClass()).warn("KillCommand lock was not acquired - interrupted exception failed {0}", id);
116            }
117            XLog.getLog(getClass()).debug("ENDED KillCommand for job " + id);
118            return null;
119        }
120    }