001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.coord;
020
021import java.text.ParseException;
022import java.util.Calendar;
023import java.util.Date;
024import java.util.Iterator;
025import java.util.List;
026
027import org.apache.oozie.CoordinatorActionBean;
028import org.apache.oozie.CoordinatorJobBean;
029import org.apache.oozie.ErrorCode;
030import org.apache.oozie.client.CoordinatorAction;
031import org.apache.oozie.client.CoordinatorJob;
032import org.apache.oozie.client.Job;
033import org.apache.oozie.command.CommandException;
034import org.apache.oozie.command.PreconditionException;
035import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
036import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
037import org.apache.oozie.executor.jpa.CoordJobGetReadyActionsJPAExecutor;
038import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor;
039import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
040import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
041import org.apache.oozie.executor.jpa.JPAExecutorException;
042import org.apache.oozie.service.ConfigurationService;
043import org.apache.oozie.service.JPAService;
044import org.apache.oozie.service.Services;
045import org.apache.oozie.util.DateUtils;
046import org.apache.oozie.util.LogUtils;
047import org.apache.oozie.util.XLog;
048import org.jdom.JDOMException;
049
050public class CoordActionReadyXCommand extends CoordinatorXCommand<Void> {
051    private final String jobId;
052    private final XLog log = getLog();
053    private CoordinatorJobBean coordJob = null;
054    private JPAService jpaService = null;
055
056    public CoordActionReadyXCommand(String id) {
057        super("coord_action_ready", "coord_action_ready", 1);
058        this.jobId = id;
059    }
060
061    @Override
062    protected void setLogInfo() {
063        LogUtils.setLogInfo(jobId);
064    }
065
066    @Override
067    /**
068     * Check for READY actions and change state to SUBMITTED by a command to submit the job to WF engine.
069     * This method checks all the actions associated with a jobId to figure out which actions
070     * to start (based on concurrency and execution order [FIFO, LIFO, LAST_ONLY, NONE])
071     *
072     */
073    protected Void execute() throws CommandException {
074        // number of actions to start (-1 means start ALL)
075        int numActionsToStart = -1;
076
077        // get execution setting for this job (FIFO, LIFO, LAST_ONLY)
078        CoordinatorJob.Execution jobExecution = coordJob.getExecutionOrder();
079        // get concurrency setting for this job
080        int jobConcurrency = coordJob.getConcurrency();
081        // if less than 0, then UNLIMITED concurrency
082        if (jobConcurrency >= 0) {
083            // count number of actions that are already RUNNING or SUBMITTED
084            // subtract from CONCURRENCY to calculate number of actions to start
085            // in WF engine
086
087            int numRunningJobs;
088            try {
089                numRunningJobs = jpaService.execute(new CoordJobGetRunningActionsCountJPAExecutor(jobId));
090            }
091            catch (JPAExecutorException je) {
092                throw new CommandException(je);
093            }
094
095            numActionsToStart = jobConcurrency - numRunningJobs;
096            if (numActionsToStart < 0) {
097                numActionsToStart = 0;
098            }
099            log.debug("concurrency=" + jobConcurrency + ", execution=" + jobExecution + ", numRunningJobs="
100                    + numRunningJobs + ", numLeftover=" + numActionsToStart);
101            // no actions to start
102            if (numActionsToStart == 0) {
103                log.warn("No actions to start for jobId=" + jobId + " as max concurrency reached!");
104            }
105        }
106        // get list of actions that are READY and fit in the concurrency and execution
107
108        List<CoordinatorActionBean> actions;
109        try {
110            actions = jpaService.execute(new CoordJobGetReadyActionsJPAExecutor(jobId, jobExecution.name()));
111        }
112        catch (JPAExecutorException je) {
113            throw new CommandException(je);
114        }
115        log.debug("Number of READY actions = " + actions.size());
116        Date now = new Date();
117        // If we're using LAST_ONLY or NONE, we should check if any of these need to be SKIPPED instead of SUBMITTED
118        if (jobExecution.equals(CoordinatorJobBean.Execution.LAST_ONLY)) {
119            for (Iterator<CoordinatorActionBean> it = actions.iterator(); it.hasNext(); ) {
120                CoordinatorActionBean action = it.next();
121                try {
122                    Date nextNominalTime = CoordCommandUtils.computeNextNominalTime(coordJob, action);
123                    if (nextNominalTime != null) {
124                        // If the current time is after the next action's nominal time, then we've passed the window where this
125                        // action should be started; so set it to SKIPPED
126                        if (now.after(nextNominalTime)) {
127                            LOG.info("LAST_ONLY execution: Preparing to skip action [{0}] because the current time [{1}] is later "
128                                    + "than the nominal time [{2}] of the next action]", action.getId(),
129                                    DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime));
130                            queue(new CoordActionSkipXCommand(action, coordJob.getUser(), coordJob.getAppName()));
131                            it.remove();
132                        } else {
133                            LOG.debug("LAST_ONLY execution: Not skipping action [{0}] because the current time [{1}] is earlier "
134                                    + "than the nominal time [{2}] of the next action]", action.getId(),
135                                    DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime));
136                        }
137                    }
138                } catch (ParseException e) {
139                    LOG.error("Should not happen", e);
140                } catch (JDOMException e) {
141                    LOG.error("Should not happen", e);
142                }
143            }
144        }
145        else if (jobExecution.equals(CoordinatorJobBean.Execution.NONE)) {
146            for (Iterator<CoordinatorActionBean> it = actions.iterator(); it.hasNext(); ) {
147                CoordinatorActionBean action = it.next();
148                // If the current time is after the nominal time of this action plus some tolerance,
149                // then we've passed the window where this action should be started; so set it to SKIPPED
150                Calendar cal = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone()));
151                cal.setTime(action.getNominalTime());
152                int tolerance = ConfigurationService.getInt(CoordActionInputCheckXCommand.COORD_EXECUTION_NONE_TOLERANCE);
153                cal.add(Calendar.MINUTE, tolerance);
154                if (now.after(cal.getTime())) {
155                    LOG.info("NONE execution: Preparing to skip action [{0}] because the current time [{1}] is more than [{2}]"
156                                    + " minutes later than the nominal time [{3}] of the current action]", action.getId(),
157                            DateUtils.formatDateOozieTZ(now), tolerance, DateUtils.formatDateOozieTZ(action.getNominalTime()));
158                    queue(new CoordActionSkipXCommand(action, coordJob.getUser(), coordJob.getAppName()));
159                    it.remove();
160                } else {
161                    LOG.debug("NONE execution: Not skipping action [{0}] because the current time [{1}] is earlier than [{2}]"
162                                    + " minutes later than the nominal time [{3}] of the current action]", action.getId(),
163                            DateUtils.formatDateOozieTZ(now), tolerance, DateUtils.formatDateOozieTZ(action.getNominalTime()));
164                }
165            }
166        }
167
168        int counter = 0;
169        for (CoordinatorActionBean action : actions) {
170            // continue if numActionsToStart is negative (no limit on number of
171            // actions), or if the counter is less than numActionsToStart
172            if ((numActionsToStart < 0) || (counter < numActionsToStart)) {
173                log.debug("Set status to SUBMITTED for id: " + action.getId());
174                // change state of action to SUBMITTED
175                action.setStatus(CoordinatorAction.Status.SUBMITTED);
176                try {
177                    CoordActionQueryExecutor.getInstance().executeUpdate(
178                            CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
179                }
180                catch (JPAExecutorException je) {
181                    throw new CommandException(je);
182                }
183                // start action
184                new CoordActionStartXCommand(action.getId(), coordJob.getUser(), coordJob.getAppName(),
185                        action.getJobId()).call();
186            }
187            else {
188                break;
189            }
190            counter++;
191
192        }
193        return null;
194    }
195
196    @Override
197    public String getEntityKey() {
198        return jobId;
199    }
200
201    @Override
202    public String getKey() {
203        return getName() + "_" + jobId;
204    }
205
206    @Override
207    protected boolean isLockRequired() {
208        return true;
209    }
210
211    @Override
212    protected void loadState() throws CommandException {
213        jpaService = Services.get().get(JPAService.class);
214        if (jpaService == null) {
215            throw new CommandException(ErrorCode.E0610);
216        }
217        try {
218            coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_ACTION_READY, jobId);
219        }
220        catch (JPAExecutorException e) {
221            throw new CommandException(e);
222        }
223        LogUtils.setLogInfo(coordJob);
224    }
225
226    @Override
227    protected void verifyPrecondition() throws CommandException, PreconditionException {
228        if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR
229                && coordJob.getStatus() != Job.Status.SUCCEEDED && coordJob.getStatus() != Job.Status.PAUSED
230                && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
231            throw new PreconditionException(ErrorCode.E1100, "[" + jobId
232                    + "]::CoordActionReady:: Ignoring job. Coordinator job is not in RUNNING state, but state="
233                    + coordJob.getStatus());
234        }
235    }
236}