001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.coord;
019
020import java.util.Date;
021import java.util.List;
022import org.apache.oozie.CoordinatorActionBean;
023import org.apache.oozie.CoordinatorActionInfo;
024import org.apache.oozie.CoordinatorJobBean;
025import org.apache.oozie.ErrorCode;
026import org.apache.oozie.XException;
027import org.apache.oozie.client.CoordinatorAction;
028import org.apache.oozie.client.CoordinatorJob;
029import org.apache.oozie.client.rest.RestConstants;
030import org.apache.oozie.command.CommandException;
031import org.apache.oozie.command.KillTransitionXCommand;
032import org.apache.oozie.command.PreconditionException;
033import org.apache.oozie.command.wf.KillXCommand;
034import org.apache.oozie.coord.CoordUtils;
035import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
036import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
037import org.apache.oozie.executor.jpa.BatchQueryExecutor;
038import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
039import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
040import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
041import org.apache.oozie.executor.jpa.JPAExecutorException;
042import org.apache.oozie.service.EventHandlerService;
043import org.apache.oozie.service.JPAService;
044import org.apache.oozie.service.Services;
045import org.apache.oozie.util.InstrumentUtils;
046import org.apache.oozie.util.LogUtils;
047import org.apache.oozie.util.ParamChecker;
048
049/**
050 * Kill coordinator actions by a range of dates (nominal time) or action number.
051 * <p/>
052 * The "range" can be set with {@link RestConstants.JOB_COORD_SCOPE_DATE} or
053 * {@link RestConstants.JOB_COORD_SCOPE_ACTION}.
054 * <p/>
055 */
056public class CoordActionsKillXCommand extends KillTransitionXCommand<CoordinatorActionInfo> {
057
058    private String jobId;
059    private CoordinatorJobBean coordJob;
060    List<CoordinatorActionBean> coordActions;
061    private String rangeType;
062    private String scope;
063    private JPAService jpaService = null;
064
065    public CoordActionsKillXCommand(String id, String rangeType, String scope) {
066        super("coord_action_kill", "coord_action_kill", 2);
067        this.jobId = id;
068        this.rangeType = ParamChecker.notEmpty(rangeType, "rangeType");
069        this.scope = ParamChecker.notEmpty(scope, "scope");
070    }
071
072    @Override
073    protected boolean isLockRequired() {
074        return true;
075    }
076
077    @Override
078    public String getEntityKey() {
079        return this.jobId;
080    }
081
082    @Override
083    protected void loadState() throws CommandException {
084        try {
085            jpaService = Services.get().get(JPAService.class);
086            if (jpaService != null) {
087                coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_ACTION_KILL, jobId);
088                LogUtils.setLogInfo(coordJob, logInfo);
089                coordActions = CoordUtils.getCoordActions(rangeType, coordJob.getId(), scope, true);
090            }
091            else {
092                throw new CommandException(ErrorCode.E0610);
093            }
094        }
095        catch (XException ex) {
096            throw new CommandException(ex);
097        }
098    }
099
100    @Override
101    protected void verifyPrecondition() throws CommandException, PreconditionException {
102        if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED
103                || coordJob.getStatus() == CoordinatorJob.Status.FAILED
104                || coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR
105                || coordJob.getStatus() == CoordinatorJob.Status.KILLED) {
106            LOG.info("Coord actions not killed - job either finished SUCCEEDED, FAILED, KILLED or DONEWITHERROR, job id = "
107                    + jobId + ", status = " + coordJob.getStatus());
108            throw new PreconditionException(ErrorCode.E1020, jobId);
109        }
110    }
111
112    @Override
113    public void transitToNext() {
114    }
115
116    @Override
117    public void killChildren() throws CommandException {
118        InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
119        for (CoordinatorActionBean coordAction : coordActions) {
120            coordAction.setStatus(CoordinatorAction.Status.KILLED);
121            coordAction.setLastModifiedTime(new Date());
122            // kill Workflow job associated with this Coord action
123            if (coordAction.getExternalId() != null) {
124                queue(new KillXCommand(coordAction.getExternalId()));
125                coordAction.incrementAndGetPending();
126            }
127            else {
128                coordAction.setPending(0);
129            }
130            updateList.add(new UpdateEntry(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, coordAction));
131            if (EventHandlerService.isEnabled()) {
132                CoordinatorXCommand.generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(),
133                        coordAction.getCreatedTime());
134            }
135            queue(new CoordActionNotificationXCommand(coordAction), 100);
136        }
137        CoordinatorActionInfo coordInfo = new CoordinatorActionInfo(coordActions);
138        ret = coordInfo;
139    }
140
141    /*
142     * (non-Javadoc)
143     *
144     * @see org.apache.oozie.command.KillTransitionXCommand#performWrites()
145     */
146    @Override
147    public void performWrites() throws CommandException {
148        try {
149            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
150        }
151        catch (JPAExecutorException e) {
152            throw new CommandException(e);
153        }
154    }
155
156    @Override
157    public void updateJob() throws CommandException {
158        coordJob.setPending();
159        updateList.add(new UpdateEntry(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING, coordJob));
160    }
161
162    @Override
163    public void notifyParent() throws CommandException {
164    }
165
166}