001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.wf;
020
021import java.util.ArrayList;
022import java.util.Date;
023import java.util.List;
024
025import org.apache.hadoop.fs.FileStatus;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.oozie.ErrorCode;
029import org.apache.oozie.SLAEventBean;
030import org.apache.oozie.WorkflowActionBean;
031import org.apache.oozie.WorkflowJobBean;
032import org.apache.oozie.XException;
033import org.apache.oozie.action.ActionExecutor;
034import org.apache.oozie.action.ActionExecutor.Context;
035import org.apache.oozie.action.ActionExecutorException;
036import org.apache.oozie.action.control.ControlNodeActionExecutor;
037import org.apache.oozie.client.SLAEvent.SlaAppType;
038import org.apache.oozie.client.SLAEvent.Status;
039import org.apache.oozie.client.rest.JsonBean;
040import org.apache.oozie.command.CommandException;
041import org.apache.oozie.command.PreconditionException;
042import org.apache.oozie.executor.jpa.BatchQueryExecutor;
043import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
044import org.apache.oozie.executor.jpa.JPAExecutorException;
045import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
046import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
047import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
048import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
049import org.apache.oozie.service.ActionService;
050import org.apache.oozie.service.EventHandlerService;
051import org.apache.oozie.service.JPAService;
052import org.apache.oozie.service.Services;
053import org.apache.oozie.service.UUIDService;
054import org.apache.oozie.util.Instrumentation;
055import org.apache.oozie.util.LogUtils;
056import org.apache.oozie.util.db.SLADbXOperations;
057
058/**
059 * Kill workflow action and invoke action executor to kill the underlying context.
060 *
061 */
062@SuppressWarnings("deprecation")
063public class ActionKillXCommand extends ActionXCommand<Void> {
064    private String actionId;
065    private String jobId;
066    private WorkflowJobBean wfJob;
067    private WorkflowActionBean wfAction;
068    private JPAService jpaService = null;
069    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
070    private List<JsonBean> insertList = new ArrayList<JsonBean>();
071
072    public ActionKillXCommand(String actionId, String type) {
073        super("action.kill", type, 0);
074        this.actionId = actionId;
075        this.jobId = Services.get().get(UUIDService.class).getId(actionId);
076    }
077
078    public ActionKillXCommand(String actionId) {
079        this(actionId, "action.kill");
080    }
081
082    @Override
083    protected void setLogInfo() {
084        LogUtils.setLogInfo(actionId);
085    }
086
087    @Override
088    protected boolean isLockRequired() {
089        return true;
090    }
091
092    @Override
093    public String getEntityKey() {
094        return this.jobId;
095    }
096
097    @Override
098    public String getKey() {
099        return getName() + "_" + this.actionId;
100    }
101
102    @Override
103    protected void loadState() throws CommandException {
104        try {
105            jpaService = Services.get().get(JPAService.class);
106
107            if (jpaService != null) {
108                this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_ACTION_OP, jobId);
109                this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION, actionId);
110                LogUtils.setLogInfo(wfJob);
111                LogUtils.setLogInfo(wfAction);
112            }
113            else {
114                throw new CommandException(ErrorCode.E0610);
115            }
116        }
117        catch (XException ex) {
118            throw new CommandException(ex);
119        }
120    }
121
122    @Override
123    protected void verifyPrecondition() throws CommandException, PreconditionException {
124        if (wfAction.getStatus() != WorkflowActionBean.Status.KILLED) {
125            throw new PreconditionException(ErrorCode.E0726, wfAction.getId());
126        }
127    }
128
129    @Override
130    protected Void execute() throws CommandException {
131        LOG.debug("STARTED WorkflowActionKillXCommand for action " + actionId);
132
133        if (wfAction.isPending()) {
134            ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
135            if (executor != null) {
136                ActionExecutorContext context = null;
137                try {
138                    boolean isRetry = false;
139                    boolean isUserRetry = false;
140                    context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction,
141                            isRetry, isUserRetry);
142                    incrActionCounter(wfAction.getType(), 1);
143
144                    Instrumentation.Cron cron = new Instrumentation.Cron();
145                    cron.start();
146                    executor.kill(context, wfAction);
147                    cron.stop();
148                    addActionCron(wfAction.getType(), cron);
149
150                    wfAction.resetPending();
151                    wfAction.setStatus(WorkflowActionBean.Status.KILLED);
152                    wfAction.setEndTime(new Date());
153
154                    updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END, wfAction));
155                    wfJob.setLastModifiedTime(new Date());
156                    updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_MODTIME, wfJob));
157                    // Add SLA status event (KILLED) for WF_ACTION
158                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.KILLED,
159                            SlaAppType.WORKFLOW_ACTION);
160                    if(slaEvent != null) {
161                        insertList.add(slaEvent);
162                    }
163                    queue(new WorkflowNotificationXCommand(wfJob, wfAction));
164                }
165                catch (ActionExecutorException ex) {
166                    wfAction.resetPending();
167                    wfAction.setStatus(WorkflowActionBean.Status.FAILED);
168                    wfAction.setErrorInfo(ex.getErrorCode().toString(),
169                            "KILL COMMAND FAILED - exception while executing job kill");
170                    wfAction.setEndTime(new Date());
171
172                    wfJob.setStatus(WorkflowJobBean.Status.KILLED);
173                    updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END, wfAction));
174                    wfJob.setLastModifiedTime(new Date());
175                    updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, wfJob));
176                    // What will happen to WF and COORD_ACTION, NOTIFICATION?
177                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
178                            SlaAppType.WORKFLOW_ACTION);
179                    if(slaEvent != null) {
180                        insertList.add(slaEvent);
181                    }
182                    LOG.warn("Exception while executing kill(). Error Code [{0}], Message[{1}]",
183                            ex.getErrorCode(), ex.getMessage(), ex);
184                }
185                finally {
186                    try {
187                        cleanupActionDir(context);
188                        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
189                        if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
190                            generateEvent(wfAction, wfJob.getUser());
191                        }
192                    }
193                    catch (JPAExecutorException e) {
194                        throw new CommandException(e);
195                    }
196                }
197            }
198        }
199        LOG.debug("ENDED WorkflowActionKillXCommand for action " + actionId);
200        return null;
201    }
202
203    /*
204     * Cleans up the action directory
205     */
206    private void cleanupActionDir(Context context) {
207        try {
208            FileSystem actionFs = context.getAppFileSystem();
209            Path actionDir = context.getActionDir();
210            Path jobDir = actionDir.getParent();
211            if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)
212                    && actionFs.exists(actionDir)) {
213                actionFs.delete(actionDir, true);
214            }
215            if (actionFs.exists(jobDir) && actionFs.getFileStatus(jobDir).isDir()) {
216                FileStatus[] statuses = actionFs.listStatus(jobDir);
217                if (statuses == null || statuses.length == 0) {
218                    actionFs.delete(jobDir, true);
219                }
220            }
221        }
222        catch (Exception e) {
223            LOG.warn("Exception while cleaning up action dir. Message[{1}]", e.getMessage(), e);
224        }
225    }
226
227}