This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.bundle;
019
020 import java.util.Date;
021
022 import org.apache.oozie.BundleActionBean;
023 import org.apache.oozie.CoordinatorJobBean;
024 import org.apache.oozie.ErrorCode;
025 import org.apache.oozie.XException;
026 import org.apache.oozie.client.CoordinatorJob;
027 import org.apache.oozie.client.Job;
028 import org.apache.oozie.command.CommandException;
029 import org.apache.oozie.command.PreconditionException;
030 import org.apache.oozie.command.StatusUpdateXCommand;
031 import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor;
032 import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
033 import org.apache.oozie.executor.jpa.JPAExecutorException;
034 import org.apache.oozie.service.JPAService;
035 import org.apache.oozie.service.Services;
036
037 /**
038 * The command to update Bundle status
039 */
040 public class BundleStatusUpdateXCommand extends StatusUpdateXCommand {
041 private final CoordinatorJobBean coordjob;
042 private JPAService jpaService = null;
043 private BundleActionBean bundleaction;
044 private final Job.Status prevStatus;
045
046 /**
047 * The constructor for class {@link BundleStatusUpdateXCommand}
048 *
049 * @param coordjob coordinator job bean
050 * @param prevStatus coordinator job old status
051 */
052 public BundleStatusUpdateXCommand(CoordinatorJobBean coordjob, CoordinatorJob.Status prevStatus) {
053 super("BundleStatusUpdate", "BundleStatusUpdate", 1);
054 this.coordjob = coordjob;
055 this.prevStatus = prevStatus;
056 }
057
058 /* (non-Javadoc)
059 * @see org.apache.oozie.command.XCommand#execute()
060 */
061 @Override
062 protected Void execute() throws CommandException {
063 try {
064 LOG.debug("STARTED BundleStatusUpdateXCommand with bundle id : " + coordjob.getBundleId()
065 + " coord job ID: " + coordjob.getId() + " coord Status " + coordjob.getStatus());
066 Job.Status coordCurrentStatus = coordjob.getStatus();
067 // The status of bundle action should not be updated if the bundle action is in terminal state
068 // TODO - change this once bottom up rerun is allowed to change the bundle action state
069 if (!bundleaction.isTerminalStatus()) {
070 bundleaction.setStatus(coordCurrentStatus);
071 }
072 if (bundleaction.isPending()) {
073 bundleaction.decrementAndGetPending();
074 }
075 // TODO - Uncomment this when bottom up rerun can change terminal state
076 /*BundleJobBean bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleaction.getBundleId()));
077 if (!bundleJob.isPending()) {
078 bundleJob.setPending();
079 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
080 LOG.info("Updated bundle job [{0}] pending to true", bundleaction.getBundleId());
081 }*/
082
083 bundleaction.setLastModifiedTime(new Date());
084 bundleaction.setCoordId(coordjob.getId());
085 jpaService.execute(new BundleActionUpdateJPAExecutor(bundleaction));
086 if (bundleaction.getCoordId() != null) {
087 LOG.info("Updated bundle action [{0}] from prev status [{1}] to current coord status [{2}], and new bundle action pending [{3}]", bundleaction
088 .getBundleActionId(), bundleaction.getStatus(), coordCurrentStatus, bundleaction.getPending());
089 }
090 else {
091 LOG.info("Updated Bundle action [{0}], status = [{1}], pending = [{2}]", bundleaction.getBundleActionId(),
092 bundleaction.getStatus(), bundleaction.getPending());
093 }
094 LOG.debug("ENDED BundleStatusUpdateXCommand with bundle id : " + coordjob.getBundleId() + " coord job ID: "
095 + coordjob.getId() + " coord Status " + coordjob.getStatus());
096 }
097 catch (Exception ex) {
098 throw new CommandException(ErrorCode.E1309, bundleaction.getBundleId(), bundleaction.getCoordName());
099 }
100 return null;
101 }
102
103 /* (non-Javadoc)
104 * @see org.apache.oozie.command.XCommand#getEntityKey()
105 */
106 @Override
107 public String getEntityKey() {
108 return this.bundleaction.getBundleActionId();
109 }
110
111 /* (non-Javadoc)
112 * @see org.apache.oozie.command.XCommand#isLockRequired()
113 */
114 @Override
115 protected boolean isLockRequired() {
116 return true;
117 }
118
119 /* (non-Javadoc)
120 * @see org.apache.oozie.command.XCommand#eagerLoadState()
121 */
122 @Override
123 protected void eagerLoadState() throws CommandException{
124 loadState();
125 }
126
127 /* (non-Javadoc)
128 * @see org.apache.oozie.command.XCommand#loadState()
129 */
130 @Override
131 protected void loadState() throws CommandException {
132 try {
133 if (jpaService == null) {
134 jpaService = Services.get().get(JPAService.class);
135 }
136
137 if (jpaService != null) {
138 this.bundleaction = jpaService.execute(new BundleActionGetJPAExecutor(coordjob.getBundleId(), coordjob
139 .getAppName()));
140 }
141 else {
142 throw new CommandException(ErrorCode.E0610);
143 }
144 }
145 catch (XException ex) {
146 throw new CommandException(ex);
147 }
148 }
149
150 /* (non-Javadoc)
151 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
152 */
153 @Override
154 protected void verifyPrecondition() throws CommandException, PreconditionException {
155 if (bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0 && bundleaction.getCoordId()!=null) {
156 // pending should be decremented only if status of coord job and bundle action is same
157 // e.g if bundle is killed and coord job is running, then pending should not be false
158 // to allow recovery service to pick and kill the coord job
159 if (bundleaction.isPending() && coordjob.getStatus().equals(bundleaction.getStatus())) {
160 bundleaction.decrementAndGetPending();
161 }
162 bundleaction.setLastModifiedTime(new Date());
163 try {
164 jpaService.execute(new BundleActionUpdateJPAExecutor(bundleaction));
165 }
166 catch (JPAExecutorException je) {
167 throw new CommandException(je);
168 }
169 LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}], decrement pending so new pending = [{3}]",
170 bundleaction.getBundleActionId(), bundleaction.getStatusStr(), prevStatus.toString(),
171 bundleaction.getPending());
172 throw new PreconditionException(ErrorCode.E1308, bundleaction.getStatusStr(), prevStatus.toString());
173 }
174 else if (bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0) {
175 LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}], pending = [{3}] and bundle not yet updated with coord-id",
176 bundleaction.getBundleActionId(), bundleaction.getStatusStr(), prevStatus.toString(),
177 bundleaction.getPending());
178 }
179 }
180
181 }