This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.util.Date;
021 import java.util.List;
022
023 import org.apache.oozie.CoordinatorActionBean;
024 import org.apache.oozie.CoordinatorJobBean;
025 import org.apache.oozie.ErrorCode;
026 import org.apache.oozie.XException;
027 import org.apache.oozie.client.CoordinatorJob;
028 import org.apache.oozie.client.Job;
029 import org.apache.oozie.command.CommandException;
030 import org.apache.oozie.command.PreconditionException;
031 import org.apache.oozie.command.ResumeTransitionXCommand;
032 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
033 import org.apache.oozie.command.wf.ResumeXCommand;
034 import org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor;
035 import org.apache.oozie.executor.jpa.CoordJobGetActionsSuspendedJPAExecutor;
036 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
037 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
038 import org.apache.oozie.executor.jpa.JPAExecutorException;
039 import org.apache.oozie.service.JPAService;
040 import org.apache.oozie.service.Services;
041 import org.apache.oozie.util.InstrumentUtils;
042 import org.apache.oozie.util.LogUtils;
043 import org.apache.oozie.util.ParamChecker;
044
045 /**
046 * Resume coordinator job and actions.
047 *
048 */
049 public class CoordResumeXCommand extends ResumeTransitionXCommand {
050 private final String jobId;
051 private CoordinatorJobBean coordJob = null;
052 private JPAService jpaService = null;
053 private boolean exceptionOccured = false;
054 CoordinatorJob.Status prevStatus;
055
056 public CoordResumeXCommand(String id) {
057 super("coord_resume", "coord_resume", 1);
058 this.jobId = ParamChecker.notEmpty(id, "id");
059 }
060
061 /* (non-Javadoc)
062 * @see org.apache.oozie.command.XCommand#getEntityKey()
063 */
064 @Override
065 public String getEntityKey() {
066 return jobId;
067 }
068
069 /* (non-Javadoc)
070 * @see org.apache.oozie.command.XCommand#isLockRequired()
071 */
072 @Override
073 protected boolean isLockRequired() {
074 return true;
075 }
076
077 /* (non-Javadoc)
078 * @see org.apache.oozie.command.XCommand#loadState()
079 */
080 @Override
081 protected void loadState() throws CommandException {
082 jpaService = Services.get().get(JPAService.class);
083 if (jpaService == null) {
084 throw new CommandException(ErrorCode.E0610);
085 }
086 try {
087 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
088 }
089 catch (JPAExecutorException e) {
090 throw new CommandException(e);
091 }
092 setJob(coordJob);
093 prevStatus = coordJob.getStatus();
094 LogUtils.setLogInfo(coordJob, logInfo);
095 }
096
097 /* (non-Javadoc)
098 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
099 */
100 @Override
101 protected void verifyPrecondition() throws CommandException, PreconditionException {
102 if (coordJob.getStatus() != CoordinatorJob.Status.SUSPENDED && coordJob.getStatus() != Job.Status.PREPSUSPENDED) {
103 throw new PreconditionException(ErrorCode.E1100, "CoordResumeXCommand not Resumed - "
104 + "job not in SUSPENDED/PREPSUSPENDED state, job = " + jobId);
105 }
106 }
107
108 /* (non-Javadoc)
109 * @see org.apache.oozie.command.TransitionXCommand#updateJob()
110 */
111 @Override
112 public void updateJob() throws CommandException {
113 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
114 coordJob.setSuspendedTime(null);
115 coordJob.setLastModifiedTime(new Date());
116 LOG.debug("Resume coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = "
117 + coordJob.isPending());
118 try {
119 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
120 }
121 catch (JPAExecutorException e) {
122 throw new CommandException(e);
123 }
124 }
125
126 /* (non-Javadoc)
127 * @see org.apache.oozie.command.ResumeTransitionXCommand#resumeChildren()
128 */
129 @Override
130 public void resumeChildren() throws CommandException {
131 try {
132 // Get all suspended actions to resume them
133 List<CoordinatorActionBean> actionList = jpaService.execute(new CoordJobGetActionsSuspendedJPAExecutor(
134 jobId));
135
136 for (CoordinatorActionBean action : actionList) {
137 if (action.getExternalId() != null) {
138 // queue a ResumeXCommand
139 queue(new ResumeXCommand(action.getExternalId()));
140 updateCoordAction(action);
141 LOG.debug(
142 "Resume coord action = [{0}], new status = [{1}], pending = [{2}] and queue ResumeXCommand for [{3}]",
143 action.getId(), action.getStatus(), action.getPending(), action.getExternalId());
144 }
145 else {
146 updateCoordAction(action);
147 LOG.debug(
148 "Resume coord action = [{0}], new status = [{1}], pending = [{2}] and external id is null",
149 action.getId(), action.getStatus(), action.getPending());
150 }
151 }
152
153 }
154 catch (XException ex) {
155 exceptionOccured = true;
156 throw new CommandException(ex);
157 }
158 finally {
159 if (exceptionOccured) {
160 coordJob.setStatus(CoordinatorJob.Status.FAILED);
161 coordJob.resetPending();
162 LOG.warn("Resume children failed so fail coordinator, coordinator job id = " + jobId + ", status = "
163 + coordJob.getStatus());
164 try {
165 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
166 }
167 catch (JPAExecutorException je) {
168 LOG.error("Failed to update coordinator job : " + jobId, je);
169 }
170 }
171 }
172 }
173
174 /* (non-Javadoc)
175 * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
176 */
177 @Override
178 public void notifyParent() throws CommandException {
179 // update bundle action
180 if (this.coordJob.getBundleId() != null) {
181 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
182 bundleStatusUpdate.call();
183 }
184 }
185
186 private void updateCoordAction(CoordinatorActionBean action) throws CommandException {
187 action.setStatus(CoordinatorActionBean.Status.RUNNING);
188 action.incrementAndGetPending();
189 action.setLastModifiedTime(new Date());
190 try {
191 jpaService.execute(new CoordActionUpdateStatusJPAExecutor(action));
192 }
193 catch (JPAExecutorException e) {
194 throw new CommandException(e);
195 }
196 }
197
198 /* (non-Javadoc)
199 * @see org.apache.oozie.command.TransitionXCommand#getJob()
200 */
201 @Override
202 public Job getJob() {
203 return coordJob;
204 }
205 }