001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command;
020
021import org.apache.oozie.ErrorCode;
022import org.apache.oozie.WorkflowJobBean;
023import org.apache.oozie.XException;
024import org.apache.oozie.executor.jpa.BundleJobsDeleteJPAExecutor;
025import org.apache.oozie.executor.jpa.BundleJobsGetForPurgeJPAExecutor;
026import org.apache.oozie.executor.jpa.CoordActionsDeleteJPAExecutor;
027import org.apache.oozie.executor.jpa.CoordActionsGetFromCoordJobIdJPAExecutor;
028import org.apache.oozie.executor.jpa.CoordJobsCountNotForPurgeFromParentIdJPAExecutor;
029import org.apache.oozie.executor.jpa.CoordJobsDeleteJPAExecutor;
030import org.apache.oozie.executor.jpa.CoordJobsGetForPurgeJPAExecutor;
031import org.apache.oozie.executor.jpa.CoordJobsGetFromParentIdJPAExecutor;
032import org.apache.oozie.executor.jpa.JPAExecutorException;
033import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
034import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
035import org.apache.oozie.executor.jpa.WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor;
036import org.apache.oozie.executor.jpa.WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor;
037import org.apache.oozie.executor.jpa.WorkflowJobsDeleteJPAExecutor;
038import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor;
039import org.apache.oozie.service.JPAService;
040import org.apache.oozie.service.Services;
041import org.eclipse.jgit.util.StringUtils;
042
043import java.util.ArrayList;
044import java.util.Collection;
045import java.util.Collections;
046import java.util.Date;
047import java.util.Iterator;
048import java.util.List;
049
050/**
051 * This class is used to purge workflows, coordinators, and bundles.  It takes into account the relationships between workflows and
052 * coordinators, and coordinators and bundles.  It also only acts on 'limit' number of items at a time to not overtax the DB and in
053 * case something gets rolled back.  Also, children are always deleted before their parents in case of a rollback.
054 */
055public class PurgeXCommand extends XCommand<Void> {
056    private JPAService jpaService = null;
057    private int wfOlderThan;
058    private int coordOlderThan;
059    private int bundleOlderThan;
060    private boolean purgeOldCoordAction = false;
061    private final int limit;
062    private List<String> wfList;
063    private List<String> coordActionList;
064    private List<String> coordList;
065    private List<String> bundleList;
066    private int wfDel;
067    private int coordDel;
068    private int coordActionDel;
069    private int bundleDel;
070    private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
071
072    public PurgeXCommand(int wfOlderThan, int coordOlderThan, int bundleOlderThan, int limit) {
073        this(wfOlderThan, coordOlderThan, bundleOlderThan, limit, false);
074    }
075
076    public PurgeXCommand(int wfOlderThan, int coordOlderThan, int bundleOlderThan, int limit, boolean purgeOldCoordAction) {
077        super("purge", "purge", 0);
078        this.wfOlderThan = wfOlderThan;
079        this.coordOlderThan = coordOlderThan;
080        this.bundleOlderThan = bundleOlderThan;
081        this.purgeOldCoordAction = purgeOldCoordAction;
082        this.limit = limit;
083        wfList = new ArrayList<String>();
084        coordActionList = new ArrayList<String>();
085        coordList = new ArrayList<String>();
086        bundleList = new ArrayList<String>();
087        wfDel = 0;
088        coordDel = 0;
089        bundleDel = 0;
090    }
091
092    /* (non-Javadoc)
093     * @see org.apache.oozie.command.XCommand#loadState()
094     */
095    @Override
096    protected void loadState() throws CommandException {
097        try {
098            jpaService = Services.get().get(JPAService.class);
099
100            if (jpaService != null) {
101                // Get the lists of workflows, coordinators, and bundles that can be purged (and have no parents)
102                int size;
103                do {
104                    size = wfList.size();
105                    wfList.addAll(jpaService.execute(new WorkflowJobsGetForPurgeJPAExecutor(wfOlderThan, wfList.size(), limit)));
106                } while(size != wfList.size());
107                if (purgeOldCoordAction) {
108                    LOG.debug("Purging workflows of long running coordinators is turned on");
109                    do {
110                        size = coordActionList.size();
111                        long olderThan = wfOlderThan;
112                        List<WorkflowJobBean> jobBeans = WorkflowJobQueryExecutor.getInstance().getList(
113                                WorkflowJobQuery.GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN, olderThan,
114                                coordActionList.size(), limit);
115                        for (WorkflowJobBean bean : jobBeans) {
116                            coordActionList.add(bean.getParentId());
117                            wfList.add(bean.getId());
118                        }
119                    } while(size != coordActionList.size());
120                }
121                do {
122                    size = coordList.size();
123                    coordList.addAll(jpaService.execute(
124                            new CoordJobsGetForPurgeJPAExecutor(coordOlderThan, coordList.size(), limit)));
125                } while(size != coordList.size());
126                do {
127                    size = bundleList.size();
128                    bundleList.addAll(jpaService.execute(
129                            new BundleJobsGetForPurgeJPAExecutor(bundleOlderThan, bundleList.size(), limit)));
130                } while(size != bundleList.size());
131            }
132            else {
133                throw new CommandException(ErrorCode.E0610);
134            }
135        }
136        catch (XException ex) {
137            throw new CommandException(ex);
138        }
139    }
140
141    /* (non-Javadoc)
142     * @see org.apache.oozie.command.XCommand#execute()
143     */
144    @Override
145    protected Void execute() throws CommandException {
146        LOG.info("STARTED Purge to purge Workflow Jobs older than [{0}] days, Coordinator Jobs older than [{1}] days, and Bundle"
147                + " jobs older than [{2}] days.", wfOlderThan, coordOlderThan, bundleOlderThan);
148
149        // Process parentless workflows to purge them and their children
150        if (!wfList.isEmpty()) {
151            try {
152                processWorkflows(wfList);
153            }
154            catch (JPAExecutorException je) {
155                throw new CommandException(je);
156            }
157        }
158
159        // Process coordinator actions of long running coordinators and purge them
160        if (!coordActionList.isEmpty()) {
161            try {
162                purgeCoordActions(coordActionList);
163            }
164            catch (JPAExecutorException je) {
165                throw new CommandException(je);
166            }
167        }
168        // Processs parentless coordinators to purge them and their children
169        if (!coordList.isEmpty()) {
170            try {
171                processCoordinators(coordList);
172            }
173            catch (JPAExecutorException je) {
174                throw new CommandException(je);
175            }
176        }
177
178        // Process bundles to purge them and their children
179        if (!bundleList.isEmpty()) {
180            try {
181                processBundles(bundleList);
182            }
183            catch (JPAExecutorException je) {
184                throw new CommandException(je);
185            }
186        }
187
188        LOG.info("ENDED Purge deleted [{0}] workflows, [{1}] coordinatorActions, [{2}] coordinators, [{3}] bundles",
189                wfDel, coordActionDel, coordDel, bundleDel);
190        return null;
191    }
192
193    /**
194     * Process workflows to purge them and their children.  Uses the processWorkflowsHelper method to help via recursion to make
195     * sure that the workflow children are deleted before their parents.
196     *
197     * @param wfs List of workflows to process
198     * @throws JPAExecutorException If a JPA executor has a problem
199     */
200    private void processWorkflows(List<String> wfs) throws JPAExecutorException {
201        List<String> wfsToPurge = processWorkflowsHelper(wfs);
202        purgeWorkflows(wfsToPurge);
203    }
204
205    /**
206     * Used by the processWorkflows method and via recursion.
207     *
208     * @param wfs List of workflows to process
209     * @return List of workflows to purge
210     * @throws JPAExecutorException If a JPA executor has a problem
211     */
212    private List<String> processWorkflowsHelper(List<String> wfs) throws JPAExecutorException {
213        // If the list is empty, then we've finished recursing
214        if (wfs.isEmpty()) {
215            return wfs;
216        }
217        List<String> subwfs = new ArrayList<String>();
218        List<String> wfsToPurge = new ArrayList<String>();
219        for (String wfId : wfs) {
220            int size;
221            List<WorkflowJobBean> swfBeanList = new ArrayList<WorkflowJobBean>();
222            do {
223                size = swfBeanList.size();
224                swfBeanList.addAll(jpaService.execute(
225                        new WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor(wfId, swfBeanList.size(), limit)));
226            } while (size != swfBeanList.size());
227
228            // Checking if sub workflow is ready to purge
229            List<String> children = fetchTerminatedWorkflow(swfBeanList);
230
231            // if all sub workflow ready to purge add them all and add current workflow
232            if(children.size() == swfBeanList.size()) {
233                subwfs.addAll(children);
234                wfsToPurge.add(wfId);
235            }
236        }
237        // Recurse on the children we just found to process their children
238        wfsToPurge.addAll(processWorkflowsHelper(subwfs));
239        return wfsToPurge;
240    }
241
242    /**
243     * This method will return all terminate workflow ids from wfBeanlist for purge.
244     * @param wfBeanList
245     * @return workflows to purge
246     */
247    private List<String> fetchTerminatedWorkflow(List<WorkflowJobBean> wfBeanList) {
248        List<String> children = new ArrayList<String>();
249        long wfOlderThanMS = System.currentTimeMillis() - (wfOlderThan * DAY_IN_MS);
250        for (WorkflowJobBean wfjBean : wfBeanList) {
251            final Date wfEndTime = wfjBean.getEndTime();
252            final boolean isFinished = wfjBean.inTerminalState();
253            if (isFinished && wfEndTime != null && wfEndTime.getTime() < wfOlderThanMS) {
254                    children.add(wfjBean.getId());
255            }
256            else {
257                final Date lastModificationTime = wfjBean.getLastModifiedTime();
258                if (isFinished && lastModificationTime != null && lastModificationTime.getTime() < wfOlderThanMS) {
259                    children.add(wfjBean.getId());
260                }
261            }
262        }
263        return children;
264    }
265
266    /**
267     * Process coordinators to purge them and their children.
268     *
269     * @param coords List of coordinators to process
270     * @throws JPAExecutorException If a JPA executor has a problem
271     */
272    private void processCoordinators(List<String> coords) throws JPAExecutorException {
273        List<String> wfsToPurge = new ArrayList<String>();
274        List<String> actionsToPurge = new ArrayList<String>();
275        List<String> coordsToPurge = new ArrayList<String>();
276        for (String coordId : coords) {
277            // Get all of the direct workflowChildren for this coord
278            List<WorkflowJobBean> wfjBeanList = new ArrayList<WorkflowJobBean>();
279            int size;
280            do {
281                size = wfjBeanList.size();
282                wfjBeanList.addAll(jpaService.execute(
283                        new WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor(coordId, wfjBeanList.size(), limit)));
284            } while (size != wfjBeanList.size());
285
286            // Checking if workflow is ready to purge
287            List<String> workflowChildren = fetchTerminatedWorkflow(wfjBeanList);
288
289            // if all workflow are ready to purge add them and add the coordinator and their actions
290            if(workflowChildren.size() == wfjBeanList.size()) {
291                LOG.debug("Purging coordinator " + coordId);
292                wfsToPurge.addAll(workflowChildren);
293                coordsToPurge.add(coordId);
294                // Get all of the direct actionChildren for this coord
295                List<String> actionChildren = new ArrayList<String>();
296                do {
297                    size = actionChildren.size();
298                    actionChildren.addAll(jpaService.execute(
299                            new CoordActionsGetFromCoordJobIdJPAExecutor(coordId, actionChildren.size(), limit)));
300                } while (size != actionChildren.size());
301                actionsToPurge.addAll(actionChildren);
302            }
303        }
304        // Process the children workflow
305        processWorkflows(wfsToPurge);
306        // Process the children action
307        purgeCoordActions(actionsToPurge);
308        // Now that all children have been purged, we can purge the coordinators
309        purgeCoordinators(coordsToPurge);
310    }
311
312    /**
313     * Process bundles to purge them and their children
314     *
315     * @param bundles List of bundles to process
316     * @throws JPAExecutorException If a JPA executor has a problem
317     */
318    private void processBundles(List<String> bundles) throws JPAExecutorException {
319        List<String> coordsToPurge = new ArrayList<String>();
320        List<String> bundlesToPurge = new ArrayList<String>();
321        for (Iterator<String> it = bundles.iterator(); it.hasNext(); ) {
322            String bundleId = it.next();
323            // We only purge the bundle and its children if they are all ready to be purged
324            long numChildrenNotReady = jpaService.execute(
325                    new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(coordOlderThan, bundleId));
326            if (numChildrenNotReady == 0) {
327                bundlesToPurge.add(bundleId);
328                LOG.debug("Purging bundle " + bundleId);
329                // Get all of the direct children for this bundle
330                List<String> children = new ArrayList<String>();
331                int size;
332                do {
333                    size = children.size();
334                    children.addAll(jpaService.execute(new CoordJobsGetFromParentIdJPAExecutor(bundleId, children.size(), limit)));
335                } while (size != children.size());
336                coordsToPurge.addAll(children);
337            }
338        }
339        // Process the children
340        processCoordinators(coordsToPurge);
341        // Now that all children have been purged, we can purge the bundles
342        purgeBundles(bundlesToPurge);
343    }
344
345    /**
346     * Purge the workflows in REVERSE order in batches of size 'limit' (this must be done in reverse order so that children are
347     * purged before their parents)
348     *
349     * @param wfs List of workflows to purge
350     * @throws JPAExecutorException If a JPA executor has a problem
351     */
352    private void purgeWorkflows(List<String> wfs) throws JPAExecutorException {
353        wfDel += wfs.size();
354        //To delete sub-workflows before deleting parent workflows
355        Collections.reverse(wfs);
356        for (int startIndex = 0; startIndex < wfs.size(); ) {
357            int endIndex = (startIndex + limit < wfs.size()) ? (startIndex + limit) : wfs.size();
358            List<String> wfsForDelete = wfs.subList(startIndex, endIndex);
359            LOG.debug("Deleting workflows: " + StringUtils.join(wfsForDelete, ","));
360            jpaService.execute(new WorkflowJobsDeleteJPAExecutor(wfsForDelete));
361            startIndex = endIndex;
362        }
363    }
364
365    /**
366     * Purge coordActions of long running coordinators and purge them
367     *
368     * @param coordActions List of coordActions to purge
369     * @throws JPAExecutorException If a JPA executor has a problem
370     */
371    private void purgeCoordActions(List<String> coordActions) throws JPAExecutorException {
372        coordActionDel = coordActions.size();
373        for (int startIndex = 0; startIndex < coordActions.size(); ) {
374            int endIndex = (startIndex + limit < coordActions.size()) ? (startIndex + limit) : coordActions.size();
375            List<String> coordActionsForDelete = coordActions.subList(startIndex, endIndex);
376            LOG.debug("Deleting coordinator actions: " + StringUtils.join(coordActionsForDelete, ","));
377            jpaService.execute(new CoordActionsDeleteJPAExecutor(coordActionsForDelete));
378            startIndex = endIndex;
379        }
380    }
381    /**
382     * Purge the coordinators in SOME order in batches of size 'limit' (its in reverse order only for convenience)
383     *
384     * @param coords List of coordinators to purge
385     * @throws JPAExecutorException If a JPA executor has a problem
386     */
387    private void purgeCoordinators(List<String> coords) throws JPAExecutorException {
388        coordDel += coords.size();
389        for (int startIndex = 0; startIndex < coords.size(); ) {
390            int endIndex = (startIndex + limit < coords.size()) ? (startIndex + limit) : coords.size();
391            List<String> coordsForDelete = coords.subList(startIndex, endIndex);
392            LOG.debug("Deleting coordinators: " + StringUtils.join(coordsForDelete, ","));
393            jpaService.execute(new CoordJobsDeleteJPAExecutor(coordsForDelete));
394            startIndex = endIndex;
395        }
396    }
397
398    /**
399     * Purge the bundles in SOME order in batches of size 'limit' (its in reverse order only for convenience)
400     *
401     * @param bundles List of bundles to purge
402     * @throws JPAExecutorException If a JPA executor has a problem
403     */
404    private void purgeBundles(List<String> bundles) throws JPAExecutorException {
405        bundleDel += bundles.size();
406        for (int startIndex = 0; startIndex < bundles.size(); ) {
407            int endIndex = (startIndex + limit < bundles.size()) ? (startIndex + limit) : bundles.size();
408            Collection<String> bundlesForDelete = bundles.subList(startIndex, endIndex);
409            LOG.debug("Deleting bundles: " + StringUtils.join(bundlesForDelete, ","));
410            jpaService.execute(new BundleJobsDeleteJPAExecutor(bundlesForDelete));
411            startIndex = endIndex;
412        }
413    }
414
415    /* (non-Javadoc)
416     * @see org.apache.oozie.command.XCommand#getEntityKey()
417     */
418    @Override
419    public String getEntityKey() {
420        return "purge_command";
421    }
422
423    /* (non-Javadoc)
424     * @see org.apache.oozie.command.XCommand#isLockRequired()
425     */
426    @Override
427    protected boolean isLockRequired() {
428        return true;
429    }
430
431    /* (non-Javadoc)
432     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
433     */
434    @Override
435    protected void verifyPrecondition() throws CommandException, PreconditionException {
436    }
437}