This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.bundle;
019
020 import java.util.Date;
021 import java.util.List;
022
023 import org.apache.oozie.BundleActionBean;
024 import org.apache.oozie.BundleJobBean;
025 import org.apache.oozie.ErrorCode;
026 import org.apache.oozie.XException;
027 import org.apache.oozie.client.CoordinatorJob;
028 import org.apache.oozie.client.Job;
029 import org.apache.oozie.command.CommandException;
030 import org.apache.oozie.command.KillTransitionXCommand;
031 import org.apache.oozie.command.PreconditionException;
032 import org.apache.oozie.command.coord.CoordKillXCommand;
033 import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
034 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
035 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
036 import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
037 import org.apache.oozie.executor.jpa.JPAExecutorException;
038 import org.apache.oozie.service.JPAService;
039 import org.apache.oozie.service.Services;
040 import org.apache.oozie.util.LogUtils;
041 import org.apache.oozie.util.ParamChecker;
042
043 public class BundleKillXCommand extends KillTransitionXCommand {
044 private final String jobId;
045 private BundleJobBean bundleJob;
046 private List<BundleActionBean> bundleActions;
047 private JPAService jpaService = null;
048
049 public BundleKillXCommand(String jobId) {
050 super("bundle_kill", "bundle_kill", 1);
051 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
052 }
053
054 /* (non-Javadoc)
055 * @see org.apache.oozie.command.XCommand#getEntityKey()
056 */
057 @Override
058 protected String getEntityKey() {
059 return jobId;
060 }
061
062 /* (non-Javadoc)
063 * @see org.apache.oozie.command.XCommand#isLockRequired()
064 */
065 @Override
066 protected boolean isLockRequired() {
067 return true;
068 }
069
070 /* (non-Javadoc)
071 * @see org.apache.oozie.command.XCommand#loadState()
072 */
073 @Override
074 public void loadState() throws CommandException {
075 try {
076 jpaService = Services.get().get(JPAService.class);
077
078 if (jpaService != null) {
079 this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
080 this.bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(jobId));
081 LogUtils.setLogInfo(bundleJob, logInfo);
082 super.setJob(bundleJob);
083
084 }
085 else {
086 throw new CommandException(ErrorCode.E0610);
087 }
088 }
089 catch (XException ex) {
090 throw new CommandException(ex);
091 }
092 }
093
094 /* (non-Javadoc)
095 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
096 */
097 @Override
098 protected void verifyPrecondition() throws CommandException, PreconditionException {
099 if (bundleJob.getStatus() == CoordinatorJob.Status.SUCCEEDED
100 || bundleJob.getStatus() == CoordinatorJob.Status.FAILED
101 || bundleJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR) {
102 LOG.info("BundleKillXCommand not killed - job either finished SUCCEEDED, FAILED or DONEWITHERROR, job id = "
103 + jobId + ", status = " + bundleJob.getStatus());
104 throw new PreconditionException(ErrorCode.E1020, jobId);
105 }
106 }
107
108 /* (non-Javadoc)
109 * @see org.apache.oozie.command.KillTransitionXCommand#killChildren()
110 */
111 @Override
112 public void killChildren() throws CommandException {
113 if (bundleActions != null) {
114 for (BundleActionBean action : bundleActions) {
115 if (action.getCoordId() != null) {
116 queue(new CoordKillXCommand(action.getCoordId()));
117 updateBundleAction(action);
118 LOG.debug("Killed bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordKillXCommand for [{3}]",
119 action.getBundleActionId(), action.getStatus(), action.getPending(), action.getCoordId());
120 } else {
121 updateBundleAction(action);
122 LOG.debug("Killed bundle action = [{0}], current status = [{1}], pending = [{2}]", action.getBundleActionId(), action
123 .getStatus(), action.getPending());
124 }
125
126 }
127 }
128 LOG.debug("Killed coord jobs for the bundle=[{0}]", jobId);
129 }
130
131 /**
132 * Update bundle action
133 *
134 * @param action
135 * @throws CommandException
136 */
137 private void updateBundleAction(BundleActionBean action) throws CommandException {
138 action.incrementAndGetPending();
139 action.setLastModifiedTime(new Date());
140 action.setStatus(Job.Status.KILLED);
141 try {
142 jpaService.execute(new BundleActionUpdateJPAExecutor(action));
143 }
144 catch (JPAExecutorException e) {
145 throw new CommandException(e);
146 }
147 }
148
149 /* (non-Javadoc)
150 * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
151 */
152 @Override
153 public void notifyParent() {
154 }
155
156 /* (non-Javadoc)
157 * @see org.apache.oozie.command.TransitionXCommand#getJob()
158 */
159 @Override
160 public Job getJob() {
161 return bundleJob;
162 }
163
164 /* (non-Javadoc)
165 * @see org.apache.oozie.command.TransitionXCommand#updateJob()
166 */
167 @Override
168 public void updateJob() throws CommandException {
169 try {
170 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
171 }
172 catch (JPAExecutorException e) {
173 throw new CommandException(e);
174 }
175 }
176
177 }