This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.util.Date;
021 import java.util.HashSet;
022 import java.util.Map;
023 import java.util.Set;
024 import java.util.Map.Entry;
025
026 import org.apache.oozie.CoordinatorActionBean;
027 import org.apache.oozie.CoordinatorJobBean;
028 import org.apache.oozie.ErrorCode;
029 import org.apache.oozie.XException;
030 import org.apache.oozie.client.CoordinatorJob;
031 import org.apache.oozie.client.Job;
032 import org.apache.oozie.client.OozieClient;
033 import org.apache.oozie.command.CommandException;
034 import org.apache.oozie.command.PreconditionException;
035 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
036 import org.apache.oozie.executor.jpa.CoordActionRemoveJPAExecutor;
037 import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
038 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
039 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
040 import org.apache.oozie.executor.jpa.JPAExecutorException;
041 import org.apache.oozie.service.JPAService;
042 import org.apache.oozie.service.Services;
043 import org.apache.oozie.util.DateUtils;
044 import org.apache.oozie.util.JobUtils;
045 import org.apache.oozie.util.LogUtils;
046 import org.apache.oozie.util.ParamChecker;
047
048 public class CoordChangeXCommand extends CoordinatorXCommand<Void> {
049 private final String jobId;
050 private Date newEndTime = null;
051 private Integer newConcurrency = null;
052 private Date newPauseTime = null;
053 private Date oldPauseTime = null;
054 private boolean resetPauseTime = false;
055 private CoordinatorJobBean coordJob;
056 private JPAService jpaService = null;
057 private Job.Status prevStatus;
058
059 private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>();
060 static {
061 ALLOWED_CHANGE_OPTIONS.add("endtime");
062 ALLOWED_CHANGE_OPTIONS.add("concurrency");
063 ALLOWED_CHANGE_OPTIONS.add("pausetime");
064 }
065
066 /**
067 * This command is used to update the Coordinator job with the new values Update the coordinator job bean and update
068 * that to database.
069 *
070 * @param id Coordinator job id.
071 * @param changeValue This the changed value in the form key=value.
072 * @throws CommandException thrown if changeValue cannot be parsed properly.
073 */
074 public CoordChangeXCommand(String id, String changeValue) throws CommandException {
075 super("coord_change", "coord_change", 0);
076 this.jobId = ParamChecker.notEmpty(id, "id");
077 ParamChecker.notEmpty(changeValue, "value");
078
079 validateChangeValue(changeValue);
080 }
081
082 /**
083 * @param changeValue change value.
084 * @throws CommandException thrown if changeValue cannot be parsed properly.
085 */
086 private void validateChangeValue(String changeValue) throws CommandException {
087 Map<String, String> map = JobUtils.parseChangeValue(changeValue);
088
089 if (map.size() > ALLOWED_CHANGE_OPTIONS.size()) {
090 throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime");
091 }
092
093 java.util.Iterator<Entry<String, String>> iter = map.entrySet().iterator();
094 while (iter.hasNext()) {
095 Entry<String, String> entry = iter.next();
096 String key = entry.getKey();
097 String value = entry.getValue();
098
099 if (!ALLOWED_CHANGE_OPTIONS.contains(key)) {
100 throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime");
101 }
102
103 if (!key.equals(OozieClient.CHANGE_VALUE_PAUSETIME) && value.equalsIgnoreCase("")) {
104 throw new CommandException(ErrorCode.E1015, changeValue, "value on " + key + " can not be empty");
105 }
106 }
107
108 if (map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)) {
109 String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME);
110 try {
111 newEndTime = DateUtils.parseDateUTC(value);
112 }
113 catch (Exception ex) {
114 throw new CommandException(ErrorCode.E1015, value, "must be a valid date");
115 }
116 }
117
118 if (map.containsKey(OozieClient.CHANGE_VALUE_CONCURRENCY)) {
119 String value = map.get(OozieClient.CHANGE_VALUE_CONCURRENCY);
120 try {
121 newConcurrency = Integer.parseInt(value);
122 }
123 catch (NumberFormatException ex) {
124 throw new CommandException(ErrorCode.E1015, value, "must be a valid integer");
125 }
126 }
127
128 if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) {
129 String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME);
130 if (value.equals("")) { // this is to reset pause time to null;
131 resetPauseTime = true;
132 }
133 else {
134 try {
135 newPauseTime = DateUtils.parseDateUTC(value);
136 }
137 catch (Exception ex) {
138 throw new CommandException(ErrorCode.E1015, value, "must be a valid date");
139 }
140 }
141 }
142 }
143
144 /**
145 * Check if new end time is valid.
146 *
147 * @param coordJob coordinator job id.
148 * @param newEndTime new end time.
149 * @throws CommandException thrown if new end time is not valid.
150 */
151 private void checkEndTime(CoordinatorJobBean coordJob, Date newEndTime) throws CommandException {
152 // New endTime cannot be before coordinator job's start time.
153 Date startTime = coordJob.getStartTime();
154 if (newEndTime.before(startTime)) {
155 throw new CommandException(ErrorCode.E1015, newEndTime, "cannot be before coordinator job's start time ["
156 + startTime + "]");
157 }
158
159 // New endTime cannot be before coordinator job's last action time.
160 Date lastActionTime = coordJob.getLastActionTime();
161 if (lastActionTime != null) {
162 Date d = new Date(lastActionTime.getTime() - coordJob.getFrequency() * 60 * 1000);
163 if (!newEndTime.after(d)) {
164 throw new CommandException(ErrorCode.E1015, newEndTime,
165 "must be after coordinator job's last action time [" + d + "]");
166 }
167 }
168 }
169
170 /**
171 * Check if new pause time is valid.
172 *
173 * @param coordJob coordinator job id.
174 * @param newPauseTime new pause time.
175 * @param newEndTime new end time, can be null meaning no change on end time.
176 * @throws CommandException thrown if new pause time is not valid.
177 */
178 private void checkPauseTime(CoordinatorJobBean coordJob, Date newPauseTime)
179 throws CommandException {
180 // New pauseTime has to be a non-past time.
181 Date d = new Date();
182 if (newPauseTime.before(d)) {
183 throw new CommandException(ErrorCode.E1015, newPauseTime, "must be a non-past time");
184 }
185 }
186
187 /**
188 * Process lookahead created actions that become invalid because of the new pause time,
189 * These actions will be deleted from DB, also the coordinator job will be updated accordingly
190 *
191 * @param coordJob coordinator job
192 * @param newPauseTime new pause time
193 */
194 private void processLookaheadActions(CoordinatorJobBean coordJob, Date newPauseTime) throws CommandException {
195 Date lastActionTime = coordJob.getLastActionTime();
196 if (lastActionTime != null) {
197 // d is the real last action time.
198 Date d = new Date(lastActionTime.getTime() - coordJob.getFrequency() * 60 * 1000);
199 int lastActionNumber = coordJob.getLastActionNumber();
200
201 boolean hasChanged = false;
202 while (true) {
203 if (!newPauseTime.after(d)) {
204 deleteAction(coordJob.getId(), lastActionNumber);
205 d = new Date(d.getTime() - coordJob.getFrequency() * 60 * 1000);
206 lastActionNumber = lastActionNumber - 1;
207
208 hasChanged = true;
209 }
210 else {
211 break;
212 }
213 }
214
215 if (hasChanged == true) {
216 coordJob.setLastActionNumber(lastActionNumber);
217 Date d1 = new Date(d.getTime() + coordJob.getFrequency() * 60 * 1000);
218 coordJob.setLastActionTime(d1);
219 coordJob.setNextMaterializedTime(d1);
220 coordJob.resetDoneMaterialization();
221 }
222 }
223 }
224
225 /**
226 * Delete last action for a coordinator job
227 *
228 * @param coordJob coordinator job
229 * @param lastActionNum last action number of the coordinator job
230 */
231 private void deleteAction(String jobId, int lastActionNum) throws CommandException {
232 try {
233 String coordActionId = jpaService.execute(new CoordJobGetActionByActionNumberJPAExecutor(jobId, lastActionNum));
234 jpaService.execute(new CoordActionRemoveJPAExecutor(coordActionId));
235 }
236 catch (JPAExecutorException e) {
237 throw new CommandException(e);
238 }
239 }
240
241 /**
242 * Check if new end time, new concurrency, new pause time are valid.
243 *
244 * @param coordJob coordinator job id.
245 * @param newEndTime new end time.
246 * @param newConcurrency new concurrency.
247 * @param newPauseTime new pause time.
248 * @throws CommandException thrown if new values are not valid.
249 */
250 private void check(CoordinatorJobBean coordJob, Date newEndTime, Integer newConcurrency, Date newPauseTime)
251 throws CommandException {
252 if (coordJob.getStatus() == CoordinatorJob.Status.KILLED) {
253 throw new CommandException(ErrorCode.E1016);
254 }
255
256 if (newEndTime != null) {
257 checkEndTime(coordJob, newEndTime);
258 }
259
260 if (newPauseTime != null) {
261 checkPauseTime(coordJob, newPauseTime);
262 }
263 }
264
265 /* (non-Javadoc)
266 * @see org.apache.oozie.command.XCommand#execute()
267 */
268 @Override
269 protected Void execute() throws CommandException {
270 LOG.info("STARTED CoordChangeXCommand for jobId=" + jobId);
271
272 try {
273 if (newEndTime != null) {
274 coordJob.setEndTime(newEndTime);
275 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED
276 || coordJob.getStatus() == CoordinatorJob.Status.RUNNING
277 || coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR
278 || coordJob.getStatus() == CoordinatorJob.Status.FAILED) {
279 coordJob.setStatus(CoordinatorJob.Status.RUNNING);
280 coordJob.setPending();
281 coordJob.resetDoneMaterialization();
282 }
283 }
284
285 if (newConcurrency != null) {
286 this.coordJob.setConcurrency(newConcurrency);
287 }
288
289 if (newPauseTime != null || resetPauseTime == true) {
290 this.coordJob.setPauseTime(newPauseTime);
291 if (oldPauseTime != null && newPauseTime != null) {
292 if (oldPauseTime.before(newPauseTime) && this.coordJob.getStatus() == Job.Status.PAUSED) {
293 this.coordJob.setStatus(Job.Status.RUNNING);
294 }
295 }
296 else if (oldPauseTime != null && newPauseTime == null && this.coordJob.getStatus() == Job.Status.PAUSED) {
297 this.coordJob.setStatus(Job.Status.RUNNING);
298 }
299
300 if (!resetPauseTime) {
301 processLookaheadActions(coordJob, newPauseTime);
302 }
303 }
304
305 jpaService.execute(new CoordJobUpdateJPAExecutor(this.coordJob));
306
307 return null;
308 }
309 catch (XException ex) {
310 throw new CommandException(ex);
311 }
312 finally {
313 LOG.info("ENDED CoordChangeXCommand for jobId=" + jobId);
314 // update bundle action
315 if (coordJob.getBundleId() != null) {
316 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
317 bundleStatusUpdate.call();
318 }
319 }
320 }
321
322 /* (non-Javadoc)
323 * @see org.apache.oozie.command.XCommand#getEntityKey()
324 */
325 @Override
326 public String getEntityKey() {
327 return this.jobId;
328 }
329
330 /* (non-Javadoc)
331 * @see org.apache.oozie.command.XCommand#loadState()
332 */
333 @Override
334 protected void loadState() throws CommandException{
335 jpaService = Services.get().get(JPAService.class);
336
337 if (jpaService == null) {
338 throw new CommandException(ErrorCode.E0610);
339 }
340
341 try {
342 this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
343 oldPauseTime = coordJob.getPauseTime();
344 prevStatus = coordJob.getStatus();
345 }
346 catch (JPAExecutorException e) {
347 throw new CommandException(e);
348 }
349
350 LogUtils.setLogInfo(this.coordJob, logInfo);
351 }
352
353 /* (non-Javadoc)
354 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
355 */
356 @Override
357 protected void verifyPrecondition() throws CommandException,PreconditionException {
358 check(this.coordJob, newEndTime, newConcurrency, newPauseTime);
359 }
360
361 /* (non-Javadoc)
362 * @see org.apache.oozie.command.XCommand#isLockRequired()
363 */
364 @Override
365 protected boolean isLockRequired() {
366 return true;
367 }
368 }