This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import org.apache.oozie.client.CoordinatorAction;
021 import org.apache.oozie.client.CoordinatorJob;
022 import org.apache.oozie.client.Job;
023 import org.apache.oozie.CoordinatorActionBean;
024 import org.apache.oozie.CoordinatorJobBean;
025 import org.apache.oozie.ErrorCode;
026 import org.apache.oozie.XException;
027 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
028 import org.apache.oozie.command.wf.KillXCommand;
029 import org.apache.oozie.command.CommandException;
030 import org.apache.oozie.command.KillTransitionXCommand;
031 import org.apache.oozie.command.PreconditionException;
032 import org.apache.oozie.dependency.DependencyChecker;
033 import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
034 import org.apache.oozie.executor.jpa.CoordJobGetActionsNotCompletedJPAExecutor;
035 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
036 import org.apache.oozie.executor.jpa.JPAExecutorException;
037 import org.apache.oozie.service.EventHandlerService;
038 import org.apache.oozie.service.JPAService;
039 import org.apache.oozie.service.Services;
040 import org.apache.oozie.util.LogUtils;
041 import org.apache.oozie.util.ParamChecker;
042 import org.apache.oozie.util.StatusUtils;
043
044 import java.util.Arrays;
045 import java.util.Date;
046 import java.util.List;
047
048 public class CoordKillXCommand extends KillTransitionXCommand {
049
050 private final String jobId;
051 private CoordinatorJobBean coordJob;
052 private List<CoordinatorActionBean> actionList;
053 private JPAService jpaService = null;
054 private CoordinatorJob.Status prevStatus = null;
055
056 public CoordKillXCommand(String id) {
057 super("coord_kill", "coord_kill", 2);
058 this.jobId = ParamChecker.notEmpty(id, "id");
059 }
060
061 @Override
062 protected boolean isLockRequired() {
063 return true;
064 }
065
066 @Override
067 public String getEntityKey() {
068 return this.jobId;
069 }
070
071 @Override
072 public String getKey() {
073 return getName() + "_" + this.jobId;
074 }
075
076 @Override
077 protected void loadState() throws CommandException {
078 try {
079 jpaService = Services.get().get(JPAService.class);
080
081 if (jpaService != null) {
082 this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
083 //Get actions which are not succeeded, failed, timed out or killed
084 this.actionList = jpaService.execute(new CoordJobGetActionsNotCompletedJPAExecutor(jobId));
085 prevStatus = coordJob.getStatus();
086 LogUtils.setLogInfo(coordJob, logInfo);
087 }
088 else {
089 throw new CommandException(ErrorCode.E0610);
090 }
091 }
092 catch (XException ex) {
093 throw new CommandException(ex);
094 }
095 }
096
097 @Override
098 protected void verifyPrecondition() throws CommandException, PreconditionException {
099 // if namespace 0.1 is used and backward support is true, SUCCEEDED coord job can be killed
100 if (StatusUtils.isV1CoordjobKillable(coordJob)) {
101 return;
102 }
103 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED
104 || coordJob.getStatus() == CoordinatorJob.Status.FAILED
105 || coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR
106 || coordJob.getStatus() == CoordinatorJob.Status.KILLED) {
107 LOG.info("CoordKillXCommand not killed - job either finished SUCCEEDED, FAILED, KILLED or DONEWITHERROR, job id = "
108 + jobId + ", status = " + coordJob.getStatus());
109 throw new PreconditionException(ErrorCode.E1020, jobId);
110 }
111 }
112
113 private void updateCoordAction(CoordinatorActionBean action, boolean makePending) {
114 CoordinatorAction.Status prevStatus = action.getStatus();
115 action.setStatus(CoordinatorActionBean.Status.KILLED);
116 if (makePending) {
117 action.incrementAndGetPending();
118 } else {
119 // set pending to false
120 action.setPending(0);
121 }
122 if (EventHandlerService.isEnabled() && prevStatus != CoordinatorAction.Status.RUNNING
123 && prevStatus != CoordinatorAction.Status.SUSPENDED) {
124 CoordinatorXCommand.generateEvent(action, coordJob.getUser(), coordJob.getAppName(), null);
125 }
126 action.setLastModifiedTime(new Date());
127 updateList.add(action);
128 }
129
130 @Override
131 public void killChildren() throws CommandException {
132 if (actionList != null) {
133 for (CoordinatorActionBean action : actionList) {
134 // queue a WorkflowKillXCommand to delete the workflow job and actions
135 if (action.getExternalId() != null) {
136 queue(new KillXCommand(action.getExternalId()));
137 // As the kill command for children is queued, set pending flag for coord action to be true
138 updateCoordAction(action, true);
139 LOG.debug(
140 "Killed coord action = [{0}], new status = [{1}], pending = [{2}] and queue KillXCommand for [{3}]",
141 action.getId(), action.getStatus(), action.getPending(), action.getExternalId());
142 }
143 else {
144 // As killing children is not required, set pending flag for coord action to be false
145 updateCoordAction(action, false);
146 LOG.debug("Killed coord action = [{0}], current status = [{1}], pending = [{2}]",
147 action.getId(), action.getStatus(), action.getPending());
148 }
149 String pushMissingDeps = action.getPushMissingDependencies();
150 if (pushMissingDeps != null) {
151 CoordPushDependencyCheckXCommand.unregisterMissingDependencies(
152 Arrays.asList(DependencyChecker.dependenciesAsArray(pushMissingDeps)), action.getId());
153 }
154 }
155 }
156 coordJob.setDoneMaterialization();
157 updateList.add(coordJob);
158
159 LOG.debug("Killed coord actions for the coordinator=[{0}]", jobId);
160 }
161
162 @Override
163 public void notifyParent() throws CommandException {
164 // update bundle action
165 if (coordJob.getBundleId() != null) {
166 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
167 bundleStatusUpdate.call();
168 }
169 }
170
171 @Override
172 public void updateJob() throws CommandException {
173 updateList.add(coordJob);
174 }
175
176 @Override
177 public void performWrites() throws CommandException {
178 try {
179 jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
180 }
181 catch (JPAExecutorException e) {
182 throw new CommandException(e);
183 }
184 }
185
186 @Override
187 public Job getJob() {
188 return coordJob;
189 }
190
191 }