001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.bundle;
019    
020    import java.util.Date;
021    import java.util.HashMap;
022    import java.util.List;
023    import java.util.Map;
024    
025    import org.apache.oozie.BundleActionBean;
026    import org.apache.oozie.BundleJobBean;
027    import org.apache.oozie.CoordinatorJobBean;
028    import org.apache.oozie.ErrorCode;
029    import org.apache.oozie.XException;
030    import org.apache.oozie.client.Job;
031    import org.apache.oozie.client.rest.RestConstants;
032    import org.apache.oozie.command.CommandException;
033    import org.apache.oozie.command.RerunTransitionXCommand;
034    import org.apache.oozie.command.coord.CoordRerunXCommand;
035    import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
036    import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
037    import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
038    import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
039    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
040    import org.apache.oozie.executor.jpa.JPAExecutorException;
041    import org.apache.oozie.service.JPAService;
042    import org.apache.oozie.service.Services;
043    import org.apache.oozie.util.DateUtils;
044    import org.apache.oozie.util.LogUtils;
045    import org.apache.oozie.util.ParamChecker;
046    import org.apache.oozie.util.XLog;
047    
048    /**
049     * Rerun bundle coordinator jobs by a list of coordinator names or dates. User can specify if refresh or noCleanup.
050     * <p/>
051     * The "refresh" is used to indicate if user wants to refresh an action's input/outpur dataset urls
052     * <p/>
053     * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions
054     */
055    public class BundleRerunXCommand extends RerunTransitionXCommand<Void> {
056    
057        private final String coordScope;
058        private final String dateScope;
059        private final boolean refresh;
060        private final boolean noCleanup;
061        private BundleJobBean bundleJob;
062        private List<BundleActionBean> bundleActions;
063        protected boolean prevPending;
064    
065        private JPAService jpaService = null;
066    
067        /**
068         * The constructor for class {@link BundleRerunXCommand}
069         *
070         * @param jobId the bundle job id
071         * @param coordScope the rerun scope for coordinator job names separated by ","
072         * @param dateScope the rerun scope for coordinator nominal times separated by ","
073         * @param refresh true if user wants to refresh input/outpur dataset urls
074         * @param noCleanup false if user wants to cleanup output events for given rerun actions
075         */
076        public BundleRerunXCommand(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) {
077            super("bundle_rerun", "bundle_rerun", 1);
078            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
079            this.coordScope = coordScope;
080            this.dateScope = dateScope;
081            this.refresh = refresh;
082            this.noCleanup = noCleanup;
083        }
084    
085        /* (non-Javadoc)
086         * @see org.apache.oozie.command.XCommand#loadState()
087         */
088        @Override
089        protected void loadState() throws CommandException {
090            try {
091                jpaService = Services.get().get(JPAService.class);
092    
093                if (jpaService != null) {
094                    this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
095                    this.bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(jobId));
096                    LogUtils.setLogInfo(bundleJob, logInfo);
097                    super.setJob(bundleJob);
098                    prevPending = bundleJob.isPending();
099                }
100                else {
101                    throw new CommandException(ErrorCode.E0610);
102                }
103            }
104            catch (XException ex) {
105                throw new CommandException(ex);
106            }
107    
108        }
109    
110        /* (non-Javadoc)
111         * @see org.apache.oozie.command.RerunTransitionXCommand#rerunChildren()
112         */
113        @Override
114        public void rerunChildren() throws CommandException {
115            boolean isUpdateActionDone = false;
116            Map<String, BundleActionBean> coordNameToBAMapping = new HashMap<String, BundleActionBean>();
117            if (bundleActions != null) {
118                for (BundleActionBean action : bundleActions) {
119                    if (action.getCoordName() != null) {
120                        coordNameToBAMapping.put(action.getCoordName(), action);
121                    }
122                }
123            }
124    
125            if (coordScope != null && !coordScope.isEmpty()) {
126                String[] list = coordScope.split(",");
127                for (String coordName : list) {
128                    coordName = coordName.trim();
129                    if (coordNameToBAMapping.keySet().contains(coordName)) {
130                        String coordId = coordNameToBAMapping.get(coordName).getCoordId();
131                        if (coordId == null) {
132                            LOG.info("No coord id found. Therefore, nothing to queue for coord rerun for coordname: " + coordName);
133                            continue;
134                        }
135                        CoordinatorJobBean coordJob = getCoordJob(coordId);
136    
137                        String rerunDateScope;
138                        if (dateScope != null && !dateScope.isEmpty()) {
139                            rerunDateScope = dateScope;
140                        }
141                        else {
142                            String coordStart = DateUtils.convertDateToString(coordJob.getStartTime());
143                            String coordEnd = DateUtils.convertDateToString(coordJob.getEndTime());
144                            rerunDateScope = coordStart + "::" + coordEnd;
145                        }
146                        LOG.debug("Queuing rerun range [" + rerunDateScope + "] for coord id " + coordId + " of bundle "
147                                + bundleJob.getId());
148                        queue(new CoordRerunXCommand(coordId, RestConstants.JOB_COORD_RERUN_DATE, rerunDateScope, refresh,
149                                noCleanup));
150                        updateBundleAction(coordNameToBAMapping.get(coordName));
151                        isUpdateActionDone = true;
152                    }
153                    else {
154                        LOG.info("Rerun for coord " + coordName + " NOT performed because it is not in bundle ", bundleJob.getId());
155                    }
156                }
157            }
158            else if (dateScope != null && !dateScope.isEmpty()) {
159                if (bundleActions != null) {
160                    for (BundleActionBean action : bundleActions) {
161                        if (action.getCoordId() == null) {
162                            LOG.info("No coord id found. Therefore nothing to queue for coord rerun with coord name "
163                                    + action.getCoordName());
164                            continue;
165                        }
166                        LOG.debug("Queuing rerun range [" + dateScope + "] for coord id " + action.getCoordId() + " of bundle "
167                                + bundleJob.getId());
168                        queue(new CoordRerunXCommand(action.getCoordId(), RestConstants.JOB_COORD_RERUN_DATE, dateScope,
169                                refresh, noCleanup));
170                        updateBundleAction(action);
171                        isUpdateActionDone = true;
172                    }
173                }
174            }
175            if (!isUpdateActionDone) {
176                transitToPrevious();
177            }
178            LOG.info("Rerun coord jobs for the bundle=[{0}]", jobId);
179        }
180    
181        private final void transitToPrevious() throws CommandException {
182            bundleJob.setStatus(getPrevStatus());
183            if (!prevPending) {
184                bundleJob.resetPending();
185            }
186            else {
187                bundleJob.setPending();
188            }
189            updateJob();
190        }
191    
192        /**
193         * Update bundle action
194         *
195         * @param action the bundle action
196         * @throws CommandException thrown if failed to update bundle action
197         */
198        private void updateBundleAction(BundleActionBean action) throws CommandException {
199            action.incrementAndGetPending();
200            action.setLastModifiedTime(new Date());
201            try {
202                jpaService.execute(new BundleActionUpdateJPAExecutor(action));
203            }
204            catch (JPAExecutorException je) {
205                throw new CommandException(je);
206            }
207        }
208    
209        /* (non-Javadoc)
210         * @see org.apache.oozie.command.TransitionXCommand#updateJob()
211         */
212        @Override
213        public void updateJob() throws CommandException {
214            try {
215                // rerun a paused bundle job will keep job status at paused and pending at previous pending
216                if (getPrevStatus()!= null && getPrevStatus().equals(Job.Status.PAUSED)) {
217                    bundleJob.setStatus(Job.Status.PAUSED);
218                    if (prevPending) {
219                        bundleJob.setPending();
220                    } else {
221                        bundleJob.resetPending();
222                    }
223                }
224                jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
225            }
226            catch (JPAExecutorException je) {
227                throw new CommandException(je);
228            }
229    
230        }
231    
232        /* (non-Javadoc)
233         * @see org.apache.oozie.command.XCommand#getEntityKey()
234         */
235        @Override
236        protected String getEntityKey() {
237            return jobId;
238        }
239    
240        /* (non-Javadoc)
241         * @see org.apache.oozie.command.XCommand#isLockRequired()
242         */
243        @Override
244        protected boolean isLockRequired() {
245            return true;
246        }
247    
248        /*
249         * (non-Javadoc)
250         * @see org.apache.oozie.command.TransitionXCommand#getJob()
251         */
252        @Override
253        public Job getJob() {
254            return bundleJob;
255        }
256    
257        /* (non-Javadoc)
258         * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
259         */
260        @Override
261        public void notifyParent() throws CommandException {
262    
263        }
264    
265        /* (non-Javadoc)
266         * @see org.apache.oozie.command.RerunTransitionXCommand#getLog()
267         */
268        @Override
269        public XLog getLog() {
270            return LOG;
271        }
272    
273        private final CoordinatorJobBean getCoordJob(String coordId) throws CommandException {
274            try {
275                CoordinatorJobBean job = jpaService.execute(new CoordJobGetJPAExecutor(coordId));
276                return job;
277            }
278            catch (JPAExecutorException je) {
279                throw new CommandException(je);
280            }
281    
282        }
283    
284    }