This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.bundle;
019
020 import java.util.Date;
021
022 import org.apache.oozie.BundleActionBean;
023 import org.apache.oozie.CoordinatorJobBean;
024 import org.apache.oozie.ErrorCode;
025 import org.apache.oozie.XException;
026 import org.apache.oozie.client.CoordinatorJob;
027 import org.apache.oozie.client.Job;
028 import org.apache.oozie.command.CommandException;
029 import org.apache.oozie.command.PreconditionException;
030 import org.apache.oozie.command.StatusUpdateXCommand;
031 import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor;
032 import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
033 import org.apache.oozie.executor.jpa.JPAExecutorException;
034 import org.apache.oozie.service.JPAService;
035 import org.apache.oozie.service.Services;
036
037 /**
038 * The command to update Bundle status
039 */
040 public class BundleStatusUpdateXCommand extends StatusUpdateXCommand {
041 private final CoordinatorJobBean coordjob;
042 private JPAService jpaService = null;
043 private BundleActionBean bundleaction;
044 private final Job.Status prevStatus;
045
046 /**
047 * The constructor for class {@link BundleStatusUpdateXCommand}
048 *
049 * @param coordjob coordinator job bean
050 * @param prevStatus coordinator job old status
051 */
052 public BundleStatusUpdateXCommand(CoordinatorJobBean coordjob, CoordinatorJob.Status prevStatus) {
053 super("BundleStatusUpdate", "BundleStatusUpdate", 1);
054 this.coordjob = coordjob;
055 this.prevStatus = prevStatus;
056 }
057
058 /* (non-Javadoc)
059 * @see org.apache.oozie.command.XCommand#execute()
060 */
061 @Override
062 protected Void execute() throws CommandException {
063 try {
064 LOG.debug("STARTED BundleStatusUpdateXCommand with bubdle id : " + coordjob.getBundleId()
065 + " coord job ID: " + coordjob.getId() + " coord Status " + coordjob.getStatus());
066 Job.Status coordCurrentStatus = coordjob.getStatus();
067 Job.Status bundleActionStatus = bundleaction.getStatus();
068 bundleaction.setStatus(coordCurrentStatus);
069
070 if (bundleaction.isPending()) {
071 bundleaction.decrementAndGetPending();
072 }
073 bundleaction.setLastModifiedTime(new Date());
074 bundleaction.setCoordId(coordjob.getId());
075 jpaService.execute(new BundleActionUpdateJPAExecutor(bundleaction));
076 LOG.info("Updated bundle action [{0}] from prev status [{1}] to current coord status [{2}], and new bundle action pending [{3}]", bundleaction
077 .getBundleActionId(), bundleActionStatus, coordCurrentStatus, bundleaction.getPending());
078
079 LOG.debug("ENDED BundleStatusUpdateXCommand with bubdle id : " + coordjob.getBundleId() + " coord job ID: "
080 + coordjob.getId() + " coord Status " + coordjob.getStatus());
081 }
082 catch (Exception ex) {
083 throw new CommandException(ErrorCode.E1309, bundleaction.getBundleId(), bundleaction.getCoordName());
084 }
085 return null;
086 }
087
088 /* (non-Javadoc)
089 * @see org.apache.oozie.command.XCommand#getEntityKey()
090 */
091 @Override
092 protected String getEntityKey() {
093 return this.bundleaction.getBundleActionId();
094 }
095
096 /* (non-Javadoc)
097 * @see org.apache.oozie.command.XCommand#isLockRequired()
098 */
099 @Override
100 protected boolean isLockRequired() {
101 return true;
102 }
103
104 /* (non-Javadoc)
105 * @see org.apache.oozie.command.XCommand#eagerLoadState()
106 */
107 @Override
108 protected void eagerLoadState() throws CommandException{
109 loadState();
110 }
111
112 /* (non-Javadoc)
113 * @see org.apache.oozie.command.XCommand#loadState()
114 */
115 @Override
116 protected void loadState() throws CommandException {
117 try {
118 if (jpaService == null) {
119 jpaService = Services.get().get(JPAService.class);
120 }
121
122 if (jpaService != null) {
123 this.bundleaction = jpaService.execute(new BundleActionGetJPAExecutor(coordjob.getBundleId(), coordjob
124 .getAppName()));
125 }
126 else {
127 throw new CommandException(ErrorCode.E0610);
128 }
129 }
130 catch (XException ex) {
131 throw new CommandException(ex);
132 }
133 }
134
135 /* (non-Javadoc)
136 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
137 */
138 @Override
139 protected void verifyPrecondition() throws CommandException, PreconditionException {
140 if (bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0) {
141 // Previous status are not matched with bundle action status
142 // So that's the error and we should not be updating the Bundle Action status
143 // however we need to decrement the pending flag.
144 if (bundleaction.isPending()) {
145 bundleaction.decrementAndGetPending();
146 }
147 bundleaction.setLastModifiedTime(new Date());
148 try {
149 jpaService.execute(new BundleActionUpdateJPAExecutor(bundleaction));
150 }
151 catch (JPAExecutorException je) {
152 throw new CommandException(je);
153 }
154 LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}], decrement pending so new pending = [{3}]",
155 bundleaction.getBundleActionId(), bundleaction.getStatusStr(), prevStatus.toString(),
156 bundleaction.getPending());
157 throw new PreconditionException(ErrorCode.E1308, bundleaction.getStatusStr(), prevStatus.toString());
158 }
159 }
160
161 }