001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.bundle;
020
021import java.util.Date;
022import java.util.List;
023
024import org.apache.oozie.BundleActionBean;
025import org.apache.oozie.BundleJobBean;
026import org.apache.oozie.ErrorCode;
027import org.apache.oozie.client.Job;
028import org.apache.oozie.command.CommandException;
029import org.apache.oozie.command.PreconditionException;
030import org.apache.oozie.command.ResumeTransitionXCommand;
031import org.apache.oozie.command.coord.CoordResumeXCommand;
032import org.apache.oozie.executor.jpa.BatchQueryExecutor;
033import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
034import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
035import org.apache.oozie.executor.jpa.JPAExecutorException;
036import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
037import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
038import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
039import org.apache.oozie.service.JPAService;
040import org.apache.oozie.service.Services;
041import org.apache.oozie.util.InstrumentUtils;
042import org.apache.oozie.util.LogUtils;
043import org.apache.oozie.util.ParamChecker;
044
045public class BundleJobResumeXCommand extends ResumeTransitionXCommand {
046
047    private final String bundleId;
048    private JPAService jpaService = null;
049    private BundleJobBean bundleJob;
050    private List<BundleActionBean> bundleActions;
051
052    /**
053     * Constructor to create the Bundle Resume Command.
054     *
055     * @param jobId : Bundle Id
056     */
057    public BundleJobResumeXCommand(String jobId) {
058        super("bundle_resume", "bundle_resume", 1);
059        this.bundleId = ParamChecker.notNull(jobId, "BundleId");
060    }
061
062    /* (non-Javadoc)
063     * @see org.apache.oozie.command.ResumeTransitionXCommand#resumeChildren()
064     */
065    @Override
066    public void resumeChildren() {
067        for (BundleActionBean action : bundleActions) {
068            if (action.getStatus() == Job.Status.SUSPENDED || action.getStatus() == Job.Status.SUSPENDEDWITHERROR || action.getStatus() == Job.Status.PREPSUSPENDED) {
069                // queue a CoordResumeXCommand
070                if (action.getCoordId() != null) {
071                    queue(new CoordResumeXCommand(action.getCoordId()));
072                    updateBundleAction(action);
073                    LOG.debug("Resume bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordResumeXCommand for [{3}]",
074                                    action.getBundleActionId(), action.getStatus(), action.getPending(), action
075                                            .getCoordId());
076                }
077                else {
078                    updateBundleAction(action);
079                    LOG.debug("Resume bundle action = [{0}], new status = [{1}], pending = [{2}] and coord id is null",
080                                    action.getBundleActionId(), action.getStatus(), action.getPending());
081                }
082            }
083        }
084        LOG.debug("Resume bundle actions for the bundle=[{0}]", bundleId);
085    }
086
087    private void updateBundleAction(BundleActionBean action) {
088        if (action.getStatus() == Job.Status.PREPSUSPENDED) {
089            action.setStatus(Job.Status.PREP);
090        }
091        else if (action.getStatus() == Job.Status.SUSPENDED) {
092            action.setStatus(Job.Status.RUNNING);
093        }
094        else if (action.getStatus() == Job.Status.SUSPENDEDWITHERROR) {
095            action.setStatus(Job.Status.RUNNINGWITHERROR);
096        }
097        action.incrementAndGetPending();
098        action.setLastModifiedTime(new Date());
099        updateList.add(new UpdateEntry<BundleActionQuery>(BundleActionQuery.UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME, action));
100    }
101
102    /* (non-Javadoc)
103     * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
104     */
105    @Override
106    public void notifyParent() throws CommandException {
107
108    }
109
110    /* (non-Javadoc)
111     * @see org.apache.oozie.command.TransitionXCommand#updateJob()
112     */
113    @Override
114    public void updateJob() {
115        InstrumentUtils.incrJobCounter("bundle_resume", 1, null);
116        bundleJob.setSuspendedTime(null);
117        bundleJob.setLastModifiedTime(new Date());
118        LOG.debug("Resume bundle job id = " + bundleId + ", status = " + bundleJob.getStatus() + ", pending = " + bundleJob.isPending());
119        updateList.add(new UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING_SUSP_MOD_TIME, bundleJob));
120    }
121
122    /* (non-Javadoc)
123     * @see org.apache.oozie.command.ResumeTransitionXCommand#performWrites()
124     */
125    @Override
126    public void performWrites() throws CommandException {
127        try {
128            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
129        }
130        catch (JPAExecutorException e) {
131            throw new CommandException(e);
132        }
133    }
134
135    /* (non-Javadoc)
136     * @see org.apache.oozie.command.XCommand#getEntityKey()
137     */
138    @Override
139    public String getEntityKey() {
140        return bundleId;
141    }
142
143    /* (non-Javadoc)
144     * @see org.apache.oozie.command.XCommand#isLockRequired()
145     */
146    @Override
147    protected boolean isLockRequired() {
148        return true;
149    }
150
151    /* (non-Javadoc)
152     * @see org.apache.oozie.command.XCommand#loadState()
153     */
154    @Override
155    protected void loadState() throws CommandException {
156        jpaService = Services.get().get(JPAService.class);
157        if (jpaService == null) {
158            throw new CommandException(ErrorCode.E0610);
159        }
160
161        try {
162            bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB, bundleId);
163            bundleActions = BundleActionQueryExecutor.getInstance().getList(
164                    BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, bundleId);
165        }
166        catch (Exception Ex) {
167            throw new CommandException(ErrorCode.E0604, bundleId);
168        }
169
170        LogUtils.setLogInfo(bundleJob);
171    }
172
173    /* (non-Javadoc)
174     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
175     */
176    @Override
177    protected void verifyPrecondition() throws CommandException, PreconditionException {
178        if (bundleJob.getStatus() != Job.Status.SUSPENDED && bundleJob.getStatus() != Job.Status.SUSPENDEDWITHERROR && bundleJob.getStatus() != Job.Status.PREPSUSPENDED) {
179            throw new PreconditionException(ErrorCode.E1100, "BundleResumeCommand not Resumed - "
180                    + "job not in SUSPENDED/SUSPENDEDWITHERROR/PREPSUSPENDED state " + bundleId);
181        }
182    }
183
184    /* (non-Javadoc)
185     * @see org.apache.oozie.command.TransitionXCommand#getJob()
186     */
187    @Override
188    public Job getJob() {
189        return bundleJob;
190    }
191}