This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command;
019
020 import java.util.ArrayList;
021 import java.util.Collections;
022 import java.util.Iterator;
023 import java.util.List;
024
025 import org.apache.oozie.ErrorCode;
026 import org.apache.oozie.XException;
027 import org.apache.oozie.executor.jpa.BundleJobsDeleteJPAExecutor;
028 import org.apache.oozie.executor.jpa.BundleJobsGetForPurgeJPAExecutor;
029 import org.apache.oozie.executor.jpa.CoordJobsCountNotForPurgeFromParentIdJPAExecutor;
030 import org.apache.oozie.executor.jpa.CoordJobsDeleteJPAExecutor;
031 import org.apache.oozie.executor.jpa.CoordJobsGetForPurgeJPAExecutor;
032 import org.apache.oozie.executor.jpa.CoordJobsGetFromParentIdJPAExecutor;
033 import org.apache.oozie.executor.jpa.JPAExecutorException;
034 import org.apache.oozie.executor.jpa.WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor;
035 import org.apache.oozie.executor.jpa.WorkflowJobsDeleteJPAExecutor;
036 import org.apache.oozie.executor.jpa.WorkflowJobsGetFromParentIdJPAExecutor;
037 import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor;
038 import org.apache.oozie.service.JPAService;
039 import org.apache.oozie.service.Services;
040
041 /**
042 * This class is used to purge workflows, coordinators, and bundles. It takes into account the relationships between workflows and
043 * coordinators, and coordinators and bundles. It also only acts on 'limit' number of items at a time to not overtax the DB and in
044 * case something gets rolled back. Also, children are always deleted before their parents in case of a rollback.
045 */
046 public class PurgeXCommand extends XCommand<Void> {
047 private JPAService jpaService = null;
048 private int wfOlderThan;
049 private int coordOlderThan;
050 private int bundleOlderThan;
051 private final int limit;
052 private List<String> wfList;
053 private List<String> coordList;
054 private List<String> bundleList;
055 private int wfDel;
056 private int coordDel;
057 private int bundleDel;
058
059 public PurgeXCommand(int wfOlderThan, int coordOlderThan, int bundleOlderThan, int limit) {
060 super("purge", "purge", 0);
061 this.wfOlderThan = wfOlderThan;
062 this.coordOlderThan = coordOlderThan;
063 this.bundleOlderThan = bundleOlderThan;
064 this.limit = limit;
065 wfList = new ArrayList<String>();
066 coordList = new ArrayList<String>();
067 bundleList = new ArrayList<String>();
068 wfDel = 0;
069 coordDel = 0;
070 bundleDel = 0;
071 }
072
073 /* (non-Javadoc)
074 * @see org.apache.oozie.command.XCommand#loadState()
075 */
076 @Override
077 protected void loadState() throws CommandException {
078 try {
079 jpaService = Services.get().get(JPAService.class);
080
081 if (jpaService != null) {
082 // Get the lists of workflows, coordinators, and bundles that can be purged (and have no parents)
083 int size;
084 do {
085 size = wfList.size();
086 wfList.addAll(jpaService.execute(new WorkflowJobsGetForPurgeJPAExecutor(wfOlderThan, wfList.size(), limit)));
087 } while(size != wfList.size());
088 do {
089 size = coordList.size();
090 coordList.addAll(jpaService.execute(
091 new CoordJobsGetForPurgeJPAExecutor(coordOlderThan, coordList.size(), limit)));
092 } while(size != coordList.size());
093 do {
094 size = bundleList.size();
095 bundleList.addAll(jpaService.execute(
096 new BundleJobsGetForPurgeJPAExecutor(bundleOlderThan, bundleList.size(), limit)));
097 } while(size != bundleList.size());
098 }
099 else {
100 throw new CommandException(ErrorCode.E0610);
101 }
102 }
103 catch (XException ex) {
104 throw new CommandException(ex);
105 }
106 }
107
108 /* (non-Javadoc)
109 * @see org.apache.oozie.command.XCommand#execute()
110 */
111 @Override
112 protected Void execute() throws CommandException {
113 LOG.debug("STARTED Purge to purge Workflow Jobs older than [{0}] days, Coordinator Jobs older than [{1}] days, and Bundle"
114 + "jobs older than [{2}] days.", wfOlderThan, coordOlderThan, bundleOlderThan);
115
116 // Process parentless workflows to purge them and their children
117 if (!wfList.isEmpty()) {
118 try {
119 processWorkflows(wfList);
120 }
121 catch (JPAExecutorException je) {
122 throw new CommandException(je);
123 }
124 }
125
126 // Processs parentless coordinators to purge them and their children
127 if (!coordList.isEmpty()) {
128 try {
129 processCoordinators(coordList);
130 }
131 catch (JPAExecutorException je) {
132 throw new CommandException(je);
133 }
134 }
135
136 // Process bundles to purge them and their children
137 if (!bundleList.isEmpty()) {
138 try {
139 processBundles(bundleList);
140 }
141 catch (JPAExecutorException je) {
142 throw new CommandException(je);
143 }
144 }
145
146 LOG.debug("ENDED Purge deleted [{0}] workflows, [{1}] coordinators, [{2}] bundles", wfDel, coordDel, bundleDel);
147 return null;
148 }
149
150 /**
151 * Process workflows to purge them and their children. Uses the processWorkflowsHelper method to help via recursion to make
152 * sure that the workflow children are deleted before their parents.
153 *
154 * @param wfs List of workflows to process
155 * @throws JPAExecutorException If a JPA executor has a problem
156 */
157 private void processWorkflows(List<String> wfs) throws JPAExecutorException {
158 List<String> wfsToPurge = processWorkflowsHelper(wfs);
159 purgeWorkflows(wfsToPurge);
160 }
161
162 /**
163 * Used by the processWorkflows method and via recursion.
164 *
165 * @param wfs List of workflows to process
166 * @return List of workflows to purge
167 * @throws JPAExecutorException If a JPA executor has a problem
168 */
169 private List<String> processWorkflowsHelper(List<String> wfs) throws JPAExecutorException {
170 // If the list is empty, then we've finished recursing
171 if (wfs.isEmpty()) {
172 return wfs;
173 }
174 List<String> subwfs = new ArrayList<String>();
175 List<String> wfsToPurge = new ArrayList<String>();
176 for (String wfId : wfs) {
177 // We only purge the workflow and its children if they are all ready to be purged
178 long numChildrenNotReady = jpaService.execute(
179 new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(wfOlderThan, wfId));
180 if (numChildrenNotReady == 0) {
181 wfsToPurge.add(wfId);
182 // Get all of the direct children for this workflow
183 List<String> children = new ArrayList<String>();
184 int size;
185 do {
186 size = children.size();
187 children.addAll(jpaService.execute(new WorkflowJobsGetFromParentIdJPAExecutor(wfId, children.size(), limit)));
188 } while (size != children.size());
189 subwfs.addAll(children);
190 }
191 }
192 // Recurse on the children we just found to process their children
193 wfsToPurge.addAll(processWorkflowsHelper(subwfs));
194 return wfsToPurge;
195 }
196
197 /**
198 * Process coordinators to purge them and their children.
199 *
200 * @param coords List of coordinators to process
201 * @throws JPAExecutorException If a JPA executor has a problem
202 */
203 private void processCoordinators(List<String> coords) throws JPAExecutorException {
204 List<String> wfsToPurge = new ArrayList<String>();
205 List<String> coordsToPurge = new ArrayList<String>();
206 for (String coordId : coords) {
207 // We only purge the coord and its children if they are all ready to be purged
208 long numChildrenNotReady = jpaService.execute(
209 new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(wfOlderThan, coordId));
210 if (numChildrenNotReady == 0) {
211 coordsToPurge.add(coordId);
212 // Get all of the direct children for this coord
213 List<String> children = new ArrayList<String>();
214 int size;
215 do {
216 size = children.size();
217 children.addAll(jpaService.execute(
218 new WorkflowJobsGetFromParentIdJPAExecutor(coordId, children.size(), limit)));
219 } while (size != children.size());
220 wfsToPurge.addAll(children);
221 }
222 }
223 // Process the children
224 processWorkflows(wfsToPurge);
225 // Now that all children have been purged, we can purge the coordinators
226 purgeCoordinators(coordsToPurge);
227 }
228
229 /**
230 * Process bundles to purge them and their children
231 *
232 * @param bundles List of bundles to process
233 * @throws JPAExecutorException If a JPA executor has a problem
234 */
235 private void processBundles(List<String> bundles) throws JPAExecutorException {
236 List<String> coordsToPurge = new ArrayList<String>();
237 List<String> bundlesToPurge = new ArrayList<String>();
238 for (Iterator<String> it = bundles.iterator(); it.hasNext(); ) {
239 String bundleId = it.next();
240 // We only purge the bundle and its children if they are all ready to be purged
241 long numChildrenNotReady = jpaService.execute(
242 new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(coordOlderThan, bundleId));
243 if (numChildrenNotReady == 0) {
244 bundlesToPurge.add(bundleId);
245 // Get all of the direct children for this bundle
246 List<String> children = new ArrayList<String>();
247 int size;
248 do {
249 size = children.size();
250 children.addAll(jpaService.execute(new CoordJobsGetFromParentIdJPAExecutor(bundleId, children.size(), limit)));
251 } while (size != children.size());
252 coordsToPurge.addAll(children);
253 }
254 }
255 // Process the children
256 processCoordinators(coordsToPurge);
257 // Now that all children have been purged, we can purge the bundles
258 purgeBundles(bundlesToPurge);
259 }
260
261 /**
262 * Purge the workflows in REVERSE order in batches of size 'limit' (this must be done in reverse order so that children are
263 * purged before their parents)
264 *
265 * @param wfs List of workflows to purge
266 * @throws JPAExecutorException If a JPA executor has a problem
267 */
268 private void purgeWorkflows(List<String> wfs) throws JPAExecutorException {
269 wfDel += wfs.size();
270 Collections.reverse(wfs);
271 for (int startIndex = 0; startIndex < wfs.size(); ) {
272 int endIndex = (startIndex + limit < wfs.size()) ? (startIndex + limit) : wfs.size();
273 jpaService.execute(new WorkflowJobsDeleteJPAExecutor(wfs.subList(startIndex, endIndex)));
274 startIndex = endIndex;
275 }
276 }
277
278 /**
279 * Purge the coordinators in SOME order in batches of size 'limit' (its in reverse order only for convenience)
280 *
281 * @param coords List of coordinators to purge
282 * @throws JPAExecutorException If a JPA executor has a problem
283 */
284 private void purgeCoordinators(List<String> coords) throws JPAExecutorException {
285 coordDel += coords.size();
286 for (int startIndex = 0; startIndex < coords.size(); ) {
287 int endIndex = (startIndex + limit < coords.size()) ? (startIndex + limit) : coords.size();
288 jpaService.execute(new CoordJobsDeleteJPAExecutor(coords.subList(startIndex, endIndex)));
289 startIndex = endIndex;
290 }
291 }
292
293 /**
294 * Purge the bundles in SOME order in batches of size 'limit' (its in reverse order only for convenience)
295 *
296 * @param bundles List of bundles to purge
297 * @throws JPAExecutorException If a JPA executor has a problem
298 */
299 private void purgeBundles(List<String> bundles) throws JPAExecutorException {
300 bundleDel += bundles.size();
301 for (int startIndex = 0; startIndex < bundles.size(); ) {
302 int endIndex = (startIndex + limit < bundles.size()) ? (startIndex + limit) : bundles.size();
303 jpaService.execute(new BundleJobsDeleteJPAExecutor(bundles.subList(startIndex, endIndex)));
304 startIndex = endIndex;
305 }
306 }
307
308 /* (non-Javadoc)
309 * @see org.apache.oozie.command.XCommand#getEntityKey()
310 */
311 @Override
312 public String getEntityKey() {
313 return null;
314 }
315
316 /* (non-Javadoc)
317 * @see org.apache.oozie.command.XCommand#isLockRequired()
318 */
319 @Override
320 protected boolean isLockRequired() {
321 return false;
322 }
323
324 /* (non-Javadoc)
325 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
326 */
327 @Override
328 protected void verifyPrecondition() throws CommandException, PreconditionException {
329 }
330 }