001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.bundle;
019
020import java.util.Date;
021import java.util.List;
022
023import org.apache.oozie.BundleActionBean;
024import org.apache.oozie.BundleJobBean;
025import org.apache.oozie.ErrorCode;
026import org.apache.oozie.client.Job;
027import org.apache.oozie.command.CommandException;
028import org.apache.oozie.command.PreconditionException;
029import org.apache.oozie.command.SuspendTransitionXCommand;
030import org.apache.oozie.command.coord.CoordSuspendXCommand;
031import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
032import org.apache.oozie.executor.jpa.BatchQueryExecutor;
033import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
034import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
035import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
036import org.apache.oozie.executor.jpa.JPAExecutorException;
037import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
038import org.apache.oozie.util.InstrumentUtils;
039import org.apache.oozie.util.ParamChecker;
040import org.apache.oozie.util.LogUtils;
041
042public class BundleJobSuspendXCommand extends SuspendTransitionXCommand {
043    private final String jobId;
044    private List<BundleActionBean> bundleActions;
045    private BundleJobBean bundleJob;
046
047    public BundleJobSuspendXCommand(String id) {
048        super("bundle_suspend", "bundle_suspend", 1);
049        this.jobId = ParamChecker.notEmpty(id, "id");
050    }
051
052    /* (non-Javadoc)
053     * @see org.apache.oozie.command.TransitionXCommand#getJob()
054     */
055    @Override
056    public Job getJob() {
057        return bundleJob;
058    }
059
060    /* (non-Javadoc)
061     * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
062     */
063    @Override
064    public void notifyParent() throws CommandException {
065    }
066
067    /* (non-Javadoc)
068     * @see org.apache.oozie.command.TransitionXCommand#setJob(org.apache.oozie.client.Job)
069     */
070    @Override
071    public void setJob(Job job) {
072    }
073
074    /* (non-Javadoc)
075     * @see org.apache.oozie.command.SuspendTransitionXCommand#performWrites()
076     */
077    @Override
078    public void performWrites() throws CommandException {
079        try {
080            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
081        }
082        catch (JPAExecutorException e) {
083            throw new CommandException(e);
084        }
085    }
086
087    /* (non-Javadoc)
088     * @see org.apache.oozie.command.XCommand#getEntityKey()
089     */
090    @Override
091    public String getEntityKey() {
092        return this.jobId;
093    }
094
095    /* (non-Javadoc)
096     * @see org.apache.oozie.command.XCommand#isLockRequired()
097     */
098    @Override
099    protected boolean isLockRequired() {
100        return true;
101    }
102
103    /* (non-Javadoc)
104     * @see org.apache.oozie.command.XCommand#loadState()
105     */
106    @Override
107    protected void loadState() throws CommandException {
108        try {
109            bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB, jobId);
110        }
111        catch (Exception Ex) {
112            throw new CommandException(ErrorCode.E0604, jobId);
113        }
114
115        try {
116            bundleActions = BundleActionQueryExecutor.getInstance().getList(
117                    BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, bundleJob.getId());
118        }
119        catch (Exception Ex) {
120            throw new CommandException(ErrorCode.E1311, jobId);
121        }
122
123        LogUtils.setLogInfo(bundleJob, logInfo);
124    }
125
126    /* (non-Javadoc)
127     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
128     */
129    @Override
130    protected void verifyPrecondition() throws CommandException, PreconditionException {
131        if (bundleJob.getStatus() == Job.Status.SUCCEEDED || bundleJob.getStatus() == Job.Status.FAILED
132                || bundleJob.getStatus() == Job.Status.KILLED || bundleJob.getStatus() == Job.Status.DONEWITHERROR) {
133            LOG.info("BundleJobSuspendXCommand is not going to execute because job either succeeded, failed, killed, or donewitherror; id = "
134                            + jobId + ", status = " + bundleJob.getStatus());
135            throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString());
136        }
137    }
138
139    /* (non-Javadoc)
140     * @see org.apache.oozie.command.TransitionXCommand#updateJob()
141     */
142    @Override
143    public void updateJob() {
144        InstrumentUtils.incrJobCounter("bundle_suspend", 1, null);
145        bundleJob.setSuspendedTime(new Date());
146        bundleJob.setLastModifiedTime(new Date());
147
148        LOG.debug("Suspend bundle job id = " + jobId + ", status = " + bundleJob.getStatus() + ", pending = " + bundleJob.isPending());
149        updateList.add(new UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING_SUSP_MOD_TIME, bundleJob));
150    }
151
152    @Override
153    public void suspendChildren() throws CommandException {
154        for (BundleActionBean action : this.bundleActions) {
155            if (action.getStatus() == Job.Status.RUNNING || action.getStatus() == Job.Status.RUNNINGWITHERROR
156                    || action.getStatus() == Job.Status.PREP || action.getStatus() == Job.Status.PAUSED
157                    || action.getStatus() == Job.Status.PAUSEDWITHERROR) {
158                // queue a CoordSuspendXCommand
159                if (action.getCoordId() != null) {
160                    queue(new CoordSuspendXCommand(action.getCoordId()));
161                    updateBundleAction(action);
162                    LOG.debug("Suspend bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordSuspendXCommand for [{3}]",
163                            action.getBundleActionId(), action.getStatus(), action.getPending(), action.getCoordId());
164                } else {
165                    updateBundleAction(action);
166                    LOG.debug("Suspend bundle action = [{0}], new status = [{1}], pending = [{2}] and coord id is null",
167                            action.getBundleActionId(), action.getStatus(), action.getPending());
168                }
169
170            }
171        }
172        LOG.debug("Suspended bundle actions for the bundle=[{0}]", jobId);
173    }
174
175    private void updateBundleAction(BundleActionBean action) {
176        if (action.getStatus() == Job.Status.PREP) {
177            action.setStatus(Job.Status.PREPSUSPENDED);
178        }
179        else if (action.getStatus() == Job.Status.RUNNING) {
180            action.setStatus(Job.Status.SUSPENDED);
181        }
182        else if (action.getStatus() == Job.Status.RUNNINGWITHERROR) {
183            action.setStatus(Job.Status.SUSPENDEDWITHERROR);
184        }
185        else if (action.getStatus() == Job.Status.PAUSED) {
186            action.setStatus(Job.Status.SUSPENDED);
187        }
188        else if (action.getStatus() == Job.Status.PAUSEDWITHERROR) {
189            action.setStatus(Job.Status.SUSPENDEDWITHERROR);
190        }
191
192        action.incrementAndGetPending();
193        action.setLastModifiedTime(new Date());
194        updateList.add(new UpdateEntry<BundleActionQuery>(
195                BundleActionQuery.UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME, action));
196    }
197}