001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.util.Date;
021    import java.util.List;
022    
023    import org.apache.oozie.CoordinatorActionBean;
024    import org.apache.oozie.CoordinatorJobBean;
025    import org.apache.oozie.ErrorCode;
026    import org.apache.oozie.XException;
027    import org.apache.oozie.client.CoordinatorJob;
028    import org.apache.oozie.client.Job;
029    import org.apache.oozie.command.CommandException;
030    import org.apache.oozie.command.PreconditionException;
031    import org.apache.oozie.command.ResumeTransitionXCommand;
032    import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
033    import org.apache.oozie.command.wf.ResumeXCommand;
034    import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
035    import org.apache.oozie.executor.jpa.CoordJobGetActionsSuspendedJPAExecutor;
036    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
037    import org.apache.oozie.executor.jpa.JPAExecutorException;
038    import org.apache.oozie.service.JPAService;
039    import org.apache.oozie.service.Services;
040    import org.apache.oozie.util.InstrumentUtils;
041    import org.apache.oozie.util.LogUtils;
042    import org.apache.oozie.util.ParamChecker;
043    
044    /**
045     * Resume coordinator job and actions.
046     *
047     */
048    public class CoordResumeXCommand extends ResumeTransitionXCommand {
049        private final String jobId;
050        private CoordinatorJobBean coordJob = null;
051        private JPAService jpaService = null;
052        private boolean exceptionOccured = false;
053        CoordinatorJob.Status prevStatus;
054    
055        public CoordResumeXCommand(String id) {
056            super("coord_resume", "coord_resume", 1);
057            this.jobId = ParamChecker.notEmpty(id, "id");
058        }
059    
060        @Override
061        public String getEntityKey() {
062            return jobId;
063        }
064    
065        @Override
066        public String getKey() {
067            return getName() + "_" + this.jobId;
068        }
069    
070        @Override
071        protected boolean isLockRequired() {
072            return true;
073        }
074    
075        @Override
076        protected void loadState() throws CommandException {
077            jpaService = Services.get().get(JPAService.class);
078            if (jpaService == null) {
079                throw new CommandException(ErrorCode.E0610);
080            }
081            try {
082                coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
083            }
084            catch (JPAExecutorException e) {
085                throw new CommandException(e);
086            }
087            setJob(coordJob);
088            prevStatus = coordJob.getStatus();
089            LogUtils.setLogInfo(coordJob, logInfo);
090        }
091    
092        @Override
093        protected void verifyPrecondition() throws CommandException, PreconditionException {
094            if (coordJob.getStatus() != CoordinatorJob.Status.SUSPENDED && coordJob.getStatus() != CoordinatorJob.Status.SUSPENDEDWITHERROR && coordJob.getStatus() != Job.Status.PREPSUSPENDED) {
095                throw new PreconditionException(ErrorCode.E1100, "CoordResumeXCommand not Resumed - "
096                        + "job not in SUSPENDED/SUSPENDEDWITHERROR/PREPSUSPENDED state, job = " + jobId);
097            }
098        }
099    
100        @Override
101        public void updateJob() {
102            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
103            coordJob.setSuspendedTime(null);
104            coordJob.setLastModifiedTime(new Date());
105            LOG.debug("Resume coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = "
106                    + coordJob.isPending());
107            updateList.add(coordJob);
108        }
109    
110        @Override
111        public void resumeChildren() throws CommandException {
112            try {
113                // Get all suspended actions to resume them
114                List<CoordinatorActionBean> actionList = jpaService.execute(new CoordJobGetActionsSuspendedJPAExecutor(
115                        jobId));
116    
117                for (CoordinatorActionBean action : actionList) {
118                    if (action.getExternalId() != null) {
119                        // queue a ResumeXCommand
120                        queue(new ResumeXCommand(action.getExternalId()));
121                        updateCoordAction(action);
122                        LOG.debug(
123                                "Resume coord action = [{0}], new status = [{1}], pending = [{2}] and queue ResumeXCommand for [{3}]",
124                                action.getId(), action.getStatus(), action.getPending(), action.getExternalId());
125                    }
126                    else {
127                        updateCoordAction(action);
128                        LOG.debug(
129                                "Resume coord action = [{0}], new status = [{1}], pending = [{2}] and external id is null",
130                                action.getId(), action.getStatus(), action.getPending());
131                    }
132                }
133    
134            }
135            catch (XException ex) {
136                exceptionOccured = true;
137                throw new CommandException(ex);
138            }
139            finally {
140                if (exceptionOccured) {
141                    coordJob.setStatus(CoordinatorJob.Status.FAILED);
142                    coordJob.resetPending();
143                    LOG.warn("Resume children failed so fail coordinator, coordinator job id = " + jobId + ", status = "
144                            + coordJob.getStatus());
145                    updateList.add(coordJob);
146                }
147            }
148        }
149    
150        @Override
151        public void notifyParent() throws CommandException {
152            // update bundle action
153            if (this.coordJob.getBundleId() != null) {
154                BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
155                bundleStatusUpdate.call();
156            }
157        }
158    
159        @Override
160        public void performWrites() throws CommandException {
161            try {
162                jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
163            }
164            catch (JPAExecutorException e) {
165                throw new CommandException(e);
166            }
167        }
168    
169        private void updateCoordAction(CoordinatorActionBean action) {
170            action.setStatus(CoordinatorActionBean.Status.RUNNING);
171            action.incrementAndGetPending();
172            action.setLastModifiedTime(new Date());
173            updateList.add(action);
174        }
175    
176        @Override
177        public Job getJob() {
178            return coordJob;
179        }
180    }