This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import org.apache.oozie.util.DateUtils;
021 import org.apache.oozie.client.CoordinatorJob;
022 import org.apache.oozie.client.OozieClient;
023 import org.apache.oozie.CoordinatorActionBean;
024 import org.apache.oozie.CoordinatorJobBean;
025 import org.apache.oozie.ErrorCode;
026 import org.apache.oozie.XException;
027 import org.apache.oozie.command.CommandException;
028 import org.apache.oozie.executor.jpa.CoordActionRemoveJPAExecutor;
029 import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
030 import org.apache.oozie.executor.jpa.JPAExecutorException;
031 import org.apache.oozie.service.JPAService;
032 import org.apache.oozie.service.Services;
033 import org.apache.oozie.store.CoordinatorStore;
034 import org.apache.oozie.store.StoreException;
035 import org.apache.oozie.util.JobUtils;
036 import org.apache.oozie.util.ParamChecker;
037 import org.apache.oozie.util.XLog;
038
039 import java.util.Date;
040 import java.util.HashSet;
041 import java.util.Map;
042 import java.util.Set;
043 import java.util.Map.Entry;
044
045 public class CoordChangeCommand extends CoordinatorCommand<Void> {
046 private String jobId;
047 private Date newEndTime = null;
048 private Integer newConcurrency = null;
049 private Date newPauseTime = null;
050 private boolean resetPauseTime = false;
051 private static final XLog LOG = XLog.getLog(CoordChangeCommand.class);
052
053 private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>();
054 static {
055 ALLOWED_CHANGE_OPTIONS.add("endtime");
056 ALLOWED_CHANGE_OPTIONS.add("concurrency");
057 ALLOWED_CHANGE_OPTIONS.add("pausetime");
058 }
059
060 public CoordChangeCommand(String id, String changeValue) throws CommandException {
061 super("coord_change", "coord_change", 0, XLog.STD);
062 this.jobId = ParamChecker.notEmpty(id, "id");
063 ParamChecker.notEmpty(changeValue, "value");
064
065 validateChangeValue(changeValue);
066 }
067
068 /**
069 * @param changeValue change value.
070 * @throws CommandException thrown if changeValue cannot be parsed properly.
071 */
072 private void validateChangeValue(String changeValue) throws CommandException {
073 Map<String, String> map = JobUtils.parseChangeValue(changeValue);
074
075 if (map.size() > ALLOWED_CHANGE_OPTIONS.size()) {
076 throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime");
077 }
078
079 java.util.Iterator<Entry<String, String>> iter = map.entrySet().iterator();
080 while (iter.hasNext()) {
081 Entry<String, String> entry = iter.next();
082 String key = entry.getKey();
083 String value = entry.getValue();
084
085 if (!ALLOWED_CHANGE_OPTIONS.contains(key)) {
086 throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime");
087 }
088
089 if (!key.equals(OozieClient.CHANGE_VALUE_PAUSETIME) && value.equalsIgnoreCase("")) {
090 throw new CommandException(ErrorCode.E1015, changeValue, "value on " + key + " can not be empty");
091 }
092 }
093
094 if (map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)) {
095 String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME);
096 try {
097 newEndTime = DateUtils.parseDateUTC(value);
098 }
099 catch (Exception ex) {
100 throw new CommandException(ErrorCode.E1015, value, "must be a valid date");
101 }
102 }
103
104 if (map.containsKey(OozieClient.CHANGE_VALUE_CONCURRENCY)) {
105 String value = map.get(OozieClient.CHANGE_VALUE_CONCURRENCY);
106 try {
107 newConcurrency = Integer.parseInt(value);
108 }
109 catch (NumberFormatException ex) {
110 throw new CommandException(ErrorCode.E1015, value, "must be a valid integer");
111 }
112 }
113
114 if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) {
115 String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME);
116 if (value.equals("")) { // this is to reset pause time to null;
117 resetPauseTime = true;
118 }
119 else {
120 try {
121 newPauseTime = DateUtils.parseDateUTC(value);
122 }
123 catch (Exception ex) {
124 throw new CommandException(ErrorCode.E1015, value, "must be a valid date");
125 }
126 }
127 }
128 }
129
130 /**
131 * @param coordJob coordinator job id.
132 * @param newEndTime new end time.
133 * @throws CommandException thrown if new end time is not valid.
134 */
135 private void checkEndTime(CoordinatorJobBean coordJob, Date newEndTime) throws CommandException {
136 // New endTime cannot be before coordinator job's start time.
137 Date startTime = coordJob.getStartTime();
138 if (newEndTime.before(startTime)) {
139 throw new CommandException(ErrorCode.E1015, newEndTime, "cannot be before coordinator job's start time [" + startTime + "]");
140 }
141
142 // New endTime cannot be before coordinator job's last action time.
143 Date lastActionTime = coordJob.getLastActionTime();
144 if (lastActionTime != null) {
145 Date d = new Date(lastActionTime.getTime() - coordJob.getFrequency() * 60 * 1000);
146 if (!newEndTime.after(d)) {
147 throw new CommandException(ErrorCode.E1015, newEndTime,
148 "must be after coordinator job's last action time [" + d + "]");
149 }
150 }
151 }
152
153 /**
154 * @param coordJob coordinator job id.
155 * @param newPauseTime new pause time.
156 * @param newEndTime new end time, can be null meaning no change on end time.
157 * @throws CommandException thrown if new pause time is not valid.
158 */
159 private void checkPauseTime(CoordinatorJobBean coordJob, Date newPauseTime)
160 throws CommandException {
161 // New pauseTime has to be a non-past time.
162 Date d = new Date();
163 if (newPauseTime.before(d)) {
164 throw new CommandException(ErrorCode.E1015, newPauseTime, "must be a non-past time");
165 }
166 }
167
168 /**
169 * Process lookahead created actions that become invalid because of the new pause time,
170 * These actions will be deleted from DB, also the coordinator job will be updated accordingly
171 *
172 * @param coordJob coordinator job
173 * @param newPauseTime new pause time
174 */
175 private void processLookaheadActions(CoordinatorJobBean coordJob, Date newPauseTime) throws CommandException {
176 Date lastActionTime = coordJob.getLastActionTime();
177 if (lastActionTime != null) {
178 // d is the real last action time.
179 Date d = new Date(lastActionTime.getTime() - coordJob.getFrequency() * 60 * 1000);
180 int lastActionNumber = coordJob.getLastActionNumber();
181
182 boolean hasChanged = false;
183 while (true) {
184 if (!newPauseTime.after(d)) {
185 deleteAction(coordJob.getId(), lastActionNumber);
186 d = new Date(d.getTime() - coordJob.getFrequency() * 60 * 1000);
187 lastActionNumber = lastActionNumber - 1;
188
189 hasChanged = true;
190 }
191 else {
192 break;
193 }
194 }
195
196 if (hasChanged == true) {
197 coordJob.setLastActionNumber(lastActionNumber);
198 Date d1 = new Date(d.getTime() + coordJob.getFrequency() * 60 * 1000);
199 coordJob.setLastActionTime(d1);
200 coordJob.setNextMaterializedTime(d1);
201
202 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED) {
203 coordJob.setStatus(CoordinatorJob.Status.RUNNING);
204 }
205 }
206 }
207 }
208
209 /**
210 * delete last action for a coordinator job
211 * @param coordJob coordinator job
212 * @param lastActionNum last action number of the coordinator job
213 */
214 private void deleteAction(String jobId, int lastActionNum) throws CommandException {
215 JPAService jpaService = Services.get().get(JPAService.class);
216 if (jpaService == null) {
217 throw new CommandException(ErrorCode.E0610);
218 }
219
220 try {
221 CoordinatorActionBean actionBean = jpaService.execute(new CoordJobGetActionByActionNumberJPAExecutor(jobId, lastActionNum));
222 jpaService.execute(new CoordActionRemoveJPAExecutor(actionBean.getId()));
223 }
224 catch (JPAExecutorException e) {
225 throw new CommandException(e);
226 }
227 }
228
229 /**
230 * @param coordJob coordinator job id.
231 * @param newEndTime new end time.
232 * @param newConcurrency new concurrency.
233 * @param newPauseTime new pause time.
234 * @throws CommandException thrown if new values are not valid.
235 */
236 private void check(CoordinatorJobBean coordJob, Date newEndTime, Integer newConcurrency, Date newPauseTime)
237 throws CommandException {
238 if (coordJob.getStatus() == CoordinatorJob.Status.KILLED) {
239 throw new CommandException(ErrorCode.E1016);
240 }
241
242 if (newEndTime != null) {
243 checkEndTime(coordJob, newEndTime);
244 }
245
246 if (newPauseTime != null) {
247 checkPauseTime(coordJob, newPauseTime);
248 }
249 }
250
251 @Override
252 protected Void call(CoordinatorStore store) throws StoreException, CommandException {
253 try {
254 CoordinatorJobBean coordJob = store.getCoordinatorJob(jobId, false);
255 setLogInfo(coordJob);
256
257 check(coordJob, newEndTime, newConcurrency, newPauseTime);
258
259 if (newEndTime != null) {
260 coordJob.setEndTime(newEndTime);
261 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED) {
262 coordJob.setStatus(CoordinatorJob.Status.RUNNING);
263 }
264 }
265
266 if (newConcurrency != null) {
267 coordJob.setConcurrency(newConcurrency);
268 }
269
270 if (newPauseTime != null || resetPauseTime == true) {
271 coordJob.setPauseTime(newPauseTime);
272 if (!resetPauseTime) {
273 processLookaheadActions(coordJob, newPauseTime);
274 }
275 }
276
277 store.updateCoordinatorJob(coordJob);
278
279 return null;
280 }
281 catch (XException ex) {
282 throw new CommandException(ex);
283 }
284 }
285
286 @Override
287 protected Void execute(CoordinatorStore store) throws StoreException, CommandException {
288 LOG.info("STARTED CoordChangeCommand for jobId=" + jobId);
289 try {
290 if (lock(jobId)) {
291 call(store);
292 }
293 else {
294 throw new CommandException(ErrorCode.E0606, "job " + jobId
295 + " has been locked and cannot change value, please retry later");
296 }
297 }
298 catch (InterruptedException e) {
299 throw new CommandException(ErrorCode.E0606, "acquiring lock for job " + jobId + " failed "
300 + " with exception " + e.getMessage());
301 }
302 finally {
303 LOG.info("ENDED CoordChangeCommand for jobId=" + jobId);
304 }
305 return null;
306 }
307 }