This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import org.apache.oozie.client.CoordinatorJob;
021 import org.apache.oozie.client.Job;
022 import org.apache.oozie.CoordinatorActionBean;
023 import org.apache.oozie.CoordinatorJobBean;
024 import org.apache.oozie.ErrorCode;
025 import org.apache.oozie.XException;
026 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
027 import org.apache.oozie.command.wf.KillXCommand;
028 import org.apache.oozie.command.CommandException;
029 import org.apache.oozie.command.KillTransitionXCommand;
030 import org.apache.oozie.command.PreconditionException;
031 import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
032 import org.apache.oozie.executor.jpa.CoordJobGetActionsNotCompletedJPAExecutor;
033 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
034 import org.apache.oozie.executor.jpa.JPAExecutorException;
035 import org.apache.oozie.service.JPAService;
036 import org.apache.oozie.service.Services;
037 import org.apache.oozie.util.LogUtils;
038 import org.apache.oozie.util.ParamChecker;
039 import org.apache.oozie.util.StatusUtils;
040
041 import java.util.Date;
042 import java.util.List;
043
044 public class CoordKillXCommand extends KillTransitionXCommand {
045
046 private final String jobId;
047 private CoordinatorJobBean coordJob;
048 private List<CoordinatorActionBean> actionList;
049 private JPAService jpaService = null;
050 private CoordinatorJob.Status prevStatus = null;
051
052 public CoordKillXCommand(String id) {
053 super("coord_kill", "coord_kill", 2);
054 this.jobId = ParamChecker.notEmpty(id, "id");
055 }
056
057 @Override
058 protected boolean isLockRequired() {
059 return true;
060 }
061
062 @Override
063 public String getEntityKey() {
064 return this.jobId;
065 }
066
067 @Override
068 protected void loadState() throws CommandException {
069 try {
070 jpaService = Services.get().get(JPAService.class);
071
072 if (jpaService != null) {
073 this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
074 //Get actions which are not succeeded, failed, timed out or killed
075 this.actionList = jpaService.execute(new CoordJobGetActionsNotCompletedJPAExecutor(jobId));
076 prevStatus = coordJob.getStatus();
077 LogUtils.setLogInfo(coordJob, logInfo);
078 }
079 else {
080 throw new CommandException(ErrorCode.E0610);
081 }
082 }
083 catch (XException ex) {
084 throw new CommandException(ex);
085 }
086 }
087
088 @Override
089 protected void verifyPrecondition() throws CommandException, PreconditionException {
090 // if namespace 0.1 is used and backward support is true, SUCCEEDED coord job can be killed
091 if (StatusUtils.isV1CoordjobKillable(coordJob)) {
092 return;
093 }
094 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED
095 || coordJob.getStatus() == CoordinatorJob.Status.FAILED
096 || coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR
097 || coordJob.getStatus() == CoordinatorJob.Status.KILLED) {
098 LOG.info("CoordKillXCommand not killed - job either finished SUCCEEDED, FAILED, KILLED or DONEWITHERROR, job id = "
099 + jobId + ", status = " + coordJob.getStatus());
100 throw new PreconditionException(ErrorCode.E1020, jobId);
101 }
102 }
103
104 private void updateCoordAction(CoordinatorActionBean action, boolean makePending) {
105 action.setStatus(CoordinatorActionBean.Status.KILLED);
106 if (makePending) {
107 action.incrementAndGetPending();
108 } else {
109 // set pending to false
110 action.setPending(0);
111 }
112 action.setLastModifiedTime(new Date());
113 updateList.add(action);
114 }
115
116 @Override
117 public void killChildren() throws CommandException {
118 if (actionList != null) {
119 for (CoordinatorActionBean action : actionList) {
120 // queue a WorkflowKillXCommand to delete the workflow job and actions
121 if (action.getExternalId() != null) {
122 queue(new KillXCommand(action.getExternalId()));
123 // As the kill command for children is queued, set pending flag for coord action to be true
124 updateCoordAction(action, true);
125 LOG.debug(
126 "Killed coord action = [{0}], new status = [{1}], pending = [{2}] and queue KillXCommand for [{3}]",
127 action.getId(), action.getStatus(), action.getPending(), action.getExternalId());
128 }
129 else {
130 // As killing children is not required, set pending flag for coord action to be false
131 updateCoordAction(action, false);
132 LOG.debug("Killed coord action = [{0}], current status = [{1}], pending = [{2}]",
133 action.getId(), action.getStatus(), action.getPending());
134 }
135 }
136 }
137
138 updateList.add(coordJob);
139
140 LOG.debug("Killed coord actions for the coordinator=[{0}]", jobId);
141 }
142
143 @Override
144 public void notifyParent() throws CommandException {
145 // update bundle action
146 if (coordJob.getBundleId() != null) {
147 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
148 bundleStatusUpdate.call();
149 }
150 }
151
152 @Override
153 public void updateJob() throws CommandException {
154 updateList.add(coordJob);
155 }
156
157 /* (non-Javadoc)
158 * @see org.apache.oozie.command.KillTransitionXCommand#performWrites()
159 */
160 @Override
161 public void performWrites() throws CommandException {
162 try {
163 jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
164 }
165 catch (JPAExecutorException e) {
166 throw new CommandException(e);
167 }
168 }
169
170 /* (non-Javadoc)
171 * @see org.apache.oozie.command.TransitionXCommand#getJob()
172 */
173 @Override
174 public Job getJob() {
175 return coordJob;
176 }
177
178 /* (non-Javadoc)
179 * @see org.apache.oozie.command.XCommand#getKey()
180 */
181 @Override
182 public String getKey(){
183 return getName() + "_" + jobId;
184 }
185
186 }