001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command;
019
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.Iterator;
023import java.util.List;
024
025import org.apache.oozie.WorkflowJobBean;
026import org.apache.oozie.ErrorCode;
027import org.apache.oozie.XException;
028import org.apache.oozie.executor.jpa.BundleJobsDeleteJPAExecutor;
029import org.apache.oozie.executor.jpa.BundleJobsGetForPurgeJPAExecutor;
030import org.apache.oozie.executor.jpa.CoordActionsDeleteJPAExecutor;
031import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
032import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
033import org.apache.oozie.executor.jpa.CoordJobsCountNotForPurgeFromParentIdJPAExecutor;
034import org.apache.oozie.executor.jpa.CoordJobsDeleteJPAExecutor;
035import org.apache.oozie.executor.jpa.CoordJobsGetForPurgeJPAExecutor;
036import org.apache.oozie.executor.jpa.CoordJobsGetFromParentIdJPAExecutor;
037import org.apache.oozie.executor.jpa.JPAExecutorException;
038import org.apache.oozie.executor.jpa.WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor;
039import org.apache.oozie.executor.jpa.WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor;
040import org.apache.oozie.executor.jpa.WorkflowJobsDeleteJPAExecutor;
041import org.apache.oozie.executor.jpa.WorkflowJobsGetFromWorkflowParentIdJPAExecutor;
042import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor;
043import org.apache.oozie.executor.jpa.WorkflowJobsGetFromCoordParentIdJPAExecutor;
044import org.apache.oozie.service.JPAService;
045import org.apache.oozie.service.Services;
046
047/**
048 * This class is used to purge workflows, coordinators, and bundles.  It takes into account the relationships between workflows and
049 * coordinators, and coordinators and bundles.  It also only acts on 'limit' number of items at a time to not overtax the DB and in
050 * case something gets rolled back.  Also, children are always deleted before their parents in case of a rollback.
051 */
052public class PurgeXCommand extends XCommand<Void> {
053    private JPAService jpaService = null;
054    private int wfOlderThan;
055    private int coordOlderThan;
056    private int bundleOlderThan;
057    private boolean purgeOldCoordAction = false;
058    private final int limit;
059    private List<String> wfList;
060    private List<String> coordActionList;
061    private List<String> coordList;
062    private List<String> bundleList;
063    private int wfDel;
064    private int coordDel;
065    private int coordActionDel;
066    private int bundleDel;
067
068    public PurgeXCommand(int wfOlderThan, int coordOlderThan, int bundleOlderThan, int limit) {
069        this(wfOlderThan, coordOlderThan, bundleOlderThan, limit, false);
070    }
071
072    public PurgeXCommand(int wfOlderThan, int coordOlderThan, int bundleOlderThan, int limit, boolean purgeOldCoordAction) {
073        super("purge", "purge", 0);
074        this.wfOlderThan = wfOlderThan;
075        this.coordOlderThan = coordOlderThan;
076        this.bundleOlderThan = bundleOlderThan;
077        this.purgeOldCoordAction = purgeOldCoordAction;
078        this.limit = limit;
079        wfList = new ArrayList<String>();
080        coordActionList = new ArrayList<String>();
081        coordList = new ArrayList<String>();
082        bundleList = new ArrayList<String>();
083        wfDel = 0;
084        coordDel = 0;
085        bundleDel = 0;
086    }
087
088    /* (non-Javadoc)
089     * @see org.apache.oozie.command.XCommand#loadState()
090     */
091    @Override
092    protected void loadState() throws CommandException {
093        try {
094            jpaService = Services.get().get(JPAService.class);
095
096            if (jpaService != null) {
097                // Get the lists of workflows, coordinators, and bundles that can be purged (and have no parents)
098                int size;
099                do {
100                    size = wfList.size();
101                    wfList.addAll(jpaService.execute(new WorkflowJobsGetForPurgeJPAExecutor(wfOlderThan, wfList.size(), limit)));
102                } while(size != wfList.size());
103                if (purgeOldCoordAction) {
104                    LOG.debug("Purging workflows of long running coordinators is turned on");
105                    do {
106                        size = coordActionList.size();
107                        long olderThan = wfOlderThan;
108                        List<WorkflowJobBean> jobBeans = WorkflowJobQueryExecutor.getInstance().getList(
109                                WorkflowJobQuery.GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN, olderThan,
110                                coordActionList.size(), limit);
111                        for (WorkflowJobBean bean : jobBeans) {
112                            coordActionList.add(bean.getParentId());
113                            wfList.add(bean.getId());
114                        }
115                    } while(size != coordActionList.size());
116                }
117                do {
118                    size = coordList.size();
119                    coordList.addAll(jpaService.execute(
120                            new CoordJobsGetForPurgeJPAExecutor(coordOlderThan, coordList.size(), limit)));
121                } while(size != coordList.size());
122                do {
123                    size = bundleList.size();
124                    bundleList.addAll(jpaService.execute(
125                            new BundleJobsGetForPurgeJPAExecutor(bundleOlderThan, bundleList.size(), limit)));
126                } while(size != bundleList.size());
127            }
128            else {
129                throw new CommandException(ErrorCode.E0610);
130            }
131        }
132        catch (XException ex) {
133            throw new CommandException(ex);
134        }
135    }
136
137    /* (non-Javadoc)
138     * @see org.apache.oozie.command.XCommand#execute()
139     */
140    @Override
141    protected Void execute() throws CommandException {
142        LOG.info("STARTED Purge to purge Workflow Jobs older than [{0}] days, Coordinator Jobs older than [{1}] days, and Bundle"
143                + "jobs older than [{2}] days.", wfOlderThan, coordOlderThan, bundleOlderThan);
144
145        // Process parentless workflows to purge them and their children
146        if (!wfList.isEmpty()) {
147            try {
148                processWorkflows(wfList);
149            }
150            catch (JPAExecutorException je) {
151                throw new CommandException(je);
152            }
153        }
154
155        // Process coordinator actions of long running coordinators and purge them
156        if (!coordActionList.isEmpty()) {
157            try {
158                purgeCoordActions(coordActionList);
159            }
160            catch (JPAExecutorException je) {
161                throw new CommandException(je);
162            }
163        }
164        // Processs parentless coordinators to purge them and their children
165        if (!coordList.isEmpty()) {
166            try {
167                processCoordinators(coordList);
168            }
169            catch (JPAExecutorException je) {
170                throw new CommandException(je);
171            }
172        }
173
174        // Process bundles to purge them and their children
175        if (!bundleList.isEmpty()) {
176            try {
177                processBundles(bundleList);
178            }
179            catch (JPAExecutorException je) {
180                throw new CommandException(je);
181            }
182        }
183
184        LOG.info("ENDED Purge deleted [{0}] workflows, [{1}] coordinatorActions, [{2}] coordinators, [{3}] bundles",
185                wfDel, coordActionDel, coordDel, bundleDel);
186        return null;
187    }
188
189    /**
190     * Process workflows to purge them and their children.  Uses the processWorkflowsHelper method to help via recursion to make
191     * sure that the workflow children are deleted before their parents.
192     *
193     * @param wfs List of workflows to process
194     * @throws JPAExecutorException If a JPA executor has a problem
195     */
196    private void processWorkflows(List<String> wfs) throws JPAExecutorException {
197        List<String> wfsToPurge = processWorkflowsHelper(wfs);
198        for (String id: wfsToPurge) {
199            LOG.debug("Purging workflow " + id);
200        }
201        purgeWorkflows(wfsToPurge);
202    }
203
204    /**
205     * Used by the processWorkflows method and via recursion.
206     *
207     * @param wfs List of workflows to process
208     * @return List of workflows to purge
209     * @throws JPAExecutorException If a JPA executor has a problem
210     */
211    private List<String> processWorkflowsHelper(List<String> wfs) throws JPAExecutorException {
212        // If the list is empty, then we've finished recursing
213        if (wfs.isEmpty()) {
214            return wfs;
215        }
216        List<String> subwfs = new ArrayList<String>();
217        List<String> wfsToPurge = new ArrayList<String>();
218        for (String wfId : wfs) {
219            // We only purge the workflow and its children if they are all ready to be purged
220            long numChildrenNotReady = jpaService.execute(
221                    new WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(wfOlderThan, wfId));
222            if (numChildrenNotReady == 0) {
223                wfsToPurge.add(wfId);
224                // Get all of the direct children for this workflow
225                List<String> children = new ArrayList<String>();
226                int size;
227                do {
228                    size = children.size();
229                    children.addAll(jpaService.execute(
230                            new WorkflowJobsGetFromWorkflowParentIdJPAExecutor(wfId, children.size(), limit)));
231                } while (size != children.size());
232                subwfs.addAll(children);
233            }
234        }
235        // Recurse on the children we just found to process their children
236        wfsToPurge.addAll(processWorkflowsHelper(subwfs));
237        return wfsToPurge;
238    }
239
240    /**
241     * Process coordinators to purge them and their children.
242     *
243     * @param coords List of coordinators to process
244     * @throws JPAExecutorException If a JPA executor has a problem
245     */
246    private void processCoordinators(List<String> coords) throws JPAExecutorException {
247        List<String> wfsToPurge = new ArrayList<String>();
248        List<String> coordsToPurge = new ArrayList<String>();
249        for (String coordId : coords) {
250            // We only purge the coord and its children if they are all ready to be purged
251            long numChildrenNotReady = jpaService.execute(
252                    new WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(wfOlderThan, coordId));
253            if (numChildrenNotReady == 0) {
254                coordsToPurge.add(coordId);
255                LOG.debug("Purging coordinator " + coordId);
256                // Get all of the direct children for this coord
257                List<String> children = new ArrayList<String>();
258                int size;
259                do {
260                    size = children.size();
261                    children.addAll(jpaService.execute(
262                            new WorkflowJobsGetFromCoordParentIdJPAExecutor(coordId, children.size(), limit)));
263                } while (size != children.size());
264                wfsToPurge.addAll(children);
265            }
266        }
267        // Process the children
268        processWorkflows(wfsToPurge);
269        // Now that all children have been purged, we can purge the coordinators
270        purgeCoordinators(coordsToPurge);
271    }
272
273    /**
274     * Process bundles to purge them and their children
275     *
276     * @param bundles List of bundles to process
277     * @throws JPAExecutorException If a JPA executor has a problem
278     */
279    private void processBundles(List<String> bundles) throws JPAExecutorException {
280        List<String> coordsToPurge = new ArrayList<String>();
281        List<String> bundlesToPurge = new ArrayList<String>();
282        for (Iterator<String> it = bundles.iterator(); it.hasNext(); ) {
283            String bundleId = it.next();
284            // We only purge the bundle and its children if they are all ready to be purged
285            long numChildrenNotReady = jpaService.execute(
286                    new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(coordOlderThan, bundleId));
287            if (numChildrenNotReady == 0) {
288                bundlesToPurge.add(bundleId);
289                LOG.debug("Purging bundle " + bundleId);
290                // Get all of the direct children for this bundle
291                List<String> children = new ArrayList<String>();
292                int size;
293                do {
294                    size = children.size();
295                    children.addAll(jpaService.execute(new CoordJobsGetFromParentIdJPAExecutor(bundleId, children.size(), limit)));
296                } while (size != children.size());
297                coordsToPurge.addAll(children);
298            }
299        }
300        // Process the children
301        processCoordinators(coordsToPurge);
302        // Now that all children have been purged, we can purge the bundles
303        purgeBundles(bundlesToPurge);
304    }
305
306    /**
307     * Purge the workflows in REVERSE order in batches of size 'limit' (this must be done in reverse order so that children are
308     * purged before their parents)
309     *
310     * @param wfs List of workflows to purge
311     * @throws JPAExecutorException If a JPA executor has a problem
312     */
313    private void purgeWorkflows(List<String> wfs) throws JPAExecutorException {
314        wfDel += wfs.size();
315        Collections.reverse(wfs);
316        for (int startIndex = 0; startIndex < wfs.size(); ) {
317            int endIndex = (startIndex + limit < wfs.size()) ? (startIndex + limit) : wfs.size();
318            jpaService.execute(new WorkflowJobsDeleteJPAExecutor(wfs.subList(startIndex, endIndex)));
319            startIndex = endIndex;
320        }
321    }
322
323    /**
324     * Purge coordActions of long running coordinators and purge them
325     *
326     * @param coordActions List of coordActions to purge
327     * @throws JPAExecutorException If a JPA executor has a problem
328     */
329    private void purgeCoordActions(List<String> coordActions) throws JPAExecutorException {
330        coordActionDel = coordActions.size();
331        for (int startIndex = 0; startIndex < coordActions.size(); ) {
332            int endIndex = (startIndex + limit < coordActions.size()) ? (startIndex + limit) : coordActions.size();
333            jpaService.execute(new CoordActionsDeleteJPAExecutor(coordActions.subList(startIndex, endIndex)));
334            startIndex = endIndex;
335        }
336    }
337    /**
338     * Purge the coordinators in SOME order in batches of size 'limit' (its in reverse order only for convenience)
339     *
340     * @param coords List of coordinators to purge
341     * @throws JPAExecutorException If a JPA executor has a problem
342     */
343    private void purgeCoordinators(List<String> coords) throws JPAExecutorException {
344        coordDel += coords.size();
345        for (int startIndex = 0; startIndex < coords.size(); ) {
346            int endIndex = (startIndex + limit < coords.size()) ? (startIndex + limit) : coords.size();
347            jpaService.execute(new CoordJobsDeleteJPAExecutor(coords.subList(startIndex, endIndex)));
348            startIndex = endIndex;
349        }
350    }
351
352    /**
353     * Purge the bundles in SOME order in batches of size 'limit' (its in reverse order only for convenience)
354     *
355     * @param bundles List of bundles to purge
356     * @throws JPAExecutorException If a JPA executor has a problem
357     */
358    private void purgeBundles(List<String> bundles) throws JPAExecutorException {
359        bundleDel += bundles.size();
360        for (int startIndex = 0; startIndex < bundles.size(); ) {
361            int endIndex = (startIndex + limit < bundles.size()) ? (startIndex + limit) : bundles.size();
362            jpaService.execute(new BundleJobsDeleteJPAExecutor(bundles.subList(startIndex, endIndex)));
363            startIndex = endIndex;
364        }
365    }
366
367    /* (non-Javadoc)
368     * @see org.apache.oozie.command.XCommand#getEntityKey()
369     */
370    @Override
371    public String getEntityKey() {
372        return null;
373    }
374
375    /* (non-Javadoc)
376     * @see org.apache.oozie.command.XCommand#isLockRequired()
377     */
378    @Override
379    protected boolean isLockRequired() {
380        return false;
381    }
382
383    /* (non-Javadoc)
384     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
385     */
386    @Override
387    protected void verifyPrecondition() throws CommandException, PreconditionException {
388    }
389}