001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.bundle;
020
021import java.util.Date;
022import java.util.List;
023
024import org.apache.oozie.BundleActionBean;
025import org.apache.oozie.BundleJobBean;
026import org.apache.oozie.CoordinatorJobBean;
027import org.apache.oozie.ErrorCode;
028import org.apache.oozie.XException;
029import org.apache.oozie.client.Job;
030import org.apache.oozie.command.CommandException;
031import org.apache.oozie.command.KillTransitionXCommand;
032import org.apache.oozie.command.PreconditionException;
033import org.apache.oozie.command.coord.CoordKillXCommand;
034import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
035import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
036import org.apache.oozie.executor.jpa.BatchQueryExecutor;
037import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
038import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
039import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
040import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
041import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
042import org.apache.oozie.executor.jpa.JPAExecutorException;
043import org.apache.oozie.util.LogUtils;
044import org.apache.oozie.util.ParamChecker;
045
046public class BundleKillXCommand extends KillTransitionXCommand {
047    private final String jobId;
048    private BundleJobBean bundleJob;
049    private List<BundleActionBean> bundleActions;
050
051    public BundleKillXCommand(String jobId) {
052        super("bundle_kill", "bundle_kill", 1);
053        this.jobId = ParamChecker.notEmpty(jobId, "jobId");
054    }
055
056    @Override
057    public String getEntityKey() {
058        return jobId;
059    }
060
061    @Override
062    public String getKey() {
063        return getName() + "_" + jobId;
064    }
065
066    @Override
067    protected boolean isLockRequired() {
068        return true;
069    }
070
071    @Override
072    public void loadState() throws CommandException {
073        try {
074            this.bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB, jobId);
075            this.bundleActions = BundleActionQueryExecutor.getInstance().getList(
076                    BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, jobId);
077            LogUtils.setLogInfo(bundleJob);
078            super.setJob(bundleJob);
079
080        }
081        catch (XException ex) {
082            throw new CommandException(ex);
083        }
084    }
085
086    @Override
087    protected void verifyPrecondition() throws CommandException, PreconditionException {
088        if (bundleJob.getStatus() == Job.Status.SUCCEEDED
089                || bundleJob.getStatus() == Job.Status.FAILED
090                || bundleJob.getStatus() == Job.Status.DONEWITHERROR
091                || bundleJob.getStatus() == Job.Status.KILLED) {
092            LOG.info("Bundle job cannot be killed - job already SUCCEEDED, FAILED, KILLED or DONEWITHERROR, job id = "
093                    + jobId + ", status = " + bundleJob.getStatus());
094            throw new PreconditionException(ErrorCode.E1323, jobId);
095        }
096    }
097
098    @Override
099    public void killChildren() throws CommandException {
100        if (bundleActions != null) {
101            for (BundleActionBean action : bundleActions) {
102                if (action.getCoordId() != null) {
103                    queue(new CoordKillXCommand(action.getCoordId()));
104                    updateBundleAction(action);
105                    LOG.debug("Killed bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordKillXCommand for [{3}]",
106                            action.getBundleActionId(), action.getStatus(), action.getPending(), action.getCoordId());
107                } else {
108                    updateBundleAction(action);
109                    LOG.debug("Killed bundle action = [{0}], current status = [{1}], pending = [{2}]", action.getBundleActionId(), action
110                            .getStatus(), action.getPending());
111                }
112
113            }
114        }
115        LOG.debug("Killed coord jobs for the bundle=[{0}]", jobId);
116    }
117
118    /**
119     * Update bundle action
120     *
121     * @param action
122     * @throws CommandException
123     */
124    private void updateBundleAction(BundleActionBean action) {
125        action.setLastModifiedTime(new Date());
126        if (!action.isTerminalStatus()) {
127            action.incrementAndGetPending();
128            action.setStatus(Job.Status.KILLED);
129        }
130        else {
131            // Due to race condition bundle action pending might be true
132            // while coordinator is killed.
133            if (action.isPending()) {
134                if (action.getCoordId() == null) {
135                    action.setPending(0);
136                }
137                else {
138                    try {
139                        CoordinatorJobBean coordJob = CoordJobQueryExecutor.getInstance().get(
140                                CoordJobQuery.GET_COORD_JOB, action.getCoordId());
141                        if (!coordJob.isPending() && coordJob.isTerminalStatus()) {
142                            action.setPending(0);
143                            action.setStatus(coordJob.getStatus());
144                        }
145                    }
146                    catch (JPAExecutorException e) {
147                        LOG.warn("Error in checking coord job status:" + action.getCoordId(), e);
148                    }
149                }
150            }
151        }
152        updateList.add(new UpdateEntry<BundleActionQuery>(BundleActionQuery.UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME, action));
153    }
154
155    @Override
156    public void notifyParent() {
157    }
158
159    @Override
160    public Job getJob() {
161        return bundleJob;
162    }
163
164    @Override
165    public void updateJob() {
166        updateList.add(new UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME, bundleJob));
167    }
168
169    @Override
170    public void performWrites() throws CommandException {
171        try {
172            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
173        }
174        catch (JPAExecutorException e) {
175            throw new CommandException(e);
176        }
177    }
178
179}