001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.coord;
019
020import java.util.ArrayList;
021import java.util.Date;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Map;
025import java.util.Map.Entry;
026import java.util.Set;
027
028import org.apache.commons.lang.StringUtils;
029import org.apache.oozie.CoordinatorActionBean;
030import org.apache.oozie.CoordinatorJobBean;
031import org.apache.oozie.ErrorCode;
032import org.apache.oozie.XException;
033import org.apache.oozie.client.CoordinatorAction;
034import org.apache.oozie.client.CoordinatorJob;
035import org.apache.oozie.client.Job;
036import org.apache.oozie.client.OozieClient;
037import org.apache.oozie.client.rest.JsonBean;
038import org.apache.oozie.command.CommandException;
039import org.apache.oozie.command.PreconditionException;
040import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
041import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
042import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
043import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
044import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
045import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
046import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
047import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
048import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
049import org.apache.oozie.executor.jpa.JPAExecutorException;
050import org.apache.oozie.executor.jpa.BatchQueryExecutor;
051import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
052import org.apache.oozie.service.JPAService;
053import org.apache.oozie.service.Services;
054import org.apache.oozie.sla.SLARegistrationBean;
055import org.apache.oozie.sla.SLASummaryBean;
056import org.apache.oozie.sla.service.SLAService;
057import org.apache.oozie.util.DateUtils;
058import org.apache.oozie.util.JobUtils;
059import org.apache.oozie.util.LogUtils;
060import org.apache.oozie.util.ParamChecker;
061import org.apache.oozie.util.StatusUtils;
062
063public class CoordChangeXCommand extends CoordinatorXCommand<Void> {
064    private final String jobId;
065    private Date newEndTime = null;
066    private Integer newConcurrency = null;
067    private Date newPauseTime = null;
068    private Date oldPauseTime = null;
069    private boolean resetPauseTime = false;
070    private CoordinatorJob.Status jobStatus = null;
071    private CoordinatorJobBean coordJob;
072    private JPAService jpaService = null;
073    private Job.Status prevStatus;
074    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
075    private List<JsonBean> deleteList = new ArrayList<JsonBean>();
076
077    private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>();
078    static {
079        ALLOWED_CHANGE_OPTIONS.add("endtime");
080        ALLOWED_CHANGE_OPTIONS.add("concurrency");
081        ALLOWED_CHANGE_OPTIONS.add("pausetime");
082        ALLOWED_CHANGE_OPTIONS.add(OozieClient.CHANGE_VALUE_STATUS);
083
084    }
085
086    /**
087     * This command is used to update the Coordinator job with the new values Update the coordinator job bean and update
088     * that to database.
089     *
090     * @param id Coordinator job id.
091     * @param changeValue This the changed value in the form key=value.
092     * @throws CommandException thrown if changeValue cannot be parsed properly.
093     */
094    public CoordChangeXCommand(String id, String changeValue) throws CommandException {
095        super("coord_change", "coord_change", 0);
096        this.jobId = ParamChecker.notEmpty(id, "id");
097        ParamChecker.notEmpty(changeValue, "value");
098
099        validateChangeValue(changeValue);
100    }
101
102    /**
103     * @param changeValue change value.
104     * @throws CommandException thrown if changeValue cannot be parsed properly.
105     */
106    private void validateChangeValue(String changeValue) throws CommandException {
107        Map<String, String> map = JobUtils.parseChangeValue(changeValue);
108
109        if (map.size() > ALLOWED_CHANGE_OPTIONS.size()) {
110            throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime|status");
111        }
112
113        java.util.Iterator<Entry<String, String>> iter = map.entrySet().iterator();
114        while (iter.hasNext()) {
115            Entry<String, String> entry = iter.next();
116            String key = entry.getKey();
117            String value = entry.getValue();
118
119            if (!ALLOWED_CHANGE_OPTIONS.contains(key)) {
120                throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime|status");
121            }
122
123            if (!key.equals(OozieClient.CHANGE_VALUE_PAUSETIME) && value.equalsIgnoreCase("")) {
124                throw new CommandException(ErrorCode.E1015, changeValue, "value on " + key + " can not be empty");
125            }
126        }
127
128        if (map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)) {
129            String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME);
130            try {
131                newEndTime = DateUtils.parseDateOozieTZ(value);
132            }
133            catch (Exception ex) {
134                throw new CommandException(ErrorCode.E1015, value, "must be a valid date");
135            }
136        }
137
138        if (map.containsKey(OozieClient.CHANGE_VALUE_CONCURRENCY)) {
139            String value = map.get(OozieClient.CHANGE_VALUE_CONCURRENCY);
140            try {
141                newConcurrency = Integer.parseInt(value);
142            }
143            catch (NumberFormatException ex) {
144                throw new CommandException(ErrorCode.E1015, value, "must be a valid integer");
145            }
146        }
147
148        if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) {
149            String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME);
150            if (value.equals("")) { // this is to reset pause time to null;
151                resetPauseTime = true;
152            }
153            else {
154                try {
155                    newPauseTime = DateUtils.parseDateOozieTZ(value);
156                }
157                catch (Exception ex) {
158                    throw new CommandException(ErrorCode.E1015, value, "must be a valid date");
159                }
160            }
161        }
162
163        if (map.containsKey(OozieClient.CHANGE_VALUE_STATUS)) {
164            String value = map.get(OozieClient.CHANGE_VALUE_STATUS);
165            if (!StringUtils.isEmpty(value)) {
166                jobStatus = CoordinatorJob.Status.valueOf(value);
167            }
168        }
169    }
170
171    /**
172     * Check if new end time is valid.
173     *
174     * @param coordJob coordinator job id.
175     * @param newEndTime new end time.
176     * @throws CommandException thrown if new end time is not valid.
177     */
178    private void checkEndTime(CoordinatorJobBean coordJob, Date newEndTime) throws CommandException {
179        //It's ok to set end date before start date.
180    }
181
182    /**
183     * Check if new pause time is valid.
184     *
185     * @param coordJob coordinator job id.
186     * @param newPauseTime new pause time.
187     * @param newEndTime new end time, can be null meaning no change on end time.
188     * @throws CommandException thrown if new pause time is not valid.
189     */
190    private void checkPauseTime(CoordinatorJobBean coordJob, Date newPauseTime)
191            throws CommandException {
192        //no check.
193    }
194
195    /**
196     * Check if status change is valid.
197     *
198     * @param coordJob the coord job
199     * @param jobStatus the job status
200     * @throws CommandException the command exception
201     */
202    private void checkStatusChange(CoordinatorJobBean coordJob, CoordinatorJob.Status jobStatus)
203            throws CommandException {
204        if (!jobStatus.equals(CoordinatorJob.Status.RUNNING) && !jobStatus.equals(CoordinatorJob.Status.IGNORED)) {
205            throw new CommandException(ErrorCode.E1015, jobStatus, " must be RUNNING or IGNORED");
206        }
207
208        if (jobStatus.equals(CoordinatorJob.Status.RUNNING)) {
209            if (!(coordJob.getStatus().equals(CoordinatorJob.Status.FAILED) || coordJob.getStatus().equals(
210                    CoordinatorJob.Status.KILLED) || coordJob.getStatus().equals(CoordinatorJob.Status.IGNORED))) {
211                throw new CommandException(ErrorCode.E1015, jobStatus,
212                        " Only FAILED, KILLED, IGNORED job can be changed to RUNNING. Current job status is "
213                                + coordJob.getStatus());
214            }
215        }
216        else {
217            if (!(coordJob.getStatus().equals(CoordinatorJob.Status.FAILED) || coordJob.getStatus().equals(
218                    CoordinatorJob.Status.KILLED))
219                    || coordJob.isPending()) {
220                throw new CommandException(ErrorCode.E1015, jobStatus,
221                        " Only FAILED or KILLED non-pending job can be changed to IGNORED. Current job status is "
222                                + coordJob.getStatus() + " and pending status is " + coordJob.isPending());
223            }
224        }
225    }
226
227    /**
228     * Process lookahead created actions that become invalid because of the new pause time,
229     * These actions will be deleted from DB, also the coordinator job will be updated accordingly
230     *
231     * @param coordJob coordinator job
232     * @param newPauseTime new pause time
233     * @throws JPAExecutorException, CommandException
234     */
235    private void processLookaheadActions(CoordinatorJobBean coordJob, Date newTime) throws CommandException,
236            JPAExecutorException {
237        int lastActionNumber = coordJob.getLastActionNumber();
238        Date  lastActionTime = null;
239        Date  tempDate = null;
240
241        while ((tempDate = deleteAction(lastActionNumber, newTime)) != null) {
242            lastActionNumber--;
243            lastActionTime = tempDate;
244        }
245        if (lastActionTime != null) {
246            LOG.debug("New pause/end date is : " + newTime + " and last action number is : " + lastActionNumber);
247            coordJob.setLastActionNumber(lastActionNumber);
248            coordJob.setLastActionTime(lastActionTime);
249            coordJob.setNextMaterializedTime(lastActionTime);
250            coordJob.resetDoneMaterialization();
251        }
252    }
253
254    /**
255     * Delete coordinator action
256     *
257     * @param actionNum coordinator action number
258     */
259    private Date deleteAction(int actionNum, Date afterDate) throws CommandException {
260        try {
261            if (actionNum <= 0) {
262                return null;
263            }
264
265            String actionId = jpaService.execute(new CoordJobGetActionByActionNumberJPAExecutor(jobId, actionNum));
266            CoordinatorActionBean bean = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
267            if (afterDate.compareTo(bean.getNominalTime()) <= 0) {
268                // delete SLA registration entry (if any) for action
269                if (SLAService.isEnabled()) {
270                    Services.get().get(SLAService.class).removeRegistration(actionId);
271                }
272                SLARegistrationBean slaReg = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, actionId);
273                if (slaReg != null) {
274                    LOG.debug("Deleting registration bean corresponding to action " + slaReg.getId());
275                    deleteList.add(slaReg);
276                }
277                SLASummaryBean slaSummaryBean = SLASummaryQueryExecutor.getInstance().get(
278                        SLASummaryQuery.GET_SLA_SUMMARY, actionId);
279                if (slaSummaryBean != null) {
280                    LOG.debug("Deleting summary bean corresponding to action " + slaSummaryBean.getId());
281                    deleteList.add(slaSummaryBean);
282                }
283                if (bean.getStatus() == CoordinatorAction.Status.WAITING
284                        || bean.getStatus() == CoordinatorAction.Status.READY) {
285                    deleteList.add(bean);
286                }
287                else {
288                    throw new CommandException(ErrorCode.E1022, bean.getId());
289                }
290                return bean.getNominalTime();
291            }
292            else {
293                return null;
294            }
295
296        }
297        catch (JPAExecutorException e) {
298            throw new CommandException(e);
299        }
300    }
301
302    /**
303     * Check if new end time, new concurrency, new pause time are valid.
304     *
305     * @param coordJob coordinator job id.
306     * @param newEndTime new end time.
307     * @param newConcurrency new concurrency.
308     * @param newPauseTime new pause time.
309     * @throws CommandException thrown if new values are not valid.
310     */
311    private void check(CoordinatorJobBean coordJob, Date newEndTime, Integer newConcurrency, Date newPauseTime,
312            CoordinatorJob.Status jobStatus) throws CommandException {
313
314        if (coordJob.getStatus() == CoordinatorJob.Status.KILLED
315                || coordJob.getStatus() == CoordinatorJob.Status.IGNORED) {
316            if (jobStatus == null || (newEndTime != null || newConcurrency != null || newPauseTime != null)) {
317                throw new CommandException(ErrorCode.E1016);
318            }
319        }
320
321        if (newEndTime != null) {
322            checkEndTime(coordJob, newEndTime);
323        }
324
325        if (newPauseTime != null) {
326            checkPauseTime(coordJob, newPauseTime);
327        }
328        if (jobStatus != null) {
329            checkStatusChange(coordJob, jobStatus);
330        }
331    }
332
333    /* (non-Javadoc)
334     * @see org.apache.oozie.command.XCommand#execute()
335     */
336    @Override
337    protected Void execute() throws CommandException {
338        LOG.info("STARTED CoordChangeXCommand for jobId=" + jobId);
339
340        try {
341            if (newEndTime != null) {
342                // during coord materialization, nextMaterializedTime is set to
343                // startTime + n(actions materialized) * frequency and this can be AFTER endTime,
344                // while doneMaterialization is true. Hence the following checks
345                // for newEndTime being in the middle of endTime and nextMatdTime.
346                // Since job is already done materialization so no need to change
347                boolean dontChange = coordJob.getEndTime().before(newEndTime)
348                        && coordJob.getNextMaterializedTime() != null
349                        && coordJob.getNextMaterializedTime().after(newEndTime);
350                if (!dontChange) {
351                    coordJob.setEndTime(newEndTime);
352                    // OOZIE-1703, we should SUCCEEDED the coord, if it's in PREP and new endtime is before start time
353                    if (coordJob.getStartTime().compareTo(newEndTime) >= 0) {
354                        if (coordJob.getStatus() != CoordinatorJob.Status.PREP) {
355                            processLookaheadActions(coordJob, newEndTime);
356                        }
357                        if (coordJob.getStatus() == CoordinatorJob.Status.PREP
358                                || coordJob.getStatus() == CoordinatorJob.Status.RUNNING) {
359                            LOG.info("Changing coord status to SUCCEEDED, because it's in " + coordJob.getStatus()
360                                    + " and new end time is before start time. Startime is " + coordJob.getStartTime()
361                                    + " and new end time is " + newEndTime);
362
363                            coordJob.setStatus(CoordinatorJob.Status.SUCCEEDED);
364                            coordJob.resetPending();
365                        }
366                        coordJob.setDoneMaterialization();
367                    }
368                    else {
369                        // move it to running iff new end time is after starttime.
370                        if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED) {
371                            coordJob.setStatus(CoordinatorJob.Status.RUNNING);
372                        }
373                        if (coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR
374                                || coordJob.getStatus() == CoordinatorJob.Status.FAILED) {
375                            // Check for backward compatibility for Oozie versions (3.2 and before)
376                            // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
377                            // PAUSEDWITHERROR is not supported
378                            coordJob.setStatus(StatusUtils
379                                    .getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR));
380                        }
381                        coordJob.setPending();
382                        coordJob.resetDoneMaterialization();
383                        processLookaheadActions(coordJob, newEndTime);
384                    }
385                }
386
387                else {
388                    LOG.info("Didn't change endtime. Endtime is in between coord end time and next materialization time."
389                            + "Coord endTime = " + DateUtils.formatDateOozieTZ(newEndTime)
390                            + " next materialization time ="
391                            + DateUtils.formatDateOozieTZ(coordJob.getNextMaterializedTime()));
392                }
393            }
394
395            if (newConcurrency != null) {
396                this.coordJob.setConcurrency(newConcurrency);
397            }
398
399            if (newPauseTime != null || resetPauseTime == true) {
400                this.coordJob.setPauseTime(newPauseTime);
401                if (oldPauseTime != null && newPauseTime != null) {
402                    if (oldPauseTime.before(newPauseTime)) {
403                        if (this.coordJob.getStatus() == Job.Status.PAUSED) {
404                            this.coordJob.setStatus(Job.Status.RUNNING);
405                        }
406                        else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
407                            this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR);
408                        }
409                    }
410                }
411                else if (oldPauseTime != null && newPauseTime == null) {
412                    if (this.coordJob.getStatus() == Job.Status.PAUSED) {
413                        this.coordJob.setStatus(Job.Status.RUNNING);
414                    }
415                    else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
416                        this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR);
417                    }
418                }
419                if (!resetPauseTime) {
420                    processLookaheadActions(coordJob, newPauseTime);
421                }
422            }
423            if (jobStatus != null) {
424                coordJob.setStatus(jobStatus);
425                LOG.info("Coord status is changed to " + jobStatus + " from " + prevStatus);
426                if (jobStatus.equals(CoordinatorJob.Status.RUNNING)) {
427                    coordJob.setPending();
428                    if (coordJob.getNextMaterializedTime() != null
429                            && coordJob.getEndTime().after(coordJob.getNextMaterializedTime())) {
430                        coordJob.resetDoneMaterialization();
431                    }
432                } else if (jobStatus.equals(CoordinatorJob.Status.IGNORED)) {
433                    coordJob.resetPending();
434                    coordJob.setDoneMaterialization();
435                }
436            }
437
438            if (coordJob.getNextMaterializedTime() != null && coordJob.getEndTime().compareTo(coordJob.getNextMaterializedTime()) <= 0) {
439                LOG.info("[" + coordJob.getId() + "]: all actions have been materialized, job status = " + coordJob.getStatus()
440                        + ", set pending to true");
441                // set doneMaterialization to true when materialization is done
442                coordJob.setDoneMaterialization();
443            }
444
445            coordJob.setLastModifiedTime(new Date());
446            updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_CHANGE, coordJob));
447            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, deleteList);
448
449            return null;
450        }
451        catch (XException ex) {
452            throw new CommandException(ex);
453        }
454        finally {
455            LOG.info("ENDED CoordChangeXCommand for jobId=" + jobId);
456            // update bundle action
457            if (coordJob.getBundleId() != null) {
458                //ignore pending as it'sync command
459                BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus, true);
460                bundleStatusUpdate.call();
461            }
462        }
463    }
464
465    /* (non-Javadoc)
466     * @see org.apache.oozie.command.XCommand#getEntityKey()
467     */
468    @Override
469    public String getEntityKey() {
470        return this.jobId;
471    }
472
473    /* (non-Javadoc)
474     * @see org.apache.oozie.command.XCommand#loadState()
475     */
476    @Override
477    protected void loadState() throws CommandException{
478        jpaService = Services.get().get(JPAService.class);
479
480        if (jpaService == null) {
481            throw new CommandException(ErrorCode.E0610);
482        }
483
484        try {
485            this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
486            oldPauseTime = coordJob.getPauseTime();
487            prevStatus = coordJob.getStatus();
488        }
489        catch (JPAExecutorException e) {
490            throw new CommandException(e);
491        }
492
493        LogUtils.setLogInfo(this.coordJob, logInfo);
494    }
495
496    /* (non-Javadoc)
497     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
498     */
499    @Override
500    protected void verifyPrecondition() throws CommandException,PreconditionException {
501        check(this.coordJob, newEndTime, newConcurrency, newPauseTime, jobStatus);
502    }
503
504    /* (non-Javadoc)
505     * @see org.apache.oozie.command.XCommand#isLockRequired()
506     */
507    @Override
508    protected boolean isLockRequired() {
509        return true;
510    }
511}