This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.bundle;
019
020 import java.util.Date;
021 import java.util.List;
022
023 import org.apache.oozie.BundleActionBean;
024 import org.apache.oozie.BundleJobBean;
025 import org.apache.oozie.ErrorCode;
026 import org.apache.oozie.XException;
027 import org.apache.oozie.client.CoordinatorJob;
028 import org.apache.oozie.client.Job;
029 import org.apache.oozie.command.CommandException;
030 import org.apache.oozie.command.KillTransitionXCommand;
031 import org.apache.oozie.command.PreconditionException;
032 import org.apache.oozie.command.coord.CoordKillXCommand;
033 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
034 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
035 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
036 import org.apache.oozie.executor.jpa.JPAExecutorException;
037 import org.apache.oozie.service.JPAService;
038 import org.apache.oozie.service.Services;
039 import org.apache.oozie.util.LogUtils;
040 import org.apache.oozie.util.ParamChecker;
041
042 public class BundleKillXCommand extends KillTransitionXCommand {
043 private final String jobId;
044 private BundleJobBean bundleJob;
045 private List<BundleActionBean> bundleActions;
046 private JPAService jpaService = null;
047
048 public BundleKillXCommand(String jobId) {
049 super("bundle_kill", "bundle_kill", 1);
050 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
051 }
052
053 /* (non-Javadoc)
054 * @see org.apache.oozie.command.XCommand#getEntityKey()
055 */
056 @Override
057 public String getEntityKey() {
058 return jobId;
059 }
060
061 /* (non-Javadoc)
062 * @see org.apache.oozie.command.XCommand#isLockRequired()
063 */
064 @Override
065 protected boolean isLockRequired() {
066 return true;
067 }
068
069 /* (non-Javadoc)
070 * @see org.apache.oozie.command.XCommand#loadState()
071 */
072 @Override
073 public void loadState() throws CommandException {
074 try {
075 jpaService = Services.get().get(JPAService.class);
076
077 if (jpaService != null) {
078 this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
079 this.bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(jobId));
080 LogUtils.setLogInfo(bundleJob, logInfo);
081 super.setJob(bundleJob);
082
083 }
084 else {
085 throw new CommandException(ErrorCode.E0610);
086 }
087 }
088 catch (XException ex) {
089 throw new CommandException(ex);
090 }
091 }
092
093 /* (non-Javadoc)
094 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
095 */
096 @Override
097 protected void verifyPrecondition() throws CommandException, PreconditionException {
098 if (bundleJob.getStatus() == Job.Status.SUCCEEDED
099 || bundleJob.getStatus() == Job.Status.FAILED
100 || bundleJob.getStatus() == Job.Status.DONEWITHERROR
101 || bundleJob.getStatus() == Job.Status.KILLED) {
102 LOG.info("Bundle job cannot be killed - job already SUCCEEDED, FAILED, KILLED or DONEWITHERROR, job id = "
103 + jobId + ", status = " + bundleJob.getStatus());
104 throw new PreconditionException(ErrorCode.E1020, jobId);
105 }
106 }
107
108 /* (non-Javadoc)
109 * @see org.apache.oozie.command.KillTransitionXCommand#killChildren()
110 */
111 @Override
112 public void killChildren() throws CommandException {
113 if (bundleActions != null) {
114 for (BundleActionBean action : bundleActions) {
115 if (action.getCoordId() != null) {
116 queue(new CoordKillXCommand(action.getCoordId()));
117 updateBundleAction(action);
118 LOG.debug("Killed bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordKillXCommand for [{3}]",
119 action.getBundleActionId(), action.getStatus(), action.getPending(), action.getCoordId());
120 } else {
121 updateBundleAction(action);
122 LOG.debug("Killed bundle action = [{0}], current status = [{1}], pending = [{2}]", action.getBundleActionId(), action
123 .getStatus(), action.getPending());
124 }
125
126 }
127 }
128 LOG.debug("Killed coord jobs for the bundle=[{0}]", jobId);
129 }
130
131 /**
132 * Update bundle action
133 *
134 * @param action
135 * @throws CommandException
136 */
137 private void updateBundleAction(BundleActionBean action) {
138 action.setLastModifiedTime(new Date());
139 if (!action.isTerminalStatus()) {
140 action.incrementAndGetPending();
141 action.setStatus(Job.Status.KILLED);
142 }
143 updateList.add(action);
144 }
145
146 /* (non-Javadoc)
147 * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
148 */
149 @Override
150 public void notifyParent() {
151 }
152
153 /* (non-Javadoc)
154 * @see org.apache.oozie.command.TransitionXCommand#getJob()
155 */
156 @Override
157 public Job getJob() {
158 return bundleJob;
159 }
160
161 /* (non-Javadoc)
162 * @see org.apache.oozie.command.TransitionXCommand#updateJob()
163 */
164 @Override
165 public void updateJob() {
166 updateList.add(bundleJob);
167 }
168
169 /* (non-Javadoc)
170 * @see org.apache.oozie.command.KillTransitionXCommand#performWrites()
171 */
172 @Override
173 public void performWrites() throws CommandException {
174 try {
175 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
176 }
177 catch (JPAExecutorException e) {
178 throw new CommandException(e);
179 }
180 }
181
182 }