This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.util.Date;
021 import java.util.List;
022
023 import org.apache.oozie.CoordinatorActionBean;
024 import org.apache.oozie.CoordinatorJobBean;
025 import org.apache.oozie.ErrorCode;
026 import org.apache.oozie.XException;
027 import org.apache.oozie.client.CoordinatorJob;
028 import org.apache.oozie.client.Job;
029 import org.apache.oozie.command.CommandException;
030 import org.apache.oozie.command.PreconditionException;
031 import org.apache.oozie.command.SuspendTransitionXCommand;
032 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
033 import org.apache.oozie.command.wf.SuspendXCommand;
034 import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
035 import org.apache.oozie.executor.jpa.CoordJobGetActionsRunningJPAExecutor;
036 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
037 import org.apache.oozie.executor.jpa.JPAExecutorException;
038 import org.apache.oozie.service.JPAService;
039 import org.apache.oozie.service.Services;
040 import org.apache.oozie.util.InstrumentUtils;
041 import org.apache.oozie.util.LogUtils;
042 import org.apache.oozie.util.ParamChecker;
043 import org.apache.oozie.util.StatusUtils;
044
045 /**
046 * Suspend coordinator job and actions.
047 *
048 */
049 public class CoordSuspendXCommand extends SuspendTransitionXCommand {
050 private final String jobId;
051 private CoordinatorJobBean coordJob;
052 private JPAService jpaService;
053 private boolean exceptionOccured = false;
054 private CoordinatorJob.Status prevStatus = null;
055
056 public CoordSuspendXCommand(String id) {
057 super("coord_suspend", "coord_suspend", 1);
058 this.jobId = ParamChecker.notEmpty(id, "id");
059 }
060
061 @Override
062 public String getEntityKey() {
063 return jobId;
064 }
065
066 @Override
067 public String getKey() {
068 return getName() + "_" + jobId;
069 }
070
071 @Override
072 protected boolean isLockRequired() {
073 return true;
074 }
075
076 @Override
077 protected void loadState() throws CommandException {
078 super.eagerLoadState();
079 try {
080 jpaService = Services.get().get(JPAService.class);
081 if (jpaService != null) {
082 this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(this.jobId));
083 prevStatus = coordJob.getStatus();
084 }
085 else {
086 throw new CommandException(ErrorCode.E0610);
087 }
088 }
089 catch (Exception ex) {
090 throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex);
091 }
092 LogUtils.setLogInfo(this.coordJob, logInfo);
093 }
094
095 @Override
096 protected void verifyPrecondition() throws CommandException, PreconditionException {
097 super.eagerVerifyPrecondition();
098 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED
099 || coordJob.getStatus() == CoordinatorJob.Status.FAILED
100 || coordJob.getStatus() == CoordinatorJob.Status.KILLED) {
101 LOG.info("CoordSuspendXCommand is not going to execute because "
102 + "job finished or failed or killed, id = " + jobId + ", status = " + coordJob.getStatus());
103 throw new PreconditionException(ErrorCode.E0728, jobId, coordJob.getStatus().toString());
104 }
105 }
106
107 @Override
108 public void suspendChildren() throws CommandException {
109 try {
110 //Get all running actions of a job to suspend them
111 List<CoordinatorActionBean> actionList = jpaService
112 .execute(new CoordJobGetActionsRunningJPAExecutor(jobId));
113 for (CoordinatorActionBean action : actionList) {
114 // queue a SuspendXCommand
115 if (action.getExternalId() != null) {
116 queue(new SuspendXCommand(action.getExternalId()));
117 updateCoordAction(action);
118 LOG.debug(
119 "Suspend coord action = [{0}], new status = [{1}], pending = [{2}] and queue SuspendXCommand for [{3}]",
120 action.getId(), action.getStatus(), action.getPending(), action.getExternalId());
121 }
122 else {
123 updateCoordAction(action);
124 LOG.debug(
125 "Suspend coord action = [{0}], new status = [{1}], pending = [{2}] and external id is null",
126 action.getId(), action.getStatus(), action.getPending());
127 }
128
129 }
130 LOG.debug("Suspended coordinator actions for the coordinator=[{0}]", jobId);
131 }
132 catch (XException ex) {
133 exceptionOccured = true;
134 throw new CommandException(ex);
135 }
136 finally {
137 if (exceptionOccured) {
138 coordJob.setStatus(CoordinatorJob.Status.FAILED);
139 coordJob.resetPending();
140 LOG.debug("Exception happened, fail coordinator job id = " + jobId + ", status = "
141 + coordJob.getStatus());
142 updateList.add(coordJob);
143 }
144 }
145 }
146
147 @Override
148 public void notifyParent() throws CommandException {
149 // update bundle action
150 if (this.coordJob.getBundleId() != null) {
151 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
152 bundleStatusUpdate.call();
153 }
154 }
155
156 @Override
157 public void updateJob() {
158 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
159 coordJob.setLastModifiedTime(new Date());
160 coordJob.setSuspendedTime(new Date());
161 LOG.debug("Suspend coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = " + coordJob.isPending());
162 updateList.add(coordJob);
163 }
164
165 @Override
166 public void performWrites() throws CommandException {
167 try {
168 jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
169 }
170 catch (JPAExecutorException jex) {
171 throw new CommandException(jex);
172 }
173 }
174
175 private void updateCoordAction(CoordinatorActionBean action) {
176 action.setStatus(CoordinatorActionBean.Status.SUSPENDED);
177 action.incrementAndGetPending();
178 action.setLastModifiedTime(new Date());
179 updateList.add(action);
180 }
181
182 @Override
183 public Job getJob() {
184 return coordJob;
185 }
186
187 /**
188 * Transit job to suspended from running or to prepsuspended from prep.
189 *
190 * @see org.apache.oozie.command.TransitionXCommand#transitToNext()
191 */
192 @Override
193 public void transitToNext() {
194 if (coordJob == null) {
195 coordJob = (CoordinatorJobBean) this.getJob();
196 }
197 if (coordJob.getStatus() == Job.Status.PREP) {
198 coordJob.setStatus(Job.Status.PREPSUSPENDED);
199 coordJob.setStatus(StatusUtils.getStatus(coordJob));
200 }
201 else if (coordJob.getStatus() == Job.Status.RUNNING) {
202 coordJob.setStatus(Job.Status.SUSPENDED);
203 }
204 else if (coordJob.getStatus() == Job.Status.RUNNINGWITHERROR || coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
205 coordJob.setStatus(Job.Status.SUSPENDEDWITHERROR);
206 }
207 else if (coordJob.getStatus() == Job.Status.PAUSED) {
208 coordJob.setStatus(Job.Status.SUSPENDED);
209 }
210 else if (coordJob.getStatus() == Job.Status.PREPPAUSED) {
211 coordJob.setStatus(Job.Status.PREPSUSPENDED);
212 }
213 coordJob.setPending();
214 }
215
216 }