001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.bundle;
019    
020    import java.util.Date;
021    import java.util.HashMap;
022    import java.util.List;
023    import java.util.Map;
024    
025    import org.apache.oozie.BundleActionBean;
026    import org.apache.oozie.BundleJobBean;
027    import org.apache.oozie.CoordinatorJobBean;
028    import org.apache.oozie.ErrorCode;
029    import org.apache.oozie.XException;
030    import org.apache.oozie.client.Job;
031    import org.apache.oozie.client.rest.RestConstants;
032    import org.apache.oozie.command.CommandException;
033    import org.apache.oozie.command.RerunTransitionXCommand;
034    import org.apache.oozie.command.coord.CoordRerunXCommand;
035    import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
036    import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
037    import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
038    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
039    import org.apache.oozie.executor.jpa.JPAExecutorException;
040    import org.apache.oozie.service.JPAService;
041    import org.apache.oozie.service.Services;
042    import org.apache.oozie.util.DateUtils;
043    import org.apache.oozie.util.LogUtils;
044    import org.apache.oozie.util.ParamChecker;
045    import org.apache.oozie.util.XLog;
046    
047    /**
048     * Rerun bundle coordinator jobs by a list of coordinator names or dates. User can specify if refresh or noCleanup.
049     * <p/>
050     * The "refresh" is used to indicate if user wants to refresh an action's input/outpur dataset urls
051     * <p/>
052     * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions
053     */
054    public class BundleRerunXCommand extends RerunTransitionXCommand<Void> {
055    
056        private final String coordScope;
057        private final String dateScope;
058        private final boolean refresh;
059        private final boolean noCleanup;
060        private BundleJobBean bundleJob;
061        private List<BundleActionBean> bundleActions;
062        protected boolean prevPending;
063    
064        private JPAService jpaService = null;
065    
066        /**
067         * The constructor for class {@link BundleRerunXCommand}
068         *
069         * @param jobId the bundle job id
070         * @param coordScope the rerun scope for coordinator job names separated by ","
071         * @param dateScope the rerun scope for coordinator nominal times separated by ","
072         * @param refresh true if user wants to refresh input/outpur dataset urls
073         * @param noCleanup false if user wants to cleanup output events for given rerun actions
074         */
075        public BundleRerunXCommand(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) {
076            super("bundle_rerun", "bundle_rerun", 1);
077            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
078            this.coordScope = coordScope;
079            this.dateScope = dateScope;
080            this.refresh = refresh;
081            this.noCleanup = noCleanup;
082        }
083    
084        /* (non-Javadoc)
085         * @see org.apache.oozie.command.XCommand#loadState()
086         */
087        @Override
088        protected void loadState() throws CommandException {
089            try {
090                jpaService = Services.get().get(JPAService.class);
091    
092                if (jpaService != null) {
093                    this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
094                    this.bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(jobId));
095                    LogUtils.setLogInfo(bundleJob, logInfo);
096                    super.setJob(bundleJob);
097                    prevPending = bundleJob.isPending();
098                }
099                else {
100                    throw new CommandException(ErrorCode.E0610);
101                }
102            }
103            catch (XException ex) {
104                throw new CommandException(ex);
105            }
106    
107        }
108    
109        /* (non-Javadoc)
110         * @see org.apache.oozie.command.RerunTransitionXCommand#rerunChildren()
111         */
112        @Override
113        public void rerunChildren() throws CommandException {
114            boolean isUpdateActionDone = false;
115            Map<String, BundleActionBean> coordNameToBAMapping = new HashMap<String, BundleActionBean>();
116            if (bundleActions != null) {
117                for (BundleActionBean action : bundleActions) {
118                    if (action.getCoordName() != null) {
119                        coordNameToBAMapping.put(action.getCoordName(), action);
120                    }
121                }
122            }
123    
124            if (coordScope != null && !coordScope.isEmpty()) {
125                String[] list = coordScope.split(",");
126                for (String coordName : list) {
127                    coordName = coordName.trim();
128                    if (coordNameToBAMapping.keySet().contains(coordName)) {
129                        String coordId = coordNameToBAMapping.get(coordName).getCoordId();
130                        if (coordId == null) {
131                            LOG.info("No coord id found. Therefore, nothing to queue for coord rerun for coordname: " + coordName);
132                            continue;
133                        }
134                        CoordinatorJobBean coordJob = getCoordJob(coordId);
135    
136                        String rerunDateScope;
137                        if (dateScope != null && !dateScope.isEmpty()) {
138                            rerunDateScope = dateScope;
139                        }
140                        else {
141                            String coordStart = DateUtils.formatDateOozieTZ(coordJob.getStartTime());
142                            String coordEnd = DateUtils.formatDateOozieTZ(coordJob.getEndTime());
143                            rerunDateScope = coordStart + "::" + coordEnd;
144                        }
145                        LOG.debug("Queuing rerun range [" + rerunDateScope + "] for coord id " + coordId + " of bundle "
146                                + bundleJob.getId());
147                        queue(new CoordRerunXCommand(coordId, RestConstants.JOB_COORD_RERUN_DATE, rerunDateScope, refresh,
148                                noCleanup));
149                        updateBundleAction(coordNameToBAMapping.get(coordName));
150                        isUpdateActionDone = true;
151                    }
152                    else {
153                        LOG.info("Rerun for coord " + coordName + " NOT performed because it is not in bundle ", bundleJob.getId());
154                    }
155                }
156            }
157            else if (dateScope != null && !dateScope.isEmpty()) {
158                if (bundleActions != null) {
159                    for (BundleActionBean action : bundleActions) {
160                        if (action.getCoordId() == null) {
161                            LOG.info("No coord id found. Therefore nothing to queue for coord rerun with coord name "
162                                    + action.getCoordName());
163                            continue;
164                        }
165                        LOG.debug("Queuing rerun range [" + dateScope + "] for coord id " + action.getCoordId() + " of bundle "
166                                + bundleJob.getId());
167                        queue(new CoordRerunXCommand(action.getCoordId(), RestConstants.JOB_COORD_RERUN_DATE, dateScope,
168                                refresh, noCleanup));
169                        updateBundleAction(action);
170                        isUpdateActionDone = true;
171                    }
172                }
173            }
174            if (!isUpdateActionDone) {
175                transitToPrevious();
176            }
177            LOG.info("Rerun coord jobs for the bundle=[{0}]", jobId);
178        }
179    
180        private final void transitToPrevious() throws CommandException {
181            bundleJob.setStatus(getPrevStatus());
182            if (!prevPending) {
183                bundleJob.resetPending();
184            }
185            else {
186                bundleJob.setPending();
187            }
188            updateJob();
189        }
190    
191        /**
192         * Update bundle action
193         *
194         * @param action the bundle action
195         * @throws CommandException thrown if failed to update bundle action
196         */
197        private void updateBundleAction(BundleActionBean action) {
198            action.incrementAndGetPending();
199            action.setLastModifiedTime(new Date());
200            updateList.add(action);
201        }
202    
203        /* (non-Javadoc)
204         * @see org.apache.oozie.command.TransitionXCommand#updateJob()
205         */
206        @Override
207        public void updateJob() {
208            // rerun a paused bundle job will keep job status at paused and pending at previous pending
209            if (getPrevStatus() != null) {
210                Job.Status bundleJobStatus = getPrevStatus();
211                if (bundleJobStatus.equals(Job.Status.PAUSED) || bundleJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
212                    bundleJob.setStatus(bundleJobStatus);
213                    if (prevPending) {
214                        bundleJob.setPending();
215                    }
216                    else {
217                        bundleJob.resetPending();
218                    }
219                }
220            }
221            updateList.add(bundleJob);
222        }
223    
224        /* (non-Javadoc)
225         * @see org.apache.oozie.command.RerunTransitionXCommand#performWrites()
226         */
227        @Override
228        public void performWrites() throws CommandException {
229            try {
230                jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
231            }
232            catch (JPAExecutorException e) {
233                throw new CommandException(e);
234            }
235        }
236    
237        /* (non-Javadoc)
238         * @see org.apache.oozie.command.XCommand#getEntityKey()
239         */
240        @Override
241        public String getEntityKey() {
242            return jobId;
243        }
244    
245        /* (non-Javadoc)
246         * @see org.apache.oozie.command.XCommand#isLockRequired()
247         */
248        @Override
249        protected boolean isLockRequired() {
250            return true;
251        }
252    
253        /*
254         * (non-Javadoc)
255         * @see org.apache.oozie.command.TransitionXCommand#getJob()
256         */
257        @Override
258        public Job getJob() {
259            return bundleJob;
260        }
261    
262        /* (non-Javadoc)
263         * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
264         */
265        @Override
266        public void notifyParent() throws CommandException {
267    
268        }
269    
270        /* (non-Javadoc)
271         * @see org.apache.oozie.command.RerunTransitionXCommand#getLog()
272         */
273        @Override
274        public XLog getLog() {
275            return LOG;
276        }
277    
278        private final CoordinatorJobBean getCoordJob(String coordId) throws CommandException {
279            try {
280                CoordinatorJobBean job = jpaService.execute(new CoordJobGetJPAExecutor(coordId));
281                return job;
282            }
283            catch (JPAExecutorException je) {
284                throw new CommandException(je);
285            }
286    
287        }
288    
289    }