001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.bundle;
019    
020    import java.util.Date;
021    import java.util.List;
022    
023    import org.apache.oozie.BundleActionBean;
024    import org.apache.oozie.BundleJobBean;
025    import org.apache.oozie.ErrorCode;
026    import org.apache.oozie.XException;
027    import org.apache.oozie.client.Job;
028    import org.apache.oozie.command.CommandException;
029    import org.apache.oozie.command.KillTransitionXCommand;
030    import org.apache.oozie.command.PreconditionException;
031    import org.apache.oozie.command.coord.CoordKillXCommand;
032    import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
033    import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
034    import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
035    import org.apache.oozie.executor.jpa.JPAExecutorException;
036    import org.apache.oozie.service.JPAService;
037    import org.apache.oozie.service.Services;
038    import org.apache.oozie.util.LogUtils;
039    import org.apache.oozie.util.ParamChecker;
040    
041    public class BundleKillXCommand extends KillTransitionXCommand {
042        private final String jobId;
043        private BundleJobBean bundleJob;
044        private List<BundleActionBean> bundleActions;
045        private JPAService jpaService = null;
046    
047        public BundleKillXCommand(String jobId) {
048            super("bundle_kill", "bundle_kill", 1);
049            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
050        }
051    
052        /* (non-Javadoc)
053         * @see org.apache.oozie.command.XCommand#getEntityKey()
054         */
055        @Override
056        public String getEntityKey() {
057            return jobId;
058        }
059    
060        @Override
061        public String getKey() {
062            return getName() + "_" + jobId;
063        }
064    
065        /* (non-Javadoc)
066         * @see org.apache.oozie.command.XCommand#isLockRequired()
067         */
068        @Override
069        protected boolean isLockRequired() {
070            return true;
071        }
072    
073        /* (non-Javadoc)
074         * @see org.apache.oozie.command.XCommand#loadState()
075         */
076        @Override
077        public void loadState() throws CommandException {
078            try {
079                jpaService = Services.get().get(JPAService.class);
080    
081                if (jpaService != null) {
082                    this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
083                    this.bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(jobId));
084                    LogUtils.setLogInfo(bundleJob, logInfo);
085                    super.setJob(bundleJob);
086    
087                }
088                else {
089                    throw new CommandException(ErrorCode.E0610);
090                }
091            }
092            catch (XException ex) {
093                throw new CommandException(ex);
094            }
095        }
096    
097        /* (non-Javadoc)
098         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
099         */
100        @Override
101        protected void verifyPrecondition() throws CommandException, PreconditionException {
102            if (bundleJob.getStatus() == Job.Status.SUCCEEDED
103                    || bundleJob.getStatus() == Job.Status.FAILED
104                    || bundleJob.getStatus() == Job.Status.DONEWITHERROR
105                    || bundleJob.getStatus() == Job.Status.KILLED) {
106                LOG.info("Bundle job cannot be killed - job already SUCCEEDED, FAILED, KILLED or DONEWITHERROR, job id = "
107                        + jobId + ", status = " + bundleJob.getStatus());
108                throw new PreconditionException(ErrorCode.E1020, jobId);
109            }
110        }
111    
112        /* (non-Javadoc)
113         * @see org.apache.oozie.command.KillTransitionXCommand#killChildren()
114         */
115        @Override
116        public void killChildren() throws CommandException {
117            if (bundleActions != null) {
118                for (BundleActionBean action : bundleActions) {
119                    if (action.getCoordId() != null) {
120                        queue(new CoordKillXCommand(action.getCoordId()));
121                        updateBundleAction(action);
122                        LOG.debug("Killed bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordKillXCommand for [{3}]",
123                                action.getBundleActionId(), action.getStatus(), action.getPending(), action.getCoordId());
124                    } else {
125                        updateBundleAction(action);
126                        LOG.debug("Killed bundle action = [{0}], current status = [{1}], pending = [{2}]", action.getBundleActionId(), action
127                                .getStatus(), action.getPending());
128                    }
129    
130                }
131            }
132            LOG.debug("Killed coord jobs for the bundle=[{0}]", jobId);
133        }
134    
135        /**
136         * Update bundle action
137         *
138         * @param action
139         * @throws CommandException
140         */
141        private void updateBundleAction(BundleActionBean action) {
142            action.setLastModifiedTime(new Date());
143            if (!action.isTerminalStatus()) {
144                action.incrementAndGetPending();
145                action.setStatus(Job.Status.KILLED);
146            }
147            updateList.add(action);
148        }
149    
150        /* (non-Javadoc)
151         * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
152         */
153        @Override
154        public void notifyParent() {
155        }
156    
157        /* (non-Javadoc)
158         * @see org.apache.oozie.command.TransitionXCommand#getJob()
159         */
160        @Override
161        public Job getJob() {
162            return bundleJob;
163        }
164    
165        /* (non-Javadoc)
166         * @see org.apache.oozie.command.TransitionXCommand#updateJob()
167         */
168        @Override
169        public void updateJob() {
170            updateList.add(bundleJob);
171        }
172    
173        /* (non-Javadoc)
174         * @see org.apache.oozie.command.KillTransitionXCommand#performWrites()
175         */
176        @Override
177        public void performWrites() throws CommandException {
178            try {
179                jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
180            }
181            catch (JPAExecutorException e) {
182                throw new CommandException(e);
183            }
184        }
185    
186    }