001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command; 020 021import org.apache.oozie.ErrorCode; 022import org.apache.oozie.WorkflowJobBean; 023import org.apache.oozie.XException; 024import org.apache.oozie.executor.jpa.BundleJobsDeleteJPAExecutor; 025import org.apache.oozie.executor.jpa.BundleJobsGetForPurgeJPAExecutor; 026import org.apache.oozie.executor.jpa.CoordActionsDeleteJPAExecutor; 027import org.apache.oozie.executor.jpa.CoordActionsGetFromCoordJobIdJPAExecutor; 028import org.apache.oozie.executor.jpa.CoordJobsCountNotForPurgeFromParentIdJPAExecutor; 029import org.apache.oozie.executor.jpa.CoordJobsDeleteJPAExecutor; 030import org.apache.oozie.executor.jpa.CoordJobsGetForPurgeJPAExecutor; 031import org.apache.oozie.executor.jpa.CoordJobsGetFromParentIdJPAExecutor; 032import org.apache.oozie.executor.jpa.JPAExecutorException; 033import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 034import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 035import org.apache.oozie.executor.jpa.WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor; 036import org.apache.oozie.executor.jpa.WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor; 037import org.apache.oozie.executor.jpa.WorkflowJobsDeleteJPAExecutor; 038import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor; 039import org.apache.oozie.service.JPAService; 040import org.apache.oozie.service.Services; 041import org.eclipse.jgit.util.StringUtils; 042 043import java.util.ArrayList; 044import java.util.Collection; 045import java.util.Collections; 046import java.util.Iterator; 047import java.util.List; 048/** 049 * This class is used to purge workflows, coordinators, and bundles. It takes into account the relationships between workflows and 050 * coordinators, and coordinators and bundles. It also only acts on 'limit' number of items at a time to not overtax the DB and in 051 * case something gets rolled back. Also, children are always deleted before their parents in case of a rollback. 052 */ 053public class PurgeXCommand extends XCommand<Void> { 054 private JPAService jpaService = null; 055 private int wfOlderThan; 056 private int coordOlderThan; 057 private int bundleOlderThan; 058 private boolean purgeOldCoordAction = false; 059 private final int limit; 060 private List<String> wfList; 061 private List<String> coordActionList; 062 private List<String> coordList; 063 private List<String> bundleList; 064 private int wfDel; 065 private int coordDel; 066 private int coordActionDel; 067 private int bundleDel; 068 private static final long DAY_IN_MS = 24 * 60 * 60 * 1000; 069 070 public PurgeXCommand(int wfOlderThan, int coordOlderThan, int bundleOlderThan, int limit) { 071 this(wfOlderThan, coordOlderThan, bundleOlderThan, limit, false); 072 } 073 074 public PurgeXCommand(int wfOlderThan, int coordOlderThan, int bundleOlderThan, int limit, boolean purgeOldCoordAction) { 075 super("purge", "purge", 0); 076 this.wfOlderThan = wfOlderThan; 077 this.coordOlderThan = coordOlderThan; 078 this.bundleOlderThan = bundleOlderThan; 079 this.purgeOldCoordAction = purgeOldCoordAction; 080 this.limit = limit; 081 wfList = new ArrayList<String>(); 082 coordActionList = new ArrayList<String>(); 083 coordList = new ArrayList<String>(); 084 bundleList = new ArrayList<String>(); 085 wfDel = 0; 086 coordDel = 0; 087 bundleDel = 0; 088 } 089 090 /* (non-Javadoc) 091 * @see org.apache.oozie.command.XCommand#loadState() 092 */ 093 @Override 094 protected void loadState() throws CommandException { 095 try { 096 jpaService = Services.get().get(JPAService.class); 097 098 if (jpaService != null) { 099 // Get the lists of workflows, coordinators, and bundles that can be purged (and have no parents) 100 int size; 101 do { 102 size = wfList.size(); 103 wfList.addAll(jpaService.execute(new WorkflowJobsGetForPurgeJPAExecutor(wfOlderThan, wfList.size(), limit))); 104 } while(size != wfList.size()); 105 if (purgeOldCoordAction) { 106 LOG.debug("Purging workflows of long running coordinators is turned on"); 107 do { 108 size = coordActionList.size(); 109 long olderThan = wfOlderThan; 110 List<WorkflowJobBean> jobBeans = WorkflowJobQueryExecutor.getInstance().getList( 111 WorkflowJobQuery.GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN, olderThan, 112 coordActionList.size(), limit); 113 for (WorkflowJobBean bean : jobBeans) { 114 coordActionList.add(bean.getParentId()); 115 wfList.add(bean.getId()); 116 } 117 } while(size != coordActionList.size()); 118 } 119 do { 120 size = coordList.size(); 121 coordList.addAll(jpaService.execute( 122 new CoordJobsGetForPurgeJPAExecutor(coordOlderThan, coordList.size(), limit))); 123 } while(size != coordList.size()); 124 do { 125 size = bundleList.size(); 126 bundleList.addAll(jpaService.execute( 127 new BundleJobsGetForPurgeJPAExecutor(bundleOlderThan, bundleList.size(), limit))); 128 } while(size != bundleList.size()); 129 } 130 else { 131 throw new CommandException(ErrorCode.E0610); 132 } 133 } 134 catch (XException ex) { 135 throw new CommandException(ex); 136 } 137 } 138 139 /* (non-Javadoc) 140 * @see org.apache.oozie.command.XCommand#execute() 141 */ 142 @Override 143 protected Void execute() throws CommandException { 144 LOG.info("STARTED Purge to purge Workflow Jobs older than [{0}] days, Coordinator Jobs older than [{1}] days, and Bundle" 145 + "jobs older than [{2}] days.", wfOlderThan, coordOlderThan, bundleOlderThan); 146 147 // Process parentless workflows to purge them and their children 148 if (!wfList.isEmpty()) { 149 try { 150 processWorkflows(wfList); 151 } 152 catch (JPAExecutorException je) { 153 throw new CommandException(je); 154 } 155 } 156 157 // Process coordinator actions of long running coordinators and purge them 158 if (!coordActionList.isEmpty()) { 159 try { 160 purgeCoordActions(coordActionList); 161 } 162 catch (JPAExecutorException je) { 163 throw new CommandException(je); 164 } 165 } 166 // Processs parentless coordinators to purge them and their children 167 if (!coordList.isEmpty()) { 168 try { 169 processCoordinators(coordList); 170 } 171 catch (JPAExecutorException je) { 172 throw new CommandException(je); 173 } 174 } 175 176 // Process bundles to purge them and their children 177 if (!bundleList.isEmpty()) { 178 try { 179 processBundles(bundleList); 180 } 181 catch (JPAExecutorException je) { 182 throw new CommandException(je); 183 } 184 } 185 186 LOG.info("ENDED Purge deleted [{0}] workflows, [{1}] coordinatorActions, [{2}] coordinators, [{3}] bundles", 187 wfDel, coordActionDel, coordDel, bundleDel); 188 return null; 189 } 190 191 /** 192 * Process workflows to purge them and their children. Uses the processWorkflowsHelper method to help via recursion to make 193 * sure that the workflow children are deleted before their parents. 194 * 195 * @param wfs List of workflows to process 196 * @throws JPAExecutorException If a JPA executor has a problem 197 */ 198 private void processWorkflows(List<String> wfs) throws JPAExecutorException { 199 List<String> wfsToPurge = processWorkflowsHelper(wfs); 200 purgeWorkflows(wfsToPurge); 201 } 202 203 /** 204 * Used by the processWorkflows method and via recursion. 205 * 206 * @param wfs List of workflows to process 207 * @return List of workflows to purge 208 * @throws JPAExecutorException If a JPA executor has a problem 209 */ 210 private List<String> processWorkflowsHelper(List<String> wfs) throws JPAExecutorException { 211 // If the list is empty, then we've finished recursing 212 if (wfs.isEmpty()) { 213 return wfs; 214 } 215 List<String> subwfs = new ArrayList<String>(); 216 List<String> wfsToPurge = new ArrayList<String>(); 217 for (String wfId : wfs) { 218 int size; 219 List<WorkflowJobBean> swfBeanList = new ArrayList<WorkflowJobBean>(); 220 do { 221 size = swfBeanList.size(); 222 swfBeanList.addAll(jpaService.execute( 223 new WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor(wfId, swfBeanList.size(), limit))); 224 } while (size != swfBeanList.size()); 225 226 // Checking if sub workflow is ready to purge 227 List<String> children = fetchTerminatedWorkflow(swfBeanList); 228 229 // if all sub workflow ready to purge add them all and add current workflow 230 if(children.size() == swfBeanList.size()) { 231 subwfs.addAll(children); 232 wfsToPurge.add(wfId); 233 } 234 } 235 // Recurse on the children we just found to process their children 236 wfsToPurge.addAll(processWorkflowsHelper(subwfs)); 237 return wfsToPurge; 238 } 239 240 /** 241 * This method will return all terminate workflow ids from wfBeanlist for purge. 242 * @param wfBeanList 243 * @return workflows to purge 244 */ 245 private List<String> fetchTerminatedWorkflow(List<WorkflowJobBean> wfBeanList) { 246 List<String> children = new ArrayList<String>(); 247 long wfOlderThanMS = System.currentTimeMillis() - (wfOlderThan * DAY_IN_MS); 248 for (WorkflowJobBean wfjBean : wfBeanList) { 249 if (wfjBean.inTerminalState() && wfjBean.getEndTime().getTime() < wfOlderThanMS) { 250 children.add(wfjBean.getId()); 251 } 252 } 253 return children; 254 } 255 256 /** 257 * Process coordinators to purge them and their children. 258 * 259 * @param coords List of coordinators to process 260 * @throws JPAExecutorException If a JPA executor has a problem 261 */ 262 private void processCoordinators(List<String> coords) throws JPAExecutorException { 263 List<String> wfsToPurge = new ArrayList<String>(); 264 List<String> actionsToPurge = new ArrayList<String>(); 265 List<String> coordsToPurge = new ArrayList<String>(); 266 for (String coordId : coords) { 267 // Get all of the direct workflowChildren for this coord 268 List<WorkflowJobBean> wfjBeanList = new ArrayList<WorkflowJobBean>(); 269 int size; 270 do { 271 size = wfjBeanList.size(); 272 wfjBeanList.addAll(jpaService.execute( 273 new WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor(coordId, wfjBeanList.size(), limit))); 274 } while (size != wfjBeanList.size()); 275 276 // Checking if workflow is ready to purge 277 List<String> workflowChildren = fetchTerminatedWorkflow(wfjBeanList); 278 279 // if all workflow are ready to purge add them and add the coordinator and their actions 280 if(workflowChildren.size() == wfjBeanList.size()) { 281 LOG.debug("Purging coordinator " + coordId); 282 wfsToPurge.addAll(workflowChildren); 283 coordsToPurge.add(coordId); 284 // Get all of the direct actionChildren for this coord 285 List<String> actionChildren = new ArrayList<String>(); 286 do { 287 size = actionChildren.size(); 288 actionChildren.addAll(jpaService.execute( 289 new CoordActionsGetFromCoordJobIdJPAExecutor(coordId, actionChildren.size(), limit))); 290 } while (size != actionChildren.size()); 291 actionsToPurge.addAll(actionChildren); 292 } 293 } 294 // Process the children workflow 295 processWorkflows(wfsToPurge); 296 // Process the children action 297 purgeCoordActions(actionsToPurge); 298 // Now that all children have been purged, we can purge the coordinators 299 purgeCoordinators(coordsToPurge); 300 } 301 302 /** 303 * Process bundles to purge them and their children 304 * 305 * @param bundles List of bundles to process 306 * @throws JPAExecutorException If a JPA executor has a problem 307 */ 308 private void processBundles(List<String> bundles) throws JPAExecutorException { 309 List<String> coordsToPurge = new ArrayList<String>(); 310 List<String> bundlesToPurge = new ArrayList<String>(); 311 for (Iterator<String> it = bundles.iterator(); it.hasNext(); ) { 312 String bundleId = it.next(); 313 // We only purge the bundle and its children if they are all ready to be purged 314 long numChildrenNotReady = jpaService.execute( 315 new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(coordOlderThan, bundleId)); 316 if (numChildrenNotReady == 0) { 317 bundlesToPurge.add(bundleId); 318 LOG.debug("Purging bundle " + bundleId); 319 // Get all of the direct children for this bundle 320 List<String> children = new ArrayList<String>(); 321 int size; 322 do { 323 size = children.size(); 324 children.addAll(jpaService.execute(new CoordJobsGetFromParentIdJPAExecutor(bundleId, children.size(), limit))); 325 } while (size != children.size()); 326 coordsToPurge.addAll(children); 327 } 328 } 329 // Process the children 330 processCoordinators(coordsToPurge); 331 // Now that all children have been purged, we can purge the bundles 332 purgeBundles(bundlesToPurge); 333 } 334 335 /** 336 * Purge the workflows in REVERSE order in batches of size 'limit' (this must be done in reverse order so that children are 337 * purged before their parents) 338 * 339 * @param wfs List of workflows to purge 340 * @throws JPAExecutorException If a JPA executor has a problem 341 */ 342 private void purgeWorkflows(List<String> wfs) throws JPAExecutorException { 343 wfDel += wfs.size(); 344 //To delete sub-workflows before deleting parent workflows 345 Collections.reverse(wfs); 346 for (int startIndex = 0; startIndex < wfs.size(); ) { 347 int endIndex = (startIndex + limit < wfs.size()) ? (startIndex + limit) : wfs.size(); 348 List<String> wfsForDelete = wfs.subList(startIndex, endIndex); 349 LOG.debug("Deleting workflows: " + StringUtils.join(wfsForDelete, ",")); 350 jpaService.execute(new WorkflowJobsDeleteJPAExecutor(wfsForDelete)); 351 startIndex = endIndex; 352 } 353 } 354 355 /** 356 * Purge coordActions of long running coordinators and purge them 357 * 358 * @param coordActions List of coordActions to purge 359 * @throws JPAExecutorException If a JPA executor has a problem 360 */ 361 private void purgeCoordActions(List<String> coordActions) throws JPAExecutorException { 362 coordActionDel = coordActions.size(); 363 for (int startIndex = 0; startIndex < coordActions.size(); ) { 364 int endIndex = (startIndex + limit < coordActions.size()) ? (startIndex + limit) : coordActions.size(); 365 List<String> coordActionsForDelete = coordActions.subList(startIndex, endIndex); 366 LOG.debug("Deleting coordinator actions: " + StringUtils.join(coordActionsForDelete, ",")); 367 jpaService.execute(new CoordActionsDeleteJPAExecutor(coordActionsForDelete)); 368 startIndex = endIndex; 369 } 370 } 371 /** 372 * Purge the coordinators in SOME order in batches of size 'limit' (its in reverse order only for convenience) 373 * 374 * @param coords List of coordinators to purge 375 * @throws JPAExecutorException If a JPA executor has a problem 376 */ 377 private void purgeCoordinators(List<String> coords) throws JPAExecutorException { 378 coordDel += coords.size(); 379 for (int startIndex = 0; startIndex < coords.size(); ) { 380 int endIndex = (startIndex + limit < coords.size()) ? (startIndex + limit) : coords.size(); 381 List<String> coordsForDelete = coords.subList(startIndex, endIndex); 382 LOG.debug("Deleting coordinators: " + StringUtils.join(coordsForDelete, ",")); 383 jpaService.execute(new CoordJobsDeleteJPAExecutor(coordsForDelete)); 384 startIndex = endIndex; 385 } 386 } 387 388 /** 389 * Purge the bundles in SOME order in batches of size 'limit' (its in reverse order only for convenience) 390 * 391 * @param bundles List of bundles to purge 392 * @throws JPAExecutorException If a JPA executor has a problem 393 */ 394 private void purgeBundles(List<String> bundles) throws JPAExecutorException { 395 bundleDel += bundles.size(); 396 for (int startIndex = 0; startIndex < bundles.size(); ) { 397 int endIndex = (startIndex + limit < bundles.size()) ? (startIndex + limit) : bundles.size(); 398 Collection<String> bundlesForDelete = bundles.subList(startIndex, endIndex); 399 LOG.debug("Deleting bundles: " + StringUtils.join(bundlesForDelete, ",")); 400 jpaService.execute(new BundleJobsDeleteJPAExecutor(bundlesForDelete)); 401 startIndex = endIndex; 402 } 403 } 404 405 /* (non-Javadoc) 406 * @see org.apache.oozie.command.XCommand#getEntityKey() 407 */ 408 @Override 409 public String getEntityKey() { 410 return null; 411 } 412 413 /* (non-Javadoc) 414 * @see org.apache.oozie.command.XCommand#isLockRequired() 415 */ 416 @Override 417 protected boolean isLockRequired() { 418 return false; 419 } 420 421 /* (non-Javadoc) 422 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 423 */ 424 @Override 425 protected void verifyPrecondition() throws CommandException, PreconditionException { 426 } 427}