001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.coord; 020 021import java.text.ParseException; 022import java.util.Calendar; 023import java.util.Date; 024import java.util.Iterator; 025import java.util.List; 026 027import org.apache.oozie.CoordinatorActionBean; 028import org.apache.oozie.CoordinatorJobBean; 029import org.apache.oozie.ErrorCode; 030import org.apache.oozie.client.CoordinatorAction; 031import org.apache.oozie.client.CoordinatorJob; 032import org.apache.oozie.client.Job; 033import org.apache.oozie.command.CommandException; 034import org.apache.oozie.command.PreconditionException; 035import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; 036import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 037import org.apache.oozie.executor.jpa.CoordJobGetReadyActionsJPAExecutor; 038import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor; 039import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 040import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 041import org.apache.oozie.executor.jpa.JPAExecutorException; 042import org.apache.oozie.service.ConfigurationService; 043import org.apache.oozie.service.JPAService; 044import org.apache.oozie.service.Services; 045import org.apache.oozie.util.DateUtils; 046import org.apache.oozie.util.LogUtils; 047import org.apache.oozie.util.XLog; 048import org.jdom.JDOMException; 049 050public class CoordActionReadyXCommand extends CoordinatorXCommand<Void> { 051 private final String jobId; 052 private final XLog log = getLog(); 053 private CoordinatorJobBean coordJob = null; 054 private JPAService jpaService = null; 055 056 public CoordActionReadyXCommand(String id) { 057 super("coord_action_ready", "coord_action_ready", 1); 058 this.jobId = id; 059 } 060 061 @Override 062 protected void setLogInfo() { 063 LogUtils.setLogInfo(jobId); 064 } 065 066 @Override 067 /** 068 * Check for READY actions and change state to SUBMITTED by a command to submit the job to WF engine. 069 * This method checks all the actions associated with a jobId to figure out which actions 070 * to start (based on concurrency and execution order [FIFO, LIFO, LAST_ONLY, NONE]) 071 * 072 */ 073 protected Void execute() throws CommandException { 074 // number of actions to start (-1 means start ALL) 075 int numActionsToStart = -1; 076 077 // get execution setting for this job (FIFO, LIFO, LAST_ONLY) 078 CoordinatorJob.Execution jobExecution = coordJob.getExecutionOrder(); 079 // get concurrency setting for this job 080 int jobConcurrency = coordJob.getConcurrency(); 081 // if less than 0, then UNLIMITED concurrency 082 if (jobConcurrency >= 0) { 083 // count number of actions that are already RUNNING or SUBMITTED 084 // subtract from CONCURRENCY to calculate number of actions to start 085 // in WF engine 086 087 int numRunningJobs; 088 try { 089 numRunningJobs = jpaService.execute(new CoordJobGetRunningActionsCountJPAExecutor(jobId)); 090 } 091 catch (JPAExecutorException je) { 092 throw new CommandException(je); 093 } 094 095 numActionsToStart = jobConcurrency - numRunningJobs; 096 if (numActionsToStart < 0) { 097 numActionsToStart = 0; 098 } 099 log.debug("concurrency=" + jobConcurrency + ", execution=" + jobExecution + ", numRunningJobs=" 100 + numRunningJobs + ", numLeftover=" + numActionsToStart); 101 // no actions to start 102 if (numActionsToStart == 0) { 103 log.warn("No actions to start for jobId=" + jobId + " as max concurrency reached!"); 104 } 105 } 106 // get list of actions that are READY and fit in the concurrency and execution 107 108 List<CoordinatorActionBean> actions; 109 try { 110 actions = jpaService.execute(new CoordJobGetReadyActionsJPAExecutor(jobId, jobExecution.name())); 111 } 112 catch (JPAExecutorException je) { 113 throw new CommandException(je); 114 } 115 log.debug("Number of READY actions = " + actions.size()); 116 Date now = new Date(); 117 // If we're using LAST_ONLY or NONE, we should check if any of these need to be SKIPPED instead of SUBMITTED 118 if (jobExecution.equals(CoordinatorJobBean.Execution.LAST_ONLY)) { 119 for (Iterator<CoordinatorActionBean> it = actions.iterator(); it.hasNext(); ) { 120 CoordinatorActionBean action = it.next(); 121 try { 122 Date nextNominalTime = CoordCommandUtils.computeNextNominalTime(coordJob, action); 123 if (nextNominalTime != null) { 124 // If the current time is after the next action's nominal time, then we've passed the window where this 125 // action should be started; so set it to SKIPPED 126 if (now.after(nextNominalTime)) { 127 LOG.info("LAST_ONLY execution: Preparing to skip action [{0}] because the current time [{1}] is later " 128 + "than the nominal time [{2}] of the next action]", action.getId(), 129 DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime)); 130 queue(new CoordActionSkipXCommand(action, coordJob.getUser(), coordJob.getAppName())); 131 it.remove(); 132 } else { 133 LOG.debug("LAST_ONLY execution: Not skipping action [{0}] because the current time [{1}] is earlier " 134 + "than the nominal time [{2}] of the next action]", action.getId(), 135 DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime)); 136 } 137 } 138 } catch (ParseException e) { 139 LOG.error("Should not happen", e); 140 } catch (JDOMException e) { 141 LOG.error("Should not happen", e); 142 } 143 } 144 } 145 else if (jobExecution.equals(CoordinatorJobBean.Execution.NONE)) { 146 for (Iterator<CoordinatorActionBean> it = actions.iterator(); it.hasNext(); ) { 147 CoordinatorActionBean action = it.next(); 148 // If the current time is after the nominal time of this action plus some tolerance, 149 // then we've passed the window where this action should be started; so set it to SKIPPED 150 Calendar cal = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone())); 151 cal.setTime(action.getNominalTime()); 152 int tolerance = ConfigurationService.getInt(CoordActionInputCheckXCommand.COORD_EXECUTION_NONE_TOLERANCE); 153 cal.add(Calendar.MINUTE, tolerance); 154 if (now.after(cal.getTime())) { 155 LOG.info("NONE execution: Preparing to skip action [{0}] because the current time [{1}] is more than [{2}]" 156 + " minutes later than the nominal time [{3}] of the current action]", action.getId(), 157 DateUtils.formatDateOozieTZ(now), tolerance, DateUtils.formatDateOozieTZ(action.getNominalTime())); 158 queue(new CoordActionSkipXCommand(action, coordJob.getUser(), coordJob.getAppName())); 159 it.remove(); 160 } else { 161 LOG.debug("NONE execution: Not skipping action [{0}] because the current time [{1}] is earlier than [{2}]" 162 + " minutes later than the nominal time [{3}] of the current action]", action.getId(), 163 DateUtils.formatDateOozieTZ(now), tolerance, DateUtils.formatDateOozieTZ(action.getNominalTime())); 164 } 165 } 166 } 167 168 int counter = 0; 169 for (CoordinatorActionBean action : actions) { 170 // continue if numActionsToStart is negative (no limit on number of 171 // actions), or if the counter is less than numActionsToStart 172 if ((numActionsToStart < 0) || (counter < numActionsToStart)) { 173 log.debug("Set status to SUBMITTED for id: " + action.getId()); 174 // change state of action to SUBMITTED 175 action.setStatus(CoordinatorAction.Status.SUBMITTED); 176 try { 177 CoordActionQueryExecutor.getInstance().executeUpdate( 178 CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action); 179 } 180 catch (JPAExecutorException je) { 181 throw new CommandException(je); 182 } 183 // start action 184 new CoordActionStartXCommand(action.getId(), coordJob.getUser(), coordJob.getAppName(), 185 action.getJobId()).call(); 186 } 187 else { 188 break; 189 } 190 counter++; 191 192 } 193 return null; 194 } 195 196 @Override 197 public String getEntityKey() { 198 return jobId; 199 } 200 201 @Override 202 public String getKey() { 203 return getName() + "_" + jobId; 204 } 205 206 @Override 207 protected boolean isLockRequired() { 208 return true; 209 } 210 211 @Override 212 protected void loadState() throws CommandException { 213 jpaService = Services.get().get(JPAService.class); 214 if (jpaService == null) { 215 throw new CommandException(ErrorCode.E0610); 216 } 217 try { 218 coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_ACTION_READY, jobId); 219 } 220 catch (JPAExecutorException e) { 221 throw new CommandException(e); 222 } 223 LogUtils.setLogInfo(coordJob); 224 } 225 226 @Override 227 protected void verifyPrecondition() throws CommandException, PreconditionException { 228 if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR 229 && coordJob.getStatus() != Job.Status.SUCCEEDED && coordJob.getStatus() != Job.Status.PAUSED 230 && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) { 231 throw new PreconditionException(ErrorCode.E1100, "[" + jobId 232 + "]::CoordActionReady:: Ignoring job. Coordinator job is not in RUNNING state, but state=" 233 + coordJob.getStatus()); 234 } 235 } 236}