001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.coord;
020
021import org.apache.oozie.client.CoordinatorAction;
022import org.apache.oozie.client.CoordinatorJob;
023import org.apache.oozie.client.Job;
024import org.apache.oozie.CoordinatorActionBean;
025import org.apache.oozie.CoordinatorJobBean;
026import org.apache.oozie.ErrorCode;
027import org.apache.oozie.XException;
028import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
029import org.apache.oozie.command.wf.KillXCommand;
030import org.apache.oozie.command.CommandException;
031import org.apache.oozie.command.KillTransitionXCommand;
032import org.apache.oozie.command.PreconditionException;
033import org.apache.oozie.dependency.DependencyChecker;
034import org.apache.oozie.executor.jpa.BatchQueryExecutor;
035import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
036import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
037import org.apache.oozie.executor.jpa.CoordJobGetActionsNotCompletedJPAExecutor;
038import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
039import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
040import org.apache.oozie.executor.jpa.JPAExecutorException;
041import org.apache.oozie.service.EventHandlerService;
042import org.apache.oozie.service.JPAService;
043import org.apache.oozie.service.Services;
044import org.apache.oozie.util.LogUtils;
045import org.apache.oozie.util.ParamChecker;
046import org.apache.oozie.util.StatusUtils;
047
048import java.util.Arrays;
049import java.util.Date;
050import java.util.List;
051
052public class CoordKillXCommand extends KillTransitionXCommand {
053
054    private final String jobId;
055    private CoordinatorJobBean coordJob;
056    private List<CoordinatorActionBean> actionList;
057    private JPAService jpaService = null;
058    private CoordinatorJob.Status prevStatus = null;
059
060    public CoordKillXCommand(String id) {
061        super("coord_kill", "coord_kill", 2);
062        this.jobId = ParamChecker.notEmpty(id, "id");
063    }
064
065    @Override
066    protected boolean isLockRequired() {
067        return true;
068    }
069
070    @Override
071    public String getEntityKey() {
072        return this.jobId;
073    }
074
075    @Override
076    public String getKey() {
077        return getName() + "_" + this.jobId;
078    }
079
080    @Override
081    protected void loadState() throws CommandException {
082        try {
083            jpaService = Services.get().get(JPAService.class);
084
085            if (jpaService != null) {
086                this.coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_SUSPEND_KILL, jobId);
087                //Get actions which are not succeeded, failed, timed out or killed
088                this.actionList = jpaService.execute(new CoordJobGetActionsNotCompletedJPAExecutor(jobId));
089                prevStatus = coordJob.getStatus();
090                LogUtils.setLogInfo(coordJob);
091            }
092            else {
093                throw new CommandException(ErrorCode.E0610);
094            }
095        }
096        catch (XException ex) {
097            throw new CommandException(ex);
098        }
099    }
100
101    @Override
102    protected void verifyPrecondition() throws CommandException, PreconditionException {
103        // if namespace 0.1 is used and backward support is true, SUCCEEDED coord job can be killed
104        if (StatusUtils.isV1CoordjobKillable(coordJob)) {
105            return;
106        }
107        if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED
108                || coordJob.getStatus() == CoordinatorJob.Status.FAILED
109                || coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR
110                || coordJob.getStatus() == CoordinatorJob.Status.KILLED
111                || coordJob.getStatus() == CoordinatorJob.Status.IGNORED) {
112            LOG.info("CoordKillXCommand not killed - job either finished SUCCEEDED, FAILED, KILLED, DONEWITHERROR"
113                    + " or IGNORED,job id = " + jobId + ", status = " + coordJob.getStatus());
114            throw new PreconditionException(ErrorCode.E1020, jobId);
115        }
116    }
117
118    private void updateCoordAction(CoordinatorActionBean action, boolean makePending) {
119        CoordinatorAction.Status prevStatus = action.getStatus();
120        action.setStatus(CoordinatorActionBean.Status.KILLED);
121        if (makePending) {
122            action.incrementAndGetPending();
123        } else {
124            // set pending to false
125            action.setPending(0);
126        }
127        if (EventHandlerService.isEnabled() && prevStatus != CoordinatorAction.Status.RUNNING
128                && prevStatus != CoordinatorAction.Status.SUSPENDED) {
129            CoordinatorXCommand.generateEvent(action, coordJob.getUser(), coordJob.getAppName(), null);
130        }
131        action.setLastModifiedTime(new Date());
132        updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME,action));
133    }
134
135    @Override
136    public void killChildren() throws CommandException {
137        if (actionList != null) {
138            for (CoordinatorActionBean action : actionList) {
139                // queue a WorkflowKillXCommand to delete the workflow job and actions
140                if (action.getExternalId() != null) {
141                    queue(new KillXCommand(action.getExternalId()));
142                    // As the kill command for children is queued, set pending flag for coord action to be true
143                    updateCoordAction(action, true);
144                    LOG.debug(
145                            "Killed coord action = [{0}], new status = [{1}], pending = [{2}] and queue KillXCommand for [{3}]",
146                            action.getId(), action.getStatus(), action.getPending(), action.getExternalId());
147                }
148                else {
149                    // As killing children is not required, set pending flag for coord action to be false
150                    updateCoordAction(action, false);
151                    LOG.debug("Killed coord action = [{0}], current status = [{1}], pending = [{2}]",
152                            action.getId(), action.getStatus(), action.getPending());
153                }
154                String pushMissingDeps = action.getPushMissingDependencies();
155                if (pushMissingDeps != null) {
156                    CoordPushDependencyCheckXCommand.unregisterMissingDependencies(
157                            Arrays.asList(DependencyChecker.dependenciesAsArray(pushMissingDeps)), action.getId());
158                }
159            }
160        }
161        coordJob.setDoneMaterialization();
162        coordJob.setLastModifiedTime(new Date());
163        LOG.debug("Killed coord actions for the coordinator=[{0}]", jobId);
164    }
165
166    @Override
167    public void notifyParent() throws CommandException {
168        // update bundle action
169        if (coordJob.getBundleId() != null) {
170            BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
171            bundleStatusUpdate.call();
172        }
173    }
174
175    @Override
176    public void updateJob() throws CommandException {
177        updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_TIME, coordJob));
178    }
179
180    @Override
181    public void performWrites() throws CommandException {
182        try {
183            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
184        }
185        catch (JPAExecutorException e) {
186            throw new CommandException(e);
187        }
188    }
189
190    @Override
191    public Job getJob() {
192        return coordJob;
193    }
194
195}