This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.util.ArrayList;
021 import java.util.Calendar;
022 import java.util.Date;
023 import java.util.HashSet;
024 import java.util.List;
025 import java.util.Map;
026 import java.util.Map.Entry;
027 import java.util.Set;
028
029 import org.apache.oozie.CoordinatorActionBean;
030 import org.apache.oozie.CoordinatorJobBean;
031 import org.apache.oozie.ErrorCode;
032 import org.apache.oozie.XException;
033 import org.apache.oozie.client.CoordinatorJob;
034 import org.apache.oozie.client.Job;
035 import org.apache.oozie.client.OozieClient;
036 import org.apache.oozie.client.rest.JsonBean;
037 import org.apache.oozie.command.CommandException;
038 import org.apache.oozie.command.PreconditionException;
039 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
040 import org.apache.oozie.coord.TimeUnit;
041 import org.apache.oozie.executor.jpa.BulkUpdateDeleteJPAExecutor;
042 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
043 import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
044 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
045 import org.apache.oozie.executor.jpa.JPAExecutorException;
046 import org.apache.oozie.service.JPAService;
047 import org.apache.oozie.service.Services;
048 import org.apache.oozie.util.DateUtils;
049 import org.apache.oozie.util.JobUtils;
050 import org.apache.oozie.util.LogUtils;
051 import org.apache.oozie.util.ParamChecker;
052 import org.apache.oozie.util.StatusUtils;
053
054 public class CoordChangeXCommand extends CoordinatorXCommand<Void> {
055 private final String jobId;
056 private Date newEndTime = null;
057 private Integer newConcurrency = null;
058 private Date newPauseTime = null;
059 private Date oldPauseTime = null;
060 private boolean resetPauseTime = false;
061 private CoordinatorJobBean coordJob;
062 private JPAService jpaService = null;
063 private Job.Status prevStatus;
064 private List<JsonBean> updateList = new ArrayList<JsonBean>();
065 private List<JsonBean> deleteList = new ArrayList<JsonBean>();
066
067 private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>();
068 static {
069 ALLOWED_CHANGE_OPTIONS.add("endtime");
070 ALLOWED_CHANGE_OPTIONS.add("concurrency");
071 ALLOWED_CHANGE_OPTIONS.add("pausetime");
072 }
073
074 /**
075 * This command is used to update the Coordinator job with the new values Update the coordinator job bean and update
076 * that to database.
077 *
078 * @param id Coordinator job id.
079 * @param changeValue This the changed value in the form key=value.
080 * @throws CommandException thrown if changeValue cannot be parsed properly.
081 */
082 public CoordChangeXCommand(String id, String changeValue) throws CommandException {
083 super("coord_change", "coord_change", 0);
084 this.jobId = ParamChecker.notEmpty(id, "id");
085 ParamChecker.notEmpty(changeValue, "value");
086
087 validateChangeValue(changeValue);
088 }
089
090 /**
091 * @param changeValue change value.
092 * @throws CommandException thrown if changeValue cannot be parsed properly.
093 */
094 private void validateChangeValue(String changeValue) throws CommandException {
095 Map<String, String> map = JobUtils.parseChangeValue(changeValue);
096
097 if (map.size() > ALLOWED_CHANGE_OPTIONS.size()) {
098 throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime");
099 }
100
101 java.util.Iterator<Entry<String, String>> iter = map.entrySet().iterator();
102 while (iter.hasNext()) {
103 Entry<String, String> entry = iter.next();
104 String key = entry.getKey();
105 String value = entry.getValue();
106
107 if (!ALLOWED_CHANGE_OPTIONS.contains(key)) {
108 throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime");
109 }
110
111 if (!key.equals(OozieClient.CHANGE_VALUE_PAUSETIME) && value.equalsIgnoreCase("")) {
112 throw new CommandException(ErrorCode.E1015, changeValue, "value on " + key + " can not be empty");
113 }
114 }
115
116 if (map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)) {
117 String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME);
118 try {
119 newEndTime = DateUtils.parseDateOozieTZ(value);
120 }
121 catch (Exception ex) {
122 throw new CommandException(ErrorCode.E1015, value, "must be a valid date");
123 }
124 }
125
126 if (map.containsKey(OozieClient.CHANGE_VALUE_CONCURRENCY)) {
127 String value = map.get(OozieClient.CHANGE_VALUE_CONCURRENCY);
128 try {
129 newConcurrency = Integer.parseInt(value);
130 }
131 catch (NumberFormatException ex) {
132 throw new CommandException(ErrorCode.E1015, value, "must be a valid integer");
133 }
134 }
135
136 if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) {
137 String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME);
138 if (value.equals("")) { // this is to reset pause time to null;
139 resetPauseTime = true;
140 }
141 else {
142 try {
143 newPauseTime = DateUtils.parseDateOozieTZ(value);
144 }
145 catch (Exception ex) {
146 throw new CommandException(ErrorCode.E1015, value, "must be a valid date");
147 }
148 }
149 }
150 }
151
152 /**
153 * Returns the actual last action time(one instance before coordJob.lastActionTime)
154 * @return Date - last action time if coordJob.getLastActionTime() is not null, null otherwise
155 */
156 private Date getLastActionTime() {
157 if(coordJob.getLastActionTime() == null)
158 return null;
159
160 Calendar d = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone()));
161 d.setTime(coordJob.getLastActionTime());
162 TimeUnit timeUnit = TimeUnit.valueOf(coordJob.getTimeUnitStr());
163 d.add(timeUnit.getCalendarUnit(), -coordJob.getFrequency());
164 return d.getTime();
165 }
166
167 /**
168 * Check if new end time is valid.
169 *
170 * @param coordJob coordinator job id.
171 * @param newEndTime new end time.
172 * @throws CommandException thrown if new end time is not valid.
173 */
174 private void checkEndTime(CoordinatorJobBean coordJob, Date newEndTime) throws CommandException {
175 // New endTime cannot be before coordinator job's start time.
176 Date startTime = coordJob.getStartTime();
177 if (newEndTime.before(startTime)) {
178 throw new CommandException(ErrorCode.E1015, newEndTime, "cannot be before coordinator job's start time ["
179 + startTime + "]");
180 }
181
182 // New endTime cannot be before coordinator job's last action time.
183 Date lastActionTime = getLastActionTime();
184 if (lastActionTime != null) {
185 if (!newEndTime.after(lastActionTime)) {
186 throw new CommandException(ErrorCode.E1015, newEndTime,
187 "must be after coordinator job's last action time [" + lastActionTime + "]");
188 }
189 }
190 }
191
192 /**
193 * Check if new pause time is valid.
194 *
195 * @param coordJob coordinator job id.
196 * @param newPauseTime new pause time.
197 * @param newEndTime new end time, can be null meaning no change on end time.
198 * @throws CommandException thrown if new pause time is not valid.
199 */
200 private void checkPauseTime(CoordinatorJobBean coordJob, Date newPauseTime)
201 throws CommandException {
202 // New pauseTime has to be a non-past time.
203 Date d = new Date();
204 if (newPauseTime.before(d)) {
205 throw new CommandException(ErrorCode.E1015, newPauseTime, "must be a non-past time");
206 }
207 }
208
209 /**
210 * Process lookahead created actions that become invalid because of the new pause time,
211 * These actions will be deleted from DB, also the coordinator job will be updated accordingly
212 *
213 * @param coordJob coordinator job
214 * @param newPauseTime new pause time
215 */
216 private void processLookaheadActions(CoordinatorJobBean coordJob, Date newPauseTime) throws CommandException {
217 Date lastActionTime = coordJob.getLastActionTime();
218 if (lastActionTime != null) {
219 // d is the real last action time.
220 Calendar d = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone()));
221 d.setTime(getLastActionTime());
222 TimeUnit timeUnit = TimeUnit.valueOf(coordJob.getTimeUnitStr());
223
224 int lastActionNumber = coordJob.getLastActionNumber();
225
226 boolean hasChanged = false;
227 while (true) {
228 if (!newPauseTime.after(d.getTime())) {
229 deleteAction(lastActionNumber);
230 d.add(timeUnit.getCalendarUnit(), -coordJob.getFrequency());
231 lastActionNumber = lastActionNumber - 1;
232
233 hasChanged = true;
234 }
235 else {
236 break;
237 }
238 }
239
240 if (hasChanged == true) {
241 coordJob.setLastActionNumber(lastActionNumber);
242 d.add(timeUnit.getCalendarUnit(), coordJob.getFrequency());
243 Date d1 = d.getTime();
244 coordJob.setLastActionTime(d1);
245 coordJob.setNextMaterializedTime(d1);
246 coordJob.resetDoneMaterialization();
247 }
248 }
249 }
250
251 /**
252 * Delete coordinator action
253 *
254 * @param actionNum coordinator action number
255 */
256 private void deleteAction(int actionNum) throws CommandException {
257 try {
258 String actionId = jpaService.execute(new CoordJobGetActionByActionNumberJPAExecutor(jobId, actionNum));
259 CoordinatorActionBean bean = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
260 deleteList.add(bean);
261 } catch (JPAExecutorException e) {
262 throw new CommandException(e);
263 }
264 }
265
266 /**
267 * Check if new end time, new concurrency, new pause time are valid.
268 *
269 * @param coordJob coordinator job id.
270 * @param newEndTime new end time.
271 * @param newConcurrency new concurrency.
272 * @param newPauseTime new pause time.
273 * @throws CommandException thrown if new values are not valid.
274 */
275 private void check(CoordinatorJobBean coordJob, Date newEndTime, Integer newConcurrency, Date newPauseTime)
276 throws CommandException {
277 if (coordJob.getStatus() == CoordinatorJob.Status.KILLED) {
278 throw new CommandException(ErrorCode.E1016);
279 }
280
281 if (newEndTime != null) {
282 checkEndTime(coordJob, newEndTime);
283 }
284
285 if (newPauseTime != null) {
286 checkPauseTime(coordJob, newPauseTime);
287 }
288 }
289
290 /* (non-Javadoc)
291 * @see org.apache.oozie.command.XCommand#execute()
292 */
293 @Override
294 protected Void execute() throws CommandException {
295 LOG.info("STARTED CoordChangeXCommand for jobId=" + jobId);
296
297 try {
298 if (newEndTime != null) {
299 coordJob.setEndTime(newEndTime);
300 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED){
301 coordJob.setStatus(CoordinatorJob.Status.RUNNING);
302 }
303 if (coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR
304 || coordJob.getStatus() == CoordinatorJob.Status.FAILED) {
305 // Check for backward compatibility for Oozie versions (3.2 and before)
306 // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
307 // PAUSEDWITHERROR is not supported
308 coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR));
309 }
310 coordJob.setPending();
311 coordJob.resetDoneMaterialization();
312 }
313
314 if (newConcurrency != null) {
315 this.coordJob.setConcurrency(newConcurrency);
316 }
317
318 if (newPauseTime != null || resetPauseTime == true) {
319 this.coordJob.setPauseTime(newPauseTime);
320 if (oldPauseTime != null && newPauseTime != null) {
321 if (oldPauseTime.before(newPauseTime)) {
322 if (this.coordJob.getStatus() == Job.Status.PAUSED) {
323 this.coordJob.setStatus(Job.Status.RUNNING);
324 }
325 else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
326 this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR);
327 }
328 }
329 }
330 else if (oldPauseTime != null && newPauseTime == null) {
331 if (this.coordJob.getStatus() == Job.Status.PAUSED) {
332 this.coordJob.setStatus(Job.Status.RUNNING);
333 }
334 else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
335 this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR);
336 }
337 }
338 if (!resetPauseTime) {
339 processLookaheadActions(coordJob, newPauseTime);
340 }
341 }
342
343 if (coordJob.getNextMaterializedTime() != null && coordJob.getEndTime().compareTo(coordJob.getNextMaterializedTime()) <= 0) {
344 LOG.info("[" + coordJob.getId() + "]: all actions have been materialized, job status = " + coordJob.getStatus()
345 + ", set pending to true");
346 // set doneMaterialization to true when materialization is done
347 coordJob.setDoneMaterialization();
348 }
349
350 updateList.add(coordJob);
351 jpaService.execute(new BulkUpdateDeleteJPAExecutor(updateList, deleteList, false));
352
353 return null;
354 }
355 catch (XException ex) {
356 throw new CommandException(ex);
357 }
358 finally {
359 LOG.info("ENDED CoordChangeXCommand for jobId=" + jobId);
360 // update bundle action
361 if (coordJob.getBundleId() != null) {
362 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
363 bundleStatusUpdate.call();
364 }
365 }
366 }
367
368 /* (non-Javadoc)
369 * @see org.apache.oozie.command.XCommand#getEntityKey()
370 */
371 @Override
372 public String getEntityKey() {
373 return this.jobId;
374 }
375
376 /* (non-Javadoc)
377 * @see org.apache.oozie.command.XCommand#loadState()
378 */
379 @Override
380 protected void loadState() throws CommandException{
381 jpaService = Services.get().get(JPAService.class);
382
383 if (jpaService == null) {
384 throw new CommandException(ErrorCode.E0610);
385 }
386
387 try {
388 this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
389 oldPauseTime = coordJob.getPauseTime();
390 prevStatus = coordJob.getStatus();
391 }
392 catch (JPAExecutorException e) {
393 throw new CommandException(e);
394 }
395
396 LogUtils.setLogInfo(this.coordJob, logInfo);
397 }
398
399 /* (non-Javadoc)
400 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
401 */
402 @Override
403 protected void verifyPrecondition() throws CommandException,PreconditionException {
404 check(this.coordJob, newEndTime, newConcurrency, newPauseTime);
405 }
406
407 /* (non-Javadoc)
408 * @see org.apache.oozie.command.XCommand#isLockRequired()
409 */
410 @Override
411 protected boolean isLockRequired() {
412 return true;
413 }
414 }