001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command; 020 021import org.apache.oozie.ErrorCode; 022import org.apache.oozie.WorkflowJobBean; 023import org.apache.oozie.XException; 024import org.apache.oozie.executor.jpa.BundleJobsDeleteJPAExecutor; 025import org.apache.oozie.executor.jpa.BundleJobsGetForPurgeJPAExecutor; 026import org.apache.oozie.executor.jpa.CoordActionsDeleteJPAExecutor; 027import org.apache.oozie.executor.jpa.CoordActionsGetFromCoordJobIdJPAExecutor; 028import org.apache.oozie.executor.jpa.CoordJobsCountNotForPurgeFromParentIdJPAExecutor; 029import org.apache.oozie.executor.jpa.CoordJobsDeleteJPAExecutor; 030import org.apache.oozie.executor.jpa.CoordJobsGetForPurgeJPAExecutor; 031import org.apache.oozie.executor.jpa.CoordJobsGetFromParentIdJPAExecutor; 032import org.apache.oozie.executor.jpa.JPAExecutorException; 033import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 034import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 035import org.apache.oozie.executor.jpa.WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor; 036import org.apache.oozie.executor.jpa.WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor; 037import org.apache.oozie.executor.jpa.WorkflowJobsDeleteJPAExecutor; 038import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor; 039import org.apache.oozie.service.JPAService; 040import org.apache.oozie.service.Services; 041import org.eclipse.jgit.util.StringUtils; 042 043import java.util.ArrayList; 044import java.util.Collection; 045import java.util.Collections; 046import java.util.Date; 047import java.util.Iterator; 048import java.util.List; 049 050/** 051 * This class is used to purge workflows, coordinators, and bundles. It takes into account the relationships between workflows and 052 * coordinators, and coordinators and bundles. It also only acts on 'limit' number of items at a time to not overtax the DB and in 053 * case something gets rolled back. Also, children are always deleted before their parents in case of a rollback. 054 */ 055public class PurgeXCommand extends XCommand<Void> { 056 private JPAService jpaService = null; 057 private int wfOlderThan; 058 private int coordOlderThan; 059 private int bundleOlderThan; 060 private boolean purgeOldCoordAction = false; 061 private final int limit; 062 private List<String> wfList; 063 private List<String> coordActionList; 064 private List<String> coordList; 065 private List<String> bundleList; 066 private int wfDel; 067 private int coordDel; 068 private int coordActionDel; 069 private int bundleDel; 070 private static final long DAY_IN_MS = 24 * 60 * 60 * 1000; 071 072 public PurgeXCommand(int wfOlderThan, int coordOlderThan, int bundleOlderThan, int limit) { 073 this(wfOlderThan, coordOlderThan, bundleOlderThan, limit, false); 074 } 075 076 public PurgeXCommand(int wfOlderThan, int coordOlderThan, int bundleOlderThan, int limit, boolean purgeOldCoordAction) { 077 super("purge", "purge", 0); 078 this.wfOlderThan = wfOlderThan; 079 this.coordOlderThan = coordOlderThan; 080 this.bundleOlderThan = bundleOlderThan; 081 this.purgeOldCoordAction = purgeOldCoordAction; 082 this.limit = limit; 083 wfList = new ArrayList<String>(); 084 coordActionList = new ArrayList<String>(); 085 coordList = new ArrayList<String>(); 086 bundleList = new ArrayList<String>(); 087 wfDel = 0; 088 coordDel = 0; 089 bundleDel = 0; 090 } 091 092 /* (non-Javadoc) 093 * @see org.apache.oozie.command.XCommand#loadState() 094 */ 095 @Override 096 protected void loadState() throws CommandException { 097 try { 098 jpaService = Services.get().get(JPAService.class); 099 100 if (jpaService != null) { 101 // Get the lists of workflows, coordinators, and bundles that can be purged (and have no parents) 102 int size; 103 do { 104 size = wfList.size(); 105 wfList.addAll(jpaService.execute(new WorkflowJobsGetForPurgeJPAExecutor(wfOlderThan, wfList.size(), limit))); 106 } while(size != wfList.size()); 107 if (purgeOldCoordAction) { 108 LOG.debug("Purging workflows of long running coordinators is turned on"); 109 do { 110 size = coordActionList.size(); 111 long olderThan = wfOlderThan; 112 List<WorkflowJobBean> jobBeans = WorkflowJobQueryExecutor.getInstance().getList( 113 WorkflowJobQuery.GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN, olderThan, 114 coordActionList.size(), limit); 115 for (WorkflowJobBean bean : jobBeans) { 116 coordActionList.add(bean.getParentId()); 117 wfList.add(bean.getId()); 118 } 119 } while(size != coordActionList.size()); 120 } 121 do { 122 size = coordList.size(); 123 coordList.addAll(jpaService.execute( 124 new CoordJobsGetForPurgeJPAExecutor(coordOlderThan, coordList.size(), limit))); 125 } while(size != coordList.size()); 126 do { 127 size = bundleList.size(); 128 bundleList.addAll(jpaService.execute( 129 new BundleJobsGetForPurgeJPAExecutor(bundleOlderThan, bundleList.size(), limit))); 130 } while(size != bundleList.size()); 131 } 132 else { 133 throw new CommandException(ErrorCode.E0610); 134 } 135 } 136 catch (XException ex) { 137 throw new CommandException(ex); 138 } 139 } 140 141 /* (non-Javadoc) 142 * @see org.apache.oozie.command.XCommand#execute() 143 */ 144 @Override 145 protected Void execute() throws CommandException { 146 LOG.info("STARTED Purge to purge Workflow Jobs older than [{0}] days, Coordinator Jobs older than [{1}] days, and Bundle" 147 + " jobs older than [{2}] days.", wfOlderThan, coordOlderThan, bundleOlderThan); 148 149 // Process parentless workflows to purge them and their children 150 if (!wfList.isEmpty()) { 151 try { 152 processWorkflows(wfList); 153 } 154 catch (JPAExecutorException je) { 155 throw new CommandException(je); 156 } 157 } 158 159 // Process coordinator actions of long running coordinators and purge them 160 if (!coordActionList.isEmpty()) { 161 try { 162 purgeCoordActions(coordActionList); 163 } 164 catch (JPAExecutorException je) { 165 throw new CommandException(je); 166 } 167 } 168 // Processs parentless coordinators to purge them and their children 169 if (!coordList.isEmpty()) { 170 try { 171 processCoordinators(coordList); 172 } 173 catch (JPAExecutorException je) { 174 throw new CommandException(je); 175 } 176 } 177 178 // Process bundles to purge them and their children 179 if (!bundleList.isEmpty()) { 180 try { 181 processBundles(bundleList); 182 } 183 catch (JPAExecutorException je) { 184 throw new CommandException(je); 185 } 186 } 187 188 LOG.info("ENDED Purge deleted [{0}] workflows, [{1}] coordinatorActions, [{2}] coordinators, [{3}] bundles", 189 wfDel, coordActionDel, coordDel, bundleDel); 190 return null; 191 } 192 193 /** 194 * Process workflows to purge them and their children. Uses the processWorkflowsHelper method to help via recursion to make 195 * sure that the workflow children are deleted before their parents. 196 * 197 * @param wfs List of workflows to process 198 * @throws JPAExecutorException If a JPA executor has a problem 199 */ 200 private void processWorkflows(List<String> wfs) throws JPAExecutorException { 201 List<String> wfsToPurge = processWorkflowsHelper(wfs); 202 purgeWorkflows(wfsToPurge); 203 } 204 205 /** 206 * Used by the processWorkflows method and via recursion. 207 * 208 * @param wfs List of workflows to process 209 * @return List of workflows to purge 210 * @throws JPAExecutorException If a JPA executor has a problem 211 */ 212 private List<String> processWorkflowsHelper(List<String> wfs) throws JPAExecutorException { 213 // If the list is empty, then we've finished recursing 214 if (wfs.isEmpty()) { 215 return wfs; 216 } 217 List<String> subwfs = new ArrayList<String>(); 218 List<String> wfsToPurge = new ArrayList<String>(); 219 for (String wfId : wfs) { 220 int size; 221 List<WorkflowJobBean> swfBeanList = new ArrayList<WorkflowJobBean>(); 222 do { 223 size = swfBeanList.size(); 224 swfBeanList.addAll(jpaService.execute( 225 new WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor(wfId, swfBeanList.size(), limit))); 226 } while (size != swfBeanList.size()); 227 228 // Checking if sub workflow is ready to purge 229 List<String> children = fetchTerminatedWorkflow(swfBeanList); 230 231 // if all sub workflow ready to purge add them all and add current workflow 232 if(children.size() == swfBeanList.size()) { 233 subwfs.addAll(children); 234 wfsToPurge.add(wfId); 235 } 236 } 237 // Recurse on the children we just found to process their children 238 wfsToPurge.addAll(processWorkflowsHelper(subwfs)); 239 return wfsToPurge; 240 } 241 242 /** 243 * This method will return all terminate workflow ids from wfBeanlist for purge. 244 * @param wfBeanList 245 * @return workflows to purge 246 */ 247 private List<String> fetchTerminatedWorkflow(List<WorkflowJobBean> wfBeanList) { 248 List<String> children = new ArrayList<String>(); 249 long wfOlderThanMS = System.currentTimeMillis() - (wfOlderThan * DAY_IN_MS); 250 for (WorkflowJobBean wfjBean : wfBeanList) { 251 final Date wfEndTime = wfjBean.getEndTime(); 252 final boolean isFinished = wfjBean.inTerminalState(); 253 if (isFinished && wfEndTime != null && wfEndTime.getTime() < wfOlderThanMS) { 254 children.add(wfjBean.getId()); 255 } 256 else { 257 final Date lastModificationTime = wfjBean.getLastModifiedTime(); 258 if (isFinished && lastModificationTime != null && lastModificationTime.getTime() < wfOlderThanMS) { 259 children.add(wfjBean.getId()); 260 } 261 } 262 } 263 return children; 264 } 265 266 /** 267 * Process coordinators to purge them and their children. 268 * 269 * @param coords List of coordinators to process 270 * @throws JPAExecutorException If a JPA executor has a problem 271 */ 272 private void processCoordinators(List<String> coords) throws JPAExecutorException { 273 List<String> wfsToPurge = new ArrayList<String>(); 274 List<String> actionsToPurge = new ArrayList<String>(); 275 List<String> coordsToPurge = new ArrayList<String>(); 276 for (String coordId : coords) { 277 // Get all of the direct workflowChildren for this coord 278 List<WorkflowJobBean> wfjBeanList = new ArrayList<WorkflowJobBean>(); 279 int size; 280 do { 281 size = wfjBeanList.size(); 282 wfjBeanList.addAll(jpaService.execute( 283 new WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor(coordId, wfjBeanList.size(), limit))); 284 } while (size != wfjBeanList.size()); 285 286 // Checking if workflow is ready to purge 287 List<String> workflowChildren = fetchTerminatedWorkflow(wfjBeanList); 288 289 // if all workflow are ready to purge add them and add the coordinator and their actions 290 if(workflowChildren.size() == wfjBeanList.size()) { 291 LOG.debug("Purging coordinator " + coordId); 292 wfsToPurge.addAll(workflowChildren); 293 coordsToPurge.add(coordId); 294 // Get all of the direct actionChildren for this coord 295 List<String> actionChildren = new ArrayList<String>(); 296 do { 297 size = actionChildren.size(); 298 actionChildren.addAll(jpaService.execute( 299 new CoordActionsGetFromCoordJobIdJPAExecutor(coordId, actionChildren.size(), limit))); 300 } while (size != actionChildren.size()); 301 actionsToPurge.addAll(actionChildren); 302 } 303 } 304 // Process the children workflow 305 processWorkflows(wfsToPurge); 306 // Process the children action 307 purgeCoordActions(actionsToPurge); 308 // Now that all children have been purged, we can purge the coordinators 309 purgeCoordinators(coordsToPurge); 310 } 311 312 /** 313 * Process bundles to purge them and their children 314 * 315 * @param bundles List of bundles to process 316 * @throws JPAExecutorException If a JPA executor has a problem 317 */ 318 private void processBundles(List<String> bundles) throws JPAExecutorException { 319 List<String> coordsToPurge = new ArrayList<String>(); 320 List<String> bundlesToPurge = new ArrayList<String>(); 321 for (Iterator<String> it = bundles.iterator(); it.hasNext(); ) { 322 String bundleId = it.next(); 323 // We only purge the bundle and its children if they are all ready to be purged 324 long numChildrenNotReady = jpaService.execute( 325 new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(coordOlderThan, bundleId)); 326 if (numChildrenNotReady == 0) { 327 bundlesToPurge.add(bundleId); 328 LOG.debug("Purging bundle " + bundleId); 329 // Get all of the direct children for this bundle 330 List<String> children = new ArrayList<String>(); 331 int size; 332 do { 333 size = children.size(); 334 children.addAll(jpaService.execute(new CoordJobsGetFromParentIdJPAExecutor(bundleId, children.size(), limit))); 335 } while (size != children.size()); 336 coordsToPurge.addAll(children); 337 } 338 } 339 // Process the children 340 processCoordinators(coordsToPurge); 341 // Now that all children have been purged, we can purge the bundles 342 purgeBundles(bundlesToPurge); 343 } 344 345 /** 346 * Purge the workflows in REVERSE order in batches of size 'limit' (this must be done in reverse order so that children are 347 * purged before their parents) 348 * 349 * @param wfs List of workflows to purge 350 * @throws JPAExecutorException If a JPA executor has a problem 351 */ 352 private void purgeWorkflows(List<String> wfs) throws JPAExecutorException { 353 wfDel += wfs.size(); 354 //To delete sub-workflows before deleting parent workflows 355 Collections.reverse(wfs); 356 for (int startIndex = 0; startIndex < wfs.size(); ) { 357 int endIndex = (startIndex + limit < wfs.size()) ? (startIndex + limit) : wfs.size(); 358 List<String> wfsForDelete = wfs.subList(startIndex, endIndex); 359 LOG.debug("Deleting workflows: " + StringUtils.join(wfsForDelete, ",")); 360 jpaService.execute(new WorkflowJobsDeleteJPAExecutor(wfsForDelete)); 361 startIndex = endIndex; 362 } 363 } 364 365 /** 366 * Purge coordActions of long running coordinators and purge them 367 * 368 * @param coordActions List of coordActions to purge 369 * @throws JPAExecutorException If a JPA executor has a problem 370 */ 371 private void purgeCoordActions(List<String> coordActions) throws JPAExecutorException { 372 coordActionDel = coordActions.size(); 373 for (int startIndex = 0; startIndex < coordActions.size(); ) { 374 int endIndex = (startIndex + limit < coordActions.size()) ? (startIndex + limit) : coordActions.size(); 375 List<String> coordActionsForDelete = coordActions.subList(startIndex, endIndex); 376 LOG.debug("Deleting coordinator actions: " + StringUtils.join(coordActionsForDelete, ",")); 377 jpaService.execute(new CoordActionsDeleteJPAExecutor(coordActionsForDelete)); 378 startIndex = endIndex; 379 } 380 } 381 /** 382 * Purge the coordinators in SOME order in batches of size 'limit' (its in reverse order only for convenience) 383 * 384 * @param coords List of coordinators to purge 385 * @throws JPAExecutorException If a JPA executor has a problem 386 */ 387 private void purgeCoordinators(List<String> coords) throws JPAExecutorException { 388 coordDel += coords.size(); 389 for (int startIndex = 0; startIndex < coords.size(); ) { 390 int endIndex = (startIndex + limit < coords.size()) ? (startIndex + limit) : coords.size(); 391 List<String> coordsForDelete = coords.subList(startIndex, endIndex); 392 LOG.debug("Deleting coordinators: " + StringUtils.join(coordsForDelete, ",")); 393 jpaService.execute(new CoordJobsDeleteJPAExecutor(coordsForDelete)); 394 startIndex = endIndex; 395 } 396 } 397 398 /** 399 * Purge the bundles in SOME order in batches of size 'limit' (its in reverse order only for convenience) 400 * 401 * @param bundles List of bundles to purge 402 * @throws JPAExecutorException If a JPA executor has a problem 403 */ 404 private void purgeBundles(List<String> bundles) throws JPAExecutorException { 405 bundleDel += bundles.size(); 406 for (int startIndex = 0; startIndex < bundles.size(); ) { 407 int endIndex = (startIndex + limit < bundles.size()) ? (startIndex + limit) : bundles.size(); 408 Collection<String> bundlesForDelete = bundles.subList(startIndex, endIndex); 409 LOG.debug("Deleting bundles: " + StringUtils.join(bundlesForDelete, ",")); 410 jpaService.execute(new BundleJobsDeleteJPAExecutor(bundlesForDelete)); 411 startIndex = endIndex; 412 } 413 } 414 415 /* (non-Javadoc) 416 * @see org.apache.oozie.command.XCommand#getEntityKey() 417 */ 418 @Override 419 public String getEntityKey() { 420 return "purge_command"; 421 } 422 423 /* (non-Javadoc) 424 * @see org.apache.oozie.command.XCommand#isLockRequired() 425 */ 426 @Override 427 protected boolean isLockRequired() { 428 return true; 429 } 430 431 /* (non-Javadoc) 432 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 433 */ 434 @Override 435 protected void verifyPrecondition() throws CommandException, PreconditionException { 436 } 437}