This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.util.List;
021
022 import org.apache.oozie.CoordinatorActionBean;
023 import org.apache.oozie.CoordinatorJobBean;
024 import org.apache.oozie.client.CoordinatorAction;
025 import org.apache.oozie.command.CommandException;
026 import org.apache.oozie.store.CoordinatorStore;
027 import org.apache.oozie.store.StoreException;
028 import org.apache.oozie.util.XLog;
029
030 public class CoordActionReadyCommand extends CoordinatorCommand<Void> {
031 private String jobId;
032 private final XLog log = XLog.getLog(getClass());
033
034 public CoordActionReadyCommand(String id) {
035 super("coord_action_ready", "coord_action_ready", 1, XLog.STD);
036 this.jobId = id;
037 }
038
039 @Override
040 /**
041 * Check for READY actions and change state to SUBMITTED by a command to submit the job to WF engine.
042 * This method checks all the actions associated with a jobId to figure out which actions
043 * to start (based on concurrency and execution order [FIFO, LIFO, LAST_ONLY])
044 *
045 * @param store Coordinator Store
046 */
047 protected Void call(CoordinatorStore store) throws StoreException, CommandException {
048 // number of actions to start (-1 means start ALL)
049 int numActionsToStart = -1;
050 // get CoordinatorJobBean for jobId
051 //CoordinatorJobBean coordJob = store.getCoordinatorJob(jobId, false);
052 CoordinatorJobBean coordJob = store.getEntityManager().find(CoordinatorJobBean.class, jobId);
053 setLogInfo(coordJob);
054 // get execution setting for this job (FIFO, LIFO, LAST_ONLY)
055 String jobExecution = coordJob.getExecution();
056 // get concurrency setting for this job
057 int jobConcurrency = coordJob.getConcurrency();
058 // if less than 0, then UNLIMITED concurrency
059 if (jobConcurrency >= 0) {
060 // count number of actions that are already RUNNING or SUBMITTED
061 // subtract from CONCURRENCY to calculate number of actions to start
062 // in WF engine
063 int numRunningJobs = store.getCoordinatorRunningActionsCount(jobId);
064 numActionsToStart = jobConcurrency - numRunningJobs;
065 if (numActionsToStart < 0) {
066 numActionsToStart = 0;
067 }
068 log.debug("concurrency=" + jobConcurrency + ", execution=" + jobExecution + ", numRunningJobs="
069 + numRunningJobs + ", numLeftover=" + numActionsToStart);
070 // no actions to start
071 if (numActionsToStart == 0) {
072 log.warn("No actions to start! for jobId=" + jobId);
073 return null;
074 }
075 }
076 // get list of actions that are READY and fit in the concurrency and
077 // execution
078 List<CoordinatorActionBean> actions = store.getCoordinatorActionsForJob(jobId, numActionsToStart, jobExecution);
079 log.debug("Number of READY actions = " + actions.size());
080 String user = coordJob.getUser();
081 String authToken = coordJob.getAuthToken();
082 // make sure auth token is not null
083 // log.denug("user=" + user + ", token=" + authToken);
084 int counter = 0;
085 for (CoordinatorActionBean action : actions) {
086 // continue if numActionsToStart is negative (no limit on number of
087 // actions), or if the counter is less than numActionsToStart
088 if ((numActionsToStart < 0) || (counter < numActionsToStart)) {
089 log.debug("Set status to SUBMITTED for id: " + action.getId());
090 // change state of action to SUBMITTED
091 action.setStatus(CoordinatorAction.Status.SUBMITTED);
092 // queue action to start action
093 queueCallable(new CoordActionStartCommand(action.getId(), user, authToken), 100);
094 store.updateCoordinatorAction(action);
095 }
096 else {
097 break;
098 }
099 counter++;
100
101 }
102 return null;
103 }
104
105 @Override
106 protected Void execute(CoordinatorStore store) throws StoreException, CommandException {
107 log.info("STARTED CoordActionReadyCommand for jobId=" + jobId);
108 try {
109 if (lock(jobId)) {
110 call(store);
111 }
112 else {
113 queueCallable(new CoordActionReadyCommand(jobId), LOCK_FAILURE_REQUEUE_INTERVAL);
114 log.warn("CoordActionReadyCommand lock was not acquired - failed jobId=" + jobId
115 + ". Requeing the same.");
116 }
117 }
118 catch (InterruptedException e) {
119 queueCallable(new CoordActionReadyCommand(jobId), LOCK_FAILURE_REQUEUE_INTERVAL);
120 log.warn("CoordActionReadyCommand lock acquiring failed with exception " + e.getMessage()
121 + " for jobId=" + jobId + " Requeing the same.");
122 }
123 finally {
124 log.info("ENDED CoordActionReadyCommand for jobId=" + jobId);
125 }
126 return null;
127 }
128
129 }