001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.bundle;
019
020import java.util.Date;
021import java.util.List;
022
023import org.apache.oozie.BundleActionBean;
024import org.apache.oozie.BundleJobBean;
025import org.apache.oozie.CoordinatorJobBean;
026import org.apache.oozie.ErrorCode;
027import org.apache.oozie.XException;
028import org.apache.oozie.client.Job;
029import org.apache.oozie.command.CommandException;
030import org.apache.oozie.command.KillTransitionXCommand;
031import org.apache.oozie.command.PreconditionException;
032import org.apache.oozie.command.coord.CoordKillXCommand;
033import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
034import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
035import org.apache.oozie.executor.jpa.BatchQueryExecutor;
036import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
037import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
038import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
039import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
040import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
041import org.apache.oozie.executor.jpa.JPAExecutorException;
042import org.apache.oozie.util.LogUtils;
043import org.apache.oozie.util.ParamChecker;
044
045public class BundleKillXCommand extends KillTransitionXCommand {
046    private final String jobId;
047    private BundleJobBean bundleJob;
048    private List<BundleActionBean> bundleActions;
049
050    public BundleKillXCommand(String jobId) {
051        super("bundle_kill", "bundle_kill", 1);
052        this.jobId = ParamChecker.notEmpty(jobId, "jobId");
053    }
054
055    @Override
056    public String getEntityKey() {
057        return jobId;
058    }
059
060    @Override
061    public String getKey() {
062        return getName() + "_" + jobId;
063    }
064
065    @Override
066    protected boolean isLockRequired() {
067        return true;
068    }
069
070    @Override
071    public void loadState() throws CommandException {
072        try {
073            this.bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB, jobId);
074            this.bundleActions = BundleActionQueryExecutor.getInstance().getList(
075                    BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, jobId);
076            LogUtils.setLogInfo(bundleJob, logInfo);
077            super.setJob(bundleJob);
078
079        }
080        catch (XException ex) {
081            throw new CommandException(ex);
082        }
083    }
084
085    @Override
086    protected void verifyPrecondition() throws CommandException, PreconditionException {
087        if (bundleJob.getStatus() == Job.Status.SUCCEEDED
088                || bundleJob.getStatus() == Job.Status.FAILED
089                || bundleJob.getStatus() == Job.Status.DONEWITHERROR
090                || bundleJob.getStatus() == Job.Status.KILLED) {
091            LOG.info("Bundle job cannot be killed - job already SUCCEEDED, FAILED, KILLED or DONEWITHERROR, job id = "
092                    + jobId + ", status = " + bundleJob.getStatus());
093            throw new PreconditionException(ErrorCode.E1020, jobId);
094        }
095    }
096
097    @Override
098    public void killChildren() throws CommandException {
099        if (bundleActions != null) {
100            for (BundleActionBean action : bundleActions) {
101                if (action.getCoordId() != null) {
102                    queue(new CoordKillXCommand(action.getCoordId()));
103                    updateBundleAction(action);
104                    LOG.debug("Killed bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordKillXCommand for [{3}]",
105                            action.getBundleActionId(), action.getStatus(), action.getPending(), action.getCoordId());
106                } else {
107                    updateBundleAction(action);
108                    LOG.debug("Killed bundle action = [{0}], current status = [{1}], pending = [{2}]", action.getBundleActionId(), action
109                            .getStatus(), action.getPending());
110                }
111
112            }
113        }
114        LOG.debug("Killed coord jobs for the bundle=[{0}]", jobId);
115    }
116
117    /**
118     * Update bundle action
119     *
120     * @param action
121     * @throws CommandException
122     */
123    private void updateBundleAction(BundleActionBean action) {
124        action.setLastModifiedTime(new Date());
125        if (!action.isTerminalStatus()) {
126            action.incrementAndGetPending();
127            action.setStatus(Job.Status.KILLED);
128        }
129        else {
130            // Due to race condition bundle action pending might be true
131            // while coordinator is killed.
132            if (action.isPending()) {
133                if (action.getCoordId() == null) {
134                    action.setPending(0);
135                }
136                else {
137                    try {
138                        CoordinatorJobBean coordJob = CoordJobQueryExecutor.getInstance().get(
139                                CoordJobQuery.GET_COORD_JOB, action.getCoordId());
140                        if (!coordJob.isPending() && coordJob.isTerminalStatus()) {
141                            action.setPending(0);
142                            action.setStatus(coordJob.getStatus());
143                        }
144                    }
145                    catch (JPAExecutorException e) {
146                        LOG.warn("Error in checking coord job status:" + action.getCoordId(), e);
147                    }
148                }
149            }
150        }
151        updateList.add(new UpdateEntry<BundleActionQuery>(BundleActionQuery.UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME, action));
152    }
153
154    @Override
155    public void notifyParent() {
156    }
157
158    @Override
159    public Job getJob() {
160        return bundleJob;
161    }
162
163    @Override
164    public void updateJob() {
165        updateList.add(new UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME, bundleJob));
166    }
167
168    @Override
169    public void performWrites() throws CommandException {
170        try {
171            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
172        }
173        catch (JPAExecutorException e) {
174            throw new CommandException(e);
175        }
176    }
177
178}