001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command;
019    
020    import java.util.ArrayList;
021    import java.util.Collections;
022    import java.util.Iterator;
023    import java.util.List;
024    
025    import org.apache.oozie.ErrorCode;
026    import org.apache.oozie.XException;
027    import org.apache.oozie.executor.jpa.BundleJobsDeleteJPAExecutor;
028    import org.apache.oozie.executor.jpa.BundleJobsGetForPurgeJPAExecutor;
029    import org.apache.oozie.executor.jpa.CoordJobsCountNotForPurgeFromParentIdJPAExecutor;
030    import org.apache.oozie.executor.jpa.CoordJobsDeleteJPAExecutor;
031    import org.apache.oozie.executor.jpa.CoordJobsGetForPurgeJPAExecutor;
032    import org.apache.oozie.executor.jpa.CoordJobsGetFromParentIdJPAExecutor;
033    import org.apache.oozie.executor.jpa.JPAExecutorException;
034    import org.apache.oozie.executor.jpa.WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor;
035    import org.apache.oozie.executor.jpa.WorkflowJobsDeleteJPAExecutor;
036    import org.apache.oozie.executor.jpa.WorkflowJobsGetFromParentIdJPAExecutor;
037    import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor;
038    import org.apache.oozie.service.JPAService;
039    import org.apache.oozie.service.Services;
040    
041    /**
042     * This class is used to purge workflows, coordinators, and bundles.  It takes into account the relationships between workflows and
043     * coordinators, and coordinators and bundles.  It also only acts on 'limit' number of items at a time to not overtax the DB and in
044     * case something gets rolled back.  Also, children are always deleted before their parents in case of a rollback.
045     */
046    public class PurgeXCommand extends XCommand<Void> {
047        private JPAService jpaService = null;
048        private int wfOlderThan;
049        private int coordOlderThan;
050        private int bundleOlderThan;
051        private final int limit;
052        private List<String> wfList;
053        private List<String> coordList;
054        private List<String> bundleList;
055        private int wfDel;
056        private int coordDel;
057        private int bundleDel;
058    
059        public PurgeXCommand(int wfOlderThan, int coordOlderThan, int bundleOlderThan, int limit) {
060            super("purge", "purge", 0);
061            this.wfOlderThan = wfOlderThan;
062            this.coordOlderThan = coordOlderThan;
063            this.bundleOlderThan = bundleOlderThan;
064            this.limit = limit;
065            wfList = new ArrayList<String>();
066            coordList = new ArrayList<String>();
067            bundleList = new ArrayList<String>();
068            wfDel = 0;
069            coordDel = 0;
070            bundleDel = 0;
071        }
072    
073        /* (non-Javadoc)
074         * @see org.apache.oozie.command.XCommand#loadState()
075         */
076        @Override
077        protected void loadState() throws CommandException {
078            try {
079                jpaService = Services.get().get(JPAService.class);
080    
081                if (jpaService != null) {
082                    // Get the lists of workflows, coordinators, and bundles that can be purged (and have no parents)
083                    int size;
084                    do {
085                        size = wfList.size();
086                        wfList.addAll(jpaService.execute(new WorkflowJobsGetForPurgeJPAExecutor(wfOlderThan, wfList.size(), limit)));
087                    } while(size != wfList.size());
088                    do {
089                        size = coordList.size();
090                        coordList.addAll(jpaService.execute(
091                                new CoordJobsGetForPurgeJPAExecutor(coordOlderThan, coordList.size(), limit)));
092                    } while(size != coordList.size());
093                    do {
094                        size = bundleList.size();
095                        bundleList.addAll(jpaService.execute(
096                                new BundleJobsGetForPurgeJPAExecutor(bundleOlderThan, bundleList.size(), limit)));
097                    } while(size != bundleList.size());
098                }
099                else {
100                    throw new CommandException(ErrorCode.E0610);
101                }
102            }
103            catch (XException ex) {
104                throw new CommandException(ex);
105            }
106        }
107    
108        /* (non-Javadoc)
109         * @see org.apache.oozie.command.XCommand#execute()
110         */
111        @Override
112        protected Void execute() throws CommandException {
113            LOG.debug("STARTED Purge to purge Workflow Jobs older than [{0}] days, Coordinator Jobs older than [{1}] days, and Bundle"
114                    + "jobs older than [{2}] days.", wfOlderThan, coordOlderThan, bundleOlderThan);
115    
116            // Process parentless workflows to purge them and their children
117            if (!wfList.isEmpty()) {
118                try {
119                    processWorkflows(wfList);
120                }
121                catch (JPAExecutorException je) {
122                    throw new CommandException(je);
123                }
124            }
125    
126            // Processs parentless coordinators to purge them and their children
127            if (!coordList.isEmpty()) {
128                try {
129                    processCoordinators(coordList);
130                }
131                catch (JPAExecutorException je) {
132                    throw new CommandException(je);
133                }
134            }
135    
136            // Process bundles to purge them and their children
137            if (!bundleList.isEmpty()) {
138                try {
139                    processBundles(bundleList);
140                }
141                catch (JPAExecutorException je) {
142                    throw new CommandException(je);
143                }
144            }
145    
146            LOG.debug("ENDED Purge deleted [{0}] workflows, [{1}] coordinators, [{2}] bundles", wfDel, coordDel, bundleDel);
147            return null;
148        }
149    
150        /**
151         * Process workflows to purge them and their children.  Uses the processWorkflowsHelper method to help via recursion to make
152         * sure that the workflow children are deleted before their parents.
153         *
154         * @param wfs List of workflows to process
155         * @throws JPAExecutorException If a JPA executor has a problem
156         */
157        private void processWorkflows(List<String> wfs) throws JPAExecutorException {
158            List<String> wfsToPurge = processWorkflowsHelper(wfs);
159            purgeWorkflows(wfsToPurge);
160        }
161    
162        /**
163         * Used by the processWorkflows method and via recursion.
164         *
165         * @param wfs List of workflows to process
166         * @return List of workflows to purge
167         * @throws JPAExecutorException If a JPA executor has a problem
168         */
169        private List<String> processWorkflowsHelper(List<String> wfs) throws JPAExecutorException {
170            // If the list is empty, then we've finished recursing
171            if (wfs.isEmpty()) {
172                return wfs;
173            }
174            List<String> subwfs = new ArrayList<String>();
175            List<String> wfsToPurge = new ArrayList<String>();
176            for (String wfId : wfs) {
177                // We only purge the workflow and its children if they are all ready to be purged
178                long numChildrenNotReady = jpaService.execute(
179                        new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(wfOlderThan, wfId));
180                if (numChildrenNotReady == 0) {
181                    wfsToPurge.add(wfId);
182                    // Get all of the direct children for this workflow
183                    List<String> children = new ArrayList<String>();
184                    int size;
185                    do {
186                        size = children.size();
187                        children.addAll(jpaService.execute(new WorkflowJobsGetFromParentIdJPAExecutor(wfId, children.size(), limit)));
188                    } while (size != children.size());
189                    subwfs.addAll(children);
190                }
191            }
192            // Recurse on the children we just found to process their children
193            wfsToPurge.addAll(processWorkflowsHelper(subwfs));
194            return wfsToPurge;
195        }
196    
197        /**
198         * Process coordinators to purge them and their children.
199         *
200         * @param coords List of coordinators to process
201         * @throws JPAExecutorException If a JPA executor has a problem
202         */
203        private void processCoordinators(List<String> coords) throws JPAExecutorException {
204            List<String> wfsToPurge = new ArrayList<String>();
205            List<String> coordsToPurge = new ArrayList<String>();
206            for (String coordId : coords) {
207                // We only purge the coord and its children if they are all ready to be purged
208                long numChildrenNotReady = jpaService.execute(
209                        new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(wfOlderThan, coordId));
210                if (numChildrenNotReady == 0) {
211                    coordsToPurge.add(coordId);
212                    // Get all of the direct children for this coord
213                    List<String> children = new ArrayList<String>();
214                    int size;
215                    do {
216                        size = children.size();
217                        children.addAll(jpaService.execute(
218                                new WorkflowJobsGetFromParentIdJPAExecutor(coordId, children.size(), limit)));
219                    } while (size != children.size());
220                    wfsToPurge.addAll(children);
221                }
222            }
223            // Process the children
224            processWorkflows(wfsToPurge);
225            // Now that all children have been purged, we can purge the coordinators
226            purgeCoordinators(coordsToPurge);
227        }
228    
229        /**
230         * Process bundles to purge them and their children
231         *
232         * @param bundles List of bundles to process
233         * @throws JPAExecutorException If a JPA executor has a problem
234         */
235        private void processBundles(List<String> bundles) throws JPAExecutorException {
236            List<String> coordsToPurge = new ArrayList<String>();
237            List<String> bundlesToPurge = new ArrayList<String>();
238            for (Iterator<String> it = bundles.iterator(); it.hasNext(); ) {
239                String bundleId = it.next();
240                // We only purge the bundle and its children if they are all ready to be purged
241                long numChildrenNotReady = jpaService.execute(
242                        new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(coordOlderThan, bundleId));
243                if (numChildrenNotReady == 0) {
244                    bundlesToPurge.add(bundleId);
245                    // Get all of the direct children for this bundle
246                    List<String> children = new ArrayList<String>();
247                    int size;
248                    do {
249                        size = children.size();
250                        children.addAll(jpaService.execute(new CoordJobsGetFromParentIdJPAExecutor(bundleId, children.size(), limit)));
251                    } while (size != children.size());
252                    coordsToPurge.addAll(children);
253                }
254            }
255            // Process the children
256            processCoordinators(coordsToPurge);
257            // Now that all children have been purged, we can purge the bundles
258            purgeBundles(bundlesToPurge);
259        }
260    
261        /**
262         * Purge the workflows in REVERSE order in batches of size 'limit' (this must be done in reverse order so that children are
263         * purged before their parents)
264         *
265         * @param wfs List of workflows to purge
266         * @throws JPAExecutorException If a JPA executor has a problem
267         */
268        private void purgeWorkflows(List<String> wfs) throws JPAExecutorException {
269            wfDel += wfs.size();
270            Collections.reverse(wfs);
271            for (int startIndex = 0; startIndex < wfs.size(); ) {
272                int endIndex = (startIndex + limit < wfs.size()) ? (startIndex + limit) : wfs.size();
273                jpaService.execute(new WorkflowJobsDeleteJPAExecutor(wfs.subList(startIndex, endIndex)));
274                startIndex = endIndex;
275            }
276        }
277    
278        /**
279         * Purge the coordinators in SOME order in batches of size 'limit' (its in reverse order only for convenience)
280         *
281         * @param coords List of coordinators to purge
282         * @throws JPAExecutorException If a JPA executor has a problem
283         */
284        private void purgeCoordinators(List<String> coords) throws JPAExecutorException {
285            coordDel += coords.size();
286            for (int startIndex = 0; startIndex < coords.size(); ) {
287                int endIndex = (startIndex + limit < coords.size()) ? (startIndex + limit) : coords.size();
288                jpaService.execute(new CoordJobsDeleteJPAExecutor(coords.subList(startIndex, endIndex)));
289                startIndex = endIndex;
290            }
291        }
292    
293        /**
294         * Purge the bundles in SOME order in batches of size 'limit' (its in reverse order only for convenience)
295         *
296         * @param bundles List of bundles to purge
297         * @throws JPAExecutorException If a JPA executor has a problem
298         */
299        private void purgeBundles(List<String> bundles) throws JPAExecutorException {
300            bundleDel += bundles.size();
301            for (int startIndex = 0; startIndex < bundles.size(); ) {
302                int endIndex = (startIndex + limit < bundles.size()) ? (startIndex + limit) : bundles.size();
303                jpaService.execute(new BundleJobsDeleteJPAExecutor(bundles.subList(startIndex, endIndex)));
304                startIndex = endIndex;
305            }
306        }
307    
308        /* (non-Javadoc)
309         * @see org.apache.oozie.command.XCommand#getEntityKey()
310         */
311        @Override
312        public String getEntityKey() {
313            return null;
314        }
315    
316        /* (non-Javadoc)
317         * @see org.apache.oozie.command.XCommand#isLockRequired()
318         */
319        @Override
320        protected boolean isLockRequired() {
321            return false;
322        }
323    
324        /* (non-Javadoc)
325         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
326         */
327        @Override
328        protected void verifyPrecondition() throws CommandException, PreconditionException {
329        }
330    }