001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.bundle;
019    
020    import java.util.Date;
021    import java.util.List;
022    
023    import org.apache.oozie.BundleActionBean;
024    import org.apache.oozie.BundleJobBean;
025    import org.apache.oozie.ErrorCode;
026    import org.apache.oozie.XException;
027    import org.apache.oozie.client.Job;
028    import org.apache.oozie.command.CommandException;
029    import org.apache.oozie.command.PreconditionException;
030    import org.apache.oozie.command.SuspendTransitionXCommand;
031    import org.apache.oozie.command.coord.CoordSuspendXCommand;
032    import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
033    import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
034    import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
035    import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
036    import org.apache.oozie.executor.jpa.JPAExecutorException;
037    import org.apache.oozie.service.JPAService;
038    import org.apache.oozie.service.Services;
039    import org.apache.oozie.util.InstrumentUtils;
040    import org.apache.oozie.util.ParamChecker;
041    import org.apache.oozie.util.LogUtils;
042    
043    public class BundleJobSuspendXCommand extends SuspendTransitionXCommand {
044        private final String jobId;
045        private JPAService jpaService;
046        private List<BundleActionBean> bundleActions;
047        private BundleJobBean bundleJob;
048    
049        public BundleJobSuspendXCommand(String id) {
050            super("bundle_suspend", "bundle_suspend", 1);
051            this.jobId = ParamChecker.notEmpty(id, "id");
052        }
053    
054        /* (non-Javadoc)
055         * @see org.apache.oozie.command.TransitionXCommand#getJob()
056         */
057        @Override
058        public Job getJob() {
059            return bundleJob;
060        }
061    
062        /* (non-Javadoc)
063         * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
064         */
065        @Override
066        public void notifyParent() throws CommandException {
067        }
068    
069        /* (non-Javadoc)
070         * @see org.apache.oozie.command.TransitionXCommand#setJob(org.apache.oozie.client.Job)
071         */
072        @Override
073        public void setJob(Job job) {
074        }
075    
076        /* (non-Javadoc)
077         * @see org.apache.oozie.command.XCommand#getEntityKey()
078         */
079        @Override
080        protected String getEntityKey() {
081            return this.jobId;
082        }
083    
084        /* (non-Javadoc)
085         * @see org.apache.oozie.command.XCommand#isLockRequired()
086         */
087        @Override
088        protected boolean isLockRequired() {
089            return true;
090        }
091    
092        /* (non-Javadoc)
093         * @see org.apache.oozie.command.XCommand#loadState()
094         */
095        @Override
096        protected void loadState() throws CommandException {
097            jpaService = Services.get().get(JPAService.class);
098            if (jpaService == null) {
099                throw new CommandException(ErrorCode.E0610);
100            }
101    
102            try {
103                bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
104            }
105            catch (Exception Ex) {
106                throw new CommandException(ErrorCode.E0604, jobId);
107            }
108    
109            try {
110                bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(jobId));
111            }
112            catch (Exception Ex) {
113                throw new CommandException(ErrorCode.E1311, jobId);
114            }
115    
116            LogUtils.setLogInfo(bundleJob, logInfo);
117        }
118    
119        /* (non-Javadoc)
120         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
121         */
122        @Override
123        protected void verifyPrecondition() throws CommandException, PreconditionException {
124            if (bundleJob.getStatus() == Job.Status.SUCCEEDED || bundleJob.getStatus() == Job.Status.FAILED
125                    || bundleJob.getStatus() == Job.Status.KILLED) {
126                LOG.info("BundleJobSuspendXCommand is not going to execute because job finished or failed or killed, id = "
127                                + jobId + ", status = " + bundleJob.getStatus());
128                throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString());
129            }
130        }
131    
132        /* (non-Javadoc)
133         * @see org.apache.oozie.command.TransitionXCommand#updateJob()
134         */
135        @Override
136        public void updateJob() throws CommandException {
137            InstrumentUtils.incrJobCounter("bundle_suspend", 1, null);
138            bundleJob.setSuspendedTime(new Date());
139            bundleJob.setLastModifiedTime(new Date());
140    
141            LOG.debug("Suspend bundle job id = " + jobId + ", status = " + bundleJob.getStatus() + ", pending = " + bundleJob.isPending());
142            try {
143                jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
144            }
145            catch (JPAExecutorException e) {
146                throw new CommandException(e);
147            }
148        }
149    
150        @Override
151        public void suspendChildren() throws CommandException {
152            try {
153                for (BundleActionBean action : this.bundleActions) {
154                    if (action.getStatus() == Job.Status.RUNNING || action.getStatus() == Job.Status.PREP) {
155                        // queue a CoordSuspendXCommand
156                        if (action.getCoordId() != null) {
157                            queue(new CoordSuspendXCommand(action.getCoordId()));
158                            updateBundleAction(action);
159                            LOG.debug("Suspend bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordSuspendXCommand for [{3}]",
160                                    action.getBundleActionId(), action.getStatus(), action.getPending(), action.getCoordId());
161                        } else {
162                            updateBundleAction(action);
163                            LOG.debug("Suspend bundle action = [{0}], new status = [{1}], pending = [{2}] and coord id is null",
164                                    action.getBundleActionId(), action.getStatus(), action.getPending());
165                        }
166    
167                    }
168                }
169                LOG.debug("Suspended bundle actions for the bundle=[{0}]", jobId);
170            }
171            catch (XException ex) {
172                throw new CommandException(ex);
173            }
174        }
175    
176        private void updateBundleAction(BundleActionBean action) throws CommandException {
177            if (action.getStatus() == Job.Status.PREP) {
178                action.setStatus(Job.Status.PREPSUSPENDED);
179            }
180            else if (action.getStatus() == Job.Status.RUNNING) {
181                action.setStatus(Job.Status.SUSPENDED);
182            }
183            action.incrementAndGetPending();
184            action.setLastModifiedTime(new Date());
185            try {
186                jpaService.execute(new BundleActionUpdateJPAExecutor(action));
187            }
188            catch (JPAExecutorException e) {
189                throw new CommandException(e);
190            }
191        }
192    }