001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
020    import java.util.ArrayList;
021    import java.util.Calendar;
022    import java.util.Date;
023    import java.util.HashSet;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Map.Entry;
027    import java.util.Set;
029    import org.apache.oozie.CoordinatorActionBean;
030    import org.apache.oozie.CoordinatorJobBean;
031    import org.apache.oozie.ErrorCode;
032    import org.apache.oozie.XException;
033    import org.apache.oozie.client.CoordinatorJob;
034    import org.apache.oozie.client.Job;
035    import org.apache.oozie.client.OozieClient;
036    import org.apache.oozie.client.rest.JsonBean;
037    import org.apache.oozie.command.CommandException;
038    import org.apache.oozie.command.PreconditionException;
039    import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
040    import org.apache.oozie.coord.TimeUnit;
041    import org.apache.oozie.executor.jpa.BulkUpdateDeleteJPAExecutor;
042    import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
043    import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
044    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
045    import org.apache.oozie.executor.jpa.JPAExecutorException;
046    import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
047    import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
048    import org.apache.oozie.service.JPAService;
049    import org.apache.oozie.service.Services;
050    import org.apache.oozie.sla.SLARegistrationBean;
051    import org.apache.oozie.sla.SLASummaryBean;
052    import org.apache.oozie.sla.service.SLAService;
053    import org.apache.oozie.util.DateUtils;
054    import org.apache.oozie.util.JobUtils;
055    import org.apache.oozie.util.LogUtils;
056    import org.apache.oozie.util.ParamChecker;
057    import org.apache.oozie.util.StatusUtils;
059    public class CoordChangeXCommand extends CoordinatorXCommand<Void> {
060        private final String jobId;
061        private Date newEndTime = null;
062        private Integer newConcurrency = null;
063        private Date newPauseTime = null;
064        private Date oldPauseTime = null;
065        private boolean resetPauseTime = false;
066        private CoordinatorJobBean coordJob;
067        private JPAService jpaService = null;
068        private Job.Status prevStatus;
069        private List<JsonBean> updateList = new ArrayList<JsonBean>();
070        private List<JsonBean> deleteList = new ArrayList<JsonBean>();
072        private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>();
073        static {
074            ALLOWED_CHANGE_OPTIONS.add("endtime");
075            ALLOWED_CHANGE_OPTIONS.add("concurrency");
076            ALLOWED_CHANGE_OPTIONS.add("pausetime");
077        }
079        /**
080         * This command is used to update the Coordinator job with the new values Update the coordinator job bean and update
081         * that to database.
082         *
083         * @param id Coordinator job id.
084         * @param changeValue This the changed value in the form key=value.
085         * @throws CommandException thrown if changeValue cannot be parsed properly.
086         */
087        public CoordChangeXCommand(String id, String changeValue) throws CommandException {
088            super("coord_change", "coord_change", 0);
089            this.jobId = ParamChecker.notEmpty(id, "id");
090            ParamChecker.notEmpty(changeValue, "value");
092            validateChangeValue(changeValue);
093        }
095        /**
096         * @param changeValue change value.
097         * @throws CommandException thrown if changeValue cannot be parsed properly.
098         */
099        private void validateChangeValue(String changeValue) throws CommandException {
100            Map<String, String> map = JobUtils.parseChangeValue(changeValue);
102            if (map.size() > ALLOWED_CHANGE_OPTIONS.size()) {
103                throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime");
104            }
106            java.util.Iterator<Entry<String, String>> iter = map.entrySet().iterator();
107            while (iter.hasNext()) {
108                Entry<String, String> entry = iter.next();
109                String key = entry.getKey();
110                String value = entry.getValue();
112                if (!ALLOWED_CHANGE_OPTIONS.contains(key)) {
113                    throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime");
114                }
116                if (!key.equals(OozieClient.CHANGE_VALUE_PAUSETIME) && value.equalsIgnoreCase("")) {
117                    throw new CommandException(ErrorCode.E1015, changeValue, "value on " + key + " can not be empty");
118                }
119            }
121            if (map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)) {
122                String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME);
123                try {
124                    newEndTime = DateUtils.parseDateOozieTZ(value);
125                }
126                catch (Exception ex) {
127                    throw new CommandException(ErrorCode.E1015, value, "must be a valid date");
128                }
129            }
131            if (map.containsKey(OozieClient.CHANGE_VALUE_CONCURRENCY)) {
132                String value = map.get(OozieClient.CHANGE_VALUE_CONCURRENCY);
133                try {
134                    newConcurrency = Integer.parseInt(value);
135                }
136                catch (NumberFormatException ex) {
137                    throw new CommandException(ErrorCode.E1015, value, "must be a valid integer");
138                }
139            }
141            if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) {
142                String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME);
143                if (value.equals("")) { // this is to reset pause time to null;
144                    resetPauseTime = true;
145                }
146                else {
147                    try {
148                        newPauseTime = DateUtils.parseDateOozieTZ(value);
149                    }
150                    catch (Exception ex) {
151                        throw new CommandException(ErrorCode.E1015, value, "must be a valid date");
152                    }
153                }
154            }
155        }
157        /**
158         * Returns the actual last action time(one instance before coordJob.lastActionTime)
159         * @return Date - last action time if coordJob.getLastActionTime() is not null, null otherwise
160         */
161        private Date getLastActionTime() {
162            if(coordJob.getLastActionTime() == null)
163                return null;
165            Calendar d = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone()));
166            d.setTime(coordJob.getLastActionTime());
167            TimeUnit timeUnit = TimeUnit.valueOf(coordJob.getTimeUnitStr());
168            d.add(timeUnit.getCalendarUnit(), -Integer.valueOf(coordJob.getFrequency()));
169            return d.getTime();
170        }
172        /**
173         * Check if new end time is valid.
174         *
175         * @param coordJob coordinator job id.
176         * @param newEndTime new end time.
177         * @throws CommandException thrown if new end time is not valid.
178         */
179        private void checkEndTime(CoordinatorJobBean coordJob, Date newEndTime) throws CommandException {
180            // New endTime cannot be before coordinator job's start time.
181            Date startTime = coordJob.getStartTime();
182            if (newEndTime.before(startTime)) {
183                throw new CommandException(ErrorCode.E1015, newEndTime, "cannot be before coordinator job's start time ["
184                        + startTime + "]");
185            }
187            // New endTime cannot be before coordinator job's last action time.
188            Date lastActionTime = getLastActionTime();
189            if (lastActionTime != null) {
190                if (!newEndTime.after(lastActionTime)) {
191                    throw new CommandException(ErrorCode.E1015, newEndTime,
192                            "must be after coordinator job's last action time [" + lastActionTime + "]");
193                }
194            }
195        }
197        /**
198         * Check if new pause time is valid.
199         *
200         * @param coordJob coordinator job id.
201         * @param newPauseTime new pause time.
202         * @param newEndTime new end time, can be null meaning no change on end time.
203         * @throws CommandException thrown if new pause time is not valid.
204         */
205        private void checkPauseTime(CoordinatorJobBean coordJob, Date newPauseTime)
206                throws CommandException {
207            // New pauseTime has to be a non-past time.
208            Date d = new Date();
209            if (newPauseTime.before(d)) {
210                throw new CommandException(ErrorCode.E1015, newPauseTime, "must be a non-past time");
211            }
212        }
214        /**
215         * Process lookahead created actions that become invalid because of the new pause time,
216         * These actions will be deleted from DB, also the coordinator job will be updated accordingly
217         *
218         * @param coordJob coordinator job
219         * @param newPauseTime new pause time
220         */
221        private void processLookaheadActions(CoordinatorJobBean coordJob, Date newPauseTime) throws CommandException {
222            Date lastActionTime = coordJob.getLastActionTime();
223            if (lastActionTime != null) {
224                // d is the real last action time.
225                Calendar d = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone()));
226                d.setTime(getLastActionTime());
227                TimeUnit timeUnit = TimeUnit.valueOf(coordJob.getTimeUnitStr());
229                int lastActionNumber = coordJob.getLastActionNumber();
231                boolean hasChanged = false;
232                while (true) {
233                    if (!newPauseTime.after(d.getTime())) {
234                        deleteAction(lastActionNumber);
235                        d.add(timeUnit.getCalendarUnit(), -Integer.valueOf(coordJob.getFrequency()));
236                        lastActionNumber = lastActionNumber - 1;
238                        hasChanged = true;
239                    }
240                    else {
241                        break;
242                    }
243                }
245                if (hasChanged == true) {
246                    coordJob.setLastActionNumber(lastActionNumber);
247                    d.add(timeUnit.getCalendarUnit(), Integer.valueOf(coordJob.getFrequency()));
248                    Date d1 = d.getTime();
249                    coordJob.setLastActionTime(d1);
250                    coordJob.setNextMaterializedTime(d1);
251                    coordJob.resetDoneMaterialization();
252                }
253            }
254        }
256        /**
257         * Delete coordinator action
258         *
259         * @param actionNum coordinator action number
260         */
261        private void deleteAction(int actionNum) throws CommandException {
262            try {
263                String actionId = jpaService.execute(new CoordJobGetActionByActionNumberJPAExecutor(jobId, actionNum));
264                CoordinatorActionBean bean = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
265                // delete SLA registration entry (if any) for action
266                if (SLAService.isEnabled()) {
267                    Services.get().get(SLAService.class).removeRegistration(actionId);
268                }
269                SLARegistrationBean slaReg = jpaService.execute(new SLARegistrationGetJPAExecutor(actionId));
270                if (slaReg != null) {
271                    LOG.debug("Deleting registration bean corresponding to action " + slaReg.getId());
272                    deleteList.add(slaReg);
273                }
274                SLASummaryBean slaSummaryBean = jpaService.execute(new SLASummaryGetJPAExecutor(actionId));
275                if (slaSummaryBean != null) {
276                    LOG.debug("Deleting summary bean corresponding to action " + slaSummaryBean.getId());
277                    deleteList.add(slaSummaryBean);
278                }
279                deleteList.add(bean);
280            }
281            catch (JPAExecutorException e) {
282                throw new CommandException(e);
283            }
284        }
286        /**
287         * Check if new end time, new concurrency, new pause time are valid.
288         *
289         * @param coordJob coordinator job id.
290         * @param newEndTime new end time.
291         * @param newConcurrency new concurrency.
292         * @param newPauseTime new pause time.
293         * @throws CommandException thrown if new values are not valid.
294         */
295        private void check(CoordinatorJobBean coordJob, Date newEndTime, Integer newConcurrency, Date newPauseTime)
296                throws CommandException {
297            if (coordJob.getStatus() == CoordinatorJob.Status.KILLED) {
298                throw new CommandException(ErrorCode.E1016);
299            }
301            if (newEndTime != null) {
302                checkEndTime(coordJob, newEndTime);
303            }
305            if (newPauseTime != null) {
306                checkPauseTime(coordJob, newPauseTime);
307            }
308        }
310        /* (non-Javadoc)
311         * @see org.apache.oozie.command.XCommand#execute()
312         */
313        @Override
314        protected Void execute() throws CommandException {
315            LOG.info("STARTED CoordChangeXCommand for jobId=" + jobId);
317            try {
318                if (newEndTime != null) {
319                    coordJob.setEndTime(newEndTime);
320                    if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED){
321                        coordJob.setStatus(CoordinatorJob.Status.RUNNING);
322                    }
323                    if (coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR
324                            || coordJob.getStatus() == CoordinatorJob.Status.FAILED) {
325                        // Check for backward compatibility for Oozie versions (3.2 and before)
326                        // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
327                        // PAUSEDWITHERROR is not supported
328                        coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR));
329                    }
330                    coordJob.setPending();
331                    coordJob.resetDoneMaterialization();
332                }
334                if (newConcurrency != null) {
335                    this.coordJob.setConcurrency(newConcurrency);
336                }
338                if (newPauseTime != null || resetPauseTime == true) {
339                    this.coordJob.setPauseTime(newPauseTime);
340                    if (oldPauseTime != null && newPauseTime != null) {
341                        if (oldPauseTime.before(newPauseTime)) {
342                            if (this.coordJob.getStatus() == Job.Status.PAUSED) {
343                                this.coordJob.setStatus(Job.Status.RUNNING);
344                            }
345                            else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
346                                this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR);
347                            }
348                        }
349                    }
350                    else if (oldPauseTime != null && newPauseTime == null) {
351                        if (this.coordJob.getStatus() == Job.Status.PAUSED) {
352                            this.coordJob.setStatus(Job.Status.RUNNING);
353                        }
354                        else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
355                            this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR);
356                        }
357                    }
358                    if (!resetPauseTime) {
359                        processLookaheadActions(coordJob, newPauseTime);
360                    }
361                }
363                if (coordJob.getNextMaterializedTime() != null && coordJob.getEndTime().compareTo(coordJob.getNextMaterializedTime()) <= 0) {
364                    LOG.info("[" + coordJob.getId() + "]: all actions have been materialized, job status = " + coordJob.getStatus()
365                            + ", set pending to true");
366                    // set doneMaterialization to true when materialization is done
367                    coordJob.setDoneMaterialization();
368                }
370                updateList.add(coordJob);
371                jpaService.execute(new BulkUpdateDeleteJPAExecutor(updateList, deleteList, false));
373                return null;
374            }
375            catch (XException ex) {
376                throw new CommandException(ex);
377            }
378            finally {
379                LOG.info("ENDED CoordChangeXCommand for jobId=" + jobId);
380                // update bundle action
381                if (coordJob.getBundleId() != null) {
382                    BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
383                    bundleStatusUpdate.call();
384                }
385            }
386        }
388        /* (non-Javadoc)
389         * @see org.apache.oozie.command.XCommand#getEntityKey()
390         */
391        @Override
392        public String getEntityKey() {
393            return this.jobId;
394        }
396        /* (non-Javadoc)
397         * @see org.apache.oozie.command.XCommand#loadState()
398         */
399        @Override
400        protected void loadState() throws CommandException{
401            jpaService = Services.get().get(JPAService.class);
403            if (jpaService == null) {
404                throw new CommandException(ErrorCode.E0610);
405            }
407            try {
408                this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
409                oldPauseTime = coordJob.getPauseTime();
410                prevStatus = coordJob.getStatus();
411            }
412            catch (JPAExecutorException e) {
413                throw new CommandException(e);
414            }
416            LogUtils.setLogInfo(this.coordJob, logInfo);
417        }
419        /* (non-Javadoc)
420         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
421         */
422        @Override
423        protected void verifyPrecondition() throws CommandException,PreconditionException {
424            check(this.coordJob, newEndTime, newConcurrency, newPauseTime);
425        }
427        /* (non-Javadoc)
428         * @see org.apache.oozie.command.XCommand#isLockRequired()
429         */
430        @Override
431        protected boolean isLockRequired() {
432            return true;
433        }
434    }