001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.util.List;
021    
022    import org.apache.oozie.CoordinatorActionBean;
023    import org.apache.oozie.CoordinatorJobBean;
024    import org.apache.oozie.client.CoordinatorAction;
025    import org.apache.oozie.command.CommandException;
026    import org.apache.oozie.store.CoordinatorStore;
027    import org.apache.oozie.store.StoreException;
028    import org.apache.oozie.util.XLog;
029    
030    public class CoordActionReadyCommand extends CoordinatorCommand<Void> {
031        private String jobId;
032        private final XLog log = XLog.getLog(getClass());
033    
034        public CoordActionReadyCommand(String id) {
035            super("coord_action_ready", "coord_action_ready", 1, XLog.STD);
036            this.jobId = id;
037        }
038    
039        @Override
040        /**
041         * Check for READY actions and change state to SUBMITTED by a command to submit the job to WF engine.
042         * This method checks all the actions associated with a jobId to figure out which actions
043         * to start (based on concurrency and execution order [FIFO, LIFO, LAST_ONLY])
044         *
045         * @param store Coordinator Store
046         */
047        protected Void call(CoordinatorStore store) throws StoreException, CommandException {
048            // number of actions to start (-1 means start ALL)
049            int numActionsToStart = -1;
050            // get CoordinatorJobBean for jobId
051            //CoordinatorJobBean coordJob = store.getCoordinatorJob(jobId, false);
052            CoordinatorJobBean coordJob = store.getEntityManager().find(CoordinatorJobBean.class, jobId);
053            setLogInfo(coordJob);
054            // get execution setting for this job (FIFO, LIFO, LAST_ONLY)
055            String jobExecution = coordJob.getExecution();
056            // get concurrency setting for this job
057            int jobConcurrency = coordJob.getConcurrency();
058            // if less than 0, then UNLIMITED concurrency
059            if (jobConcurrency >= 0) {
060                // count number of actions that are already RUNNING or SUBMITTED
061                // subtract from CONCURRENCY to calculate number of actions to start
062                // in WF engine
063                int numRunningJobs = store.getCoordinatorRunningActionsCount(jobId);
064                numActionsToStart = jobConcurrency - numRunningJobs;
065                if (numActionsToStart < 0) {
066                    numActionsToStart = 0;
067                }
068                log.debug("concurrency=" + jobConcurrency + ", execution=" + jobExecution + ", numRunningJobs="
069                        + numRunningJobs + ", numLeftover=" + numActionsToStart);
070                // no actions to start
071                if (numActionsToStart == 0) {
072                    log.warn("No actions to start! for jobId=" + jobId);
073                    return null;
074                }
075            }
076            // get list of actions that are READY and fit in the concurrency and
077            // execution
078            List<CoordinatorActionBean> actions = store.getCoordinatorActionsForJob(jobId, numActionsToStart, jobExecution);
079            log.debug("Number of READY actions = " + actions.size());
080            String user = coordJob.getUser();
081            String authToken = coordJob.getAuthToken();
082            // make sure auth token is not null
083            // log.denug("user=" + user + ", token=" + authToken);
084            int counter = 0;
085            for (CoordinatorActionBean action : actions) {
086                // continue if numActionsToStart is negative (no limit on number of
087                // actions), or if the counter is less than numActionsToStart
088                if ((numActionsToStart < 0) || (counter < numActionsToStart)) {
089                    log.debug("Set status to SUBMITTED for id: " + action.getId());
090                    // change state of action to SUBMITTED
091                    action.setStatus(CoordinatorAction.Status.SUBMITTED);
092                    // queue action to start action
093                    queueCallable(new CoordActionStartCommand(action.getId(), user, authToken), 100);
094                    store.updateCoordinatorAction(action);
095                }
096                else {
097                    break;
098                }
099                counter++;
100    
101            }
102            return null;
103        }
104    
105        @Override
106        protected Void execute(CoordinatorStore store) throws StoreException, CommandException {
107            log.info("STARTED CoordActionReadyCommand for jobId=" + jobId);
108            try {
109                if (lock(jobId)) {
110                    call(store);
111                }
112                else {
113                    queueCallable(new CoordActionReadyCommand(jobId), LOCK_FAILURE_REQUEUE_INTERVAL);
114                    log.warn("CoordActionReadyCommand lock was not acquired - failed jobId=" + jobId
115                            + ". Requeing the same.");
116                }
117            }
118            catch (InterruptedException e) {
119                queueCallable(new CoordActionReadyCommand(jobId), LOCK_FAILURE_REQUEUE_INTERVAL);
120                log.warn("CoordActionReadyCommand lock acquiring failed with exception " + e.getMessage()
121                        + " for jobId=" + jobId + " Requeing the same.");
122            }
123            finally {
124                log.info("ENDED CoordActionReadyCommand for jobId=" + jobId);
125            }
126            return null;
127        }
128    
129    }