001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.coord;
020
021import java.util.Date;
022import java.util.List;
023import org.apache.oozie.CoordinatorActionBean;
024import org.apache.oozie.CoordinatorActionInfo;
025import org.apache.oozie.CoordinatorJobBean;
026import org.apache.oozie.ErrorCode;
027import org.apache.oozie.XException;
028import org.apache.oozie.client.CoordinatorAction;
029import org.apache.oozie.client.CoordinatorJob;
030import org.apache.oozie.client.rest.RestConstants;
031import org.apache.oozie.command.CommandException;
032import org.apache.oozie.command.KillTransitionXCommand;
033import org.apache.oozie.command.PreconditionException;
034import org.apache.oozie.command.wf.KillXCommand;
035import org.apache.oozie.coord.CoordUtils;
036import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
037import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
038import org.apache.oozie.executor.jpa.BatchQueryExecutor;
039import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
040import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
041import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
042import org.apache.oozie.executor.jpa.JPAExecutorException;
043import org.apache.oozie.service.EventHandlerService;
044import org.apache.oozie.service.JPAService;
045import org.apache.oozie.service.Services;
046import org.apache.oozie.util.InstrumentUtils;
047import org.apache.oozie.util.LogUtils;
048import org.apache.oozie.util.ParamChecker;
049
050/**
051 * Kill coordinator actions by a range of dates (nominal time) or action number.
052 * <p>
053 * The "range" can be set with {@link RestConstants#JOB_COORD_SCOPE_DATE} or
054 * {@link RestConstants#JOB_COORD_SCOPE_ACTION}.
055 * <p>
056 */
057public class CoordActionsKillXCommand extends KillTransitionXCommand<CoordinatorActionInfo> {
058
059    private String jobId;
060    private CoordinatorJobBean coordJob;
061    List<CoordinatorActionBean> coordActions;
062    private String rangeType;
063    private String scope;
064    private JPAService jpaService = null;
065
066    public CoordActionsKillXCommand(String id, String rangeType, String scope) {
067        super("coord_action_kill", "coord_action_kill", 2);
068        this.jobId = id;
069        this.rangeType = ParamChecker.notEmpty(rangeType, "rangeType");
070        this.scope = ParamChecker.notEmpty(scope, "scope");
071    }
072
073    @Override
074    protected boolean isLockRequired() {
075        return true;
076    }
077
078    @Override
079    public String getEntityKey() {
080        return this.jobId;
081    }
082
083    @Override
084    protected void loadState() throws CommandException {
085        try {
086            jpaService = Services.get().get(JPAService.class);
087            if (jpaService != null) {
088                coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_ACTION_KILL, jobId);
089                LogUtils.setLogInfo(coordJob);
090                coordActions = CoordUtils.getCoordActions(rangeType, coordJob.getId(), scope, true);
091            }
092            else {
093                throw new CommandException(ErrorCode.E0610);
094            }
095        }
096        catch (XException ex) {
097            throw new CommandException(ex);
098        }
099    }
100
101    @Override
102    protected void verifyPrecondition() throws CommandException, PreconditionException {
103        if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED
104                || coordJob.getStatus() == CoordinatorJob.Status.FAILED
105                || coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR
106                || coordJob.getStatus() == CoordinatorJob.Status.KILLED) {
107            LOG.info("Coord actions not killed - job either finished SUCCEEDED, FAILED, KILLED or DONEWITHERROR, job id = "
108                    + jobId + ", status = " + coordJob.getStatus());
109            throw new PreconditionException(ErrorCode.E1020, jobId);
110        }
111    }
112
113    @Override
114    public void transitToNext() {
115    }
116
117    @Override
118    public void killChildren() throws CommandException {
119        InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
120        for (CoordinatorActionBean coordAction : coordActions) {
121            coordAction.setStatus(CoordinatorAction.Status.KILLED);
122            coordAction.setLastModifiedTime(new Date());
123            // kill Workflow job associated with this Coord action
124            if (coordAction.getExternalId() != null) {
125                queue(new KillXCommand(coordAction.getExternalId()));
126                coordAction.incrementAndGetPending();
127            }
128            else {
129                coordAction.setPending(0);
130            }
131            updateList.add(new UpdateEntry(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, coordAction));
132            if (EventHandlerService.isEnabled()) {
133                CoordinatorXCommand.generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(),
134                        coordAction.getCreatedTime());
135            }
136            queue(new CoordActionNotificationXCommand(coordAction), 100);
137        }
138        CoordinatorActionInfo coordInfo = new CoordinatorActionInfo(coordActions);
139        ret = coordInfo;
140    }
141
142    /*
143     * (non-Javadoc)
144     *
145     * @see org.apache.oozie.command.KillTransitionXCommand#performWrites()
146     */
147    @Override
148    public void performWrites() throws CommandException {
149        try {
150            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
151        }
152        catch (JPAExecutorException e) {
153            throw new CommandException(e);
154        }
155    }
156
157    @Override
158    public void updateJob() throws CommandException {
159        coordJob.setPending();
160        updateList.add(new UpdateEntry(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING, coordJob));
161    }
162
163    @Override
164    public void notifyParent() throws CommandException {
165    }
166
167}