This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.util.Date;
021 import java.util.List;
022
023 import org.apache.oozie.CoordinatorActionBean;
024 import org.apache.oozie.CoordinatorJobBean;
025 import org.apache.oozie.ErrorCode;
026 import org.apache.oozie.XException;
027 import org.apache.oozie.client.CoordinatorJob;
028 import org.apache.oozie.client.Job;
029 import org.apache.oozie.command.CommandException;
030 import org.apache.oozie.command.PreconditionException;
031 import org.apache.oozie.command.SuspendTransitionXCommand;
032 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
033 import org.apache.oozie.command.wf.SuspendXCommand;
034 import org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor;
035 import org.apache.oozie.executor.jpa.CoordJobGetActionsRunningJPAExecutor;
036 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
037 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
038 import org.apache.oozie.executor.jpa.JPAExecutorException;
039 import org.apache.oozie.service.JPAService;
040 import org.apache.oozie.service.Services;
041 import org.apache.oozie.util.InstrumentUtils;
042 import org.apache.oozie.util.LogUtils;
043 import org.apache.oozie.util.ParamChecker;
044 import org.apache.oozie.util.StatusUtils;
045
046 /**
047 * Suspend coordinator job and actions.
048 *
049 */
050 public class CoordSuspendXCommand extends SuspendTransitionXCommand {
051 private final String jobId;
052 private CoordinatorJobBean coordJob;
053 private JPAService jpaService;
054 private boolean exceptionOccured = false;
055 private CoordinatorJob.Status prevStatus = null;
056
057 public CoordSuspendXCommand(String id) {
058 super("coord_suspend", "coord_suspend", 1);
059 this.jobId = ParamChecker.notEmpty(id, "id");
060 }
061
062 /* (non-Javadoc)
063 * @see org.apache.oozie.command.XCommand#getEntityKey()
064 */
065 @Override
066 public String getEntityKey() {
067 return jobId;
068 }
069
070 /* (non-Javadoc)
071 * @see org.apache.oozie.command.XCommand#isLockRequired()
072 */
073 @Override
074 protected boolean isLockRequired() {
075 return true;
076 }
077
078 /* (non-Javadoc)
079 * @see org.apache.oozie.command.XCommand#loadState()
080 */
081 @Override
082 protected void loadState() throws CommandException {
083 super.eagerLoadState();
084 try {
085 jpaService = Services.get().get(JPAService.class);
086 if (jpaService != null) {
087 this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(this.jobId));
088 prevStatus = coordJob.getStatus();
089 }
090 else {
091 throw new CommandException(ErrorCode.E0610);
092 }
093 }
094 catch (Exception ex) {
095 throw new CommandException(ErrorCode.E0603, ex);
096 }
097 LogUtils.setLogInfo(this.coordJob, logInfo);
098 }
099
100 /* (non-Javadoc)
101 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
102 */
103 @Override
104 protected void verifyPrecondition() throws CommandException, PreconditionException {
105 super.eagerVerifyPrecondition();
106 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED
107 || coordJob.getStatus() == CoordinatorJob.Status.FAILED
108 || coordJob.getStatus() == CoordinatorJob.Status.KILLED) {
109 LOG.info("CoordSuspendXCommand is not going to execute because "
110 + "job finished or failed or killed, id = " + jobId + ", status = " + coordJob.getStatus());
111 throw new PreconditionException(ErrorCode.E0728, jobId, coordJob.getStatus().toString());
112 }
113 }
114
115 /* (non-Javadoc)
116 * @see org.apache.oozie.command.SuspendTransitionXCommand#suspendChildren()
117 */
118 @Override
119 public void suspendChildren() throws CommandException {
120 try {
121 //Get all running actions of a job to suspend them
122 List<CoordinatorActionBean> actionList = jpaService
123 .execute(new CoordJobGetActionsRunningJPAExecutor(jobId));
124 for (CoordinatorActionBean action : actionList) {
125 // queue a SuspendXCommand
126 if (action.getExternalId() != null) {
127 queue(new SuspendXCommand(action.getExternalId()));
128 updateCoordAction(action);
129 LOG.debug(
130 "Suspend coord action = [{0}], new status = [{1}], pending = [{2}] and queue SuspendXCommand for [{3}]",
131 action.getId(), action.getStatus(), action.getPending(), action.getExternalId());
132 }
133 else {
134 updateCoordAction(action);
135 LOG.debug(
136 "Suspend coord action = [{0}], new status = [{1}], pending = [{2}] and external id is null",
137 action.getId(), action.getStatus(), action.getPending());
138 }
139
140 }
141 LOG.debug("Suspended coordinator actions for the coordinator=[{0}]", jobId);
142 }
143 catch (XException ex) {
144 exceptionOccured = true;
145 throw new CommandException(ex);
146 }
147 finally {
148 if (exceptionOccured) {
149 coordJob.setStatus(CoordinatorJob.Status.FAILED);
150 coordJob.resetPending();
151 LOG.debug("Exception happened, fail coordinator job id = " + jobId + ", status = "
152 + coordJob.getStatus());
153 try {
154 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
155 }
156 catch (JPAExecutorException je) {
157 LOG.error("Failed to update coordinator job : " + jobId, je);
158 }
159 }
160 }
161 }
162
163 /* (non-Javadoc)
164 * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
165 */
166 @Override
167 public void notifyParent() throws CommandException {
168 // update bundle action
169 if (this.coordJob.getBundleId() != null) {
170 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
171 bundleStatusUpdate.call();
172 }
173 }
174
175 /* (non-Javadoc)
176 * @see org.apache.oozie.command.TransitionXCommand#updateJob()
177 */
178 @Override
179 public void updateJob() throws CommandException {
180 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
181 coordJob.setLastModifiedTime(new Date());
182 coordJob.setSuspendedTime(new Date());
183 LOG.debug("Suspend coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = " + coordJob.isPending());
184 try {
185 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
186 }
187 catch (JPAExecutorException e) {
188 throw new CommandException(e);
189 }
190 }
191
192 private void updateCoordAction(CoordinatorActionBean action) throws CommandException {
193 action.setStatus(CoordinatorActionBean.Status.SUSPENDED);
194 action.incrementAndGetPending();
195 action.setLastModifiedTime(new Date());
196 try {
197 jpaService.execute(new CoordActionUpdateStatusJPAExecutor(action));
198 }
199 catch (JPAExecutorException e) {
200 throw new CommandException(e);
201 }
202 }
203
204 /* (non-Javadoc)
205 * @see org.apache.oozie.command.TransitionXCommand#getJob()
206 */
207 @Override
208 public Job getJob() {
209 return coordJob;
210 }
211
212 /**
213 * Transit job to suspended from running or to prepsuspended from prep.
214 *
215 * @see org.apache.oozie.command.TransitionXCommand#transitToNext()
216 */
217 @Override
218 public void transitToNext() {
219 if (coordJob == null) {
220 coordJob = (CoordinatorJobBean) this.getJob();
221 }
222 if (coordJob.getStatus() == Job.Status.PREP) {
223 coordJob.setStatus(Job.Status.PREPSUSPENDED);
224 coordJob.setStatus(StatusUtils.getStatus(coordJob));
225 }
226 else if (coordJob.getStatus() == Job.Status.RUNNING) {
227 coordJob.setStatus(Job.Status.SUSPENDED);
228 }
229 coordJob.setPending();
230 }
231
232 }