This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.util.ArrayList;
021 import java.util.Calendar;
022 import java.util.Date;
023 import java.util.HashSet;
024 import java.util.List;
025 import java.util.Map;
026 import java.util.Map.Entry;
027 import java.util.Set;
028
029 import org.apache.oozie.CoordinatorActionBean;
030 import org.apache.oozie.CoordinatorJobBean;
031 import org.apache.oozie.ErrorCode;
032 import org.apache.oozie.XException;
033 import org.apache.oozie.client.CoordinatorJob;
034 import org.apache.oozie.client.Job;
035 import org.apache.oozie.client.OozieClient;
036 import org.apache.oozie.client.rest.JsonBean;
037 import org.apache.oozie.command.CommandException;
038 import org.apache.oozie.command.PreconditionException;
039 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
040 import org.apache.oozie.coord.TimeUnit;
041 import org.apache.oozie.executor.jpa.BulkUpdateDeleteJPAExecutor;
042 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
043 import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
044 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
045 import org.apache.oozie.executor.jpa.JPAExecutorException;
046 import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
047 import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
048 import org.apache.oozie.service.JPAService;
049 import org.apache.oozie.service.Services;
050 import org.apache.oozie.sla.SLARegistrationBean;
051 import org.apache.oozie.sla.SLASummaryBean;
052 import org.apache.oozie.sla.service.SLAService;
053 import org.apache.oozie.util.DateUtils;
054 import org.apache.oozie.util.JobUtils;
055 import org.apache.oozie.util.LogUtils;
056 import org.apache.oozie.util.ParamChecker;
057 import org.apache.oozie.util.StatusUtils;
058
059 public class CoordChangeXCommand extends CoordinatorXCommand<Void> {
060 private final String jobId;
061 private Date newEndTime = null;
062 private Integer newConcurrency = null;
063 private Date newPauseTime = null;
064 private Date oldPauseTime = null;
065 private boolean resetPauseTime = false;
066 private CoordinatorJobBean coordJob;
067 private JPAService jpaService = null;
068 private Job.Status prevStatus;
069 private List<JsonBean> updateList = new ArrayList<JsonBean>();
070 private List<JsonBean> deleteList = new ArrayList<JsonBean>();
071
072 private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>();
073 static {
074 ALLOWED_CHANGE_OPTIONS.add("endtime");
075 ALLOWED_CHANGE_OPTIONS.add("concurrency");
076 ALLOWED_CHANGE_OPTIONS.add("pausetime");
077 }
078
079 /**
080 * This command is used to update the Coordinator job with the new values Update the coordinator job bean and update
081 * that to database.
082 *
083 * @param id Coordinator job id.
084 * @param changeValue This the changed value in the form key=value.
085 * @throws CommandException thrown if changeValue cannot be parsed properly.
086 */
087 public CoordChangeXCommand(String id, String changeValue) throws CommandException {
088 super("coord_change", "coord_change", 0);
089 this.jobId = ParamChecker.notEmpty(id, "id");
090 ParamChecker.notEmpty(changeValue, "value");
091
092 validateChangeValue(changeValue);
093 }
094
095 /**
096 * @param changeValue change value.
097 * @throws CommandException thrown if changeValue cannot be parsed properly.
098 */
099 private void validateChangeValue(String changeValue) throws CommandException {
100 Map<String, String> map = JobUtils.parseChangeValue(changeValue);
101
102 if (map.size() > ALLOWED_CHANGE_OPTIONS.size()) {
103 throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime");
104 }
105
106 java.util.Iterator<Entry<String, String>> iter = map.entrySet().iterator();
107 while (iter.hasNext()) {
108 Entry<String, String> entry = iter.next();
109 String key = entry.getKey();
110 String value = entry.getValue();
111
112 if (!ALLOWED_CHANGE_OPTIONS.contains(key)) {
113 throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime");
114 }
115
116 if (!key.equals(OozieClient.CHANGE_VALUE_PAUSETIME) && value.equalsIgnoreCase("")) {
117 throw new CommandException(ErrorCode.E1015, changeValue, "value on " + key + " can not be empty");
118 }
119 }
120
121 if (map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)) {
122 String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME);
123 try {
124 newEndTime = DateUtils.parseDateOozieTZ(value);
125 }
126 catch (Exception ex) {
127 throw new CommandException(ErrorCode.E1015, value, "must be a valid date");
128 }
129 }
130
131 if (map.containsKey(OozieClient.CHANGE_VALUE_CONCURRENCY)) {
132 String value = map.get(OozieClient.CHANGE_VALUE_CONCURRENCY);
133 try {
134 newConcurrency = Integer.parseInt(value);
135 }
136 catch (NumberFormatException ex) {
137 throw new CommandException(ErrorCode.E1015, value, "must be a valid integer");
138 }
139 }
140
141 if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) {
142 String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME);
143 if (value.equals("")) { // this is to reset pause time to null;
144 resetPauseTime = true;
145 }
146 else {
147 try {
148 newPauseTime = DateUtils.parseDateOozieTZ(value);
149 }
150 catch (Exception ex) {
151 throw new CommandException(ErrorCode.E1015, value, "must be a valid date");
152 }
153 }
154 }
155 }
156
157 /**
158 * Returns the actual last action time(one instance before coordJob.lastActionTime)
159 * @return Date - last action time if coordJob.getLastActionTime() is not null, null otherwise
160 */
161 private Date getLastActionTime() {
162 if(coordJob.getLastActionTime() == null)
163 return null;
164
165 Calendar d = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone()));
166 d.setTime(coordJob.getLastActionTime());
167 TimeUnit timeUnit = TimeUnit.valueOf(coordJob.getTimeUnitStr());
168 d.add(timeUnit.getCalendarUnit(), -Integer.valueOf(coordJob.getFrequency()));
169 return d.getTime();
170 }
171
172 /**
173 * Check if new end time is valid.
174 *
175 * @param coordJob coordinator job id.
176 * @param newEndTime new end time.
177 * @throws CommandException thrown if new end time is not valid.
178 */
179 private void checkEndTime(CoordinatorJobBean coordJob, Date newEndTime) throws CommandException {
180 // New endTime cannot be before coordinator job's start time.
181 Date startTime = coordJob.getStartTime();
182 if (newEndTime.before(startTime)) {
183 throw new CommandException(ErrorCode.E1015, newEndTime, "cannot be before coordinator job's start time ["
184 + startTime + "]");
185 }
186
187 // New endTime cannot be before coordinator job's last action time.
188 Date lastActionTime = getLastActionTime();
189 if (lastActionTime != null) {
190 if (!newEndTime.after(lastActionTime)) {
191 throw new CommandException(ErrorCode.E1015, newEndTime,
192 "must be after coordinator job's last action time [" + lastActionTime + "]");
193 }
194 }
195 }
196
197 /**
198 * Check if new pause time is valid.
199 *
200 * @param coordJob coordinator job id.
201 * @param newPauseTime new pause time.
202 * @param newEndTime new end time, can be null meaning no change on end time.
203 * @throws CommandException thrown if new pause time is not valid.
204 */
205 private void checkPauseTime(CoordinatorJobBean coordJob, Date newPauseTime)
206 throws CommandException {
207 // New pauseTime has to be a non-past time.
208 Date d = new Date();
209 if (newPauseTime.before(d)) {
210 throw new CommandException(ErrorCode.E1015, newPauseTime, "must be a non-past time");
211 }
212 }
213
214 /**
215 * Process lookahead created actions that become invalid because of the new pause time,
216 * These actions will be deleted from DB, also the coordinator job will be updated accordingly
217 *
218 * @param coordJob coordinator job
219 * @param newPauseTime new pause time
220 */
221 private void processLookaheadActions(CoordinatorJobBean coordJob, Date newPauseTime) throws CommandException {
222 Date lastActionTime = coordJob.getLastActionTime();
223 if (lastActionTime != null) {
224 // d is the real last action time.
225 Calendar d = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone()));
226 d.setTime(getLastActionTime());
227 TimeUnit timeUnit = TimeUnit.valueOf(coordJob.getTimeUnitStr());
228
229 int lastActionNumber = coordJob.getLastActionNumber();
230
231 boolean hasChanged = false;
232 while (true) {
233 if (!newPauseTime.after(d.getTime())) {
234 deleteAction(lastActionNumber);
235 d.add(timeUnit.getCalendarUnit(), -Integer.valueOf(coordJob.getFrequency()));
236 lastActionNumber = lastActionNumber - 1;
237
238 hasChanged = true;
239 }
240 else {
241 break;
242 }
243 }
244
245 if (hasChanged == true) {
246 coordJob.setLastActionNumber(lastActionNumber);
247 d.add(timeUnit.getCalendarUnit(), Integer.valueOf(coordJob.getFrequency()));
248 Date d1 = d.getTime();
249 coordJob.setLastActionTime(d1);
250 coordJob.setNextMaterializedTime(d1);
251 coordJob.resetDoneMaterialization();
252 }
253 }
254 }
255
256 /**
257 * Delete coordinator action
258 *
259 * @param actionNum coordinator action number
260 */
261 private void deleteAction(int actionNum) throws CommandException {
262 try {
263 String actionId = jpaService.execute(new CoordJobGetActionByActionNumberJPAExecutor(jobId, actionNum));
264 CoordinatorActionBean bean = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
265 // delete SLA registration entry (if any) for action
266 if (SLAService.isEnabled()) {
267 Services.get().get(SLAService.class).removeRegistration(actionId);
268 }
269 SLARegistrationBean slaReg = jpaService.execute(new SLARegistrationGetJPAExecutor(actionId));
270 if (slaReg != null) {
271 LOG.debug("Deleting registration bean corresponding to action " + slaReg.getId());
272 deleteList.add(slaReg);
273 }
274 SLASummaryBean slaSummaryBean = jpaService.execute(new SLASummaryGetJPAExecutor(actionId));
275 if (slaSummaryBean != null) {
276 LOG.debug("Deleting summary bean corresponding to action " + slaSummaryBean.getId());
277 deleteList.add(slaSummaryBean);
278 }
279 deleteList.add(bean);
280 }
281 catch (JPAExecutorException e) {
282 throw new CommandException(e);
283 }
284 }
285
286 /**
287 * Check if new end time, new concurrency, new pause time are valid.
288 *
289 * @param coordJob coordinator job id.
290 * @param newEndTime new end time.
291 * @param newConcurrency new concurrency.
292 * @param newPauseTime new pause time.
293 * @throws CommandException thrown if new values are not valid.
294 */
295 private void check(CoordinatorJobBean coordJob, Date newEndTime, Integer newConcurrency, Date newPauseTime)
296 throws CommandException {
297 if (coordJob.getStatus() == CoordinatorJob.Status.KILLED) {
298 throw new CommandException(ErrorCode.E1016);
299 }
300
301 if (newEndTime != null) {
302 checkEndTime(coordJob, newEndTime);
303 }
304
305 if (newPauseTime != null) {
306 checkPauseTime(coordJob, newPauseTime);
307 }
308 }
309
310 /* (non-Javadoc)
311 * @see org.apache.oozie.command.XCommand#execute()
312 */
313 @Override
314 protected Void execute() throws CommandException {
315 LOG.info("STARTED CoordChangeXCommand for jobId=" + jobId);
316
317 try {
318 if (newEndTime != null) {
319 coordJob.setEndTime(newEndTime);
320 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED){
321 coordJob.setStatus(CoordinatorJob.Status.RUNNING);
322 }
323 if (coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR
324 || coordJob.getStatus() == CoordinatorJob.Status.FAILED) {
325 // Check for backward compatibility for Oozie versions (3.2 and before)
326 // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
327 // PAUSEDWITHERROR is not supported
328 coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR));
329 }
330 coordJob.setPending();
331 coordJob.resetDoneMaterialization();
332 }
333
334 if (newConcurrency != null) {
335 this.coordJob.setConcurrency(newConcurrency);
336 }
337
338 if (newPauseTime != null || resetPauseTime == true) {
339 this.coordJob.setPauseTime(newPauseTime);
340 if (oldPauseTime != null && newPauseTime != null) {
341 if (oldPauseTime.before(newPauseTime)) {
342 if (this.coordJob.getStatus() == Job.Status.PAUSED) {
343 this.coordJob.setStatus(Job.Status.RUNNING);
344 }
345 else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
346 this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR);
347 }
348 }
349 }
350 else if (oldPauseTime != null && newPauseTime == null) {
351 if (this.coordJob.getStatus() == Job.Status.PAUSED) {
352 this.coordJob.setStatus(Job.Status.RUNNING);
353 }
354 else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
355 this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR);
356 }
357 }
358 if (!resetPauseTime) {
359 processLookaheadActions(coordJob, newPauseTime);
360 }
361 }
362
363 if (coordJob.getNextMaterializedTime() != null && coordJob.getEndTime().compareTo(coordJob.getNextMaterializedTime()) <= 0) {
364 LOG.info("[" + coordJob.getId() + "]: all actions have been materialized, job status = " + coordJob.getStatus()
365 + ", set pending to true");
366 // set doneMaterialization to true when materialization is done
367 coordJob.setDoneMaterialization();
368 }
369
370 updateList.add(coordJob);
371 jpaService.execute(new BulkUpdateDeleteJPAExecutor(updateList, deleteList, false));
372
373 return null;
374 }
375 catch (XException ex) {
376 throw new CommandException(ex);
377 }
378 finally {
379 LOG.info("ENDED CoordChangeXCommand for jobId=" + jobId);
380 // update bundle action
381 if (coordJob.getBundleId() != null) {
382 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
383 bundleStatusUpdate.call();
384 }
385 }
386 }
387
388 /* (non-Javadoc)
389 * @see org.apache.oozie.command.XCommand#getEntityKey()
390 */
391 @Override
392 public String getEntityKey() {
393 return this.jobId;
394 }
395
396 /* (non-Javadoc)
397 * @see org.apache.oozie.command.XCommand#loadState()
398 */
399 @Override
400 protected void loadState() throws CommandException{
401 jpaService = Services.get().get(JPAService.class);
402
403 if (jpaService == null) {
404 throw new CommandException(ErrorCode.E0610);
405 }
406
407 try {
408 this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
409 oldPauseTime = coordJob.getPauseTime();
410 prevStatus = coordJob.getStatus();
411 }
412 catch (JPAExecutorException e) {
413 throw new CommandException(e);
414 }
415
416 LogUtils.setLogInfo(this.coordJob, logInfo);
417 }
418
419 /* (non-Javadoc)
420 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
421 */
422 @Override
423 protected void verifyPrecondition() throws CommandException,PreconditionException {
424 check(this.coordJob, newEndTime, newConcurrency, newPauseTime);
425 }
426
427 /* (non-Javadoc)
428 * @see org.apache.oozie.command.XCommand#isLockRequired()
429 */
430 @Override
431 protected boolean isLockRequired() {
432 return true;
433 }
434 }