001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.bundle;
019
020import org.apache.oozie.BundleJobBean;
021import org.apache.oozie.BundleJobInfo;
022import org.apache.oozie.ErrorCode;
023import org.apache.oozie.client.Job;
024import org.apache.oozie.command.CommandException;
025import org.apache.oozie.command.OperationType;
026import org.apache.oozie.command.PreconditionException;
027import org.apache.oozie.command.XCommand;
028import org.apache.oozie.executor.jpa.BundleJobInfoGetJPAExecutor;
029import org.apache.oozie.service.JPAService;
030import org.apache.oozie.service.Services;
031
032import java.util.List;
033import java.util.Map;
034
035public class BulkBundleXCommand extends XCommand<BundleJobInfo> {
036    private Map<String, List<String>> filter;
037    private final int start;
038    private final int len;
039    private BundleJobInfo bundleJobInfo;
040    private OperationType operation;
041
042    /**
043     * The constructor for BulkBundleXCommand
044     *
045     * @param filter the filter string
046     * @param start start location for paging
047     * @param length total length to get
048     * @param operation the type of operation to perform, it can be kill, suspend or resume
049     */
050    public BulkBundleXCommand(Map<String, List<String>> filter, int start, int length, OperationType operation) {
051        super("bulkbundle" + operation, "bulkbundle" + operation, 1);
052        this.filter = filter;
053        this.start = start;
054        this.len = length;
055        this.operation = operation;
056    }
057
058    /* (non-Javadoc)
059     * @see org.apache.oozie.command.XCommand#isLockRequired()
060     */
061    @Override
062    protected boolean isLockRequired() {
063        return false;
064    }
065
066    /* (non-Javadoc)
067     * @see org.apache.oozie.command.XCommand#getEntityKey()
068     */
069    @Override
070    public String getEntityKey() {
071        return null;
072    }
073
074    /* (non-Javadoc)
075     * @see org.apache.oozie.command.XCommand#loadState()
076     */
077    @Override
078    protected void loadState() throws CommandException {
079        loadBundleJobs();
080    }
081
082    /* (non-Javadoc)
083     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
084     */
085    @Override
086    protected void verifyPrecondition() throws CommandException, PreconditionException {
087    }
088
089    /* (non-Javadoc)
090     * @see org.apache.oozie.command.XCommand#execute()
091     */
092    @Override
093    protected BundleJobInfo execute() throws CommandException {
094        List<BundleJobBean> jobs = this.bundleJobInfo.getBundleJobs();
095        for (BundleJobBean job : jobs) {
096            switch (operation) {
097                case Kill:
098                    if (job.getStatus() != Job.Status.SUCCEEDED
099                            && job.getStatus() != Job.Status.FAILED
100                            && job.getStatus() != Job.Status.DONEWITHERROR
101                            && job.getStatus() != Job.Status.KILLED) {
102                        new BundleKillXCommand(job.getId()).call();
103                    }
104                    break;
105                case Suspend:
106                    if (job.getStatus() != Job.Status.SUCCEEDED
107                            && job.getStatus() != Job.Status.FAILED
108                            && job.getStatus() != Job.Status.KILLED
109                            && job.getStatus() != Job.Status.DONEWITHERROR) {
110                        new BundleJobSuspendXCommand(job.getId()).call();
111                    }
112                    break;
113                case Resume:
114                    if (job.getStatus() == Job.Status.SUSPENDED
115                            || job.getStatus() == Job.Status.SUSPENDEDWITHERROR
116                            || job.getStatus() == Job.Status.PREPSUSPENDED) {
117                        new BundleJobResumeXCommand(job.getId()).call();
118                    }
119                    break;
120                default:
121                    throw new CommandException(ErrorCode.E1102, operation);
122            }
123        }
124        loadBundleJobs();
125        return this.bundleJobInfo;
126    }
127
128    private void loadBundleJobs() throws CommandException {
129        try {
130            JPAService jpaService = Services.get().get(JPAService.class);
131            if (jpaService != null) {
132                this.bundleJobInfo = jpaService.execute(new BundleJobInfoGetJPAExecutor(filter, start, len));
133            }
134            else {
135                throw new CommandException(ErrorCode.E0610);
136            }
137        }
138        catch (Exception ex) {
139            throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex);
140        }
141    }
142}