001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.util.Date;
021    import java.util.List;
022    
023    import org.apache.oozie.CoordinatorActionBean;
024    import org.apache.oozie.CoordinatorJobBean;
025    import org.apache.oozie.ErrorCode;
026    import org.apache.oozie.XException;
027    import org.apache.oozie.client.CoordinatorJob;
028    import org.apache.oozie.client.Job;
029    import org.apache.oozie.command.CommandException;
030    import org.apache.oozie.command.PreconditionException;
031    import org.apache.oozie.command.ResumeTransitionXCommand;
032    import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
033    import org.apache.oozie.command.wf.ResumeXCommand;
034    import org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor;
035    import org.apache.oozie.executor.jpa.CoordJobGetActionsSuspendedJPAExecutor;
036    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
037    import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
038    import org.apache.oozie.executor.jpa.JPAExecutorException;
039    import org.apache.oozie.service.JPAService;
040    import org.apache.oozie.service.Services;
041    import org.apache.oozie.util.InstrumentUtils;
042    import org.apache.oozie.util.LogUtils;
043    import org.apache.oozie.util.ParamChecker;
044    
045    /**
046     * Resume coordinator job and actions.
047     *
048     */
049    public class CoordResumeXCommand extends ResumeTransitionXCommand {
050        private final String jobId;
051        private CoordinatorJobBean coordJob = null;
052        private JPAService jpaService = null;
053        private boolean exceptionOccured = false;
054        CoordinatorJob.Status prevStatus;
055    
056        public CoordResumeXCommand(String id) {
057            super("coord_resume", "coord_resume", 1);
058            this.jobId = ParamChecker.notEmpty(id, "id");
059        }
060    
061        /* (non-Javadoc)
062         * @see org.apache.oozie.command.XCommand#getEntityKey()
063         */
064        @Override
065        public String getEntityKey() {
066            return jobId;
067        }
068    
069        /* (non-Javadoc)
070         * @see org.apache.oozie.command.XCommand#isLockRequired()
071         */
072        @Override
073        protected boolean isLockRequired() {
074            return true;
075        }
076    
077        /* (non-Javadoc)
078         * @see org.apache.oozie.command.XCommand#loadState()
079         */
080        @Override
081        protected void loadState() throws CommandException {
082            jpaService = Services.get().get(JPAService.class);
083            if (jpaService == null) {
084                throw new CommandException(ErrorCode.E0610);
085            }
086            try {
087                coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
088            }
089            catch (JPAExecutorException e) {
090                throw new CommandException(e);
091            }
092            setJob(coordJob);
093            prevStatus = coordJob.getStatus();
094            LogUtils.setLogInfo(coordJob, logInfo);
095        }
096    
097        /* (non-Javadoc)
098         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
099         */
100        @Override
101        protected void verifyPrecondition() throws CommandException, PreconditionException {
102            if (coordJob.getStatus() != CoordinatorJob.Status.SUSPENDED && coordJob.getStatus() != Job.Status.PREPSUSPENDED) {
103                throw new PreconditionException(ErrorCode.E1100, "CoordResumeXCommand not Resumed - "
104                        + "job not in SUSPENDED/PREPSUSPENDED state, job = " + jobId);
105            }
106        }
107    
108        /* (non-Javadoc)
109         * @see org.apache.oozie.command.TransitionXCommand#updateJob()
110         */
111        @Override
112        public void updateJob() throws CommandException {
113            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
114            coordJob.setSuspendedTime(null);
115            coordJob.setLastModifiedTime(new Date());
116            LOG.debug("Resume coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = "
117                    + coordJob.isPending());
118            try {
119                jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
120            }
121            catch (JPAExecutorException e) {
122                throw new CommandException(e);
123            }
124        }
125    
126        /* (non-Javadoc)
127         * @see org.apache.oozie.command.ResumeTransitionXCommand#resumeChildren()
128         */
129        @Override
130        public void resumeChildren() throws CommandException {
131            try {
132                // Get all suspended actions to resume them
133                List<CoordinatorActionBean> actionList = jpaService.execute(new CoordJobGetActionsSuspendedJPAExecutor(
134                        jobId));
135    
136                for (CoordinatorActionBean action : actionList) {
137                    if (action.getExternalId() != null) {
138                        // queue a ResumeXCommand
139                        queue(new ResumeXCommand(action.getExternalId()));
140                        updateCoordAction(action);
141                        LOG.debug(
142                                "Resume coord action = [{0}], new status = [{1}], pending = [{2}] and queue ResumeXCommand for [{3}]",
143                                action.getId(), action.getStatus(), action.getPending(), action.getExternalId());
144                    }
145                    else {
146                        updateCoordAction(action);
147                        LOG.debug(
148                                "Resume coord action = [{0}], new status = [{1}], pending = [{2}] and external id is null",
149                                action.getId(), action.getStatus(), action.getPending());
150                    }
151                }
152    
153            }
154            catch (XException ex) {
155                exceptionOccured = true;
156                throw new CommandException(ex);
157            }
158            finally {
159                if (exceptionOccured) {
160                    coordJob.setStatus(CoordinatorJob.Status.FAILED);
161                    coordJob.resetPending();
162                    LOG.warn("Resume children failed so fail coordinator, coordinator job id = " + jobId + ", status = "
163                            + coordJob.getStatus());
164                    try {
165                        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
166                    }
167                    catch (JPAExecutorException je) {
168                        LOG.error("Failed to update coordinator job : " + jobId, je);
169                    }
170                }
171            }
172        }
173    
174        /* (non-Javadoc)
175         * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
176         */
177        @Override
178        public void notifyParent() throws CommandException {
179            // update bundle action
180            if (this.coordJob.getBundleId() != null) {
181                BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
182                bundleStatusUpdate.call();
183            }
184        }
185    
186        private void updateCoordAction(CoordinatorActionBean action) throws CommandException {
187            action.setStatus(CoordinatorActionBean.Status.RUNNING);
188            action.incrementAndGetPending();
189            action.setLastModifiedTime(new Date());
190            try {
191                jpaService.execute(new CoordActionUpdateStatusJPAExecutor(action));
192            }
193            catch (JPAExecutorException e) {
194                throw new CommandException(e);
195            }
196        }
197    
198        /* (non-Javadoc)
199         * @see org.apache.oozie.command.TransitionXCommand#getJob()
200         */
201        @Override
202        public Job getJob() {
203            return coordJob;
204        }
205    }