001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.wf; 019 020import org.apache.oozie.ErrorCode; 021import org.apache.oozie.WorkflowJobBean; 022import org.apache.oozie.WorkflowsInfo; 023import org.apache.oozie.client.WorkflowJob; 024import org.apache.oozie.command.CommandException; 025import org.apache.oozie.command.OperationType; 026import org.apache.oozie.command.PreconditionException; 027import org.apache.oozie.executor.jpa.WorkflowsJobGetJPAExecutor; 028import org.apache.oozie.service.JPAService; 029import org.apache.oozie.service.Services; 030 031import java.util.List; 032import java.util.Map; 033 034public class BulkWorkflowXCommand extends WorkflowXCommand<WorkflowsInfo> { 035 private final Map<String, List<String>> filter; 036 private final int start; 037 private final int len; 038 private WorkflowsInfo workflowsInfo; 039 private OperationType operation; 040 041 /** 042 * constructor taking the filter information. 043 * 044 * @param filter Can be name, status, user, group and combination of these 045 * @param start starting from this index in the list of workflows matching the filter are killed 046 * @param length number of workflows to be killed from the list of workflows matching the filter and starting from 047 * index "start". 048 */ 049 public BulkWorkflowXCommand(Map<String, List<String>> filter, int start, int length, OperationType operation) { 050 super("bulkkill", "bulkkill", 1, true); 051 this.filter = filter; 052 this.start = start; 053 this.len = length; 054 this.operation = operation; 055 } 056 057 /* (non-Javadoc) 058 * @see org.apache.oozie.command.XCommand#execute() 059 */ 060 @Override 061 protected WorkflowsInfo execute() throws CommandException { 062 try { 063 List<WorkflowJobBean> workflows = this.workflowsInfo.getWorkflows(); 064 for (WorkflowJobBean job : workflows) { 065 switch (operation) { 066 case Kill: 067 if (job.getStatus() == WorkflowJob.Status.PREP 068 || job.getStatus() == WorkflowJob.Status.RUNNING 069 || job.getStatus() == WorkflowJob.Status.SUSPENDED 070 || job.getStatus() == WorkflowJob.Status.FAILED) { 071 new KillXCommand(job.getId()).call(); 072 } 073 break; 074 case Suspend: 075 if (job.getStatus() == WorkflowJob.Status.RUNNING) { 076 new SuspendXCommand(job.getId()).call(); 077 } 078 break; 079 case Resume: 080 if (job.getStatus() == WorkflowJob.Status.SUSPENDED) { 081 new ResumeXCommand(job.getId()).call(); 082 } 083 break; 084 default: 085 throw new CommandException(ErrorCode.E1102, operation); 086 } 087 } 088 loadJobs(); 089 return this.workflowsInfo; 090 } 091 catch (Exception ex) { 092 throw new CommandException(ErrorCode.E0725, ex.getMessage(), ex); 093 } 094 } 095 096 /* (non-Javadoc) 097 * @see org.apache.oozie.command.XCommand#getEntityKey() 098 */ 099 @Override 100 public String getEntityKey() { 101 return null; 102 } 103 104 /* (non-Javadoc) 105 * @see org.apache.oozie.command.XCommand#isLockRequired() 106 */ 107 @Override 108 protected boolean isLockRequired() { 109 return false; 110 } 111 112 /* (non-Javadoc) 113 * @see org.apache.oozie.command.XCommand#loadState() 114 */ 115 @Override 116 protected void loadState() throws CommandException { 117 loadJobs(); 118 } 119 120 /* (non-Javadoc) 121 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 122 */ 123 @Override 124 protected void verifyPrecondition() throws CommandException, PreconditionException { 125 } 126 127 private void loadJobs() throws CommandException { 128 try { 129 JPAService jpaService = Services.get().get(JPAService.class); 130 if (jpaService != null) { 131 this.workflowsInfo = jpaService.execute( 132 new WorkflowsJobGetJPAExecutor(this.filter, this.start, this.len)); 133 } 134 else { 135 throw new CommandException(ErrorCode.E0610); 136 } 137 } 138 catch (Exception ex) { 139 throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex); 140 } 141 } 142}