This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.bundle;
019
020 import java.util.Date;
021
022 import org.apache.oozie.BundleActionBean;
023 import org.apache.oozie.CoordinatorJobBean;
024 import org.apache.oozie.ErrorCode;
025 import org.apache.oozie.XException;
026 import org.apache.oozie.client.CoordinatorJob;
027 import org.apache.oozie.client.Job;
028 import org.apache.oozie.command.CommandException;
029 import org.apache.oozie.command.PreconditionException;
030 import org.apache.oozie.command.StatusUpdateXCommand;
031 import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor;
032 import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
033 import org.apache.oozie.executor.jpa.JPAExecutorException;
034 import org.apache.oozie.service.JPAService;
035 import org.apache.oozie.service.Services;
036
037 /**
038 * The command to update Bundle status
039 */
040 public class BundleStatusUpdateXCommand extends StatusUpdateXCommand {
041 private final CoordinatorJobBean coordjob;
042 private JPAService jpaService = null;
043 private BundleActionBean bundleaction;
044 private final Job.Status prevStatus;
045
046 /**
047 * The constructor for class {@link BundleStatusUpdateXCommand}
048 *
049 * @param coordjob coordinator job bean
050 * @param prevStatus coordinator job old status
051 */
052 public BundleStatusUpdateXCommand(CoordinatorJobBean coordjob, CoordinatorJob.Status prevStatus) {
053 super("BundleStatusUpdate", "BundleStatusUpdate", 1);
054 this.coordjob = coordjob;
055 this.prevStatus = prevStatus;
056 }
057
058 /* (non-Javadoc)
059 * @see org.apache.oozie.command.XCommand#execute()
060 */
061 @Override
062 protected Void execute() throws CommandException {
063 try {
064 LOG.debug("STARTED BundleStatusUpdateXCommand with bundle id : " + coordjob.getBundleId()
065 + " coord job ID: " + coordjob.getId() + " coord Status " + coordjob.getStatus());
066 Job.Status coordCurrentStatus = coordjob.getStatus();
067 // The status of bundle action should not be updated if the bundle action is in terminal state
068 // and coord Id is null. For e.g if Bundleaction is killed and coord Id is null, then the status of bundle
069 // should not be changed.
070 if (bundleaction.getCoordId() != null || !bundleaction.isTerminalStatus()) {
071 bundleaction.setStatus(coordCurrentStatus);
072 }
073 if (bundleaction.isPending()) {
074 bundleaction.decrementAndGetPending();
075 }
076 // TODO - Uncomment this when bottom up rerun can change terminal state
077 /*BundleJobBean bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleaction.getBundleId()));
078 if (!bundleJob.isPending()) {
079 bundleJob.setPending();
080 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
081 LOG.info("Updated bundle job [{0}] pending to true", bundleaction.getBundleId());
082 }*/
083
084 bundleaction.setLastModifiedTime(new Date());
085 bundleaction.setCoordId(coordjob.getId());
086 jpaService.execute(new BundleActionUpdateJPAExecutor(bundleaction));
087 if (bundleaction.getCoordId() != null) {
088 LOG.info("Updated bundle action [{0}] from prev status [{1}] to current coord status [{2}], and new bundle action pending [{3}]", bundleaction
089 .getBundleActionId(), bundleaction.getStatus(), coordCurrentStatus, bundleaction.getPending());
090 }
091 else {
092 LOG.info("Updated Bundle action [{0}], status = [{1}], pending = [{2}]", bundleaction.getBundleActionId(),
093 bundleaction.getStatus(), bundleaction.getPending());
094 }
095 LOG.debug("ENDED BundleStatusUpdateXCommand with bundle id : " + coordjob.getBundleId() + " coord job ID: "
096 + coordjob.getId() + " coord Status " + coordjob.getStatus());
097 }
098 catch (Exception ex) {
099 throw new CommandException(ErrorCode.E1309, bundleaction.getBundleId(), bundleaction.getCoordName());
100 }
101 return null;
102 }
103
104 /* (non-Javadoc)
105 * @see org.apache.oozie.command.XCommand#getEntityKey()
106 */
107 @Override
108 public String getEntityKey() {
109 return this.bundleaction.getBundleActionId();
110 }
111
112 /* (non-Javadoc)
113 * @see org.apache.oozie.command.XCommand#isLockRequired()
114 */
115 @Override
116 protected boolean isLockRequired() {
117 return true;
118 }
119
120 /* (non-Javadoc)
121 * @see org.apache.oozie.command.XCommand#eagerLoadState()
122 */
123 @Override
124 protected void eagerLoadState() throws CommandException{
125 loadState();
126 }
127
128 /* (non-Javadoc)
129 * @see org.apache.oozie.command.XCommand#loadState()
130 */
131 @Override
132 protected void loadState() throws CommandException {
133 try {
134 if (jpaService == null) {
135 jpaService = Services.get().get(JPAService.class);
136 }
137
138 if (jpaService != null) {
139 this.bundleaction = jpaService.execute(new BundleActionGetJPAExecutor(coordjob.getBundleId(), coordjob
140 .getAppName()));
141 }
142 else {
143 throw new CommandException(ErrorCode.E0610);
144 }
145 }
146 catch (XException ex) {
147 throw new CommandException(ex);
148 }
149 }
150
151 /* (non-Javadoc)
152 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
153 */
154 @Override
155 protected void verifyPrecondition() throws CommandException, PreconditionException {
156 if (bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0 && bundleaction.getCoordId()!=null) {
157 // pending should be decremented only if status of coord job and bundle action is same
158 // e.g if bundle is killed and coord job is running, then pending should not be false
159 // to allow recovery service to pick and kill the coord job
160 if (bundleaction.isPending() && coordjob.getStatus().equals(bundleaction.getStatus())) {
161 bundleaction.decrementAndGetPending();
162 }
163 bundleaction.setLastModifiedTime(new Date());
164 try {
165 jpaService.execute(new BundleActionUpdateJPAExecutor(bundleaction));
166 }
167 catch (JPAExecutorException je) {
168 throw new CommandException(je);
169 }
170 LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}], decrement pending so new pending = [{3}]",
171 bundleaction.getBundleActionId(), bundleaction.getStatusStr(), prevStatus.toString(),
172 bundleaction.getPending());
173 throw new PreconditionException(ErrorCode.E1308, bundleaction.getStatusStr(), prevStatus.toString());
174 }
175 else if (bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0) {
176 LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}], pending = [{3}] and bundle not yet updated with coord-id",
177 bundleaction.getBundleActionId(), bundleaction.getStatusStr(), prevStatus.toString(),
178 bundleaction.getPending());
179 }
180 }
181
182 }