001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.bundle;
019
020import java.util.Date;
021
022import org.apache.oozie.BundleActionBean;
023import org.apache.oozie.CoordinatorJobBean;
024import org.apache.oozie.ErrorCode;
025import org.apache.oozie.XException;
026import org.apache.oozie.client.CoordinatorJob;
027import org.apache.oozie.client.Job;
028import org.apache.oozie.command.CommandException;
029import org.apache.oozie.command.PreconditionException;
030import org.apache.oozie.command.StatusUpdateXCommand;
031import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor;
032import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
033import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
034import org.apache.oozie.executor.jpa.JPAExecutorException;
035import org.apache.oozie.service.JPAService;
036import org.apache.oozie.service.Services;
037
038/**
039 * The command to update Bundle status
040 */
041public class BundleStatusUpdateXCommand extends StatusUpdateXCommand {
042    private final CoordinatorJobBean coordjob;
043    private JPAService jpaService = null;
044    private BundleActionBean bundleaction;
045    private final Job.Status prevStatus;
046    private final boolean ignorePending;
047
048    /**
049     * The constructor for class {@link BundleStatusUpdateXCommand}
050     *
051     * @param coordjob coordinator job bean
052     * @param prevStatus coordinator job old status
053     */
054    public BundleStatusUpdateXCommand(CoordinatorJobBean coordjob, CoordinatorJob.Status prevStatus) {
055        this(coordjob, prevStatus, false);
056    }
057
058    public BundleStatusUpdateXCommand(CoordinatorJobBean coordjob, CoordinatorJob.Status prevStatus, boolean ignorePending) {
059        super("BundleStatusUpdate", "BundleStatusUpdate", 1);
060        this.coordjob = coordjob;
061        this.prevStatus = prevStatus;
062        this.ignorePending = ignorePending;
063    }
064
065    @Override
066    protected Void execute() throws CommandException {
067        try {
068            LOG.debug("STARTED BundleStatusUpdateXCommand with bundle id : " + coordjob.getBundleId()
069                    + " coord job ID: " + coordjob.getId() + " coord Status " + coordjob.getStatus());
070            Job.Status coordCurrentStatus = coordjob.getStatus();
071            // The status of bundle action should not be updated if the bundle action is in terminal state
072            // and coord Id is null. For e.g if Bundleaction is killed and coord Id is null, then the status of bundle
073            // should not be changed.
074            if (bundleaction.getCoordId() != null
075                    || !bundleaction.isTerminalStatus()
076                    || (bundleaction.getCoordId() != null && bundleaction.isTerminalStatus() && coordjob
077                            .isTerminalStatus())) {
078                bundleaction.setStatus(coordCurrentStatus);
079            }
080            if (bundleaction.isPending() && !ignorePending) {
081                bundleaction.decrementAndGetPending();
082            }
083            // TODO - Uncomment this when bottom up rerun can change terminal state
084            /*BundleJobBean bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleaction.getBundleId()));
085            if (!bundleJob.isPending()) {
086                bundleJob.setPending();
087                jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
088                LOG.info("Updated bundle job [{0}] pending to true", bundleaction.getBundleId());
089            }*/
090
091            bundleaction.setLastModifiedTime(new Date());
092            bundleaction.setCoordId(coordjob.getId());
093            BundleActionQueryExecutor.getInstance().executeUpdate(
094                    BundleActionQuery.UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID, bundleaction);
095            if (bundleaction.getCoordId() != null) {
096                LOG.info(
097                        "Updated bundle action [{0}] from prev status [{1}] to current coord status [{2}], " +
098                        "and new bundle action pending [{3}]",
099                        bundleaction.getBundleActionId(), bundleaction.getStatus(), coordCurrentStatus,
100                        bundleaction.getPending());
101            }
102            else {
103                LOG.info("Updated Bundle action [{0}], status = [{1}], pending = [{2}]", bundleaction.getBundleActionId(),
104                        bundleaction.getStatus(), bundleaction.getPending());
105            }
106            LOG.debug("ENDED BundleStatusUpdateXCommand with bundle id : " + coordjob.getBundleId() + " coord job ID: "
107                    + coordjob.getId() + " coord Status " + coordjob.getStatus());
108        }
109        catch (Exception ex) {
110            throw new CommandException(ErrorCode.E1309, bundleaction.getBundleId(), bundleaction.getCoordName());
111        }
112        return null;
113    }
114
115    @Override
116    public String getEntityKey() {
117        return coordjob.getBundleId();
118    }
119
120    @Override
121    protected boolean isLockRequired() {
122        return true;
123    }
124
125    @Override
126    protected void loadState() throws CommandException {
127        try {
128            if (jpaService == null) {
129                jpaService = Services.get().get(JPAService.class);
130            }
131
132            if (jpaService != null) {
133                this.bundleaction = jpaService.execute(new BundleActionGetJPAExecutor(coordjob.getBundleId(), coordjob
134                        .getAppName()));
135            }
136            else {
137                throw new CommandException(ErrorCode.E0610);
138            }
139        }
140        catch (XException ex) {
141            throw new CommandException(ex);
142        }
143    }
144
145    @Override
146    protected void verifyPrecondition() throws CommandException, PreconditionException {
147        if (bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0 && bundleaction.getCoordId()!=null) {
148            // pending should be decremented only if status of coord job and bundle action is same
149            // e.g if bundle is killed and coord job is running, then pending should not be false
150            // to allow recovery service to pick and kill the coord job
151            if (bundleaction.isTerminalStatus() && coordjob.isTerminalStatus()) {
152                LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}], "
153                        + "but coord job is currently in terminal state = [{3}]",
154                        bundleaction.getBundleActionId(), bundleaction.getStatusStr(), prevStatus.toString(),
155                        coordjob.getStatus());
156                return;
157            }
158            if (bundleaction.isPending() && coordjob.getStatus().equals(bundleaction.getStatus()) && !ignorePending) {
159                bundleaction.decrementAndGetPending();
160            }
161            bundleaction.setLastModifiedTime(new Date());
162            try {
163                BundleActionQueryExecutor.getInstance().executeUpdate(
164                        BundleActionQuery.UPDATE_BUNDLE_ACTION_PENDING_MODTIME, bundleaction);
165            }
166            catch (JPAExecutorException je) {
167                throw new CommandException(je);
168            }
169            LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}] and current coord"
170                    + " status [{3}], decrement pending so new pending = [{4}]", bundleaction.getBundleActionId(),
171                    bundleaction.getStatusStr(), prevStatus.toString(), coordjob.getStatusStr(),
172                    bundleaction.getPending());
173            throw new PreconditionException(ErrorCode.E1308, bundleaction.getStatusStr(), prevStatus.toString());
174        }
175        else if (bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0) {
176            LOG.info(
177                    "Bundle action [{0}] status [{1}] is different from prev coord status [{2}], " +
178                    "pending = [{3}] and bundle not yet updated with coord-id",
179                    bundleaction.getBundleActionId(), bundleaction.getStatusStr(), prevStatus.toString(),
180                    bundleaction.getPending());
181        }
182    }
183
184}