This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.bundle;
019
020 import java.util.Date;
021 import java.util.List;
022
023 import org.apache.oozie.BundleActionBean;
024 import org.apache.oozie.BundleJobBean;
025 import org.apache.oozie.ErrorCode;
026 import org.apache.oozie.client.Job;
027 import org.apache.oozie.command.CommandException;
028 import org.apache.oozie.command.PreconditionException;
029 import org.apache.oozie.command.ResumeTransitionXCommand;
030 import org.apache.oozie.command.coord.CoordResumeXCommand;
031 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
032 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
033 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
034 import org.apache.oozie.executor.jpa.JPAExecutorException;
035 import org.apache.oozie.service.JPAService;
036 import org.apache.oozie.service.Services;
037 import org.apache.oozie.util.InstrumentUtils;
038 import org.apache.oozie.util.LogUtils;
039 import org.apache.oozie.util.ParamChecker;
040
041 public class BundleJobResumeXCommand extends ResumeTransitionXCommand {
042
043 private final String bundleId;
044 private JPAService jpaService = null;
045 private BundleJobBean bundleJob;
046 private List<BundleActionBean> bundleActions;
047
048 /**
049 * Constructor to create the Bundle Resume Command.
050 *
051 * @param jobId : Bundle Id
052 */
053 public BundleJobResumeXCommand(String jobId) {
054 super("bundle_resume", "bundle_resume", 1);
055 this.bundleId = ParamChecker.notNull(jobId, "BundleId");
056 }
057
058 /* (non-Javadoc)
059 * @see org.apache.oozie.command.ResumeTransitionXCommand#resumeChildren()
060 */
061 @Override
062 public void resumeChildren() {
063 for (BundleActionBean action : bundleActions) {
064 if (action.getStatus() == Job.Status.SUSPENDED || action.getStatus() == Job.Status.SUSPENDEDWITHERROR || action.getStatus() == Job.Status.PREPSUSPENDED) {
065 // queue a CoordResumeXCommand
066 if (action.getCoordId() != null) {
067 queue(new CoordResumeXCommand(action.getCoordId()));
068 updateBundleAction(action);
069 LOG.debug("Resume bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordResumeXCommand for [{3}]",
070 action.getBundleActionId(), action.getStatus(), action.getPending(), action
071 .getCoordId());
072 }
073 else {
074 updateBundleAction(action);
075 LOG.debug("Resume bundle action = [{0}], new status = [{1}], pending = [{2}] and coord id is null",
076 action.getBundleActionId(), action.getStatus(), action.getPending());
077 }
078 }
079 }
080 LOG.debug("Resume bundle actions for the bundle=[{0}]", bundleId);
081 }
082
083 private void updateBundleAction(BundleActionBean action) {
084 if (action.getStatus() == Job.Status.PREPSUSPENDED) {
085 action.setStatus(Job.Status.PREP);
086 }
087 else if (action.getStatus() == Job.Status.SUSPENDED) {
088 action.setStatus(Job.Status.RUNNING);
089 }
090 else if (action.getStatus() == Job.Status.SUSPENDEDWITHERROR) {
091 action.setStatus(Job.Status.RUNNINGWITHERROR);
092 }
093 action.incrementAndGetPending();
094 action.setLastModifiedTime(new Date());
095 updateList.add(action);
096 }
097
098 /* (non-Javadoc)
099 * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
100 */
101 @Override
102 public void notifyParent() throws CommandException {
103
104 }
105
106 /* (non-Javadoc)
107 * @see org.apache.oozie.command.TransitionXCommand#updateJob()
108 */
109 @Override
110 public void updateJob() {
111 InstrumentUtils.incrJobCounter("bundle_resume", 1, null);
112 bundleJob.setSuspendedTime(null);
113 bundleJob.setLastModifiedTime(new Date());
114 LOG.debug("Resume bundle job id = " + bundleId + ", status = " + bundleJob.getStatus() + ", pending = " + bundleJob.isPending());
115 updateList.add(bundleJob);
116 }
117
118 /* (non-Javadoc)
119 * @see org.apache.oozie.command.ResumeTransitionXCommand#performWrites()
120 */
121 @Override
122 public void performWrites() throws CommandException {
123 try {
124 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
125 }
126 catch (JPAExecutorException e) {
127 throw new CommandException(e);
128 }
129 }
130
131 /* (non-Javadoc)
132 * @see org.apache.oozie.command.XCommand#getEntityKey()
133 */
134 @Override
135 public String getEntityKey() {
136 return bundleId;
137 }
138
139 /* (non-Javadoc)
140 * @see org.apache.oozie.command.XCommand#isLockRequired()
141 */
142 @Override
143 protected boolean isLockRequired() {
144 return true;
145 }
146
147 /* (non-Javadoc)
148 * @see org.apache.oozie.command.XCommand#loadState()
149 */
150 @Override
151 protected void loadState() throws CommandException {
152 jpaService = Services.get().get(JPAService.class);
153 if (jpaService == null) {
154 throw new CommandException(ErrorCode.E0610);
155 }
156
157 try {
158 bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleId));
159 bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(bundleId));
160 }
161 catch (Exception Ex) {
162 throw new CommandException(ErrorCode.E0604, bundleId);
163 }
164
165 LogUtils.setLogInfo(bundleJob, logInfo);
166 }
167
168 /* (non-Javadoc)
169 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
170 */
171 @Override
172 protected void verifyPrecondition() throws CommandException, PreconditionException {
173 if (bundleJob.getStatus() != Job.Status.SUSPENDED && bundleJob.getStatus() != Job.Status.SUSPENDEDWITHERROR && bundleJob.getStatus() != Job.Status.PREPSUSPENDED) {
174 throw new PreconditionException(ErrorCode.E1100, "BundleResumeCommand not Resumed - "
175 + "job not in SUSPENDED/SUSPENDEDWITHERROR/PREPSUSPENDED state " + bundleId);
176 }
177 }
178
179 /* (non-Javadoc)
180 * @see org.apache.oozie.command.TransitionXCommand#getJob()
181 */
182 @Override
183 public Job getJob() {
184 return bundleJob;
185 }
186 }