001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import org.apache.oozie.client.CoordinatorJob;
021    import org.apache.oozie.CoordinatorActionBean;
022    import org.apache.oozie.CoordinatorJobBean;
023    import org.apache.oozie.XException;
024    import org.apache.oozie.command.wf.KillCommand;
025    import org.apache.oozie.command.CommandException;
026    import org.apache.oozie.store.CoordinatorStore;
027    import org.apache.oozie.store.StoreException;
028    import org.apache.oozie.util.ParamChecker;
029    import org.apache.oozie.util.XLog;
030    
031    import java.util.Date;
032    import java.util.List;
033    
034    public class CoordKillCommand extends CoordinatorCommand<Void> {
035    
036        private String jobId;
037        private final XLog log = XLog.getLog(getClass());
038    
039        public CoordKillCommand(String id) {
040            super("coord_kill", "coord_kill", 1, XLog.STD);
041            this.jobId = ParamChecker.notEmpty(id, "id");
042        }
043    
044        protected Void call(CoordinatorStore store) throws StoreException, CommandException {
045            try {
046                // CoordinatorJobBean coordJob = store.getCoordinatorJob(jobId,
047                // false);
048                CoordinatorJobBean coordJob = store.getEntityManager().find(CoordinatorJobBean.class, jobId);
049                setLogInfo(coordJob);
050                if (coordJob.getStatus() != CoordinatorJob.Status.SUCCEEDED
051                        || coordJob.getStatus() != CoordinatorJob.Status.FAILED) {
052                    coordJob.setEndTime(new Date());
053                    incrJobCounter(1);
054                    coordJob.setStatus(CoordinatorJob.Status.KILLED);
055                    List<CoordinatorActionBean> actionList = store.getActionsForCoordinatorJob(jobId, false);
056                    for (CoordinatorActionBean action : actionList) {
057                        if (action.getStatus() != CoordinatorActionBean.Status.FAILED
058                                && action.getStatus() != CoordinatorActionBean.Status.TIMEDOUT
059                                && action.getStatus() != CoordinatorActionBean.Status.SUCCEEDED
060                                && action.getStatus() != CoordinatorActionBean.Status.KILLED) {
061                            // queue a KillCommand to delete the workflow job
062                            if (action.getExternalId() != null) {
063                                queueCallable(new KillCommand(action.getExternalId()));
064                            }
065                            action.setStatus(CoordinatorActionBean.Status.KILLED);
066                            store.updateCoordinatorAction(action);
067                        }
068                    }
069                    store.updateCoordinatorJob(coordJob);
070                    // TODO queueCallable(new NotificationCommand(coordJob));
071                }
072                else {
073                    log.info("CoordKillCommand not killed - job either " + "finished successfully or does not exist "
074                            + jobId);
075                }
076                return null;
077            }
078            catch (XException ex) {
079                throw new CommandException(ex);
080            }
081        }
082    
083        @Override
084        protected Void execute(CoordinatorStore store) throws StoreException, CommandException {
085            log.info("STARTED CoordKillCommand for jobId=" + jobId);
086            try {
087                if (lock(jobId)) {
088                    call(store);
089                }
090                else {
091                    queueCallable(new CoordKillCommand(jobId), LOCK_FAILURE_REQUEUE_INTERVAL);
092                    log.warn("CoordKillCommand lock was not acquired - " + " failed " + jobId + ". Requeing the same.");
093                }
094            }
095            catch (InterruptedException e) {
096                queueCallable(new CoordKillCommand(jobId), LOCK_FAILURE_REQUEUE_INTERVAL);
097                log.warn("CoordKillCommand lock acquiring failed " + " with exception " + e.getMessage() + " for job id "
098                        + jobId + ". Requeing the same.");
099            }
100            finally {
101                log.info("ENDED CoordKillCommand for jobId=" + jobId);
102            }
103            return null;
104        }
105    
106    }