001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.bundle;
019    
020    import java.util.Date;
021    
022    import org.apache.oozie.BundleActionBean;
023    import org.apache.oozie.CoordinatorJobBean;
024    import org.apache.oozie.ErrorCode;
025    import org.apache.oozie.XException;
026    import org.apache.oozie.client.CoordinatorJob;
027    import org.apache.oozie.client.Job;
028    import org.apache.oozie.command.CommandException;
029    import org.apache.oozie.command.PreconditionException;
030    import org.apache.oozie.command.StatusUpdateXCommand;
031    import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor;
032    import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
033    import org.apache.oozie.executor.jpa.JPAExecutorException;
034    import org.apache.oozie.service.JPAService;
035    import org.apache.oozie.service.Services;
036    
037    /**
038     * The command to update Bundle status
039     */
040    public class BundleStatusUpdateXCommand extends StatusUpdateXCommand {
041        private final CoordinatorJobBean coordjob;
042        private JPAService jpaService = null;
043        private BundleActionBean bundleaction;
044        private final Job.Status prevStatus;
045    
046        /**
047         * The constructor for class {@link BundleStatusUpdateXCommand}
048         *
049         * @param coordjob coordinator job bean
050         * @param prevStatus coordinator job old status
051         */
052        public BundleStatusUpdateXCommand(CoordinatorJobBean coordjob, CoordinatorJob.Status prevStatus) {
053            super("BundleStatusUpdate", "BundleStatusUpdate", 1);
054            this.coordjob = coordjob;
055            this.prevStatus = prevStatus;
056        }
057    
058        /* (non-Javadoc)
059         * @see org.apache.oozie.command.XCommand#execute()
060         */
061        @Override
062        protected Void execute() throws CommandException {
063            try {
064                LOG.debug("STARTED BundleStatusUpdateXCommand with bundle id : " + coordjob.getBundleId()
065                        + " coord job ID: " + coordjob.getId() + " coord Status " + coordjob.getStatus());
066                Job.Status coordCurrentStatus = coordjob.getStatus();
067                // The status of bundle action should not be updated if the bundle action is in terminal state
068                // and coord Id is null. For e.g if Bundleaction is killed and coord Id is null, then the status of bundle
069                // should not be changed.
070                if (bundleaction.getCoordId() != null || !bundleaction.isTerminalStatus()) {
071                    bundleaction.setStatus(coordCurrentStatus);
072                }
073                if (bundleaction.isPending()) {
074                    bundleaction.decrementAndGetPending();
075                }
076                // TODO - Uncomment this when bottom up rerun can change terminal state
077                /*BundleJobBean bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleaction.getBundleId()));
078                if (!bundleJob.isPending()) {
079                    bundleJob.setPending();
080                    jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
081                    LOG.info("Updated bundle job [{0}] pending to true", bundleaction.getBundleId());
082                }*/
083    
084                bundleaction.setLastModifiedTime(new Date());
085                bundleaction.setCoordId(coordjob.getId());
086                jpaService.execute(new BundleActionUpdateJPAExecutor(bundleaction));
087                if (bundleaction.getCoordId() != null) {
088                    LOG.info("Updated bundle action [{0}] from prev status [{1}] to current coord status [{2}], and new bundle action pending [{3}]", bundleaction
089                        .getBundleActionId(), bundleaction.getStatus(), coordCurrentStatus, bundleaction.getPending());
090                }
091                else {
092                    LOG.info("Updated Bundle action [{0}], status = [{1}], pending = [{2}]", bundleaction.getBundleActionId(),
093                            bundleaction.getStatus(), bundleaction.getPending());
094                }
095                LOG.debug("ENDED BundleStatusUpdateXCommand with bundle id : " + coordjob.getBundleId() + " coord job ID: "
096                        + coordjob.getId() + " coord Status " + coordjob.getStatus());
097            }
098            catch (Exception ex) {
099                throw new CommandException(ErrorCode.E1309, bundleaction.getBundleId(), bundleaction.getCoordName());
100            }
101            return null;
102        }
103    
104        /* (non-Javadoc)
105         * @see org.apache.oozie.command.XCommand#getEntityKey()
106         */
107        @Override
108        public String getEntityKey() {
109            return this.bundleaction.getBundleActionId();
110        }
111    
112        /* (non-Javadoc)
113         * @see org.apache.oozie.command.XCommand#isLockRequired()
114         */
115        @Override
116        protected boolean isLockRequired() {
117            return true;
118        }
119    
120        /* (non-Javadoc)
121         * @see org.apache.oozie.command.XCommand#eagerLoadState()
122         */
123        @Override
124        protected void eagerLoadState() throws CommandException{
125            loadState();
126        }
127    
128        /* (non-Javadoc)
129         * @see org.apache.oozie.command.XCommand#loadState()
130         */
131        @Override
132        protected void loadState() throws CommandException {
133            try {
134                if (jpaService == null) {
135                    jpaService = Services.get().get(JPAService.class);
136                }
137    
138                if (jpaService != null) {
139                    this.bundleaction = jpaService.execute(new BundleActionGetJPAExecutor(coordjob.getBundleId(), coordjob
140                            .getAppName()));
141                }
142                else {
143                    throw new CommandException(ErrorCode.E0610);
144                }
145            }
146            catch (XException ex) {
147                throw new CommandException(ex);
148            }
149        }
150    
151        /* (non-Javadoc)
152         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
153         */
154        @Override
155        protected void verifyPrecondition() throws CommandException, PreconditionException {
156            if (bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0 && bundleaction.getCoordId()!=null) {
157                // pending should be decremented only if status of coord job and bundle action is same
158                // e.g if bundle is killed and coord job is running, then pending should not be false
159                // to allow recovery service to pick and kill the coord job
160                if (bundleaction.isPending() && coordjob.getStatus().equals(bundleaction.getStatus())) {
161                    bundleaction.decrementAndGetPending();
162                }
163                bundleaction.setLastModifiedTime(new Date());
164                try {
165                    jpaService.execute(new BundleActionUpdateJPAExecutor(bundleaction));
166                }
167                catch (JPAExecutorException je) {
168                    throw new CommandException(je);
169                }
170                LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}], decrement pending so new pending = [{3}]",
171                                bundleaction.getBundleActionId(), bundleaction.getStatusStr(), prevStatus.toString(),
172                                bundleaction.getPending());
173                throw new PreconditionException(ErrorCode.E1308, bundleaction.getStatusStr(), prevStatus.toString());
174            }
175            else if (bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0) {
176                LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}], pending = [{3}] and bundle not yet updated with coord-id",
177                        bundleaction.getBundleActionId(), bundleaction.getStatusStr(), prevStatus.toString(),
178                        bundleaction.getPending());
179            }
180        }
181    
182    }