This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import org.apache.oozie.client.CoordinatorJob;
021 import org.apache.oozie.CoordinatorActionBean;
022 import org.apache.oozie.CoordinatorJobBean;
023 import org.apache.oozie.XException;
024 import org.apache.oozie.command.wf.KillCommand;
025 import org.apache.oozie.command.CommandException;
026 import org.apache.oozie.store.CoordinatorStore;
027 import org.apache.oozie.store.StoreException;
028 import org.apache.oozie.util.ParamChecker;
029 import org.apache.oozie.util.XLog;
030
031 import java.util.Date;
032 import java.util.List;
033
034 public class CoordKillCommand extends CoordinatorCommand<Void> {
035
036 private String jobId;
037 private final XLog log = XLog.getLog(getClass());
038
039 public CoordKillCommand(String id) {
040 super("coord_kill", "coord_kill", 1, XLog.STD);
041 this.jobId = ParamChecker.notEmpty(id, "id");
042 }
043
044 protected Void call(CoordinatorStore store) throws StoreException, CommandException {
045 try {
046 // CoordinatorJobBean coordJob = store.getCoordinatorJob(jobId,
047 // false);
048 CoordinatorJobBean coordJob = store.getEntityManager().find(CoordinatorJobBean.class, jobId);
049 setLogInfo(coordJob);
050 if (coordJob.getStatus() != CoordinatorJob.Status.SUCCEEDED
051 || coordJob.getStatus() != CoordinatorJob.Status.FAILED) {
052 coordJob.setEndTime(new Date());
053 incrJobCounter(1);
054 coordJob.setStatus(CoordinatorJob.Status.KILLED);
055 List<CoordinatorActionBean> actionList = store.getActionsForCoordinatorJob(jobId, false);
056 for (CoordinatorActionBean action : actionList) {
057 if (action.getStatus() != CoordinatorActionBean.Status.FAILED
058 && action.getStatus() != CoordinatorActionBean.Status.TIMEDOUT
059 && action.getStatus() != CoordinatorActionBean.Status.SUCCEEDED
060 && action.getStatus() != CoordinatorActionBean.Status.KILLED) {
061 // queue a KillCommand to delete the workflow job
062 if (action.getExternalId() != null) {
063 queueCallable(new KillCommand(action.getExternalId()));
064 }
065 action.setStatus(CoordinatorActionBean.Status.KILLED);
066 store.updateCoordinatorAction(action);
067 }
068 }
069 store.updateCoordinatorJob(coordJob);
070 // TODO queueCallable(new NotificationCommand(coordJob));
071 }
072 else {
073 log.info("CoordKillCommand not killed - job either " + "finished successfully or does not exist "
074 + jobId);
075 }
076 return null;
077 }
078 catch (XException ex) {
079 throw new CommandException(ex);
080 }
081 }
082
083 @Override
084 protected Void execute(CoordinatorStore store) throws StoreException, CommandException {
085 log.info("STARTED CoordKillCommand for jobId=" + jobId);
086 try {
087 if (lock(jobId)) {
088 call(store);
089 }
090 else {
091 queueCallable(new CoordKillCommand(jobId), LOCK_FAILURE_REQUEUE_INTERVAL);
092 log.warn("CoordKillCommand lock was not acquired - " + " failed " + jobId + ". Requeing the same.");
093 }
094 }
095 catch (InterruptedException e) {
096 queueCallable(new CoordKillCommand(jobId), LOCK_FAILURE_REQUEUE_INTERVAL);
097 log.warn("CoordKillCommand lock acquiring failed " + " with exception " + e.getMessage() + " for job id "
098 + jobId + ". Requeing the same.");
099 }
100 finally {
101 log.info("ENDED CoordKillCommand for jobId=" + jobId);
102 }
103 return null;
104 }
105
106 }