001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.coord;
019
020import java.util.Date;
021import java.util.List;
022
023import org.apache.oozie.CoordinatorActionBean;
024import org.apache.oozie.CoordinatorJobBean;
025import org.apache.oozie.ErrorCode;
026import org.apache.oozie.XException;
027import org.apache.oozie.client.CoordinatorJob;
028import org.apache.oozie.client.Job;
029import org.apache.oozie.command.CommandException;
030import org.apache.oozie.command.PreconditionException;
031import org.apache.oozie.command.SuspendTransitionXCommand;
032import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
033import org.apache.oozie.command.wf.SuspendXCommand;
034import org.apache.oozie.executor.jpa.BatchQueryExecutor;
035import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
036import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
037import org.apache.oozie.executor.jpa.CoordJobGetActionsRunningJPAExecutor;
038import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
039import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
040import org.apache.oozie.executor.jpa.JPAExecutorException;
041import org.apache.oozie.service.JPAService;
042import org.apache.oozie.service.Services;
043import org.apache.oozie.util.InstrumentUtils;
044import org.apache.oozie.util.LogUtils;
045import org.apache.oozie.util.ParamChecker;
046import org.apache.oozie.util.StatusUtils;
047
048/**
049 * Suspend coordinator job and actions.
050 *
051 */
052public class CoordSuspendXCommand extends SuspendTransitionXCommand {
053    private final String jobId;
054    private CoordinatorJobBean coordJob;
055    private JPAService jpaService;
056    private boolean exceptionOccured = false;
057    private CoordinatorJob.Status prevStatus = null;
058
059    public CoordSuspendXCommand(String id) {
060        super("coord_suspend", "coord_suspend", 1);
061        this.jobId = ParamChecker.notEmpty(id, "id");
062    }
063
064    @Override
065    public String getEntityKey() {
066        return jobId;
067    }
068
069    @Override
070    public String getKey() {
071        return getName() + "_" + jobId;
072    }
073
074    @Override
075    protected boolean isLockRequired() {
076        return true;
077    }
078
079    @Override
080    protected void loadState() throws CommandException {
081        super.eagerLoadState();
082        try {
083            jpaService = Services.get().get(JPAService.class);
084            if (jpaService != null) {
085                this.coordJob = CoordJobQueryExecutor.getInstance()
086                        .get(CoordJobQuery.GET_COORD_JOB_SUSPEND_KILL, this.jobId);
087                prevStatus = coordJob.getStatus();
088            }
089            else {
090                throw new CommandException(ErrorCode.E0610);
091            }
092        }
093        catch (Exception ex) {
094            throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex);
095        }
096        LogUtils.setLogInfo(this.coordJob, logInfo);
097    }
098
099    @Override
100    protected void verifyPrecondition() throws CommandException, PreconditionException {
101        super.eagerVerifyPrecondition();
102        if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED
103                || coordJob.getStatus() == CoordinatorJob.Status.FAILED
104                || coordJob.getStatus() == CoordinatorJob.Status.KILLED
105                || coordJob.getStatus() == CoordinatorJob.Status.IGNORED) {
106            LOG.info("CoordSuspendXCommand is not going to execute because "
107                    + "job finished or failed or killed, id = " + jobId + ", status = " + coordJob.getStatus());
108            throw new PreconditionException(ErrorCode.E0728, jobId, coordJob.getStatus().toString());
109        }
110    }
111
112    @Override
113    public void suspendChildren() throws CommandException {
114        try {
115            //Get all running actions of a job to suspend them
116            List<CoordinatorActionBean> actionList = jpaService
117                    .execute(new CoordJobGetActionsRunningJPAExecutor(jobId));
118            for (CoordinatorActionBean action : actionList) {
119                // queue a SuspendXCommand
120                if (action.getExternalId() != null) {
121                    queue(new SuspendXCommand(action.getExternalId()));
122                    updateCoordAction(action);
123                    LOG.debug(
124                            "Suspend coord action = [{0}], new status = [{1}], pending = [{2}] and queue SuspendXCommand for [{3}]",
125                            action.getId(), action.getStatus(), action.getPending(), action.getExternalId());
126                }
127                else {
128                    updateCoordAction(action);
129                    LOG.debug(
130                            "Suspend coord action = [{0}], new status = [{1}], pending = [{2}] and external id is null",
131                            action.getId(), action.getStatus(), action.getPending());
132                }
133
134            }
135            LOG.debug("Suspended coordinator actions for the coordinator=[{0}]", jobId);
136        }
137        catch (XException ex) {
138            exceptionOccured = true;
139            throw new CommandException(ex);
140        }
141        finally {
142            if (exceptionOccured) {
143                coordJob.setStatus(CoordinatorJob.Status.FAILED);
144                coordJob.resetPending();
145                LOG.debug("Exception happened, fail coordinator job id = " + jobId + ", status = "
146                        + coordJob.getStatus());
147                updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_TIME, coordJob));
148           }
149        }
150    }
151
152    @Override
153    public void notifyParent() throws CommandException {
154        // update bundle action
155        if (this.coordJob.getBundleId() != null) {
156            BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
157            bundleStatusUpdate.call();
158        }
159    }
160
161    @Override
162    public void updateJob() {
163        InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
164        coordJob.setLastModifiedTime(new Date());
165        coordJob.setSuspendedTime(new Date());
166        LOG.debug("Suspend coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = " + coordJob.isPending());
167        updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_TIME, coordJob));
168    }
169
170    @Override
171    public void performWrites() throws CommandException {
172        try {
173            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
174        }
175        catch (JPAExecutorException jex) {
176            throw new CommandException(jex);
177        }
178    }
179
180    private void updateCoordAction(CoordinatorActionBean action) {
181        action.setStatus(CoordinatorActionBean.Status.SUSPENDED);
182        action.incrementAndGetPending();
183        action.setLastModifiedTime(new Date());
184        updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action));
185    }
186
187    @Override
188    public Job getJob() {
189        return coordJob;
190    }
191
192    /**
193     * Transit job to suspended from running or to prepsuspended from prep.
194     *
195     * @see org.apache.oozie.command.TransitionXCommand#transitToNext()
196     */
197    @Override
198    public void transitToNext() {
199        if (coordJob == null) {
200            coordJob = (CoordinatorJobBean) this.getJob();
201        }
202        if (coordJob.getStatus() == Job.Status.PREP) {
203            coordJob.setStatus(Job.Status.PREPSUSPENDED);
204            coordJob.setStatus(StatusUtils.getStatus(coordJob));
205        }
206        else if (coordJob.getStatus() == Job.Status.RUNNING) {
207            coordJob.setStatus(Job.Status.SUSPENDED);
208        }
209        else if (coordJob.getStatus() == Job.Status.RUNNINGWITHERROR || coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
210            coordJob.setStatus(Job.Status.SUSPENDEDWITHERROR);
211        }
212        else if (coordJob.getStatus() == Job.Status.PAUSED) {
213            coordJob.setStatus(Job.Status.SUSPENDED);
214        }
215        else if (coordJob.getStatus() == Job.Status.PREPPAUSED) {
216            coordJob.setStatus(Job.Status.PREPSUSPENDED);
217        }
218        coordJob.setPending();
219    }
220
221}