001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.bundle;
020
021import java.util.Date;
022
023import org.apache.oozie.BundleActionBean;
024import org.apache.oozie.CoordinatorJobBean;
025import org.apache.oozie.ErrorCode;
026import org.apache.oozie.XException;
027import org.apache.oozie.client.CoordinatorJob;
028import org.apache.oozie.client.Job;
029import org.apache.oozie.command.CommandException;
030import org.apache.oozie.command.PreconditionException;
031import org.apache.oozie.command.StatusUpdateXCommand;
032import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor;
033import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
034import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
035import org.apache.oozie.executor.jpa.JPAExecutorException;
036import org.apache.oozie.service.JPAService;
037import org.apache.oozie.service.Services;
038
039/**
040 * The command to update Bundle status
041 */
042public class BundleStatusUpdateXCommand extends StatusUpdateXCommand {
043    private final CoordinatorJobBean coordjob;
044    private JPAService jpaService = null;
045    private BundleActionBean bundleaction;
046    private final Job.Status prevStatus;
047    private final boolean ignorePending;
048
049    /**
050     * The constructor for class {@link BundleStatusUpdateXCommand}
051     *
052     * @param coordjob coordinator job bean
053     * @param prevStatus coordinator job old status
054     */
055    public BundleStatusUpdateXCommand(CoordinatorJobBean coordjob, CoordinatorJob.Status prevStatus) {
056        this(coordjob, prevStatus, false);
057    }
058
059    public BundleStatusUpdateXCommand(CoordinatorJobBean coordjob, CoordinatorJob.Status prevStatus, boolean ignorePending) {
060        super("BundleStatusUpdate", "BundleStatusUpdate", 1);
061        this.coordjob = coordjob;
062        this.prevStatus = prevStatus;
063        this.ignorePending = ignorePending;
064    }
065
066    @Override
067    protected Void execute() throws CommandException {
068        try {
069            LOG.debug("STARTED BundleStatusUpdateXCommand with bundle id : " + coordjob.getBundleId()
070                    + " coord job ID: " + coordjob.getId() + " coord Status " + coordjob.getStatus());
071            Job.Status coordCurrentStatus = coordjob.getStatus();
072            // The status of bundle action should not be updated if the bundle action is in terminal state
073            // and coord Id is null. For e.g if Bundleaction is killed and coord Id is null, then the status of bundle
074            // should not be changed.
075            if (bundleaction.getCoordId() != null
076                    || !bundleaction.isTerminalStatus()
077                    || (bundleaction.getCoordId() != null && bundleaction.isTerminalStatus() && coordjob
078                            .isTerminalStatus())) {
079                bundleaction.setStatus(coordCurrentStatus);
080            }
081            if (bundleaction.isPending() && !ignorePending) {
082                bundleaction.decrementAndGetPending();
083            }
084            // TODO - Uncomment this when bottom up rerun can change terminal state
085            /*BundleJobBean bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleaction.getBundleId()));
086            if (!bundleJob.isPending()) {
087                bundleJob.setPending();
088                jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
089                LOG.info("Updated bundle job [{0}] pending to true", bundleaction.getBundleId());
090            }*/
091
092            bundleaction.setLastModifiedTime(new Date());
093            bundleaction.setCoordId(coordjob.getId());
094            BundleActionQueryExecutor.getInstance().executeUpdate(
095                    BundleActionQuery.UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID, bundleaction);
096            if (bundleaction.getCoordId() != null) {
097                LOG.info(
098                        "Updated bundle action [{0}] from prev status [{1}] to current coord status [{2}], " +
099                        "and new bundle action pending [{3}]",
100                        bundleaction.getBundleActionId(), bundleaction.getStatus(), coordCurrentStatus,
101                        bundleaction.getPending());
102            }
103            else {
104                LOG.info("Updated Bundle action [{0}], status = [{1}], pending = [{2}]", bundleaction.getBundleActionId(),
105                        bundleaction.getStatus(), bundleaction.getPending());
106            }
107            LOG.debug("ENDED BundleStatusUpdateXCommand with bundle id : " + coordjob.getBundleId() + " coord job ID: "
108                    + coordjob.getId() + " coord Status " + coordjob.getStatus());
109        }
110        catch (Exception ex) {
111            throw new CommandException(ErrorCode.E1309, bundleaction.getBundleId(), bundleaction.getCoordName());
112        }
113        return null;
114    }
115
116    @Override
117    public String getEntityKey() {
118        return coordjob.getBundleId();
119    }
120
121    @Override
122    protected boolean isLockRequired() {
123        return true;
124    }
125
126    @Override
127    protected void loadState() throws CommandException {
128        try {
129            if (jpaService == null) {
130                jpaService = Services.get().get(JPAService.class);
131            }
132
133            if (jpaService != null) {
134                this.bundleaction = jpaService.execute(new BundleActionGetJPAExecutor(coordjob.getBundleId(), coordjob
135                        .getAppName()));
136            }
137            else {
138                throw new CommandException(ErrorCode.E0610);
139            }
140        }
141        catch (XException ex) {
142            throw new CommandException(ex);
143        }
144    }
145
146    @Override
147    protected void verifyPrecondition() throws CommandException, PreconditionException {
148        if (bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0 && bundleaction.getCoordId()!=null) {
149            // pending should be decremented only if status of coord job and bundle action is same
150            // e.g if bundle is killed and coord job is running, then pending should not be false
151            // to allow recovery service to pick and kill the coord job
152            if (bundleaction.isTerminalStatus() && coordjob.isTerminalStatus()) {
153                LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}], "
154                        + "but coord job is currently in terminal state = [{3}]",
155                        bundleaction.getBundleActionId(), bundleaction.getStatusStr(), prevStatus.toString(),
156                        coordjob.getStatus());
157                return;
158            }
159            if (bundleaction.isPending() && coordjob.getStatus().equals(bundleaction.getStatus()) && !ignorePending) {
160                bundleaction.decrementAndGetPending();
161            }
162            bundleaction.setLastModifiedTime(new Date());
163            try {
164                BundleActionQueryExecutor.getInstance().executeUpdate(
165                        BundleActionQuery.UPDATE_BUNDLE_ACTION_PENDING_MODTIME, bundleaction);
166            }
167            catch (JPAExecutorException je) {
168                throw new CommandException(je);
169            }
170            LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}] and current coord"
171                    + " status [{3}], decrement pending so new pending = [{4}]", bundleaction.getBundleActionId(),
172                    bundleaction.getStatusStr(), prevStatus.toString(), coordjob.getStatusStr(),
173                    bundleaction.getPending());
174            throw new PreconditionException(ErrorCode.E1308, bundleaction.getStatusStr(), prevStatus.toString());
175        }
176        else if (bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0) {
177            LOG.info(
178                    "Bundle action [{0}] status [{1}] is different from prev coord status [{2}], " +
179                    "pending = [{3}] and bundle not yet updated with coord-id",
180                    bundleaction.getBundleActionId(), bundleaction.getStatusStr(), prevStatus.toString(),
181                    bundleaction.getPending());
182        }
183    }
184
185}