This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.util.Date;
021 import java.util.List;
022
023 import org.apache.oozie.CoordinatorActionBean;
024 import org.apache.oozie.CoordinatorJobBean;
025 import org.apache.oozie.ErrorCode;
026 import org.apache.oozie.XException;
027 import org.apache.oozie.client.CoordinatorJob;
028 import org.apache.oozie.client.Job;
029 import org.apache.oozie.command.CommandException;
030 import org.apache.oozie.command.PreconditionException;
031 import org.apache.oozie.command.SuspendTransitionXCommand;
032 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
033 import org.apache.oozie.command.wf.SuspendXCommand;
034 import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
035 import org.apache.oozie.executor.jpa.CoordJobGetActionsRunningJPAExecutor;
036 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
037 import org.apache.oozie.executor.jpa.JPAExecutorException;
038 import org.apache.oozie.service.JPAService;
039 import org.apache.oozie.service.Services;
040 import org.apache.oozie.util.InstrumentUtils;
041 import org.apache.oozie.util.LogUtils;
042 import org.apache.oozie.util.ParamChecker;
043 import org.apache.oozie.util.StatusUtils;
044
045 /**
046 * Suspend coordinator job and actions.
047 *
048 */
049 public class CoordSuspendXCommand extends SuspendTransitionXCommand {
050 private final String jobId;
051 private CoordinatorJobBean coordJob;
052 private JPAService jpaService;
053 private boolean exceptionOccured = false;
054 private CoordinatorJob.Status prevStatus = null;
055
056 public CoordSuspendXCommand(String id) {
057 super("coord_suspend", "coord_suspend", 1);
058 this.jobId = ParamChecker.notEmpty(id, "id");
059 }
060
061 /* (non-Javadoc)
062 * @see org.apache.oozie.command.XCommand#getEntityKey()
063 */
064 @Override
065 public String getEntityKey() {
066 return jobId;
067 }
068
069 /* (non-Javadoc)
070 * @see org.apache.oozie.command.XCommand#isLockRequired()
071 */
072 @Override
073 protected boolean isLockRequired() {
074 return true;
075 }
076
077 /* (non-Javadoc)
078 * @see org.apache.oozie.command.XCommand#loadState()
079 */
080 @Override
081 protected void loadState() throws CommandException {
082 super.eagerLoadState();
083 try {
084 jpaService = Services.get().get(JPAService.class);
085 if (jpaService != null) {
086 this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(this.jobId));
087 prevStatus = coordJob.getStatus();
088 }
089 else {
090 throw new CommandException(ErrorCode.E0610);
091 }
092 }
093 catch (Exception ex) {
094 throw new CommandException(ErrorCode.E0603, ex);
095 }
096 LogUtils.setLogInfo(this.coordJob, logInfo);
097 }
098
099 /* (non-Javadoc)
100 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
101 */
102 @Override
103 protected void verifyPrecondition() throws CommandException, PreconditionException {
104 super.eagerVerifyPrecondition();
105 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED
106 || coordJob.getStatus() == CoordinatorJob.Status.FAILED
107 || coordJob.getStatus() == CoordinatorJob.Status.KILLED) {
108 LOG.info("CoordSuspendXCommand is not going to execute because "
109 + "job finished or failed or killed, id = " + jobId + ", status = " + coordJob.getStatus());
110 throw new PreconditionException(ErrorCode.E0728, jobId, coordJob.getStatus().toString());
111 }
112 }
113
114 /* (non-Javadoc)
115 * @see org.apache.oozie.command.SuspendTransitionXCommand#suspendChildren()
116 */
117 @Override
118 public void suspendChildren() throws CommandException {
119 try {
120 //Get all running actions of a job to suspend them
121 List<CoordinatorActionBean> actionList = jpaService
122 .execute(new CoordJobGetActionsRunningJPAExecutor(jobId));
123 for (CoordinatorActionBean action : actionList) {
124 // queue a SuspendXCommand
125 if (action.getExternalId() != null) {
126 queue(new SuspendXCommand(action.getExternalId()));
127 updateCoordAction(action);
128 LOG.debug(
129 "Suspend coord action = [{0}], new status = [{1}], pending = [{2}] and queue SuspendXCommand for [{3}]",
130 action.getId(), action.getStatus(), action.getPending(), action.getExternalId());
131 }
132 else {
133 updateCoordAction(action);
134 LOG.debug(
135 "Suspend coord action = [{0}], new status = [{1}], pending = [{2}] and external id is null",
136 action.getId(), action.getStatus(), action.getPending());
137 }
138
139 }
140 LOG.debug("Suspended coordinator actions for the coordinator=[{0}]", jobId);
141 }
142 catch (XException ex) {
143 exceptionOccured = true;
144 throw new CommandException(ex);
145 }
146 finally {
147 if (exceptionOccured) {
148 coordJob.setStatus(CoordinatorJob.Status.FAILED);
149 coordJob.resetPending();
150 LOG.debug("Exception happened, fail coordinator job id = " + jobId + ", status = "
151 + coordJob.getStatus());
152 updateList.add(coordJob);
153 }
154 }
155 }
156
157 /* (non-Javadoc)
158 * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
159 */
160 @Override
161 public void notifyParent() throws CommandException {
162 // update bundle action
163 if (this.coordJob.getBundleId() != null) {
164 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
165 bundleStatusUpdate.call();
166 }
167 }
168
169 /* (non-Javadoc)
170 * @see org.apache.oozie.command.TransitionXCommand#updateJob()
171 */
172 @Override
173 public void updateJob() {
174 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
175 coordJob.setLastModifiedTime(new Date());
176 coordJob.setSuspendedTime(new Date());
177 LOG.debug("Suspend coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = " + coordJob.isPending());
178 updateList.add(coordJob);
179 }
180
181 /* (non-Javadoc)
182 * @see org.apache.oozie.command.SuspendTransitionXCommand#performWrites()
183 */
184 @Override
185 public void performWrites() throws CommandException {
186 try {
187 jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
188 }
189 catch (JPAExecutorException jex) {
190 throw new CommandException(jex);
191 }
192 }
193
194 private void updateCoordAction(CoordinatorActionBean action) {
195 action.setStatus(CoordinatorActionBean.Status.SUSPENDED);
196 action.incrementAndGetPending();
197 action.setLastModifiedTime(new Date());
198 updateList.add(action);
199 }
200
201 /* (non-Javadoc)
202 * @see org.apache.oozie.command.TransitionXCommand#getJob()
203 */
204 @Override
205 public Job getJob() {
206 return coordJob;
207 }
208
209 /**
210 * Transit job to suspended from running or to prepsuspended from prep.
211 *
212 * @see org.apache.oozie.command.TransitionXCommand#transitToNext()
213 */
214 @Override
215 public void transitToNext() {
216 if (coordJob == null) {
217 coordJob = (CoordinatorJobBean) this.getJob();
218 }
219 if (coordJob.getStatus() == Job.Status.PREP) {
220 coordJob.setStatus(Job.Status.PREPSUSPENDED);
221 coordJob.setStatus(StatusUtils.getStatus(coordJob));
222 }
223 else if (coordJob.getStatus() == Job.Status.RUNNING) {
224 coordJob.setStatus(Job.Status.SUSPENDED);
225 }
226 else if (coordJob.getStatus() == Job.Status.RUNNINGWITHERROR) {
227 coordJob.setStatus(Job.Status.SUSPENDEDWITHERROR);
228 }
229 else if (coordJob.getStatus() == Job.Status.PAUSED) {
230 coordJob.setStatus(Job.Status.SUSPENDED);
231 }
232 else if (coordJob.getStatus() == Job.Status.PREPPAUSED) {
233 coordJob.setStatus(Job.Status.PREPSUSPENDED);
234 }
235 coordJob.setPending();
236 }
237
238 }