001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.coord;
020
021import java.util.ArrayList;
022import java.util.Date;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Map.Entry;
027import java.util.Set;
028
029import org.apache.commons.lang.StringUtils;
030import org.apache.oozie.CoordinatorActionBean;
031import org.apache.oozie.CoordinatorJobBean;
032import org.apache.oozie.ErrorCode;
033import org.apache.oozie.XException;
034import org.apache.oozie.client.CoordinatorAction;
035import org.apache.oozie.client.CoordinatorJob;
036import org.apache.oozie.client.Job;
037import org.apache.oozie.client.OozieClient;
038import org.apache.oozie.client.rest.JsonBean;
039import org.apache.oozie.command.CommandException;
040import org.apache.oozie.command.PreconditionException;
041import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
042import org.apache.oozie.executor.jpa.BatchQueryExecutor;
043import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
044import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
045import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
046import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
047import org.apache.oozie.executor.jpa.JPAExecutorException;
048import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
049import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
050import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
051import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
052import org.apache.oozie.service.JPAService;
053import org.apache.oozie.service.Services;
054import org.apache.oozie.sla.SLARegistrationBean;
055import org.apache.oozie.sla.SLASummaryBean;
056import org.apache.oozie.sla.service.SLAService;
057import org.apache.oozie.util.DateUtils;
058import org.apache.oozie.util.JobUtils;
059import org.apache.oozie.util.LogUtils;
060import org.apache.oozie.util.ParamChecker;
061import org.apache.oozie.util.StatusUtils;
062
063public class CoordChangeXCommand extends CoordinatorXCommand<Void> {
064    private final String jobId;
065    private Date newEndTime = null;
066    private Integer oldConcurrency = null;
067    private Integer newConcurrency = null;
068    private Date newPauseTime = null;
069    private Date oldPauseTime = null;
070    private boolean resetPauseTime = false;
071    private CoordinatorJob.Status jobStatus = null;
072    private CoordinatorJobBean coordJob;
073    private JPAService jpaService = null;
074    private Job.Status prevStatus;
075    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
076    private List<JsonBean> deleteList = new ArrayList<JsonBean>();
077
078    private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>();
079    static {
080        ALLOWED_CHANGE_OPTIONS.add("endtime");
081        ALLOWED_CHANGE_OPTIONS.add("concurrency");
082        ALLOWED_CHANGE_OPTIONS.add("pausetime");
083        ALLOWED_CHANGE_OPTIONS.add(OozieClient.CHANGE_VALUE_STATUS);
084
085    }
086
087    /**
088     * This command is used to update the Coordinator job with the new values Update the coordinator job bean and update
089     * that to database.
090     *
091     * @param id Coordinator job id.
092     * @param changeValue This the changed value in the form key=value.
093     * @throws CommandException thrown if changeValue cannot be parsed properly.
094     */
095    public CoordChangeXCommand(String id, String changeValue) throws CommandException {
096        super("coord_change", "coord_change", 0);
097        this.jobId = ParamChecker.notEmpty(id, "id");
098        ParamChecker.notEmpty(changeValue, "value");
099
100        validateChangeValue(changeValue);
101    }
102
103    @Override
104    protected void setLogInfo() {
105        LogUtils.setLogInfo(jobId);
106    }
107
108    /**
109     * @param changeValue change value.
110     * @throws CommandException thrown if changeValue cannot be parsed properly.
111     */
112    private void validateChangeValue(String changeValue) throws CommandException {
113        Map<String, String> map = JobUtils.parseChangeValue(changeValue);
114
115        if (map.size() > ALLOWED_CHANGE_OPTIONS.size()) {
116            throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime|status");
117        }
118
119        java.util.Iterator<Entry<String, String>> iter = map.entrySet().iterator();
120        while (iter.hasNext()) {
121            Entry<String, String> entry = iter.next();
122            String key = entry.getKey();
123            String value = entry.getValue();
124
125            if (!ALLOWED_CHANGE_OPTIONS.contains(key)) {
126                throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime|status");
127            }
128
129            if (!key.equals(OozieClient.CHANGE_VALUE_PAUSETIME) && value.equalsIgnoreCase("")) {
130                throw new CommandException(ErrorCode.E1015, changeValue, "value on " + key + " can not be empty");
131            }
132        }
133
134        if (map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)) {
135            String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME);
136            try {
137                newEndTime = DateUtils.parseDateOozieTZ(value);
138            }
139            catch (Exception ex) {
140                throw new CommandException(ErrorCode.E1015, value, "must be a valid date");
141            }
142        }
143
144        if (map.containsKey(OozieClient.CHANGE_VALUE_CONCURRENCY)) {
145            String value = map.get(OozieClient.CHANGE_VALUE_CONCURRENCY);
146            try {
147                newConcurrency = Integer.parseInt(value);
148            }
149            catch (NumberFormatException ex) {
150                throw new CommandException(ErrorCode.E1015, value, "must be a valid integer");
151            }
152        }
153
154        if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) {
155            String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME);
156            if (value.equals("")) { // this is to reset pause time to null;
157                resetPauseTime = true;
158            }
159            else {
160                try {
161                    newPauseTime = DateUtils.parseDateOozieTZ(value);
162                }
163                catch (Exception ex) {
164                    throw new CommandException(ErrorCode.E1015, value, "must be a valid date");
165                }
166            }
167        }
168
169        if (map.containsKey(OozieClient.CHANGE_VALUE_STATUS)) {
170            String value = map.get(OozieClient.CHANGE_VALUE_STATUS);
171            if (!StringUtils.isEmpty(value)) {
172                jobStatus = CoordinatorJob.Status.valueOf(value);
173            }
174        }
175    }
176
177    /**
178     * Check if new end time is valid.
179     *
180     * @param coordJob coordinator job id.
181     * @param newEndTime new end time.
182     * @throws CommandException thrown if new end time is not valid.
183     */
184    private void checkEndTime(CoordinatorJobBean coordJob, Date newEndTime) throws CommandException {
185        //It's ok to set end date before start date.
186    }
187
188    /**
189     * Check if new pause time is valid.
190     *
191     * @param coordJob coordinator job id.
192     * @param newPauseTime new pause time.
193     * @param newEndTime new end time, can be null meaning no change on end time.
194     * @throws CommandException thrown if new pause time is not valid.
195     */
196    private void checkPauseTime(CoordinatorJobBean coordJob, Date newPauseTime)
197            throws CommandException {
198        //no check.
199    }
200
201    /**
202     * Check if status change is valid.
203     *
204     * @param coordJob the coord job
205     * @param jobStatus the job status
206     * @throws CommandException the command exception
207     */
208    private void checkStatusChange(CoordinatorJobBean coordJob, CoordinatorJob.Status jobStatus)
209            throws CommandException {
210        if (!jobStatus.equals(CoordinatorJob.Status.RUNNING) && !jobStatus.equals(CoordinatorJob.Status.IGNORED)) {
211            throw new CommandException(ErrorCode.E1015, jobStatus, " must be RUNNING or IGNORED");
212        }
213
214        if (jobStatus.equals(CoordinatorJob.Status.RUNNING)) {
215            if (!(coordJob.getStatus().equals(CoordinatorJob.Status.FAILED) || coordJob.getStatus().equals(
216                    CoordinatorJob.Status.KILLED) || coordJob.getStatus().equals(CoordinatorJob.Status.IGNORED))) {
217                throw new CommandException(ErrorCode.E1015, jobStatus,
218                        " Only FAILED, KILLED, IGNORED job can be changed to RUNNING. Current job status is "
219                                + coordJob.getStatus());
220            }
221        }
222        else {
223            if (!(coordJob.getStatus().equals(CoordinatorJob.Status.FAILED) || coordJob.getStatus().equals(
224                    CoordinatorJob.Status.KILLED))
225                    || coordJob.isPending()) {
226                throw new CommandException(ErrorCode.E1015, jobStatus,
227                        " Only FAILED or KILLED non-pending job can be changed to IGNORED. Current job status is "
228                                + coordJob.getStatus() + " and pending status is " + coordJob.isPending());
229            }
230        }
231    }
232
233    /**
234     * Process lookahead created actions that become invalid because of the new pause time,
235     * These actions will be deleted from DB, also the coordinator job will be updated accordingly
236     *
237     * @param coordJob coordinator job
238     * @param newPauseTime new pause time
239     * @throws JPAExecutorException, CommandException
240     */
241    private void processLookaheadActions(CoordinatorJobBean coordJob, Date newTime) throws CommandException,
242            JPAExecutorException {
243        int lastActionNumber = coordJob.getLastActionNumber();
244        Date  lastActionTime = null;
245        Date  tempDate = null;
246
247        while ((tempDate = deleteAction(lastActionNumber, newTime)) != null) {
248            lastActionNumber--;
249            lastActionTime = tempDate;
250        }
251        if (lastActionTime != null) {
252            LOG.debug("New pause/end date is : " + newTime + " and last action number is : " + lastActionNumber);
253            coordJob.setLastActionNumber(lastActionNumber);
254            coordJob.setLastActionTime(lastActionTime);
255            coordJob.setNextMaterializedTime(lastActionTime);
256            coordJob.resetDoneMaterialization();
257        }
258    }
259
260    /**
261     * Delete coordinator action
262     *
263     * @param actionNum coordinator action number
264     */
265    private Date deleteAction(int actionNum, Date afterDate) throws CommandException {
266        try {
267            if (actionNum <= 0) {
268                return null;
269            }
270
271            String actionId = jobId + "@" + actionNum;
272            CoordinatorActionBean bean = CoordActionQueryExecutor.getInstance().getIfExist(
273                    CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTION, actionId);
274            if (bean == null) {
275                return null;
276            }
277            if (afterDate.compareTo(bean.getNominalTime()) <= 0) {
278                if (bean.getStatus() == CoordinatorAction.Status.WAITING
279                        || bean.getStatus() == CoordinatorAction.Status.READY) {
280                    // delete SLA registration entry (if any) for action
281                    if (SLAService.isEnabled()) {
282                        Services.get().get(SLAService.class).removeRegistration(actionId);
283                    }
284                    SLARegistrationBean slaReg = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, actionId);
285                    if (slaReg != null) {
286                        LOG.debug("Deleting registration bean corresponding to action " + slaReg.getId());
287                        deleteList.add(slaReg);
288                    }
289                    SLASummaryBean slaSummaryBean = SLASummaryQueryExecutor.getInstance().get(
290                            SLASummaryQuery.GET_SLA_SUMMARY, actionId);
291                    if (slaSummaryBean != null) {
292                        LOG.debug("Deleting summary bean corresponding to action " + slaSummaryBean.getId());
293                        deleteList.add(slaSummaryBean);
294                    }
295                    deleteList.add(bean);
296                }
297                else {
298                    throw new CommandException(ErrorCode.E1022, bean.getId());
299                }
300                return bean.getNominalTime();
301            }
302            else {
303                return null;
304            }
305
306        }
307        catch (JPAExecutorException e) {
308            throw new CommandException(e);
309        }
310    }
311
312    /**
313     * Check if new end time, new concurrency, new pause time are valid.
314     *
315     * @param coordJob coordinator job id.
316     * @param newEndTime new end time.
317     * @param newConcurrency new concurrency.
318     * @param newPauseTime new pause time.
319     * @throws CommandException thrown if new values are not valid.
320     */
321    private void check(CoordinatorJobBean coordJob, Date newEndTime, Integer newConcurrency, Date newPauseTime,
322            CoordinatorJob.Status jobStatus) throws CommandException {
323
324        if (coordJob.getStatus() == CoordinatorJob.Status.KILLED
325                || coordJob.getStatus() == CoordinatorJob.Status.IGNORED) {
326            if (jobStatus == null || (newEndTime != null || newConcurrency != null || newPauseTime != null)) {
327                throw new CommandException(ErrorCode.E1016);
328            }
329        }
330
331        if (newEndTime != null) {
332            checkEndTime(coordJob, newEndTime);
333        }
334
335        if (newPauseTime != null) {
336            checkPauseTime(coordJob, newPauseTime);
337        }
338        if (jobStatus != null) {
339            checkStatusChange(coordJob, jobStatus);
340        }
341    }
342
343    /* (non-Javadoc)
344     * @see org.apache.oozie.command.XCommand#execute()
345     */
346    @Override
347    protected Void execute() throws CommandException {
348        LOG.info("STARTED CoordChangeXCommand for jobId=" + jobId);
349
350        try {
351            oldConcurrency = this.coordJob.getConcurrency();
352            if (newEndTime != null) {
353                // during coord materialization, nextMaterializedTime is set to
354                // startTime + n(actions materialized) * frequency and this can be AFTER endTime,
355                // while doneMaterialization is true. Hence the following checks
356                // for newEndTime being in the middle of endTime and nextMatdTime.
357                // Since job is already done materialization so no need to change
358                boolean dontChange = coordJob.getEndTime().before(newEndTime)
359                        && coordJob.getNextMaterializedTime() != null
360                        && coordJob.getNextMaterializedTime().after(newEndTime);
361                if (!dontChange) {
362                    coordJob.setEndTime(newEndTime);
363                    // OOZIE-1703, we should SUCCEEDED the coord, if it's in PREP and new endtime is before start time
364                    if (coordJob.getStartTime().compareTo(newEndTime) >= 0) {
365                        if (coordJob.getStatus() != CoordinatorJob.Status.PREP) {
366                            processLookaheadActions(coordJob, newEndTime);
367                        }
368                        if (coordJob.getStatus() == CoordinatorJob.Status.PREP
369                                || coordJob.getStatus() == CoordinatorJob.Status.RUNNING) {
370                            LOG.info("Changing coord status to SUCCEEDED, because it's in " + coordJob.getStatus()
371                                    + " and new end time is before start time. Startime is " + coordJob.getStartTime()
372                                    + " and new end time is " + newEndTime);
373
374                            coordJob.setStatus(CoordinatorJob.Status.SUCCEEDED);
375                            coordJob.resetPending();
376                        }
377                        coordJob.setDoneMaterialization();
378                    }
379                    else {
380                        // move it to running iff new end time is after starttime.
381                        if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED) {
382                            coordJob.setStatus(CoordinatorJob.Status.RUNNING);
383                        }
384                        if (coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR
385                                || coordJob.getStatus() == CoordinatorJob.Status.FAILED) {
386                            // Check for backward compatibility for Oozie versions (3.2 and before)
387                            // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
388                            // PAUSEDWITHERROR is not supported
389                            coordJob.setStatus(StatusUtils
390                                    .getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR));
391                        }
392                        coordJob.setPending();
393                        coordJob.resetDoneMaterialization();
394                        processLookaheadActions(coordJob, newEndTime);
395                    }
396                }
397
398                else {
399                    LOG.info("Didn't change endtime. Endtime is in between coord end time and next materialization time."
400                            + "Coord endTime = " + DateUtils.formatDateOozieTZ(newEndTime)
401                            + " next materialization time ="
402                            + DateUtils.formatDateOozieTZ(coordJob.getNextMaterializedTime()));
403                }
404            }
405
406            if (newConcurrency != null) {
407                this.coordJob.setConcurrency(newConcurrency);
408            }
409
410            if (newPauseTime != null || resetPauseTime == true) {
411                this.coordJob.setPauseTime(newPauseTime);
412                if (oldPauseTime != null && newPauseTime != null) {
413                    if (oldPauseTime.before(newPauseTime)) {
414                        if (this.coordJob.getStatus() == Job.Status.PAUSED) {
415                            this.coordJob.setStatus(Job.Status.RUNNING);
416                        }
417                        else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
418                            this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR);
419                        }
420                    }
421                }
422                else if (oldPauseTime != null && newPauseTime == null) {
423                    if (this.coordJob.getStatus() == Job.Status.PAUSED) {
424                        this.coordJob.setStatus(Job.Status.RUNNING);
425                    }
426                    else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
427                        this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR);
428                    }
429                }
430                if (!resetPauseTime) {
431                    processLookaheadActions(coordJob, newPauseTime);
432                }
433            }
434            if (jobStatus != null) {
435                coordJob.setStatus(jobStatus);
436                LOG.info("Coord status is changed to " + jobStatus + " from " + prevStatus);
437                if (jobStatus.equals(CoordinatorJob.Status.RUNNING)) {
438                    coordJob.setPending();
439                    if (coordJob.getNextMaterializedTime() != null
440                            && coordJob.getEndTime().after(coordJob.getNextMaterializedTime())) {
441                        coordJob.resetDoneMaterialization();
442                    }
443                } else if (jobStatus.equals(CoordinatorJob.Status.IGNORED)) {
444                    coordJob.resetPending();
445                    coordJob.setDoneMaterialization();
446                }
447            }
448
449            if (coordJob.getNextMaterializedTime() != null && coordJob.getEndTime().compareTo(coordJob.getNextMaterializedTime()) <= 0) {
450                LOG.info("[" + coordJob.getId() + "]: all actions have been materialized, job status = " + coordJob.getStatus()
451                        + ", set pending to true");
452                // set doneMaterialization to true when materialization is done
453                coordJob.setDoneMaterialization();
454            }
455
456            coordJob.setLastModifiedTime(new Date());
457            updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_CHANGE, coordJob));
458            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, deleteList);
459
460            if (newConcurrency != null && newConcurrency > oldConcurrency) {
461                queue(new CoordActionReadyXCommand(jobId));
462            }
463
464            return null;
465        }
466        catch (XException ex) {
467            throw new CommandException(ex);
468        }
469        finally {
470            LOG.info("ENDED CoordChangeXCommand for jobId=" + jobId);
471            // update bundle action
472            if (coordJob.getBundleId() != null) {
473                //ignore pending as it'sync command
474                BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus, true);
475                bundleStatusUpdate.call();
476            }
477        }
478    }
479
480    /* (non-Javadoc)
481     * @see org.apache.oozie.command.XCommand#getEntityKey()
482     */
483    @Override
484    public String getEntityKey() {
485        return this.jobId;
486    }
487
488    /* (non-Javadoc)
489     * @see org.apache.oozie.command.XCommand#loadState()
490     */
491    @Override
492    protected void loadState() throws CommandException{
493        jpaService = Services.get().get(JPAService.class);
494
495        if (jpaService == null) {
496            throw new CommandException(ErrorCode.E0610);
497        }
498
499        try {
500            this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
501            oldPauseTime = coordJob.getPauseTime();
502            prevStatus = coordJob.getStatus();
503        }
504        catch (JPAExecutorException e) {
505            throw new CommandException(e);
506        }
507
508        LogUtils.setLogInfo(this.coordJob);
509    }
510
511    /* (non-Javadoc)
512     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
513     */
514    @Override
515    protected void verifyPrecondition() throws CommandException,PreconditionException {
516        check(this.coordJob, newEndTime, newConcurrency, newPauseTime, jobStatus);
517    }
518
519    /* (non-Javadoc)
520     * @see org.apache.oozie.command.XCommand#isLockRequired()
521     */
522    @Override
523    protected boolean isLockRequired() {
524        return true;
525    }
526}