001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.util.Date;
021    import java.util.List;
022    
023    import org.apache.oozie.CoordinatorActionBean;
024    import org.apache.oozie.CoordinatorJobBean;
025    import org.apache.oozie.ErrorCode;
026    import org.apache.oozie.XException;
027    import org.apache.oozie.client.CoordinatorJob;
028    import org.apache.oozie.client.Job;
029    import org.apache.oozie.command.CommandException;
030    import org.apache.oozie.command.PreconditionException;
031    import org.apache.oozie.command.ResumeTransitionXCommand;
032    import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
033    import org.apache.oozie.command.wf.ResumeXCommand;
034    import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
035    import org.apache.oozie.executor.jpa.CoordJobGetActionsSuspendedJPAExecutor;
036    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
037    import org.apache.oozie.executor.jpa.JPAExecutorException;
038    import org.apache.oozie.service.JPAService;
039    import org.apache.oozie.service.Services;
040    import org.apache.oozie.util.InstrumentUtils;
041    import org.apache.oozie.util.LogUtils;
042    import org.apache.oozie.util.ParamChecker;
043    
044    /**
045     * Resume coordinator job and actions.
046     *
047     */
048    public class CoordResumeXCommand extends ResumeTransitionXCommand {
049        private final String jobId;
050        private CoordinatorJobBean coordJob = null;
051        private JPAService jpaService = null;
052        private boolean exceptionOccured = false;
053        CoordinatorJob.Status prevStatus;
054    
055        public CoordResumeXCommand(String id) {
056            super("coord_resume", "coord_resume", 1);
057            this.jobId = ParamChecker.notEmpty(id, "id");
058        }
059    
060        /* (non-Javadoc)
061         * @see org.apache.oozie.command.XCommand#getEntityKey()
062         */
063        @Override
064        public String getEntityKey() {
065            return jobId;
066        }
067    
068        /* (non-Javadoc)
069         * @see org.apache.oozie.command.XCommand#isLockRequired()
070         */
071        @Override
072        protected boolean isLockRequired() {
073            return true;
074        }
075    
076        /* (non-Javadoc)
077         * @see org.apache.oozie.command.XCommand#loadState()
078         */
079        @Override
080        protected void loadState() throws CommandException {
081            jpaService = Services.get().get(JPAService.class);
082            if (jpaService == null) {
083                throw new CommandException(ErrorCode.E0610);
084            }
085            try {
086                coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
087            }
088            catch (JPAExecutorException e) {
089                throw new CommandException(e);
090            }
091            setJob(coordJob);
092            prevStatus = coordJob.getStatus();
093            LogUtils.setLogInfo(coordJob, logInfo);
094        }
095    
096        /* (non-Javadoc)
097         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
098         */
099        @Override
100        protected void verifyPrecondition() throws CommandException, PreconditionException {
101            if (coordJob.getStatus() != CoordinatorJob.Status.SUSPENDED && coordJob.getStatus() != CoordinatorJob.Status.SUSPENDEDWITHERROR && coordJob.getStatus() != Job.Status.PREPSUSPENDED) {
102                throw new PreconditionException(ErrorCode.E1100, "CoordResumeXCommand not Resumed - "
103                        + "job not in SUSPENDED/SUSPENDEDWITHERROR/PREPSUSPENDED state, job = " + jobId);
104            }
105        }
106    
107        /* (non-Javadoc)
108         * @see org.apache.oozie.command.TransitionXCommand#updateJob()
109         */
110        @Override
111        public void updateJob() {
112            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
113            coordJob.setSuspendedTime(null);
114            coordJob.setLastModifiedTime(new Date());
115            LOG.debug("Resume coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = "
116                    + coordJob.isPending());
117            updateList.add(coordJob);
118        }
119    
120        /* (non-Javadoc)
121         * @see org.apache.oozie.command.ResumeTransitionXCommand#resumeChildren()
122         */
123        @Override
124        public void resumeChildren() throws CommandException {
125            try {
126                // Get all suspended actions to resume them
127                List<CoordinatorActionBean> actionList = jpaService.execute(new CoordJobGetActionsSuspendedJPAExecutor(
128                        jobId));
129    
130                for (CoordinatorActionBean action : actionList) {
131                    if (action.getExternalId() != null) {
132                        // queue a ResumeXCommand
133                        queue(new ResumeXCommand(action.getExternalId()));
134                        updateCoordAction(action);
135                        LOG.debug(
136                                "Resume coord action = [{0}], new status = [{1}], pending = [{2}] and queue ResumeXCommand for [{3}]",
137                                action.getId(), action.getStatus(), action.getPending(), action.getExternalId());
138                    }
139                    else {
140                        updateCoordAction(action);
141                        LOG.debug(
142                                "Resume coord action = [{0}], new status = [{1}], pending = [{2}] and external id is null",
143                                action.getId(), action.getStatus(), action.getPending());
144                    }
145                }
146    
147            }
148            catch (XException ex) {
149                exceptionOccured = true;
150                throw new CommandException(ex);
151            }
152            finally {
153                if (exceptionOccured) {
154                    coordJob.setStatus(CoordinatorJob.Status.FAILED);
155                    coordJob.resetPending();
156                    LOG.warn("Resume children failed so fail coordinator, coordinator job id = " + jobId + ", status = "
157                            + coordJob.getStatus());
158                    updateList.add(coordJob);
159                }
160            }
161        }
162    
163        /* (non-Javadoc)
164         * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
165         */
166        @Override
167        public void notifyParent() throws CommandException {
168            // update bundle action
169            if (this.coordJob.getBundleId() != null) {
170                BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
171                bundleStatusUpdate.call();
172            }
173        }
174    
175        /* (non-Javadoc)
176         * @see org.apache.oozie.command.ResumeTransitionXCommand#performWrites()
177         */
178        @Override
179        public void performWrites() throws CommandException {
180            try {
181                jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
182            }
183            catch (JPAExecutorException e) {
184                throw new CommandException(e);
185            }
186        }
187    
188        private void updateCoordAction(CoordinatorActionBean action) {
189            action.setStatus(CoordinatorActionBean.Status.RUNNING);
190            action.incrementAndGetPending();
191            action.setLastModifiedTime(new Date());
192            updateList.add(action);
193        }
194    
195        /* (non-Javadoc)
196         * @see org.apache.oozie.command.TransitionXCommand#getJob()
197         */
198        @Override
199        public Job getJob() {
200            return coordJob;
201        }
202    }