001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command; 019 020import java.util.ArrayList; 021import java.util.Collections; 022import java.util.Iterator; 023import java.util.List; 024 025import org.apache.oozie.WorkflowJobBean; 026import org.apache.oozie.ErrorCode; 027import org.apache.oozie.XException; 028import org.apache.oozie.executor.jpa.BundleJobsDeleteJPAExecutor; 029import org.apache.oozie.executor.jpa.BundleJobsGetForPurgeJPAExecutor; 030import org.apache.oozie.executor.jpa.CoordActionsDeleteJPAExecutor; 031import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 032import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 033import org.apache.oozie.executor.jpa.CoordJobsCountNotForPurgeFromParentIdJPAExecutor; 034import org.apache.oozie.executor.jpa.CoordJobsDeleteJPAExecutor; 035import org.apache.oozie.executor.jpa.CoordJobsGetForPurgeJPAExecutor; 036import org.apache.oozie.executor.jpa.CoordJobsGetFromParentIdJPAExecutor; 037import org.apache.oozie.executor.jpa.JPAExecutorException; 038import org.apache.oozie.executor.jpa.WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor; 039import org.apache.oozie.executor.jpa.WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor; 040import org.apache.oozie.executor.jpa.WorkflowJobsDeleteJPAExecutor; 041import org.apache.oozie.executor.jpa.WorkflowJobsGetFromWorkflowParentIdJPAExecutor; 042import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor; 043import org.apache.oozie.executor.jpa.WorkflowJobsGetFromCoordParentIdJPAExecutor; 044import org.apache.oozie.service.JPAService; 045import org.apache.oozie.service.Services; 046 047/** 048 * This class is used to purge workflows, coordinators, and bundles. It takes into account the relationships between workflows and 049 * coordinators, and coordinators and bundles. It also only acts on 'limit' number of items at a time to not overtax the DB and in 050 * case something gets rolled back. Also, children are always deleted before their parents in case of a rollback. 051 */ 052public class PurgeXCommand extends XCommand<Void> { 053 private JPAService jpaService = null; 054 private int wfOlderThan; 055 private int coordOlderThan; 056 private int bundleOlderThan; 057 private boolean purgeOldCoordAction = false; 058 private final int limit; 059 private List<String> wfList; 060 private List<String> coordActionList; 061 private List<String> coordList; 062 private List<String> bundleList; 063 private int wfDel; 064 private int coordDel; 065 private int coordActionDel; 066 private int bundleDel; 067 068 public PurgeXCommand(int wfOlderThan, int coordOlderThan, int bundleOlderThan, int limit) { 069 this(wfOlderThan, coordOlderThan, bundleOlderThan, limit, false); 070 } 071 072 public PurgeXCommand(int wfOlderThan, int coordOlderThan, int bundleOlderThan, int limit, boolean purgeOldCoordAction) { 073 super("purge", "purge", 0); 074 this.wfOlderThan = wfOlderThan; 075 this.coordOlderThan = coordOlderThan; 076 this.bundleOlderThan = bundleOlderThan; 077 this.purgeOldCoordAction = purgeOldCoordAction; 078 this.limit = limit; 079 wfList = new ArrayList<String>(); 080 coordActionList = new ArrayList<String>(); 081 coordList = new ArrayList<String>(); 082 bundleList = new ArrayList<String>(); 083 wfDel = 0; 084 coordDel = 0; 085 bundleDel = 0; 086 } 087 088 /* (non-Javadoc) 089 * @see org.apache.oozie.command.XCommand#loadState() 090 */ 091 @Override 092 protected void loadState() throws CommandException { 093 try { 094 jpaService = Services.get().get(JPAService.class); 095 096 if (jpaService != null) { 097 // Get the lists of workflows, coordinators, and bundles that can be purged (and have no parents) 098 int size; 099 do { 100 size = wfList.size(); 101 wfList.addAll(jpaService.execute(new WorkflowJobsGetForPurgeJPAExecutor(wfOlderThan, wfList.size(), limit))); 102 } while(size != wfList.size()); 103 if (purgeOldCoordAction) { 104 LOG.debug("Purging workflows of long running coordinators is turned on"); 105 do { 106 size = coordActionList.size(); 107 long olderThan = wfOlderThan; 108 List<WorkflowJobBean> jobBeans = WorkflowJobQueryExecutor.getInstance().getList( 109 WorkflowJobQuery.GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN, olderThan, 110 coordActionList.size(), limit); 111 for (WorkflowJobBean bean : jobBeans) { 112 coordActionList.add(bean.getParentId()); 113 wfList.add(bean.getId()); 114 } 115 } while(size != coordActionList.size()); 116 } 117 do { 118 size = coordList.size(); 119 coordList.addAll(jpaService.execute( 120 new CoordJobsGetForPurgeJPAExecutor(coordOlderThan, coordList.size(), limit))); 121 } while(size != coordList.size()); 122 do { 123 size = bundleList.size(); 124 bundleList.addAll(jpaService.execute( 125 new BundleJobsGetForPurgeJPAExecutor(bundleOlderThan, bundleList.size(), limit))); 126 } while(size != bundleList.size()); 127 } 128 else { 129 throw new CommandException(ErrorCode.E0610); 130 } 131 } 132 catch (XException ex) { 133 throw new CommandException(ex); 134 } 135 } 136 137 /* (non-Javadoc) 138 * @see org.apache.oozie.command.XCommand#execute() 139 */ 140 @Override 141 protected Void execute() throws CommandException { 142 LOG.info("STARTED Purge to purge Workflow Jobs older than [{0}] days, Coordinator Jobs older than [{1}] days, and Bundle" 143 + "jobs older than [{2}] days.", wfOlderThan, coordOlderThan, bundleOlderThan); 144 145 // Process parentless workflows to purge them and their children 146 if (!wfList.isEmpty()) { 147 try { 148 processWorkflows(wfList); 149 } 150 catch (JPAExecutorException je) { 151 throw new CommandException(je); 152 } 153 } 154 155 // Process coordinator actions of long running coordinators and purge them 156 if (!coordActionList.isEmpty()) { 157 try { 158 purgeCoordActions(coordActionList); 159 } 160 catch (JPAExecutorException je) { 161 throw new CommandException(je); 162 } 163 } 164 // Processs parentless coordinators to purge them and their children 165 if (!coordList.isEmpty()) { 166 try { 167 processCoordinators(coordList); 168 } 169 catch (JPAExecutorException je) { 170 throw new CommandException(je); 171 } 172 } 173 174 // Process bundles to purge them and their children 175 if (!bundleList.isEmpty()) { 176 try { 177 processBundles(bundleList); 178 } 179 catch (JPAExecutorException je) { 180 throw new CommandException(je); 181 } 182 } 183 184 LOG.info("ENDED Purge deleted [{0}] workflows, [{1}] coordinatorActions, [{2}] coordinators, [{3}] bundles", 185 wfDel, coordActionDel, coordDel, bundleDel); 186 return null; 187 } 188 189 /** 190 * Process workflows to purge them and their children. Uses the processWorkflowsHelper method to help via recursion to make 191 * sure that the workflow children are deleted before their parents. 192 * 193 * @param wfs List of workflows to process 194 * @throws JPAExecutorException If a JPA executor has a problem 195 */ 196 private void processWorkflows(List<String> wfs) throws JPAExecutorException { 197 List<String> wfsToPurge = processWorkflowsHelper(wfs); 198 for (String id: wfsToPurge) { 199 LOG.debug("Purging workflow " + id); 200 } 201 purgeWorkflows(wfsToPurge); 202 } 203 204 /** 205 * Used by the processWorkflows method and via recursion. 206 * 207 * @param wfs List of workflows to process 208 * @return List of workflows to purge 209 * @throws JPAExecutorException If a JPA executor has a problem 210 */ 211 private List<String> processWorkflowsHelper(List<String> wfs) throws JPAExecutorException { 212 // If the list is empty, then we've finished recursing 213 if (wfs.isEmpty()) { 214 return wfs; 215 } 216 List<String> subwfs = new ArrayList<String>(); 217 List<String> wfsToPurge = new ArrayList<String>(); 218 for (String wfId : wfs) { 219 // We only purge the workflow and its children if they are all ready to be purged 220 long numChildrenNotReady = jpaService.execute( 221 new WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(wfOlderThan, wfId)); 222 if (numChildrenNotReady == 0) { 223 wfsToPurge.add(wfId); 224 // Get all of the direct children for this workflow 225 List<String> children = new ArrayList<String>(); 226 int size; 227 do { 228 size = children.size(); 229 children.addAll(jpaService.execute( 230 new WorkflowJobsGetFromWorkflowParentIdJPAExecutor(wfId, children.size(), limit))); 231 } while (size != children.size()); 232 subwfs.addAll(children); 233 } 234 } 235 // Recurse on the children we just found to process their children 236 wfsToPurge.addAll(processWorkflowsHelper(subwfs)); 237 return wfsToPurge; 238 } 239 240 /** 241 * Process coordinators to purge them and their children. 242 * 243 * @param coords List of coordinators to process 244 * @throws JPAExecutorException If a JPA executor has a problem 245 */ 246 private void processCoordinators(List<String> coords) throws JPAExecutorException { 247 List<String> wfsToPurge = new ArrayList<String>(); 248 List<String> coordsToPurge = new ArrayList<String>(); 249 for (String coordId : coords) { 250 // We only purge the coord and its children if they are all ready to be purged 251 long numChildrenNotReady = jpaService.execute( 252 new WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(wfOlderThan, coordId)); 253 if (numChildrenNotReady == 0) { 254 coordsToPurge.add(coordId); 255 LOG.debug("Purging coordinator " + coordId); 256 // Get all of the direct children for this coord 257 List<String> children = new ArrayList<String>(); 258 int size; 259 do { 260 size = children.size(); 261 children.addAll(jpaService.execute( 262 new WorkflowJobsGetFromCoordParentIdJPAExecutor(coordId, children.size(), limit))); 263 } while (size != children.size()); 264 wfsToPurge.addAll(children); 265 } 266 } 267 // Process the children 268 processWorkflows(wfsToPurge); 269 // Now that all children have been purged, we can purge the coordinators 270 purgeCoordinators(coordsToPurge); 271 } 272 273 /** 274 * Process bundles to purge them and their children 275 * 276 * @param bundles List of bundles to process 277 * @throws JPAExecutorException If a JPA executor has a problem 278 */ 279 private void processBundles(List<String> bundles) throws JPAExecutorException { 280 List<String> coordsToPurge = new ArrayList<String>(); 281 List<String> bundlesToPurge = new ArrayList<String>(); 282 for (Iterator<String> it = bundles.iterator(); it.hasNext(); ) { 283 String bundleId = it.next(); 284 // We only purge the bundle and its children if they are all ready to be purged 285 long numChildrenNotReady = jpaService.execute( 286 new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(coordOlderThan, bundleId)); 287 if (numChildrenNotReady == 0) { 288 bundlesToPurge.add(bundleId); 289 LOG.debug("Purging bundle " + bundleId); 290 // Get all of the direct children for this bundle 291 List<String> children = new ArrayList<String>(); 292 int size; 293 do { 294 size = children.size(); 295 children.addAll(jpaService.execute(new CoordJobsGetFromParentIdJPAExecutor(bundleId, children.size(), limit))); 296 } while (size != children.size()); 297 coordsToPurge.addAll(children); 298 } 299 } 300 // Process the children 301 processCoordinators(coordsToPurge); 302 // Now that all children have been purged, we can purge the bundles 303 purgeBundles(bundlesToPurge); 304 } 305 306 /** 307 * Purge the workflows in REVERSE order in batches of size 'limit' (this must be done in reverse order so that children are 308 * purged before their parents) 309 * 310 * @param wfs List of workflows to purge 311 * @throws JPAExecutorException If a JPA executor has a problem 312 */ 313 private void purgeWorkflows(List<String> wfs) throws JPAExecutorException { 314 wfDel += wfs.size(); 315 Collections.reverse(wfs); 316 for (int startIndex = 0; startIndex < wfs.size(); ) { 317 int endIndex = (startIndex + limit < wfs.size()) ? (startIndex + limit) : wfs.size(); 318 jpaService.execute(new WorkflowJobsDeleteJPAExecutor(wfs.subList(startIndex, endIndex))); 319 startIndex = endIndex; 320 } 321 } 322 323 /** 324 * Purge coordActions of long running coordinators and purge them 325 * 326 * @param coordActions List of coordActions to purge 327 * @throws JPAExecutorException If a JPA executor has a problem 328 */ 329 private void purgeCoordActions(List<String> coordActions) throws JPAExecutorException { 330 coordActionDel = coordActions.size(); 331 for (int startIndex = 0; startIndex < coordActions.size(); ) { 332 int endIndex = (startIndex + limit < coordActions.size()) ? (startIndex + limit) : coordActions.size(); 333 jpaService.execute(new CoordActionsDeleteJPAExecutor(coordActions.subList(startIndex, endIndex))); 334 startIndex = endIndex; 335 } 336 } 337 /** 338 * Purge the coordinators in SOME order in batches of size 'limit' (its in reverse order only for convenience) 339 * 340 * @param coords List of coordinators to purge 341 * @throws JPAExecutorException If a JPA executor has a problem 342 */ 343 private void purgeCoordinators(List<String> coords) throws JPAExecutorException { 344 coordDel += coords.size(); 345 for (int startIndex = 0; startIndex < coords.size(); ) { 346 int endIndex = (startIndex + limit < coords.size()) ? (startIndex + limit) : coords.size(); 347 jpaService.execute(new CoordJobsDeleteJPAExecutor(coords.subList(startIndex, endIndex))); 348 startIndex = endIndex; 349 } 350 } 351 352 /** 353 * Purge the bundles in SOME order in batches of size 'limit' (its in reverse order only for convenience) 354 * 355 * @param bundles List of bundles to purge 356 * @throws JPAExecutorException If a JPA executor has a problem 357 */ 358 private void purgeBundles(List<String> bundles) throws JPAExecutorException { 359 bundleDel += bundles.size(); 360 for (int startIndex = 0; startIndex < bundles.size(); ) { 361 int endIndex = (startIndex + limit < bundles.size()) ? (startIndex + limit) : bundles.size(); 362 jpaService.execute(new BundleJobsDeleteJPAExecutor(bundles.subList(startIndex, endIndex))); 363 startIndex = endIndex; 364 } 365 } 366 367 /* (non-Javadoc) 368 * @see org.apache.oozie.command.XCommand#getEntityKey() 369 */ 370 @Override 371 public String getEntityKey() { 372 return null; 373 } 374 375 /* (non-Javadoc) 376 * @see org.apache.oozie.command.XCommand#isLockRequired() 377 */ 378 @Override 379 protected boolean isLockRequired() { 380 return false; 381 } 382 383 /* (non-Javadoc) 384 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 385 */ 386 @Override 387 protected void verifyPrecondition() throws CommandException, PreconditionException { 388 } 389}