This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.bundle;
019
020 import java.util.Date;
021 import java.util.List;
022
023 import org.apache.oozie.BundleActionBean;
024 import org.apache.oozie.BundleJobBean;
025 import org.apache.oozie.ErrorCode;
026 import org.apache.oozie.client.Job;
027 import org.apache.oozie.command.CommandException;
028 import org.apache.oozie.command.PreconditionException;
029 import org.apache.oozie.command.SuspendTransitionXCommand;
030 import org.apache.oozie.command.coord.CoordSuspendXCommand;
031 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
032 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
033 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
034 import org.apache.oozie.executor.jpa.JPAExecutorException;
035 import org.apache.oozie.service.JPAService;
036 import org.apache.oozie.service.Services;
037 import org.apache.oozie.util.InstrumentUtils;
038 import org.apache.oozie.util.ParamChecker;
039 import org.apache.oozie.util.LogUtils;
040
041 public class BundleJobSuspendXCommand extends SuspendTransitionXCommand {
042 private final String jobId;
043 private JPAService jpaService;
044 private List<BundleActionBean> bundleActions;
045 private BundleJobBean bundleJob;
046
047 public BundleJobSuspendXCommand(String id) {
048 super("bundle_suspend", "bundle_suspend", 1);
049 this.jobId = ParamChecker.notEmpty(id, "id");
050 }
051
052 /* (non-Javadoc)
053 * @see org.apache.oozie.command.TransitionXCommand#getJob()
054 */
055 @Override
056 public Job getJob() {
057 return bundleJob;
058 }
059
060 /* (non-Javadoc)
061 * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
062 */
063 @Override
064 public void notifyParent() throws CommandException {
065 }
066
067 /* (non-Javadoc)
068 * @see org.apache.oozie.command.TransitionXCommand#setJob(org.apache.oozie.client.Job)
069 */
070 @Override
071 public void setJob(Job job) {
072 }
073
074 /* (non-Javadoc)
075 * @see org.apache.oozie.command.SuspendTransitionXCommand#performWrites()
076 */
077 @Override
078 public void performWrites() throws CommandException {
079 try {
080 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
081 }
082 catch (JPAExecutorException e) {
083 throw new CommandException(e);
084 }
085 }
086
087 /* (non-Javadoc)
088 * @see org.apache.oozie.command.XCommand#getEntityKey()
089 */
090 @Override
091 public String getEntityKey() {
092 return this.jobId;
093 }
094
095 /* (non-Javadoc)
096 * @see org.apache.oozie.command.XCommand#isLockRequired()
097 */
098 @Override
099 protected boolean isLockRequired() {
100 return true;
101 }
102
103 /* (non-Javadoc)
104 * @see org.apache.oozie.command.XCommand#loadState()
105 */
106 @Override
107 protected void loadState() throws CommandException {
108 jpaService = Services.get().get(JPAService.class);
109 if (jpaService == null) {
110 throw new CommandException(ErrorCode.E0610);
111 }
112
113 try {
114 bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
115 }
116 catch (Exception Ex) {
117 throw new CommandException(ErrorCode.E0604, jobId);
118 }
119
120 try {
121 bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(jobId));
122 }
123 catch (Exception Ex) {
124 throw new CommandException(ErrorCode.E1311, jobId);
125 }
126
127 LogUtils.setLogInfo(bundleJob, logInfo);
128 }
129
130 /* (non-Javadoc)
131 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
132 */
133 @Override
134 protected void verifyPrecondition() throws CommandException, PreconditionException {
135 if (bundleJob.getStatus() == Job.Status.SUCCEEDED || bundleJob.getStatus() == Job.Status.FAILED
136 || bundleJob.getStatus() == Job.Status.KILLED || bundleJob.getStatus() == Job.Status.DONEWITHERROR) {
137 LOG.info("BundleJobSuspendXCommand is not going to execute because job either succeeded, failed, killed, or donewitherror; id = "
138 + jobId + ", status = " + bundleJob.getStatus());
139 throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString());
140 }
141 }
142
143 /* (non-Javadoc)
144 * @see org.apache.oozie.command.TransitionXCommand#updateJob()
145 */
146 @Override
147 public void updateJob() {
148 InstrumentUtils.incrJobCounter("bundle_suspend", 1, null);
149 bundleJob.setSuspendedTime(new Date());
150 bundleJob.setLastModifiedTime(new Date());
151
152 LOG.debug("Suspend bundle job id = " + jobId + ", status = " + bundleJob.getStatus() + ", pending = " + bundleJob.isPending());
153 updateList.add(bundleJob);
154 }
155
156 @Override
157 public void suspendChildren() throws CommandException {
158 for (BundleActionBean action : this.bundleActions) {
159 if (action.getStatus() == Job.Status.RUNNING || action.getStatus() == Job.Status.RUNNINGWITHERROR
160 || action.getStatus() == Job.Status.PREP || action.getStatus() == Job.Status.PAUSED
161 || action.getStatus() == Job.Status.PAUSEDWITHERROR) {
162 // queue a CoordSuspendXCommand
163 if (action.getCoordId() != null) {
164 queue(new CoordSuspendXCommand(action.getCoordId()));
165 updateBundleAction(action);
166 LOG.debug("Suspend bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordSuspendXCommand for [{3}]",
167 action.getBundleActionId(), action.getStatus(), action.getPending(), action.getCoordId());
168 } else {
169 updateBundleAction(action);
170 LOG.debug("Suspend bundle action = [{0}], new status = [{1}], pending = [{2}] and coord id is null",
171 action.getBundleActionId(), action.getStatus(), action.getPending());
172 }
173
174 }
175 }
176 LOG.debug("Suspended bundle actions for the bundle=[{0}]", jobId);
177 }
178
179 private void updateBundleAction(BundleActionBean action) {
180 if (action.getStatus() == Job.Status.PREP) {
181 action.setStatus(Job.Status.PREPSUSPENDED);
182 }
183 else if (action.getStatus() == Job.Status.RUNNING) {
184 action.setStatus(Job.Status.SUSPENDED);
185 }
186 else if (action.getStatus() == Job.Status.RUNNINGWITHERROR) {
187 action.setStatus(Job.Status.SUSPENDEDWITHERROR);
188 }
189 else if (action.getStatus() == Job.Status.PAUSED) {
190 action.setStatus(Job.Status.SUSPENDED);
191 }
192 else if (action.getStatus() == Job.Status.PAUSEDWITHERROR) {
193 action.setStatus(Job.Status.SUSPENDEDWITHERROR);
194 }
195
196 action.incrementAndGetPending();
197 action.setLastModifiedTime(new Date());
198 updateList.add(action);
199 }
200 }