This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.util.Date;
021 import java.util.List;
022
023 import org.apache.oozie.CoordinatorActionBean;
024 import org.apache.oozie.CoordinatorJobBean;
025 import org.apache.oozie.ErrorCode;
026 import org.apache.oozie.XException;
027 import org.apache.oozie.client.CoordinatorJob;
028 import org.apache.oozie.client.Job;
029 import org.apache.oozie.command.CommandException;
030 import org.apache.oozie.command.PreconditionException;
031 import org.apache.oozie.command.ResumeTransitionXCommand;
032 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
033 import org.apache.oozie.command.wf.ResumeXCommand;
034 import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
035 import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor;
036 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
037 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
038 import org.apache.oozie.executor.jpa.JPAExecutorException;
039 import org.apache.oozie.service.JPAService;
040 import org.apache.oozie.service.Services;
041 import org.apache.oozie.util.InstrumentUtils;
042 import org.apache.oozie.util.LogUtils;
043 import org.apache.oozie.util.ParamChecker;
044
045 /**
046 * Resume coordinator job and actions.
047 *
048 */
049 public class CoordResumeXCommand extends ResumeTransitionXCommand {
050 private final String jobId;
051 private CoordinatorJobBean coordJob = null;
052 private JPAService jpaService = null;
053 private boolean exceptionOccured = false;
054 CoordinatorJob.Status prevStatus;
055
056 public CoordResumeXCommand(String id) {
057 super("coord_resume", "coord_resume", 1);
058 this.jobId = ParamChecker.notEmpty(id, "id");
059 }
060
061 /* (non-Javadoc)
062 * @see org.apache.oozie.command.XCommand#getEntityKey()
063 */
064 @Override
065 protected String getEntityKey() {
066 return jobId;
067 }
068
069 /* (non-Javadoc)
070 * @see org.apache.oozie.command.XCommand#isLockRequired()
071 */
072 @Override
073 protected boolean isLockRequired() {
074 return true;
075 }
076
077 /* (non-Javadoc)
078 * @see org.apache.oozie.command.XCommand#loadState()
079 */
080 @Override
081 protected void loadState() throws CommandException {
082 jpaService = Services.get().get(JPAService.class);
083 if (jpaService == null) {
084 throw new CommandException(ErrorCode.E0610);
085 }
086 try {
087 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
088 }
089 catch (JPAExecutorException e) {
090 throw new CommandException(e);
091 }
092 setJob(coordJob);
093 prevStatus = coordJob.getStatus();
094 LogUtils.setLogInfo(coordJob, logInfo);
095 }
096
097 /* (non-Javadoc)
098 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
099 */
100 @Override
101 protected void verifyPrecondition() throws CommandException, PreconditionException {
102 if (coordJob.getStatus() != CoordinatorJob.Status.SUSPENDED && coordJob.getStatus() != Job.Status.PREPSUSPENDED) {
103 throw new PreconditionException(ErrorCode.E1100, "CoordResumeXCommand not Resumed - "
104 + "job not in SUSPENDED/PREPSUSPENDED state, job = " + jobId);
105 }
106 }
107
108 /* (non-Javadoc)
109 * @see org.apache.oozie.command.TransitionXCommand#updateJob()
110 */
111 @Override
112 public void updateJob() throws CommandException {
113 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
114 coordJob.setSuspendedTime(null);
115 coordJob.setLastModifiedTime(new Date());
116 LOG.debug("Resume coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = " + coordJob.isPending());
117 try {
118 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
119 }
120 catch (JPAExecutorException e) {
121 throw new CommandException(e);
122 }
123 }
124
125 /* (non-Javadoc)
126 * @see org.apache.oozie.command.ResumeTransitionXCommand#resumeChildren()
127 */
128 @Override
129 public void resumeChildren() throws CommandException {
130 try {
131 List<CoordinatorActionBean> actionList = jpaService.execute(new CoordJobGetActionsJPAExecutor(jobId));
132
133 for (CoordinatorActionBean action : actionList) {
134 if(action.getStatus() == CoordinatorActionBean.Status.SUSPENDED){
135 // queue a ResumeXCommand
136 if (action.getExternalId() != null) {
137 queue(new ResumeXCommand(action.getExternalId()));
138 updateCoordAction(action);
139 LOG.debug("Resume coord action = [{0}], new status = [{1}], pending = [{2}] and queue ResumeXCommand for [{3}]",
140 action.getId(), action.getStatus(), action.getPending(), action.getExternalId());
141 }else {
142 updateCoordAction(action);
143 LOG.debug("Resume coord action = [{0}], new status = [{1}], pending = [{2}] and external id is null",
144 action.getId(), action.getStatus(), action.getPending());
145 }
146 }
147 }
148 }
149 catch (XException ex) {
150 exceptionOccured = true;
151 throw new CommandException(ex);
152 }
153 finally {
154 if (exceptionOccured) {
155 coordJob.setStatus(CoordinatorJob.Status.FAILED);
156 coordJob.resetPending();
157 LOG.warn("Resume children failed so fail coordinator, coordinator job id = " + jobId
158 + ", status = " + coordJob.getStatus());
159 try {
160 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
161 }
162 catch (JPAExecutorException je) {
163 LOG.error("Failed to update coordinator job : " + jobId, je);
164 }
165 }
166 }
167 }
168
169 /* (non-Javadoc)
170 * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
171 */
172 @Override
173 public void notifyParent() throws CommandException {
174 // update bundle action
175 if (this.coordJob.getBundleId() != null) {
176 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
177 bundleStatusUpdate.call();
178 }
179 }
180
181 private void updateCoordAction(CoordinatorActionBean action) throws CommandException {
182 action.setStatus(CoordinatorActionBean.Status.RUNNING);
183 action.incrementAndGetPending();
184 action.setLastModifiedTime(new Date());
185 try {
186 jpaService.execute(new CoordActionUpdateJPAExecutor(action));
187 }
188 catch (JPAExecutorException e) {
189 throw new CommandException(e);
190 }
191 }
192
193 /* (non-Javadoc)
194 * @see org.apache.oozie.command.TransitionXCommand#getJob()
195 */
196 @Override
197 public Job getJob() {
198 return coordJob;
199 }
200 }