This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.util.Date;
021 import java.util.List;
022
023 import org.apache.oozie.CoordinatorActionBean;
024 import org.apache.oozie.CoordinatorJobBean;
025 import org.apache.oozie.ErrorCode;
026 import org.apache.oozie.XException;
027 import org.apache.oozie.client.CoordinatorJob;
028 import org.apache.oozie.client.Job;
029 import org.apache.oozie.command.CommandException;
030 import org.apache.oozie.command.PreconditionException;
031 import org.apache.oozie.command.ResumeTransitionXCommand;
032 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
033 import org.apache.oozie.command.wf.ResumeXCommand;
034 import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
035 import org.apache.oozie.executor.jpa.CoordJobGetActionsSuspendedJPAExecutor;
036 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
037 import org.apache.oozie.executor.jpa.JPAExecutorException;
038 import org.apache.oozie.service.JPAService;
039 import org.apache.oozie.service.Services;
040 import org.apache.oozie.util.InstrumentUtils;
041 import org.apache.oozie.util.LogUtils;
042 import org.apache.oozie.util.ParamChecker;
043
044 /**
045 * Resume coordinator job and actions.
046 *
047 */
048 public class CoordResumeXCommand extends ResumeTransitionXCommand {
049 private final String jobId;
050 private CoordinatorJobBean coordJob = null;
051 private JPAService jpaService = null;
052 private boolean exceptionOccured = false;
053 CoordinatorJob.Status prevStatus;
054
055 public CoordResumeXCommand(String id) {
056 super("coord_resume", "coord_resume", 1);
057 this.jobId = ParamChecker.notEmpty(id, "id");
058 }
059
060 /* (non-Javadoc)
061 * @see org.apache.oozie.command.XCommand#getEntityKey()
062 */
063 @Override
064 public String getEntityKey() {
065 return jobId;
066 }
067
068 /* (non-Javadoc)
069 * @see org.apache.oozie.command.XCommand#isLockRequired()
070 */
071 @Override
072 protected boolean isLockRequired() {
073 return true;
074 }
075
076 /* (non-Javadoc)
077 * @see org.apache.oozie.command.XCommand#loadState()
078 */
079 @Override
080 protected void loadState() throws CommandException {
081 jpaService = Services.get().get(JPAService.class);
082 if (jpaService == null) {
083 throw new CommandException(ErrorCode.E0610);
084 }
085 try {
086 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
087 }
088 catch (JPAExecutorException e) {
089 throw new CommandException(e);
090 }
091 setJob(coordJob);
092 prevStatus = coordJob.getStatus();
093 LogUtils.setLogInfo(coordJob, logInfo);
094 }
095
096 /* (non-Javadoc)
097 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
098 */
099 @Override
100 protected void verifyPrecondition() throws CommandException, PreconditionException {
101 if (coordJob.getStatus() != CoordinatorJob.Status.SUSPENDED && coordJob.getStatus() != CoordinatorJob.Status.SUSPENDEDWITHERROR && coordJob.getStatus() != Job.Status.PREPSUSPENDED) {
102 throw new PreconditionException(ErrorCode.E1100, "CoordResumeXCommand not Resumed - "
103 + "job not in SUSPENDED/SUSPENDEDWITHERROR/PREPSUSPENDED state, job = " + jobId);
104 }
105 }
106
107 /* (non-Javadoc)
108 * @see org.apache.oozie.command.TransitionXCommand#updateJob()
109 */
110 @Override
111 public void updateJob() {
112 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
113 coordJob.setSuspendedTime(null);
114 coordJob.setLastModifiedTime(new Date());
115 LOG.debug("Resume coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = "
116 + coordJob.isPending());
117 updateList.add(coordJob);
118 }
119
120 /* (non-Javadoc)
121 * @see org.apache.oozie.command.ResumeTransitionXCommand#resumeChildren()
122 */
123 @Override
124 public void resumeChildren() throws CommandException {
125 try {
126 // Get all suspended actions to resume them
127 List<CoordinatorActionBean> actionList = jpaService.execute(new CoordJobGetActionsSuspendedJPAExecutor(
128 jobId));
129
130 for (CoordinatorActionBean action : actionList) {
131 if (action.getExternalId() != null) {
132 // queue a ResumeXCommand
133 queue(new ResumeXCommand(action.getExternalId()));
134 updateCoordAction(action);
135 LOG.debug(
136 "Resume coord action = [{0}], new status = [{1}], pending = [{2}] and queue ResumeXCommand for [{3}]",
137 action.getId(), action.getStatus(), action.getPending(), action.getExternalId());
138 }
139 else {
140 updateCoordAction(action);
141 LOG.debug(
142 "Resume coord action = [{0}], new status = [{1}], pending = [{2}] and external id is null",
143 action.getId(), action.getStatus(), action.getPending());
144 }
145 }
146
147 }
148 catch (XException ex) {
149 exceptionOccured = true;
150 throw new CommandException(ex);
151 }
152 finally {
153 if (exceptionOccured) {
154 coordJob.setStatus(CoordinatorJob.Status.FAILED);
155 coordJob.resetPending();
156 LOG.warn("Resume children failed so fail coordinator, coordinator job id = " + jobId + ", status = "
157 + coordJob.getStatus());
158 updateList.add(coordJob);
159 }
160 }
161 }
162
163 /* (non-Javadoc)
164 * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
165 */
166 @Override
167 public void notifyParent() throws CommandException {
168 // update bundle action
169 if (this.coordJob.getBundleId() != null) {
170 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
171 bundleStatusUpdate.call();
172 }
173 }
174
175 /* (non-Javadoc)
176 * @see org.apache.oozie.command.ResumeTransitionXCommand#performWrites()
177 */
178 @Override
179 public void performWrites() throws CommandException {
180 try {
181 jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
182 }
183 catch (JPAExecutorException e) {
184 throw new CommandException(e);
185 }
186 }
187
188 private void updateCoordAction(CoordinatorActionBean action) {
189 action.setStatus(CoordinatorActionBean.Status.RUNNING);
190 action.incrementAndGetPending();
191 action.setLastModifiedTime(new Date());
192 updateList.add(action);
193 }
194
195 /* (non-Javadoc)
196 * @see org.apache.oozie.command.TransitionXCommand#getJob()
197 */
198 @Override
199 public Job getJob() {
200 return coordJob;
201 }
202 }