001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command;
020
021import org.apache.oozie.ErrorCode;
022import org.apache.oozie.WorkflowJobBean;
023import org.apache.oozie.XException;
024import org.apache.oozie.executor.jpa.BundleJobsDeleteJPAExecutor;
025import org.apache.oozie.executor.jpa.BundleJobsGetForPurgeJPAExecutor;
026import org.apache.oozie.executor.jpa.CoordActionsDeleteJPAExecutor;
027import org.apache.oozie.executor.jpa.CoordActionsGetFromCoordJobIdJPAExecutor;
028import org.apache.oozie.executor.jpa.CoordJobsCountNotForPurgeFromParentIdJPAExecutor;
029import org.apache.oozie.executor.jpa.CoordJobsDeleteJPAExecutor;
030import org.apache.oozie.executor.jpa.CoordJobsGetForPurgeJPAExecutor;
031import org.apache.oozie.executor.jpa.CoordJobsGetFromParentIdJPAExecutor;
032import org.apache.oozie.executor.jpa.JPAExecutorException;
033import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
034import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
035import org.apache.oozie.executor.jpa.WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor;
036import org.apache.oozie.executor.jpa.WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor;
037import org.apache.oozie.executor.jpa.WorkflowJobsDeleteJPAExecutor;
038import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor;
039import org.apache.oozie.service.JPAService;
040import org.apache.oozie.service.Services;
041import org.eclipse.jgit.util.StringUtils;
042
043import java.util.ArrayList;
044import java.util.Collection;
045import java.util.Collections;
046import java.util.Iterator;
047import java.util.List;
048/**
049 * This class is used to purge workflows, coordinators, and bundles.  It takes into account the relationships between workflows and
050 * coordinators, and coordinators and bundles.  It also only acts on 'limit' number of items at a time to not overtax the DB and in
051 * case something gets rolled back.  Also, children are always deleted before their parents in case of a rollback.
052 */
053public class PurgeXCommand extends XCommand<Void> {
054    private JPAService jpaService = null;
055    private int wfOlderThan;
056    private int coordOlderThan;
057    private int bundleOlderThan;
058    private boolean purgeOldCoordAction = false;
059    private final int limit;
060    private List<String> wfList;
061    private List<String> coordActionList;
062    private List<String> coordList;
063    private List<String> bundleList;
064    private int wfDel;
065    private int coordDel;
066    private int coordActionDel;
067    private int bundleDel;
068    private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
069
070    public PurgeXCommand(int wfOlderThan, int coordOlderThan, int bundleOlderThan, int limit) {
071        this(wfOlderThan, coordOlderThan, bundleOlderThan, limit, false);
072    }
073
074    public PurgeXCommand(int wfOlderThan, int coordOlderThan, int bundleOlderThan, int limit, boolean purgeOldCoordAction) {
075        super("purge", "purge", 0);
076        this.wfOlderThan = wfOlderThan;
077        this.coordOlderThan = coordOlderThan;
078        this.bundleOlderThan = bundleOlderThan;
079        this.purgeOldCoordAction = purgeOldCoordAction;
080        this.limit = limit;
081        wfList = new ArrayList<String>();
082        coordActionList = new ArrayList<String>();
083        coordList = new ArrayList<String>();
084        bundleList = new ArrayList<String>();
085        wfDel = 0;
086        coordDel = 0;
087        bundleDel = 0;
088    }
089
090    /* (non-Javadoc)
091     * @see org.apache.oozie.command.XCommand#loadState()
092     */
093    @Override
094    protected void loadState() throws CommandException {
095        try {
096            jpaService = Services.get().get(JPAService.class);
097
098            if (jpaService != null) {
099                // Get the lists of workflows, coordinators, and bundles that can be purged (and have no parents)
100                int size;
101                do {
102                    size = wfList.size();
103                    wfList.addAll(jpaService.execute(new WorkflowJobsGetForPurgeJPAExecutor(wfOlderThan, wfList.size(), limit)));
104                } while(size != wfList.size());
105                if (purgeOldCoordAction) {
106                    LOG.debug("Purging workflows of long running coordinators is turned on");
107                    do {
108                        size = coordActionList.size();
109                        long olderThan = wfOlderThan;
110                        List<WorkflowJobBean> jobBeans = WorkflowJobQueryExecutor.getInstance().getList(
111                                WorkflowJobQuery.GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN, olderThan,
112                                coordActionList.size(), limit);
113                        for (WorkflowJobBean bean : jobBeans) {
114                            coordActionList.add(bean.getParentId());
115                            wfList.add(bean.getId());
116                        }
117                    } while(size != coordActionList.size());
118                }
119                do {
120                    size = coordList.size();
121                    coordList.addAll(jpaService.execute(
122                            new CoordJobsGetForPurgeJPAExecutor(coordOlderThan, coordList.size(), limit)));
123                } while(size != coordList.size());
124                do {
125                    size = bundleList.size();
126                    bundleList.addAll(jpaService.execute(
127                            new BundleJobsGetForPurgeJPAExecutor(bundleOlderThan, bundleList.size(), limit)));
128                } while(size != bundleList.size());
129            }
130            else {
131                throw new CommandException(ErrorCode.E0610);
132            }
133        }
134        catch (XException ex) {
135            throw new CommandException(ex);
136        }
137    }
138
139    /* (non-Javadoc)
140     * @see org.apache.oozie.command.XCommand#execute()
141     */
142    @Override
143    protected Void execute() throws CommandException {
144        LOG.info("STARTED Purge to purge Workflow Jobs older than [{0}] days, Coordinator Jobs older than [{1}] days, and Bundle"
145                + "jobs older than [{2}] days.", wfOlderThan, coordOlderThan, bundleOlderThan);
146
147        // Process parentless workflows to purge them and their children
148        if (!wfList.isEmpty()) {
149            try {
150                processWorkflows(wfList);
151            }
152            catch (JPAExecutorException je) {
153                throw new CommandException(je);
154            }
155        }
156
157        // Process coordinator actions of long running coordinators and purge them
158        if (!coordActionList.isEmpty()) {
159            try {
160                purgeCoordActions(coordActionList);
161            }
162            catch (JPAExecutorException je) {
163                throw new CommandException(je);
164            }
165        }
166        // Processs parentless coordinators to purge them and their children
167        if (!coordList.isEmpty()) {
168            try {
169                processCoordinators(coordList);
170            }
171            catch (JPAExecutorException je) {
172                throw new CommandException(je);
173            }
174        }
175
176        // Process bundles to purge them and their children
177        if (!bundleList.isEmpty()) {
178            try {
179                processBundles(bundleList);
180            }
181            catch (JPAExecutorException je) {
182                throw new CommandException(je);
183            }
184        }
185
186        LOG.info("ENDED Purge deleted [{0}] workflows, [{1}] coordinatorActions, [{2}] coordinators, [{3}] bundles",
187                wfDel, coordActionDel, coordDel, bundleDel);
188        return null;
189    }
190
191    /**
192     * Process workflows to purge them and their children.  Uses the processWorkflowsHelper method to help via recursion to make
193     * sure that the workflow children are deleted before their parents.
194     *
195     * @param wfs List of workflows to process
196     * @throws JPAExecutorException If a JPA executor has a problem
197     */
198    private void processWorkflows(List<String> wfs) throws JPAExecutorException {
199        List<String> wfsToPurge = processWorkflowsHelper(wfs);
200        purgeWorkflows(wfsToPurge);
201    }
202
203    /**
204     * Used by the processWorkflows method and via recursion.
205     *
206     * @param wfs List of workflows to process
207     * @return List of workflows to purge
208     * @throws JPAExecutorException If a JPA executor has a problem
209     */
210    private List<String> processWorkflowsHelper(List<String> wfs) throws JPAExecutorException {
211        // If the list is empty, then we've finished recursing
212        if (wfs.isEmpty()) {
213            return wfs;
214        }
215        List<String> subwfs = new ArrayList<String>();
216        List<String> wfsToPurge = new ArrayList<String>();
217        for (String wfId : wfs) {
218            int size;
219            List<WorkflowJobBean> swfBeanList = new ArrayList<WorkflowJobBean>();
220            do {
221                size = swfBeanList.size();
222                swfBeanList.addAll(jpaService.execute(
223                        new WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor(wfId, swfBeanList.size(), limit)));
224            } while (size != swfBeanList.size());
225
226            // Checking if sub workflow is ready to purge
227            List<String> children = fetchTerminatedWorkflow(swfBeanList);
228
229            // if all sub workflow ready to purge add them all and add current workflow
230            if(children.size() == swfBeanList.size()) {
231                subwfs.addAll(children);
232                wfsToPurge.add(wfId);
233            }
234        }
235        // Recurse on the children we just found to process their children
236        wfsToPurge.addAll(processWorkflowsHelper(subwfs));
237        return wfsToPurge;
238    }
239
240    /**
241     * This method will return all terminate workflow ids from wfBeanlist for purge.
242     * @param wfBeanList
243     * @return workflows to purge
244     */
245    private List<String> fetchTerminatedWorkflow(List<WorkflowJobBean> wfBeanList) {
246        List<String> children = new ArrayList<String>();
247        long wfOlderThanMS = System.currentTimeMillis() - (wfOlderThan * DAY_IN_MS);
248        for (WorkflowJobBean wfjBean : wfBeanList) {
249            if (wfjBean.inTerminalState() && wfjBean.getEndTime().getTime() < wfOlderThanMS) {
250                children.add(wfjBean.getId());
251            }
252        }
253        return children;
254    }
255
256    /**
257     * Process coordinators to purge them and their children.
258     *
259     * @param coords List of coordinators to process
260     * @throws JPAExecutorException If a JPA executor has a problem
261     */
262    private void processCoordinators(List<String> coords) throws JPAExecutorException {
263        List<String> wfsToPurge = new ArrayList<String>();
264        List<String> actionsToPurge = new ArrayList<String>();
265        List<String> coordsToPurge = new ArrayList<String>();
266        for (String coordId : coords) {
267            // Get all of the direct workflowChildren for this coord
268            List<WorkflowJobBean> wfjBeanList = new ArrayList<WorkflowJobBean>();
269            int size;
270            do {
271                size = wfjBeanList.size();
272                wfjBeanList.addAll(jpaService.execute(
273                        new WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor(coordId, wfjBeanList.size(), limit)));
274            } while (size != wfjBeanList.size());
275
276            // Checking if workflow is ready to purge
277            List<String> workflowChildren = fetchTerminatedWorkflow(wfjBeanList);
278
279            // if all workflow are ready to purge add them and add the coordinator and their actions
280            if(workflowChildren.size() == wfjBeanList.size()) {
281                LOG.debug("Purging coordinator " + coordId);
282                wfsToPurge.addAll(workflowChildren);
283                coordsToPurge.add(coordId);
284                // Get all of the direct actionChildren for this coord
285                List<String> actionChildren = new ArrayList<String>();
286                do {
287                    size = actionChildren.size();
288                    actionChildren.addAll(jpaService.execute(
289                            new CoordActionsGetFromCoordJobIdJPAExecutor(coordId, actionChildren.size(), limit)));
290                } while (size != actionChildren.size());
291                actionsToPurge.addAll(actionChildren);
292            }
293        }
294        // Process the children workflow
295        processWorkflows(wfsToPurge);
296        // Process the children action
297        purgeCoordActions(actionsToPurge);
298        // Now that all children have been purged, we can purge the coordinators
299        purgeCoordinators(coordsToPurge);
300    }
301
302    /**
303     * Process bundles to purge them and their children
304     *
305     * @param bundles List of bundles to process
306     * @throws JPAExecutorException If a JPA executor has a problem
307     */
308    private void processBundles(List<String> bundles) throws JPAExecutorException {
309        List<String> coordsToPurge = new ArrayList<String>();
310        List<String> bundlesToPurge = new ArrayList<String>();
311        for (Iterator<String> it = bundles.iterator(); it.hasNext(); ) {
312            String bundleId = it.next();
313            // We only purge the bundle and its children if they are all ready to be purged
314            long numChildrenNotReady = jpaService.execute(
315                    new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(coordOlderThan, bundleId));
316            if (numChildrenNotReady == 0) {
317                bundlesToPurge.add(bundleId);
318                LOG.debug("Purging bundle " + bundleId);
319                // Get all of the direct children for this bundle
320                List<String> children = new ArrayList<String>();
321                int size;
322                do {
323                    size = children.size();
324                    children.addAll(jpaService.execute(new CoordJobsGetFromParentIdJPAExecutor(bundleId, children.size(), limit)));
325                } while (size != children.size());
326                coordsToPurge.addAll(children);
327            }
328        }
329        // Process the children
330        processCoordinators(coordsToPurge);
331        // Now that all children have been purged, we can purge the bundles
332        purgeBundles(bundlesToPurge);
333    }
334
335    /**
336     * Purge the workflows in REVERSE order in batches of size 'limit' (this must be done in reverse order so that children are
337     * purged before their parents)
338     *
339     * @param wfs List of workflows to purge
340     * @throws JPAExecutorException If a JPA executor has a problem
341     */
342    private void purgeWorkflows(List<String> wfs) throws JPAExecutorException {
343        wfDel += wfs.size();
344        //To delete sub-workflows before deleting parent workflows
345        Collections.reverse(wfs);
346        for (int startIndex = 0; startIndex < wfs.size(); ) {
347            int endIndex = (startIndex + limit < wfs.size()) ? (startIndex + limit) : wfs.size();
348            List<String> wfsForDelete = wfs.subList(startIndex, endIndex);
349            LOG.debug("Deleting workflows: " + StringUtils.join(wfsForDelete, ","));
350            jpaService.execute(new WorkflowJobsDeleteJPAExecutor(wfsForDelete));
351            startIndex = endIndex;
352        }
353    }
354
355    /**
356     * Purge coordActions of long running coordinators and purge them
357     *
358     * @param coordActions List of coordActions to purge
359     * @throws JPAExecutorException If a JPA executor has a problem
360     */
361    private void purgeCoordActions(List<String> coordActions) throws JPAExecutorException {
362        coordActionDel = coordActions.size();
363        for (int startIndex = 0; startIndex < coordActions.size(); ) {
364            int endIndex = (startIndex + limit < coordActions.size()) ? (startIndex + limit) : coordActions.size();
365            List<String> coordActionsForDelete = coordActions.subList(startIndex, endIndex);
366            LOG.debug("Deleting coordinator actions: " + StringUtils.join(coordActionsForDelete, ","));
367            jpaService.execute(new CoordActionsDeleteJPAExecutor(coordActionsForDelete));
368            startIndex = endIndex;
369        }
370    }
371    /**
372     * Purge the coordinators in SOME order in batches of size 'limit' (its in reverse order only for convenience)
373     *
374     * @param coords List of coordinators to purge
375     * @throws JPAExecutorException If a JPA executor has a problem
376     */
377    private void purgeCoordinators(List<String> coords) throws JPAExecutorException {
378        coordDel += coords.size();
379        for (int startIndex = 0; startIndex < coords.size(); ) {
380            int endIndex = (startIndex + limit < coords.size()) ? (startIndex + limit) : coords.size();
381            List<String> coordsForDelete = coords.subList(startIndex, endIndex);
382            LOG.debug("Deleting coordinators: " + StringUtils.join(coordsForDelete, ","));
383            jpaService.execute(new CoordJobsDeleteJPAExecutor(coordsForDelete));
384            startIndex = endIndex;
385        }
386    }
387
388    /**
389     * Purge the bundles in SOME order in batches of size 'limit' (its in reverse order only for convenience)
390     *
391     * @param bundles List of bundles to purge
392     * @throws JPAExecutorException If a JPA executor has a problem
393     */
394    private void purgeBundles(List<String> bundles) throws JPAExecutorException {
395        bundleDel += bundles.size();
396        for (int startIndex = 0; startIndex < bundles.size(); ) {
397            int endIndex = (startIndex + limit < bundles.size()) ? (startIndex + limit) : bundles.size();
398            Collection<String> bundlesForDelete = bundles.subList(startIndex, endIndex);
399            LOG.debug("Deleting bundles: " + StringUtils.join(bundlesForDelete, ","));
400            jpaService.execute(new BundleJobsDeleteJPAExecutor(bundlesForDelete));
401            startIndex = endIndex;
402        }
403    }
404
405    /* (non-Javadoc)
406     * @see org.apache.oozie.command.XCommand#getEntityKey()
407     */
408    @Override
409    public String getEntityKey() {
410        return null;
411    }
412
413    /* (non-Javadoc)
414     * @see org.apache.oozie.command.XCommand#isLockRequired()
415     */
416    @Override
417    protected boolean isLockRequired() {
418        return false;
419    }
420
421    /* (non-Javadoc)
422     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
423     */
424    @Override
425    protected void verifyPrecondition() throws CommandException, PreconditionException {
426    }
427}