001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.util.Date;
021    import java.util.List;
022    
023    import org.apache.oozie.CoordinatorActionBean;
024    import org.apache.oozie.CoordinatorJobBean;
025    import org.apache.oozie.ErrorCode;
026    import org.apache.oozie.XException;
027    import org.apache.oozie.client.CoordinatorJob;
028    import org.apache.oozie.client.Job;
029    import org.apache.oozie.command.CommandException;
030    import org.apache.oozie.command.PreconditionException;
031    import org.apache.oozie.command.SuspendTransitionXCommand;
032    import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
033    import org.apache.oozie.command.wf.SuspendXCommand;
034    import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
035    import org.apache.oozie.executor.jpa.CoordJobGetActionsRunningJPAExecutor;
036    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
037    import org.apache.oozie.executor.jpa.JPAExecutorException;
038    import org.apache.oozie.service.JPAService;
039    import org.apache.oozie.service.Services;
040    import org.apache.oozie.util.InstrumentUtils;
041    import org.apache.oozie.util.LogUtils;
042    import org.apache.oozie.util.ParamChecker;
043    import org.apache.oozie.util.StatusUtils;
044    
045    /**
046     * Suspend coordinator job and actions.
047     *
048     */
049    public class CoordSuspendXCommand extends SuspendTransitionXCommand {
050        private final String jobId;
051        private CoordinatorJobBean coordJob;
052        private JPAService jpaService;
053        private boolean exceptionOccured = false;
054        private CoordinatorJob.Status prevStatus = null;
055    
056        public CoordSuspendXCommand(String id) {
057            super("coord_suspend", "coord_suspend", 1);
058            this.jobId = ParamChecker.notEmpty(id, "id");
059        }
060    
061        /* (non-Javadoc)
062         * @see org.apache.oozie.command.XCommand#getEntityKey()
063         */
064        @Override
065        public String getEntityKey() {
066            return jobId;
067        }
068    
069        /* (non-Javadoc)
070         * @see org.apache.oozie.command.XCommand#isLockRequired()
071         */
072        @Override
073        protected boolean isLockRequired() {
074            return true;
075        }
076    
077        /* (non-Javadoc)
078         * @see org.apache.oozie.command.XCommand#loadState()
079         */
080        @Override
081        protected void loadState() throws CommandException {
082            super.eagerLoadState();
083            try {
084                jpaService = Services.get().get(JPAService.class);
085                if (jpaService != null) {
086                    this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(this.jobId));
087                    prevStatus = coordJob.getStatus();
088                }
089                else {
090                    throw new CommandException(ErrorCode.E0610);
091                }
092            }
093            catch (Exception ex) {
094                throw new CommandException(ErrorCode.E0603, ex);
095            }
096            LogUtils.setLogInfo(this.coordJob, logInfo);
097        }
098    
099        /* (non-Javadoc)
100         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
101         */
102        @Override
103        protected void verifyPrecondition() throws CommandException, PreconditionException {
104            super.eagerVerifyPrecondition();
105            if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED
106                    || coordJob.getStatus() == CoordinatorJob.Status.FAILED
107                    || coordJob.getStatus() == CoordinatorJob.Status.KILLED) {
108                LOG.info("CoordSuspendXCommand is not going to execute because "
109                        + "job finished or failed or killed, id = " + jobId + ", status = " + coordJob.getStatus());
110                throw new PreconditionException(ErrorCode.E0728, jobId, coordJob.getStatus().toString());
111            }
112        }
113    
114        /* (non-Javadoc)
115         * @see org.apache.oozie.command.SuspendTransitionXCommand#suspendChildren()
116         */
117        @Override
118        public void suspendChildren() throws CommandException {
119            try {
120                //Get all running actions of a job to suspend them
121                List<CoordinatorActionBean> actionList = jpaService
122                        .execute(new CoordJobGetActionsRunningJPAExecutor(jobId));
123                for (CoordinatorActionBean action : actionList) {
124                    // queue a SuspendXCommand
125                    if (action.getExternalId() != null) {
126                        queue(new SuspendXCommand(action.getExternalId()));
127                        updateCoordAction(action);
128                        LOG.debug(
129                                "Suspend coord action = [{0}], new status = [{1}], pending = [{2}] and queue SuspendXCommand for [{3}]",
130                                action.getId(), action.getStatus(), action.getPending(), action.getExternalId());
131                    }
132                    else {
133                        updateCoordAction(action);
134                        LOG.debug(
135                                "Suspend coord action = [{0}], new status = [{1}], pending = [{2}] and external id is null",
136                                action.getId(), action.getStatus(), action.getPending());
137                    }
138    
139                }
140                LOG.debug("Suspended coordinator actions for the coordinator=[{0}]", jobId);
141            }
142            catch (XException ex) {
143                exceptionOccured = true;
144                throw new CommandException(ex);
145            }
146            finally {
147                if (exceptionOccured) {
148                    coordJob.setStatus(CoordinatorJob.Status.FAILED);
149                    coordJob.resetPending();
150                    LOG.debug("Exception happened, fail coordinator job id = " + jobId + ", status = "
151                            + coordJob.getStatus());
152                    updateList.add(coordJob);
153                }
154            }
155        }
156    
157        /* (non-Javadoc)
158         * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
159         */
160        @Override
161        public void notifyParent() throws CommandException {
162            // update bundle action
163            if (this.coordJob.getBundleId() != null) {
164                BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
165                bundleStatusUpdate.call();
166            }
167        }
168    
169        /* (non-Javadoc)
170         * @see org.apache.oozie.command.TransitionXCommand#updateJob()
171         */
172        @Override
173        public void updateJob() {
174            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
175            coordJob.setLastModifiedTime(new Date());
176            coordJob.setSuspendedTime(new Date());
177            LOG.debug("Suspend coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = " + coordJob.isPending());
178            updateList.add(coordJob);
179        }
180    
181        /* (non-Javadoc)
182         * @see org.apache.oozie.command.SuspendTransitionXCommand#performWrites()
183         */
184        @Override
185        public void performWrites() throws CommandException {
186            try {
187                jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
188            }
189            catch (JPAExecutorException jex) {
190                throw new CommandException(jex);
191            }
192        }
193    
194        private void updateCoordAction(CoordinatorActionBean action) {
195            action.setStatus(CoordinatorActionBean.Status.SUSPENDED);
196            action.incrementAndGetPending();
197            action.setLastModifiedTime(new Date());
198            updateList.add(action);
199        }
200    
201        /* (non-Javadoc)
202         * @see org.apache.oozie.command.TransitionXCommand#getJob()
203         */
204        @Override
205        public Job getJob() {
206            return coordJob;
207        }
208    
209        /**
210         * Transit job to suspended from running or to prepsuspended from prep.
211         *
212         * @see org.apache.oozie.command.TransitionXCommand#transitToNext()
213         */
214        @Override
215        public void transitToNext() {
216            if (coordJob == null) {
217                coordJob = (CoordinatorJobBean) this.getJob();
218            }
219            if (coordJob.getStatus() == Job.Status.PREP) {
220                coordJob.setStatus(Job.Status.PREPSUSPENDED);
221                coordJob.setStatus(StatusUtils.getStatus(coordJob));
222            }
223            else if (coordJob.getStatus() == Job.Status.RUNNING) {
224                coordJob.setStatus(Job.Status.SUSPENDED);
225            }
226            else if (coordJob.getStatus() == Job.Status.RUNNINGWITHERROR) {
227                coordJob.setStatus(Job.Status.SUSPENDEDWITHERROR);
228            }
229            else if (coordJob.getStatus() == Job.Status.PAUSED) {
230                coordJob.setStatus(Job.Status.SUSPENDED);
231            }
232            else if (coordJob.getStatus() == Job.Status.PREPPAUSED) {
233                coordJob.setStatus(Job.Status.PREPSUSPENDED);
234            }
235            coordJob.setPending();
236        }
237    
238    }