001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command; 019 020 import java.util.ArrayList; 021 import java.util.Collections; 022 import java.util.Iterator; 023 import java.util.List; 024 025 import org.apache.oozie.ErrorCode; 026 import org.apache.oozie.XException; 027 import org.apache.oozie.executor.jpa.BundleJobsDeleteJPAExecutor; 028 import org.apache.oozie.executor.jpa.BundleJobsGetForPurgeJPAExecutor; 029 import org.apache.oozie.executor.jpa.CoordJobsCountNotForPurgeFromParentIdJPAExecutor; 030 import org.apache.oozie.executor.jpa.CoordJobsDeleteJPAExecutor; 031 import org.apache.oozie.executor.jpa.CoordJobsGetForPurgeJPAExecutor; 032 import org.apache.oozie.executor.jpa.CoordJobsGetFromParentIdJPAExecutor; 033 import org.apache.oozie.executor.jpa.JPAExecutorException; 034 import org.apache.oozie.executor.jpa.WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor; 035 import org.apache.oozie.executor.jpa.WorkflowJobsDeleteJPAExecutor; 036 import org.apache.oozie.executor.jpa.WorkflowJobsGetFromParentIdJPAExecutor; 037 import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor; 038 import org.apache.oozie.service.JPAService; 039 import org.apache.oozie.service.Services; 040 041 /** 042 * This class is used to purge workflows, coordinators, and bundles. It takes into account the relationships between workflows and 043 * coordinators, and coordinators and bundles. It also only acts on 'limit' number of items at a time to not overtax the DB and in 044 * case something gets rolled back. Also, children are always deleted before their parents in case of a rollback. 045 */ 046 public class PurgeXCommand extends XCommand<Void> { 047 private JPAService jpaService = null; 048 private int wfOlderThan; 049 private int coordOlderThan; 050 private int bundleOlderThan; 051 private final int limit; 052 private List<String> wfList; 053 private List<String> coordList; 054 private List<String> bundleList; 055 private int wfDel; 056 private int coordDel; 057 private int bundleDel; 058 059 public PurgeXCommand(int wfOlderThan, int coordOlderThan, int bundleOlderThan, int limit) { 060 super("purge", "purge", 0); 061 this.wfOlderThan = wfOlderThan; 062 this.coordOlderThan = coordOlderThan; 063 this.bundleOlderThan = bundleOlderThan; 064 this.limit = limit; 065 wfList = new ArrayList<String>(); 066 coordList = new ArrayList<String>(); 067 bundleList = new ArrayList<String>(); 068 wfDel = 0; 069 coordDel = 0; 070 bundleDel = 0; 071 } 072 073 /* (non-Javadoc) 074 * @see org.apache.oozie.command.XCommand#loadState() 075 */ 076 @Override 077 protected void loadState() throws CommandException { 078 try { 079 jpaService = Services.get().get(JPAService.class); 080 081 if (jpaService != null) { 082 // Get the lists of workflows, coordinators, and bundles that can be purged (and have no parents) 083 int size; 084 do { 085 size = wfList.size(); 086 wfList.addAll(jpaService.execute(new WorkflowJobsGetForPurgeJPAExecutor(wfOlderThan, wfList.size(), limit))); 087 } while(size != wfList.size()); 088 do { 089 size = coordList.size(); 090 coordList.addAll(jpaService.execute( 091 new CoordJobsGetForPurgeJPAExecutor(coordOlderThan, coordList.size(), limit))); 092 } while(size != coordList.size()); 093 do { 094 size = bundleList.size(); 095 bundleList.addAll(jpaService.execute( 096 new BundleJobsGetForPurgeJPAExecutor(bundleOlderThan, bundleList.size(), limit))); 097 } while(size != bundleList.size()); 098 } 099 else { 100 throw new CommandException(ErrorCode.E0610); 101 } 102 } 103 catch (XException ex) { 104 throw new CommandException(ex); 105 } 106 } 107 108 /* (non-Javadoc) 109 * @see org.apache.oozie.command.XCommand#execute() 110 */ 111 @Override 112 protected Void execute() throws CommandException { 113 LOG.debug("STARTED Purge to purge Workflow Jobs older than [{0}] days, Coordinator Jobs older than [{1}] days, and Bundle" 114 + "jobs older than [{2}] days.", wfOlderThan, coordOlderThan, bundleOlderThan); 115 116 // Process parentless workflows to purge them and their children 117 if (!wfList.isEmpty()) { 118 try { 119 processWorkflows(wfList); 120 } 121 catch (JPAExecutorException je) { 122 throw new CommandException(je); 123 } 124 } 125 126 // Processs parentless coordinators to purge them and their children 127 if (!coordList.isEmpty()) { 128 try { 129 processCoordinators(coordList); 130 } 131 catch (JPAExecutorException je) { 132 throw new CommandException(je); 133 } 134 } 135 136 // Process bundles to purge them and their children 137 if (!bundleList.isEmpty()) { 138 try { 139 processBundles(bundleList); 140 } 141 catch (JPAExecutorException je) { 142 throw new CommandException(je); 143 } 144 } 145 146 LOG.debug("ENDED Purge deleted [{0}] workflows, [{1}] coordinators, [{2}] bundles", wfDel, coordDel, bundleDel); 147 return null; 148 } 149 150 /** 151 * Process workflows to purge them and their children. Uses the processWorkflowsHelper method to help via recursion to make 152 * sure that the workflow children are deleted before their parents. 153 * 154 * @param wfs List of workflows to process 155 * @throws JPAExecutorException If a JPA executor has a problem 156 */ 157 private void processWorkflows(List<String> wfs) throws JPAExecutorException { 158 List<String> wfsToPurge = processWorkflowsHelper(wfs); 159 purgeWorkflows(wfsToPurge); 160 } 161 162 /** 163 * Used by the processWorkflows method and via recursion. 164 * 165 * @param wfs List of workflows to process 166 * @return List of workflows to purge 167 * @throws JPAExecutorException If a JPA executor has a problem 168 */ 169 private List<String> processWorkflowsHelper(List<String> wfs) throws JPAExecutorException { 170 // If the list is empty, then we've finished recursing 171 if (wfs.isEmpty()) { 172 return wfs; 173 } 174 List<String> subwfs = new ArrayList<String>(); 175 List<String> wfsToPurge = new ArrayList<String>(); 176 for (String wfId : wfs) { 177 // We only purge the workflow and its children if they are all ready to be purged 178 long numChildrenNotReady = jpaService.execute( 179 new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(wfOlderThan, wfId)); 180 if (numChildrenNotReady == 0) { 181 wfsToPurge.add(wfId); 182 // Get all of the direct children for this workflow 183 List<String> children = new ArrayList<String>(); 184 int size; 185 do { 186 size = children.size(); 187 children.addAll(jpaService.execute(new WorkflowJobsGetFromParentIdJPAExecutor(wfId, children.size(), limit))); 188 } while (size != children.size()); 189 subwfs.addAll(children); 190 } 191 } 192 // Recurse on the children we just found to process their children 193 wfsToPurge.addAll(processWorkflowsHelper(subwfs)); 194 return wfsToPurge; 195 } 196 197 /** 198 * Process coordinators to purge them and their children. 199 * 200 * @param coords List of coordinators to process 201 * @throws JPAExecutorException If a JPA executor has a problem 202 */ 203 private void processCoordinators(List<String> coords) throws JPAExecutorException { 204 List<String> wfsToPurge = new ArrayList<String>(); 205 List<String> coordsToPurge = new ArrayList<String>(); 206 for (String coordId : coords) { 207 // We only purge the coord and its children if they are all ready to be purged 208 long numChildrenNotReady = jpaService.execute( 209 new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(wfOlderThan, coordId)); 210 if (numChildrenNotReady == 0) { 211 coordsToPurge.add(coordId); 212 // Get all of the direct children for this coord 213 List<String> children = new ArrayList<String>(); 214 int size; 215 do { 216 size = children.size(); 217 children.addAll(jpaService.execute( 218 new WorkflowJobsGetFromParentIdJPAExecutor(coordId, children.size(), limit))); 219 } while (size != children.size()); 220 wfsToPurge.addAll(children); 221 } 222 } 223 // Process the children 224 processWorkflows(wfsToPurge); 225 // Now that all children have been purged, we can purge the coordinators 226 purgeCoordinators(coordsToPurge); 227 } 228 229 /** 230 * Process bundles to purge them and their children 231 * 232 * @param bundles List of bundles to process 233 * @throws JPAExecutorException If a JPA executor has a problem 234 */ 235 private void processBundles(List<String> bundles) throws JPAExecutorException { 236 List<String> coordsToPurge = new ArrayList<String>(); 237 List<String> bundlesToPurge = new ArrayList<String>(); 238 for (Iterator<String> it = bundles.iterator(); it.hasNext(); ) { 239 String bundleId = it.next(); 240 // We only purge the bundle and its children if they are all ready to be purged 241 long numChildrenNotReady = jpaService.execute( 242 new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(coordOlderThan, bundleId)); 243 if (numChildrenNotReady == 0) { 244 bundlesToPurge.add(bundleId); 245 // Get all of the direct children for this bundle 246 List<String> children = new ArrayList<String>(); 247 int size; 248 do { 249 size = children.size(); 250 children.addAll(jpaService.execute(new CoordJobsGetFromParentIdJPAExecutor(bundleId, children.size(), limit))); 251 } while (size != children.size()); 252 coordsToPurge.addAll(children); 253 } 254 } 255 // Process the children 256 processCoordinators(coordsToPurge); 257 // Now that all children have been purged, we can purge the bundles 258 purgeBundles(bundlesToPurge); 259 } 260 261 /** 262 * Purge the workflows in REVERSE order in batches of size 'limit' (this must be done in reverse order so that children are 263 * purged before their parents) 264 * 265 * @param wfs List of workflows to purge 266 * @throws JPAExecutorException If a JPA executor has a problem 267 */ 268 private void purgeWorkflows(List<String> wfs) throws JPAExecutorException { 269 wfDel += wfs.size(); 270 Collections.reverse(wfs); 271 for (int startIndex = 0; startIndex < wfs.size(); ) { 272 int endIndex = (startIndex + limit < wfs.size()) ? (startIndex + limit) : wfs.size(); 273 jpaService.execute(new WorkflowJobsDeleteJPAExecutor(wfs.subList(startIndex, endIndex))); 274 startIndex = endIndex; 275 } 276 } 277 278 /** 279 * Purge the coordinators in SOME order in batches of size 'limit' (its in reverse order only for convenience) 280 * 281 * @param coords List of coordinators to purge 282 * @throws JPAExecutorException If a JPA executor has a problem 283 */ 284 private void purgeCoordinators(List<String> coords) throws JPAExecutorException { 285 coordDel += coords.size(); 286 for (int startIndex = 0; startIndex < coords.size(); ) { 287 int endIndex = (startIndex + limit < coords.size()) ? (startIndex + limit) : coords.size(); 288 jpaService.execute(new CoordJobsDeleteJPAExecutor(coords.subList(startIndex, endIndex))); 289 startIndex = endIndex; 290 } 291 } 292 293 /** 294 * Purge the bundles in SOME order in batches of size 'limit' (its in reverse order only for convenience) 295 * 296 * @param bundles List of bundles to purge 297 * @throws JPAExecutorException If a JPA executor has a problem 298 */ 299 private void purgeBundles(List<String> bundles) throws JPAExecutorException { 300 bundleDel += bundles.size(); 301 for (int startIndex = 0; startIndex < bundles.size(); ) { 302 int endIndex = (startIndex + limit < bundles.size()) ? (startIndex + limit) : bundles.size(); 303 jpaService.execute(new BundleJobsDeleteJPAExecutor(bundles.subList(startIndex, endIndex))); 304 startIndex = endIndex; 305 } 306 } 307 308 /* (non-Javadoc) 309 * @see org.apache.oozie.command.XCommand#getEntityKey() 310 */ 311 @Override 312 public String getEntityKey() { 313 return null; 314 } 315 316 /* (non-Javadoc) 317 * @see org.apache.oozie.command.XCommand#isLockRequired() 318 */ 319 @Override 320 protected boolean isLockRequired() { 321 return false; 322 } 323 324 /* (non-Javadoc) 325 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 326 */ 327 @Override 328 protected void verifyPrecondition() throws CommandException, PreconditionException { 329 } 330 }