This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.util.Date;
021 import java.util.List;
022
023 import org.apache.oozie.CoordinatorActionBean;
024 import org.apache.oozie.CoordinatorJobBean;
025 import org.apache.oozie.ErrorCode;
026 import org.apache.oozie.XException;
027 import org.apache.oozie.client.CoordinatorJob;
028 import org.apache.oozie.client.Job;
029 import org.apache.oozie.command.CommandException;
030 import org.apache.oozie.command.PreconditionException;
031 import org.apache.oozie.command.ResumeTransitionXCommand;
032 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
033 import org.apache.oozie.command.wf.ResumeXCommand;
034 import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
035 import org.apache.oozie.executor.jpa.CoordJobGetActionsSuspendedJPAExecutor;
036 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
037 import org.apache.oozie.executor.jpa.JPAExecutorException;
038 import org.apache.oozie.service.JPAService;
039 import org.apache.oozie.service.Services;
040 import org.apache.oozie.util.InstrumentUtils;
041 import org.apache.oozie.util.LogUtils;
042 import org.apache.oozie.util.ParamChecker;
043
044 /**
045 * Resume coordinator job and actions.
046 *
047 */
048 public class CoordResumeXCommand extends ResumeTransitionXCommand {
049 private final String jobId;
050 private CoordinatorJobBean coordJob = null;
051 private JPAService jpaService = null;
052 private boolean exceptionOccured = false;
053 CoordinatorJob.Status prevStatus;
054
055 public CoordResumeXCommand(String id) {
056 super("coord_resume", "coord_resume", 1);
057 this.jobId = ParamChecker.notEmpty(id, "id");
058 }
059
060 @Override
061 public String getEntityKey() {
062 return jobId;
063 }
064
065 @Override
066 public String getKey() {
067 return getName() + "_" + this.jobId;
068 }
069
070 @Override
071 protected boolean isLockRequired() {
072 return true;
073 }
074
075 @Override
076 protected void loadState() throws CommandException {
077 jpaService = Services.get().get(JPAService.class);
078 if (jpaService == null) {
079 throw new CommandException(ErrorCode.E0610);
080 }
081 try {
082 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
083 }
084 catch (JPAExecutorException e) {
085 throw new CommandException(e);
086 }
087 setJob(coordJob);
088 prevStatus = coordJob.getStatus();
089 LogUtils.setLogInfo(coordJob, logInfo);
090 }
091
092 @Override
093 protected void verifyPrecondition() throws CommandException, PreconditionException {
094 if (coordJob.getStatus() != CoordinatorJob.Status.SUSPENDED && coordJob.getStatus() != CoordinatorJob.Status.SUSPENDEDWITHERROR && coordJob.getStatus() != Job.Status.PREPSUSPENDED) {
095 throw new PreconditionException(ErrorCode.E1100, "CoordResumeXCommand not Resumed - "
096 + "job not in SUSPENDED/SUSPENDEDWITHERROR/PREPSUSPENDED state, job = " + jobId);
097 }
098 }
099
100 @Override
101 public void updateJob() {
102 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
103 coordJob.setSuspendedTime(null);
104 coordJob.setLastModifiedTime(new Date());
105 LOG.debug("Resume coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = "
106 + coordJob.isPending());
107 updateList.add(coordJob);
108 }
109
110 @Override
111 public void resumeChildren() throws CommandException {
112 try {
113 // Get all suspended actions to resume them
114 List<CoordinatorActionBean> actionList = jpaService.execute(new CoordJobGetActionsSuspendedJPAExecutor(
115 jobId));
116
117 for (CoordinatorActionBean action : actionList) {
118 if (action.getExternalId() != null) {
119 // queue a ResumeXCommand
120 queue(new ResumeXCommand(action.getExternalId()));
121 updateCoordAction(action);
122 LOG.debug(
123 "Resume coord action = [{0}], new status = [{1}], pending = [{2}] and queue ResumeXCommand for [{3}]",
124 action.getId(), action.getStatus(), action.getPending(), action.getExternalId());
125 }
126 else {
127 updateCoordAction(action);
128 LOG.debug(
129 "Resume coord action = [{0}], new status = [{1}], pending = [{2}] and external id is null",
130 action.getId(), action.getStatus(), action.getPending());
131 }
132 }
133
134 }
135 catch (XException ex) {
136 exceptionOccured = true;
137 throw new CommandException(ex);
138 }
139 finally {
140 if (exceptionOccured) {
141 coordJob.setStatus(CoordinatorJob.Status.FAILED);
142 coordJob.resetPending();
143 LOG.warn("Resume children failed so fail coordinator, coordinator job id = " + jobId + ", status = "
144 + coordJob.getStatus());
145 updateList.add(coordJob);
146 }
147 }
148 }
149
150 @Override
151 public void notifyParent() throws CommandException {
152 // update bundle action
153 if (this.coordJob.getBundleId() != null) {
154 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
155 bundleStatusUpdate.call();
156 }
157 }
158
159 @Override
160 public void performWrites() throws CommandException {
161 try {
162 jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
163 }
164 catch (JPAExecutorException e) {
165 throw new CommandException(e);
166 }
167 }
168
169 private void updateCoordAction(CoordinatorActionBean action) {
170 action.setStatus(CoordinatorActionBean.Status.RUNNING);
171 action.incrementAndGetPending();
172 action.setLastModifiedTime(new Date());
173 updateList.add(action);
174 }
175
176 @Override
177 public Job getJob() {
178 return coordJob;
179 }
180 }