001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.util.ArrayList;
021    import java.util.Calendar;
022    import java.util.Date;
023    import java.util.HashSet;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Map.Entry;
027    import java.util.Set;
028    
029    import org.apache.oozie.CoordinatorActionBean;
030    import org.apache.oozie.CoordinatorJobBean;
031    import org.apache.oozie.ErrorCode;
032    import org.apache.oozie.XException;
033    import org.apache.oozie.client.CoordinatorJob;
034    import org.apache.oozie.client.Job;
035    import org.apache.oozie.client.OozieClient;
036    import org.apache.oozie.client.rest.JsonBean;
037    import org.apache.oozie.command.CommandException;
038    import org.apache.oozie.command.PreconditionException;
039    import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
040    import org.apache.oozie.coord.TimeUnit;
041    import org.apache.oozie.executor.jpa.BulkUpdateDeleteJPAExecutor;
042    import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
043    import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
044    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
045    import org.apache.oozie.executor.jpa.JPAExecutorException;
046    import org.apache.oozie.service.JPAService;
047    import org.apache.oozie.service.Services;
048    import org.apache.oozie.util.DateUtils;
049    import org.apache.oozie.util.JobUtils;
050    import org.apache.oozie.util.LogUtils;
051    import org.apache.oozie.util.ParamChecker;
052    import org.apache.oozie.util.StatusUtils;
053    
054    public class CoordChangeXCommand extends CoordinatorXCommand<Void> {
055        private final String jobId;
056        private Date newEndTime = null;
057        private Integer newConcurrency = null;
058        private Date newPauseTime = null;
059        private Date oldPauseTime = null;
060        private boolean resetPauseTime = false;
061        private CoordinatorJobBean coordJob;
062        private JPAService jpaService = null;
063        private Job.Status prevStatus;
064        private List<JsonBean> updateList = new ArrayList<JsonBean>();
065        private List<JsonBean> deleteList = new ArrayList<JsonBean>();
066    
067        private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>();
068        static {
069            ALLOWED_CHANGE_OPTIONS.add("endtime");
070            ALLOWED_CHANGE_OPTIONS.add("concurrency");
071            ALLOWED_CHANGE_OPTIONS.add("pausetime");
072        }
073    
074        /**
075         * This command is used to update the Coordinator job with the new values Update the coordinator job bean and update
076         * that to database.
077         *
078         * @param id Coordinator job id.
079         * @param changeValue This the changed value in the form key=value.
080         * @throws CommandException thrown if changeValue cannot be parsed properly.
081         */
082        public CoordChangeXCommand(String id, String changeValue) throws CommandException {
083            super("coord_change", "coord_change", 0);
084            this.jobId = ParamChecker.notEmpty(id, "id");
085            ParamChecker.notEmpty(changeValue, "value");
086    
087            validateChangeValue(changeValue);
088        }
089    
090        /**
091         * @param changeValue change value.
092         * @throws CommandException thrown if changeValue cannot be parsed properly.
093         */
094        private void validateChangeValue(String changeValue) throws CommandException {
095            Map<String, String> map = JobUtils.parseChangeValue(changeValue);
096    
097            if (map.size() > ALLOWED_CHANGE_OPTIONS.size()) {
098                throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime");
099            }
100    
101            java.util.Iterator<Entry<String, String>> iter = map.entrySet().iterator();
102            while (iter.hasNext()) {
103                Entry<String, String> entry = iter.next();
104                String key = entry.getKey();
105                String value = entry.getValue();
106    
107                if (!ALLOWED_CHANGE_OPTIONS.contains(key)) {
108                    throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime");
109                }
110    
111                if (!key.equals(OozieClient.CHANGE_VALUE_PAUSETIME) && value.equalsIgnoreCase("")) {
112                    throw new CommandException(ErrorCode.E1015, changeValue, "value on " + key + " can not be empty");
113                }
114            }
115    
116            if (map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)) {
117                String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME);
118                try {
119                    newEndTime = DateUtils.parseDateOozieTZ(value);
120                }
121                catch (Exception ex) {
122                    throw new CommandException(ErrorCode.E1015, value, "must be a valid date");
123                }
124            }
125    
126            if (map.containsKey(OozieClient.CHANGE_VALUE_CONCURRENCY)) {
127                String value = map.get(OozieClient.CHANGE_VALUE_CONCURRENCY);
128                try {
129                    newConcurrency = Integer.parseInt(value);
130                }
131                catch (NumberFormatException ex) {
132                    throw new CommandException(ErrorCode.E1015, value, "must be a valid integer");
133                }
134            }
135    
136            if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) {
137                String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME);
138                if (value.equals("")) { // this is to reset pause time to null;
139                    resetPauseTime = true;
140                }
141                else {
142                    try {
143                        newPauseTime = DateUtils.parseDateOozieTZ(value);
144                    }
145                    catch (Exception ex) {
146                        throw new CommandException(ErrorCode.E1015, value, "must be a valid date");
147                    }
148                }
149            }
150        }
151    
152        /**
153         * Returns the actual last action time(one instance before coordJob.lastActionTime)
154         * @return Date - last action time if coordJob.getLastActionTime() is not null, null otherwise
155         */
156        private Date getLastActionTime() {
157            if(coordJob.getLastActionTime() == null)
158                return null;
159    
160            Calendar d = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone()));
161            d.setTime(coordJob.getLastActionTime());
162            TimeUnit timeUnit = TimeUnit.valueOf(coordJob.getTimeUnitStr());
163            d.add(timeUnit.getCalendarUnit(), -coordJob.getFrequency());
164            return d.getTime();
165        }
166    
167        /**
168         * Check if new end time is valid.
169         *
170         * @param coordJob coordinator job id.
171         * @param newEndTime new end time.
172         * @throws CommandException thrown if new end time is not valid.
173         */
174        private void checkEndTime(CoordinatorJobBean coordJob, Date newEndTime) throws CommandException {
175            // New endTime cannot be before coordinator job's start time.
176            Date startTime = coordJob.getStartTime();
177            if (newEndTime.before(startTime)) {
178                throw new CommandException(ErrorCode.E1015, newEndTime, "cannot be before coordinator job's start time ["
179                        + startTime + "]");
180            }
181    
182            // New endTime cannot be before coordinator job's last action time.
183            Date lastActionTime = getLastActionTime();
184            if (lastActionTime != null) {
185                if (!newEndTime.after(lastActionTime)) {
186                    throw new CommandException(ErrorCode.E1015, newEndTime,
187                            "must be after coordinator job's last action time [" + lastActionTime + "]");
188                }
189            }
190        }
191    
192        /**
193         * Check if new pause time is valid.
194         *
195         * @param coordJob coordinator job id.
196         * @param newPauseTime new pause time.
197         * @param newEndTime new end time, can be null meaning no change on end time.
198         * @throws CommandException thrown if new pause time is not valid.
199         */
200        private void checkPauseTime(CoordinatorJobBean coordJob, Date newPauseTime)
201                throws CommandException {
202            // New pauseTime has to be a non-past time.
203            Date d = new Date();
204            if (newPauseTime.before(d)) {
205                throw new CommandException(ErrorCode.E1015, newPauseTime, "must be a non-past time");
206            }
207        }
208    
209        /**
210         * Process lookahead created actions that become invalid because of the new pause time,
211         * These actions will be deleted from DB, also the coordinator job will be updated accordingly
212         *
213         * @param coordJob coordinator job
214         * @param newPauseTime new pause time
215         */
216        private void processLookaheadActions(CoordinatorJobBean coordJob, Date newPauseTime) throws CommandException {
217            Date lastActionTime = coordJob.getLastActionTime();
218            if (lastActionTime != null) {
219                // d is the real last action time.
220                Calendar d = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone()));
221                d.setTime(getLastActionTime());
222                TimeUnit timeUnit = TimeUnit.valueOf(coordJob.getTimeUnitStr());
223    
224                int lastActionNumber = coordJob.getLastActionNumber();
225    
226                boolean hasChanged = false;
227                while (true) {
228                    if (!newPauseTime.after(d.getTime())) {
229                        deleteAction(lastActionNumber);
230                        d.add(timeUnit.getCalendarUnit(), -coordJob.getFrequency());
231                        lastActionNumber = lastActionNumber - 1;
232    
233                        hasChanged = true;
234                    }
235                    else {
236                        break;
237                    }
238                }
239    
240                if (hasChanged == true) {
241                    coordJob.setLastActionNumber(lastActionNumber);
242                    d.add(timeUnit.getCalendarUnit(), coordJob.getFrequency());
243                    Date d1 = d.getTime();
244                    coordJob.setLastActionTime(d1);
245                    coordJob.setNextMaterializedTime(d1);
246                    coordJob.resetDoneMaterialization();
247                }
248            }
249        }
250    
251        /**
252         * Delete coordinator action
253         *
254         * @param actionNum coordinator action number
255         */
256        private void deleteAction(int actionNum) throws CommandException {
257            try {
258                String actionId = jpaService.execute(new CoordJobGetActionByActionNumberJPAExecutor(jobId, actionNum));
259                CoordinatorActionBean bean = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
260                deleteList.add(bean);
261            } catch (JPAExecutorException e) {
262                throw new CommandException(e);
263            }
264        }
265    
266        /**
267         * Check if new end time, new concurrency, new pause time are valid.
268         *
269         * @param coordJob coordinator job id.
270         * @param newEndTime new end time.
271         * @param newConcurrency new concurrency.
272         * @param newPauseTime new pause time.
273         * @throws CommandException thrown if new values are not valid.
274         */
275        private void check(CoordinatorJobBean coordJob, Date newEndTime, Integer newConcurrency, Date newPauseTime)
276                throws CommandException {
277            if (coordJob.getStatus() == CoordinatorJob.Status.KILLED) {
278                throw new CommandException(ErrorCode.E1016);
279            }
280    
281            if (newEndTime != null) {
282                checkEndTime(coordJob, newEndTime);
283            }
284    
285            if (newPauseTime != null) {
286                checkPauseTime(coordJob, newPauseTime);
287            }
288        }
289    
290        /* (non-Javadoc)
291         * @see org.apache.oozie.command.XCommand#execute()
292         */
293        @Override
294        protected Void execute() throws CommandException {
295            LOG.info("STARTED CoordChangeXCommand for jobId=" + jobId);
296    
297            try {
298                if (newEndTime != null) {
299                    coordJob.setEndTime(newEndTime);
300                    if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED){
301                        coordJob.setStatus(CoordinatorJob.Status.RUNNING);
302                    }
303                    if (coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR
304                            || coordJob.getStatus() == CoordinatorJob.Status.FAILED) {
305                        // Check for backward compatibility for Oozie versions (3.2 and before)
306                        // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
307                        // PAUSEDWITHERROR is not supported
308                        coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR));
309                    }
310                    coordJob.setPending();
311                    coordJob.resetDoneMaterialization();
312                }
313    
314                if (newConcurrency != null) {
315                    this.coordJob.setConcurrency(newConcurrency);
316                }
317    
318                if (newPauseTime != null || resetPauseTime == true) {
319                    this.coordJob.setPauseTime(newPauseTime);
320                    if (oldPauseTime != null && newPauseTime != null) {
321                        if (oldPauseTime.before(newPauseTime)) {
322                            if (this.coordJob.getStatus() == Job.Status.PAUSED) {
323                                this.coordJob.setStatus(Job.Status.RUNNING);
324                            }
325                            else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
326                                this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR);
327                            }
328                        }
329                    }
330                    else if (oldPauseTime != null && newPauseTime == null) {
331                        if (this.coordJob.getStatus() == Job.Status.PAUSED) {
332                            this.coordJob.setStatus(Job.Status.RUNNING);
333                        }
334                        else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
335                            this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR);
336                        }
337                    }
338                    if (!resetPauseTime) {
339                        processLookaheadActions(coordJob, newPauseTime);
340                    }
341                }
342    
343                if (coordJob.getNextMaterializedTime() != null && coordJob.getEndTime().compareTo(coordJob.getNextMaterializedTime()) <= 0) {
344                    LOG.info("[" + coordJob.getId() + "]: all actions have been materialized, job status = " + coordJob.getStatus()
345                            + ", set pending to true");
346                    // set doneMaterialization to true when materialization is done
347                    coordJob.setDoneMaterialization();
348                }
349    
350                updateList.add(coordJob);
351                jpaService.execute(new BulkUpdateDeleteJPAExecutor(updateList, deleteList, false));
352    
353                return null;
354            }
355            catch (XException ex) {
356                throw new CommandException(ex);
357            }
358            finally {
359                LOG.info("ENDED CoordChangeXCommand for jobId=" + jobId);
360                // update bundle action
361                if (coordJob.getBundleId() != null) {
362                    BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
363                    bundleStatusUpdate.call();
364                }
365            }
366        }
367    
368        /* (non-Javadoc)
369         * @see org.apache.oozie.command.XCommand#getEntityKey()
370         */
371        @Override
372        public String getEntityKey() {
373            return this.jobId;
374        }
375    
376        /* (non-Javadoc)
377         * @see org.apache.oozie.command.XCommand#loadState()
378         */
379        @Override
380        protected void loadState() throws CommandException{
381            jpaService = Services.get().get(JPAService.class);
382    
383            if (jpaService == null) {
384                throw new CommandException(ErrorCode.E0610);
385            }
386    
387            try {
388                this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
389                oldPauseTime = coordJob.getPauseTime();
390                prevStatus = coordJob.getStatus();
391            }
392            catch (JPAExecutorException e) {
393                throw new CommandException(e);
394            }
395    
396            LogUtils.setLogInfo(this.coordJob, logInfo);
397        }
398    
399        /* (non-Javadoc)
400         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
401         */
402        @Override
403        protected void verifyPrecondition() throws CommandException,PreconditionException {
404            check(this.coordJob, newEndTime, newConcurrency, newPauseTime);
405        }
406    
407        /* (non-Javadoc)
408         * @see org.apache.oozie.command.XCommand#isLockRequired()
409         */
410        @Override
411        protected boolean isLockRequired() {
412            return true;
413        }
414    }