This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.util.List;
021
022 import org.apache.oozie.CoordinatorActionBean;
023 import org.apache.oozie.CoordinatorJobBean;
024 import org.apache.oozie.ErrorCode;
025 import org.apache.oozie.client.CoordinatorAction;
026 import org.apache.oozie.client.Job;
027 import org.apache.oozie.command.CommandException;
028 import org.apache.oozie.command.PreconditionException;
029 import org.apache.oozie.executor.jpa.CoordJobGetReadyActionsJPAExecutor;
030 import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor;
031 import org.apache.oozie.executor.jpa.JPAExecutorException;
032 import org.apache.oozie.service.JPAService;
033 import org.apache.oozie.service.Services;
034 import org.apache.oozie.util.LogUtils;
035 import org.apache.oozie.util.XLog;
036
037 public class CoordActionReadyXCommand extends CoordinatorXCommand<Void> {
038 private final String jobId;
039 private final XLog log = getLog();
040 private CoordinatorJobBean coordJob = null;
041 private JPAService jpaService = null;
042
043 public CoordActionReadyXCommand(String id) {
044 super("coord_action_ready", "coord_action_ready", 1);
045 this.jobId = id;
046 }
047
048 @Override
049 /**
050 * Check for READY actions and change state to SUBMITTED by a command to submit the job to WF engine.
051 * This method checks all the actions associated with a jobId to figure out which actions
052 * to start (based on concurrency and execution order [FIFO, LIFO, LAST_ONLY])
053 *
054 */
055 protected Void execute() throws CommandException {
056 // number of actions to start (-1 means start ALL)
057 int numActionsToStart = -1;
058
059 // get execution setting for this job (FIFO, LIFO, LAST_ONLY)
060 String jobExecution = coordJob.getExecution();
061 // get concurrency setting for this job
062 int jobConcurrency = coordJob.getConcurrency();
063 // if less than 0, then UNLIMITED concurrency
064 if (jobConcurrency >= 0) {
065 // count number of actions that are already RUNNING or SUBMITTED
066 // subtract from CONCURRENCY to calculate number of actions to start
067 // in WF engine
068
069 int numRunningJobs;
070 try {
071 numRunningJobs = jpaService.execute(new CoordJobGetRunningActionsCountJPAExecutor(jobId));
072 }
073 catch (JPAExecutorException je) {
074 throw new CommandException(je);
075 }
076
077 numActionsToStart = jobConcurrency - numRunningJobs;
078 if (numActionsToStart < 0) {
079 numActionsToStart = 0;
080 }
081 log.debug("concurrency=" + jobConcurrency + ", execution=" + jobExecution + ", numRunningJobs="
082 + numRunningJobs + ", numLeftover=" + numActionsToStart);
083 // no actions to start
084 if (numActionsToStart == 0) {
085 log.warn("No actions to start! for jobId=" + jobId);
086 return null;
087 }
088 }
089 // get list of actions that are READY and fit in the concurrency and execution
090
091 List<CoordinatorActionBean> actions;
092 try {
093 actions = jpaService.execute(new CoordJobGetReadyActionsJPAExecutor(jobId, numActionsToStart, jobExecution));
094 }
095 catch (JPAExecutorException je) {
096 throw new CommandException(je);
097 }
098 log.debug("Number of READY actions = " + actions.size());
099 String user = coordJob.getUser();
100 String authToken = coordJob.getAuthToken();
101 // make sure auth token is not null
102 // log.denug("user=" + user + ", token=" + authToken);
103 int counter = 0;
104 for (CoordinatorActionBean action : actions) {
105 // continue if numActionsToStart is negative (no limit on number of
106 // actions), or if the counter is less than numActionsToStart
107 if ((numActionsToStart < 0) || (counter < numActionsToStart)) {
108 log.debug("Set status to SUBMITTED for id: " + action.getId());
109 // change state of action to SUBMITTED
110 action.setStatus(CoordinatorAction.Status.SUBMITTED);
111 // queue action to start action
112 queue(new CoordActionStartXCommand(action.getId(), user, authToken, action.getJobId()), 100);
113 try {
114 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor(action));
115 }
116 catch (JPAExecutorException je) {
117 throw new CommandException(je);
118 }
119 }
120 else {
121 break;
122 }
123 counter++;
124
125 }
126 return null;
127 }
128
129 @Override
130 public String getEntityKey() {
131 return jobId;
132 }
133
134 @Override
135 protected boolean isLockRequired() {
136 return true;
137 }
138
139 @Override
140 protected void loadState() throws CommandException {
141 jpaService = Services.get().get(JPAService.class);
142 if (jpaService == null) {
143 throw new CommandException(ErrorCode.E0610);
144 }
145 try {
146 coordJob = jpaService.execute(new org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor(jobId));
147 }
148 catch (JPAExecutorException e) {
149 throw new CommandException(e);
150 }
151 LogUtils.setLogInfo(coordJob, logInfo);
152 }
153
154 @Override
155 protected void verifyPrecondition() throws CommandException, PreconditionException {
156 if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR && coordJob.getStatus() != Job.Status.SUCCEEDED && coordJob.getStatus() != Job.Status.PAUSED && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
157 throw new PreconditionException(ErrorCode.E1100, "[" + jobId
158 + "]::CoordActionReady:: Ignoring job. Coordinator job is not in RUNNING state, but state="
159 + coordJob.getStatus());
160 }
161 }
162 }