This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import org.apache.oozie.client.CoordinatorJob;
021 import org.apache.oozie.client.Job;
022 import org.apache.oozie.CoordinatorActionBean;
023 import org.apache.oozie.CoordinatorJobBean;
024 import org.apache.oozie.ErrorCode;
025 import org.apache.oozie.XException;
026 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
027 import org.apache.oozie.command.wf.KillXCommand;
028 import org.apache.oozie.command.CommandException;
029 import org.apache.oozie.command.KillTransitionXCommand;
030 import org.apache.oozie.command.PreconditionException;
031 import org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor;
032 import org.apache.oozie.executor.jpa.CoordJobGetActionsNotCompletedJPAExecutor;
033 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
034 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
035 import org.apache.oozie.executor.jpa.JPAExecutorException;
036 import org.apache.oozie.service.JPAService;
037 import org.apache.oozie.service.Services;
038 import org.apache.oozie.util.LogUtils;
039 import org.apache.oozie.util.ParamChecker;
040 import org.apache.oozie.util.StatusUtils;
041
042 import java.util.Date;
043 import java.util.List;
044
045 public class CoordKillXCommand extends KillTransitionXCommand {
046
047 private final String jobId;
048 private CoordinatorJobBean coordJob;
049 private List<CoordinatorActionBean> actionList;
050 private JPAService jpaService = null;
051 private CoordinatorJob.Status prevStatus = null;
052
053 public CoordKillXCommand(String id) {
054 super("coord_kill", "coord_kill", 2);
055 this.jobId = ParamChecker.notEmpty(id, "id");
056 }
057
058 @Override
059 protected boolean isLockRequired() {
060 return true;
061 }
062
063 @Override
064 public String getEntityKey() {
065 return this.jobId;
066 }
067
068 @Override
069 protected void loadState() throws CommandException {
070 try {
071 jpaService = Services.get().get(JPAService.class);
072
073 if (jpaService != null) {
074 this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
075 //Get actions which are not succeeded, failed, timed out or killed
076 this.actionList = jpaService.execute(new CoordJobGetActionsNotCompletedJPAExecutor(jobId));
077 prevStatus = coordJob.getStatus();
078 LogUtils.setLogInfo(coordJob, logInfo);
079 }
080 else {
081 throw new CommandException(ErrorCode.E0610);
082 }
083 }
084 catch (XException ex) {
085 throw new CommandException(ex);
086 }
087 }
088
089 @Override
090 protected void verifyPrecondition() throws CommandException, PreconditionException {
091 // if namespace 0.1 is used and backward support is true, SUCCEEDED coord job can be killed
092 if (StatusUtils.isV1CoordjobKillable(coordJob)) {
093 return;
094 }
095 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED
096 || coordJob.getStatus() == CoordinatorJob.Status.FAILED
097 || coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR
098 || coordJob.getStatus() == CoordinatorJob.Status.KILLED) {
099 LOG.info("CoordKillXCommand not killed - job either finished SUCCEEDED, FAILED, KILLED or DONEWITHERROR, job id = "
100 + jobId + ", status = " + coordJob.getStatus());
101 throw new PreconditionException(ErrorCode.E1020, jobId);
102 }
103 }
104
105 private void updateCoordAction(CoordinatorActionBean action, boolean makePending) throws CommandException {
106 action.setStatus(CoordinatorActionBean.Status.KILLED);
107 if (makePending) {
108 action.incrementAndGetPending();
109 } else {
110 // set pending to false
111 action.setPending(0);
112 }
113 action.setLastModifiedTime(new Date());
114 try {
115 jpaService.execute(new CoordActionUpdateStatusJPAExecutor(action));
116 } catch (JPAExecutorException e) {
117 throw new CommandException(e);
118 }
119 }
120
121 @Override
122 public void killChildren() throws CommandException {
123 try {
124 if (actionList != null) {
125 for (CoordinatorActionBean action : actionList) {
126 // queue a WorkflowKillXCommand to delete the workflow job and actions
127 if (action.getExternalId() != null) {
128 queue(new KillXCommand(action.getExternalId()));
129 // As the kill command for children is queued, set pending flag for coord action to be true
130 updateCoordAction(action, true);
131 LOG.debug(
132 "Killed coord action = [{0}], new status = [{1}], pending = [{2}] and queue KillXCommand for [{3}]",
133 action.getId(), action.getStatus(), action.getPending(), action.getExternalId());
134 }
135 else {
136 // As killing children is not required, set pending flag for coord action to be false
137 updateCoordAction(action, false);
138 LOG.debug("Killed coord action = [{0}], current status = [{1}], pending = [{2}]",
139 action.getId(), action.getStatus(), action.getPending());
140 }
141 }
142 }
143
144 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
145
146 LOG.debug("Killed coord actions for the coordinator=[{0}]", jobId);
147 }
148 catch (JPAExecutorException ex) {
149 throw new CommandException(ex);
150 }
151 }
152
153 @Override
154 public void notifyParent() throws CommandException {
155 // update bundle action
156 if (coordJob.getBundleId() != null) {
157 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
158 bundleStatusUpdate.call();
159 }
160 }
161
162 @Override
163 public void updateJob() throws CommandException {
164 try {
165 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
166 }
167 catch (JPAExecutorException ex) {
168 throw new CommandException(ex);
169 }
170 }
171
172 /* (non-Javadoc)
173 * @see org.apache.oozie.command.TransitionXCommand#getJob()
174 */
175 @Override
176 public Job getJob() {
177 return coordJob;
178 }
179
180 /* (non-Javadoc)
181 * @see org.apache.oozie.command.XCommand#getKey()
182 */
183 @Override
184 public String getKey(){
185 return getName() + "_" + jobId;
186 }
187
188 }