001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.coord;
019
020import java.util.List;
021
022import org.apache.oozie.CoordinatorActionBean;
023import org.apache.oozie.CoordinatorJobBean;
024import org.apache.oozie.ErrorCode;
025import org.apache.oozie.client.CoordinatorAction;
026import org.apache.oozie.client.Job;
027import org.apache.oozie.command.CommandException;
028import org.apache.oozie.command.PreconditionException;
029import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
030import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
031import org.apache.oozie.executor.jpa.CoordJobGetReadyActionsJPAExecutor;
032import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor;
033import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
034import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
035import org.apache.oozie.executor.jpa.JPAExecutorException;
036import org.apache.oozie.service.JPAService;
037import org.apache.oozie.service.Services;
038import org.apache.oozie.util.LogUtils;
039import org.apache.oozie.util.XLog;
040
041public class CoordActionReadyXCommand extends CoordinatorXCommand<Void> {
042    private final String jobId;
043    private final XLog log = getLog();
044    private CoordinatorJobBean coordJob = null;
045    private JPAService jpaService = null;
046
047    public CoordActionReadyXCommand(String id) {
048        super("coord_action_ready", "coord_action_ready", 1);
049        this.jobId = id;
050    }
051
052    @Override
053    /**
054     * Check for READY actions and change state to SUBMITTED by a command to submit the job to WF engine.
055     * This method checks all the actions associated with a jobId to figure out which actions
056     * to start (based on concurrency and execution order [FIFO, LIFO, LAST_ONLY, NONE])
057     *
058     */
059    protected Void execute() throws CommandException {
060        // number of actions to start (-1 means start ALL)
061        int numActionsToStart = -1;
062
063        // get execution setting for this job (FIFO, LIFO, LAST_ONLY)
064        String jobExecution = coordJob.getExecution();
065        // get concurrency setting for this job
066        int jobConcurrency = coordJob.getConcurrency();
067        // if less than 0, then UNLIMITED concurrency
068        if (jobConcurrency >= 0) {
069            // count number of actions that are already RUNNING or SUBMITTED
070            // subtract from CONCURRENCY to calculate number of actions to start
071            // in WF engine
072
073            int numRunningJobs;
074            try {
075                numRunningJobs = jpaService.execute(new CoordJobGetRunningActionsCountJPAExecutor(jobId));
076            }
077            catch (JPAExecutorException je) {
078                throw new CommandException(je);
079            }
080
081            numActionsToStart = jobConcurrency - numRunningJobs;
082            if (numActionsToStart < 0) {
083                numActionsToStart = 0;
084            }
085            log.debug("concurrency=" + jobConcurrency + ", execution=" + jobExecution + ", numRunningJobs="
086                    + numRunningJobs + ", numLeftover=" + numActionsToStart);
087            // no actions to start
088            if (numActionsToStart == 0) {
089                log.warn("No actions to start for jobId=" + jobId + " as max concurrency reached!");
090                return null;
091            }
092        }
093        // get list of actions that are READY and fit in the concurrency and execution
094
095        List<CoordinatorActionBean> actions;
096        try {
097            actions = jpaService.execute(new CoordJobGetReadyActionsJPAExecutor(jobId, numActionsToStart, jobExecution));
098        }
099        catch (JPAExecutorException je) {
100            throw new CommandException(je);
101        }
102        log.debug("Number of READY actions = " + actions.size());
103        // make sure auth token is not null
104        // log.denug("user=" + user + ", token=" + authToken);
105        int counter = 0;
106        for (CoordinatorActionBean action : actions) {
107            // continue if numActionsToStart is negative (no limit on number of
108            // actions), or if the counter is less than numActionsToStart
109            if ((numActionsToStart < 0) || (counter < numActionsToStart)) {
110                log.debug("Set status to SUBMITTED for id: " + action.getId());
111                // change state of action to SUBMITTED
112                action.setStatus(CoordinatorAction.Status.SUBMITTED);
113                try {
114                    CoordActionQueryExecutor.getInstance().executeUpdate(
115                            CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
116                }
117                catch (JPAExecutorException je) {
118                    throw new CommandException(je);
119                }
120                // start action
121                new CoordActionStartXCommand(action.getId(), coordJob.getUser(), coordJob.getAppName(),
122                        action.getJobId()).call(getEntityKey());
123            }
124            else {
125                break;
126            }
127            counter++;
128
129        }
130        return null;
131    }
132
133    @Override
134    public String getEntityKey() {
135        return jobId;
136    }
137
138    @Override
139    public String getKey() {
140        return getName() + "_" + jobId;
141    }
142
143    @Override
144    protected boolean isLockRequired() {
145        return true;
146    }
147
148    @Override
149    protected void loadState() throws CommandException {
150        jpaService = Services.get().get(JPAService.class);
151        if (jpaService == null) {
152            throw new CommandException(ErrorCode.E0610);
153        }
154        try {
155            coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_ACTION_READY, jobId);
156        }
157        catch (JPAExecutorException e) {
158            throw new CommandException(e);
159        }
160        LogUtils.setLogInfo(coordJob, logInfo);
161    }
162
163    @Override
164    protected void verifyPrecondition() throws CommandException, PreconditionException {
165        if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR
166                && coordJob.getStatus() != Job.Status.SUCCEEDED && coordJob.getStatus() != Job.Status.PAUSED
167                && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
168            throw new PreconditionException(ErrorCode.E1100, "[" + jobId
169                    + "]::CoordActionReady:: Ignoring job. Coordinator job is not in RUNNING state, but state="
170                    + coordJob.getStatus());
171        }
172    }
173}