001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.coord;
019
020import org.apache.oozie.CoordinatorJobBean;
021import org.apache.oozie.CoordinatorJobInfo;
022import org.apache.oozie.ErrorCode;
023import org.apache.oozie.client.CoordinatorJob;
024import org.apache.oozie.client.Job;
025import org.apache.oozie.command.CommandException;
026import org.apache.oozie.command.OperationType;
027import org.apache.oozie.command.PreconditionException;
028import org.apache.oozie.command.XCommand;
029import org.apache.oozie.executor.jpa.CoordJobInfoGetJPAExecutor;
030import org.apache.oozie.service.JPAService;
031import org.apache.oozie.service.Services;
032
033import java.util.List;
034import java.util.Map;
035
036public class BulkCoordXCommand extends XCommand<CoordinatorJobInfo> {
037    private Map<String, List<String>> filter;
038    private final int start;
039    private final int len;
040    private CoordinatorJobInfo coordinatorJobInfo;
041    private OperationType operation;
042
043    /**
044     * The constructor for BulkCoordXCommand
045     *
046     * @param filter the filter string
047     * @param start start location for paging
048     * @param length total length to get
049     */
050    public BulkCoordXCommand(Map<String, List<String>> filter, int start, int length, OperationType operation) {
051        super("bulkcoord" + operation, "bulkcoord" + operation, 1);
052        this.filter = filter;
053        this.start = start;
054        this.len = length;
055        this.operation = operation;
056    }
057
058    /* (non-Javadoc)
059    * @see org.apache.oozie.command.XCommand#isLockRequired()
060    */
061    @Override
062    protected boolean isLockRequired() {
063        return false;
064    }
065
066    /* (non-Javadoc)
067     * @see org.apache.oozie.command.XCommand#getEntityKey()
068     */
069    @Override
070    public String getEntityKey() {
071        return null;
072    }
073
074    /* (non-Javadoc)
075    * @see org.apache.oozie.command.XCommand#loadState()
076    */
077    @Override
078    protected void loadState() throws CommandException {
079        loadJobs();
080    }
081
082    /* (non-Javadoc)
083    * @see org.apache.oozie.command.XCommand#verifyPrecondition()
084    */
085    @Override
086    protected void verifyPrecondition() throws CommandException, PreconditionException {
087    }
088
089    /* (non-Javadoc)
090     * @see org.apache.oozie.command.XCommand#execute()
091     */
092    @Override
093    protected CoordinatorJobInfo execute() throws CommandException {
094        List<CoordinatorJobBean> jobs = this.coordinatorJobInfo.getCoordJobs();
095        for (CoordinatorJobBean job : jobs) {
096            switch (operation) {
097                case Kill:
098                    if (job.getStatus() != CoordinatorJob.Status.SUCCEEDED
099                            && job.getStatus() != CoordinatorJob.Status.FAILED
100                            && job.getStatus() != CoordinatorJob.Status.DONEWITHERROR
101                            && job.getStatus() != CoordinatorJob.Status.KILLED
102                            && job.getStatus() != CoordinatorJob.Status.IGNORED) {
103                        new CoordKillXCommand(job.getId()).call();
104                    }
105                    break;
106                case Suspend:
107                    if (job.getStatus() != CoordinatorJob.Status.SUCCEEDED
108                            && job.getStatus() != CoordinatorJob.Status.FAILED
109                            && job.getStatus() != CoordinatorJob.Status.KILLED
110                            && job.getStatus() != CoordinatorJob.Status.IGNORED) {
111                        new CoordSuspendXCommand(job.getId()).call();
112                    }
113                    break;
114                case Resume:
115                    if (job.getStatus() == CoordinatorJob.Status.SUSPENDED ||
116                            job.getStatus() == CoordinatorJob.Status.SUSPENDEDWITHERROR ||
117                            job.getStatus() == Job.Status.PREPSUSPENDED) {
118                        new CoordResumeXCommand(job.getId()).call();
119                    }
120                    break;
121                default:
122                    throw new CommandException(ErrorCode.E1102, operation);
123            }
124        }
125        loadJobs();
126        return this.coordinatorJobInfo;
127    }
128
129    private void loadJobs() throws CommandException {
130        try {
131            JPAService jpaService = Services.get().get(JPAService.class);
132            if (jpaService != null) {
133                this.coordinatorJobInfo = jpaService.execute(new CoordJobInfoGetJPAExecutor(filter, start, len));
134            }
135            else {
136                throw new CommandException(ErrorCode.E0610);
137            }
138        }
139        catch (Exception ex) {
140            throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex);
141        }
142    }
143}