001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.util.Date;
021    import java.util.HashSet;
022    import java.util.Map;
023    import java.util.Set;
024    import java.util.Map.Entry;
025    
026    import org.apache.oozie.CoordinatorActionBean;
027    import org.apache.oozie.CoordinatorJobBean;
028    import org.apache.oozie.ErrorCode;
029    import org.apache.oozie.XException;
030    import org.apache.oozie.client.CoordinatorJob;
031    import org.apache.oozie.client.Job;
032    import org.apache.oozie.client.OozieClient;
033    import org.apache.oozie.command.CommandException;
034    import org.apache.oozie.command.PreconditionException;
035    import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
036    import org.apache.oozie.executor.jpa.CoordActionRemoveJPAExecutor;
037    import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
038    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
039    import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
040    import org.apache.oozie.executor.jpa.JPAExecutorException;
041    import org.apache.oozie.service.JPAService;
042    import org.apache.oozie.service.Services;
043    import org.apache.oozie.util.DateUtils;
044    import org.apache.oozie.util.JobUtils;
045    import org.apache.oozie.util.LogUtils;
046    import org.apache.oozie.util.ParamChecker;
047    
048    public class CoordChangeXCommand extends CoordinatorXCommand<Void> {
049        private final String jobId;
050        private Date newEndTime = null;
051        private Integer newConcurrency = null;
052        private Date newPauseTime = null;
053        private Date oldPauseTime = null;
054        private boolean resetPauseTime = false;
055        private CoordinatorJobBean coordJob;
056        private JPAService jpaService = null;
057        private Job.Status prevStatus;
058    
059        private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>();
060        static {
061            ALLOWED_CHANGE_OPTIONS.add("endtime");
062            ALLOWED_CHANGE_OPTIONS.add("concurrency");
063            ALLOWED_CHANGE_OPTIONS.add("pausetime");
064        }
065    
066        /**
067         * This command is used to update the Coordinator job with the new values Update the coordinator job bean and update
068         * that to database.
069         *
070         * @param id Coordinator job id.
071         * @param changeValue This the changed value in the form key=value.
072         * @throws CommandException thrown if changeValue cannot be parsed properly.
073         */
074        public CoordChangeXCommand(String id, String changeValue) throws CommandException {
075            super("coord_change", "coord_change", 0);
076            this.jobId = ParamChecker.notEmpty(id, "id");
077            ParamChecker.notEmpty(changeValue, "value");
078    
079            validateChangeValue(changeValue);
080        }
081    
082        /**
083         * @param changeValue change value.
084         * @throws CommandException thrown if changeValue cannot be parsed properly.
085         */
086        private void validateChangeValue(String changeValue) throws CommandException {
087            Map<String, String> map = JobUtils.parseChangeValue(changeValue);
088    
089            if (map.size() > ALLOWED_CHANGE_OPTIONS.size()) {
090                throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime");
091            }
092    
093            java.util.Iterator<Entry<String, String>> iter = map.entrySet().iterator();
094            while (iter.hasNext()) {
095                Entry<String, String> entry = iter.next();
096                String key = entry.getKey();
097                String value = entry.getValue();
098    
099                if (!ALLOWED_CHANGE_OPTIONS.contains(key)) {
100                    throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime");
101                }
102    
103                if (!key.equals(OozieClient.CHANGE_VALUE_PAUSETIME) && value.equalsIgnoreCase("")) {
104                    throw new CommandException(ErrorCode.E1015, changeValue, "value on " + key + " can not be empty");
105                }
106            }
107    
108            if (map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)) {
109                String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME);
110                try {
111                    newEndTime = DateUtils.parseDateUTC(value);
112                }
113                catch (Exception ex) {
114                    throw new CommandException(ErrorCode.E1015, value, "must be a valid date");
115                }
116            }
117    
118            if (map.containsKey(OozieClient.CHANGE_VALUE_CONCURRENCY)) {
119                String value = map.get(OozieClient.CHANGE_VALUE_CONCURRENCY);
120                try {
121                    newConcurrency = Integer.parseInt(value);
122                }
123                catch (NumberFormatException ex) {
124                    throw new CommandException(ErrorCode.E1015, value, "must be a valid integer");
125                }
126            }
127    
128            if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) {
129                String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME);
130                if (value.equals("")) { // this is to reset pause time to null;
131                    resetPauseTime = true;
132                }
133                else {
134                    try {
135                        newPauseTime = DateUtils.parseDateUTC(value);
136                    }
137                    catch (Exception ex) {
138                        throw new CommandException(ErrorCode.E1015, value, "must be a valid date");
139                    }
140                }
141            }
142        }
143    
144        /**
145         * Check if new end time is valid.
146         *
147         * @param coordJob coordinator job id.
148         * @param newEndTime new end time.
149         * @throws CommandException thrown if new end time is not valid.
150         */
151        private void checkEndTime(CoordinatorJobBean coordJob, Date newEndTime) throws CommandException {
152            // New endTime cannot be before coordinator job's start time.
153            Date startTime = coordJob.getStartTime();
154            if (newEndTime.before(startTime)) {
155                throw new CommandException(ErrorCode.E1015, newEndTime, "cannot be before coordinator job's start time ["
156                        + startTime + "]");
157            }
158    
159            // New endTime cannot be before coordinator job's last action time.
160            Date lastActionTime = coordJob.getLastActionTime();
161            if (lastActionTime != null) {
162                Date d = new Date(lastActionTime.getTime() - coordJob.getFrequency() * 60 * 1000);
163                if (!newEndTime.after(d)) {
164                    throw new CommandException(ErrorCode.E1015, newEndTime,
165                            "must be after coordinator job's last action time [" + d + "]");
166                }
167            }
168        }
169    
170        /**
171         * Check if new pause time is valid.
172         *
173         * @param coordJob coordinator job id.
174         * @param newPauseTime new pause time.
175         * @param newEndTime new end time, can be null meaning no change on end time.
176         * @throws CommandException thrown if new pause time is not valid.
177         */
178        private void checkPauseTime(CoordinatorJobBean coordJob, Date newPauseTime)
179                throws CommandException {
180            // New pauseTime has to be a non-past time.
181            Date d = new Date();
182            if (newPauseTime.before(d)) {
183                throw new CommandException(ErrorCode.E1015, newPauseTime, "must be a non-past time");
184            }
185        }
186    
187        /**
188         * Process lookahead created actions that become invalid because of the new pause time,
189         * These actions will be deleted from DB, also the coordinator job will be updated accordingly
190         *
191         * @param coordJob coordinator job
192         * @param newPauseTime new pause time
193         */
194        private void processLookaheadActions(CoordinatorJobBean coordJob, Date newPauseTime) throws CommandException {
195            Date lastActionTime = coordJob.getLastActionTime();
196            if (lastActionTime != null) {
197                // d is the real last action time.
198                Date d = new Date(lastActionTime.getTime() - coordJob.getFrequency() * 60 * 1000);
199                int lastActionNumber = coordJob.getLastActionNumber();
200    
201                boolean hasChanged = false;
202                while (true) {
203                    if (!newPauseTime.after(d)) {
204                        deleteAction(coordJob.getId(), lastActionNumber);
205                        d = new Date(d.getTime() - coordJob.getFrequency() * 60 * 1000);
206                        lastActionNumber = lastActionNumber - 1;
207    
208                        hasChanged = true;
209                    }
210                    else {
211                        break;
212                    }
213                }
214    
215                if (hasChanged == true) {
216                    coordJob.setLastActionNumber(lastActionNumber);
217                    Date d1 = new Date(d.getTime() + coordJob.getFrequency() * 60 * 1000);
218                    coordJob.setLastActionTime(d1);
219                    coordJob.setNextMaterializedTime(d1);
220                    coordJob.resetDoneMaterialization();
221                }
222            }
223        }
224    
225        /**
226         * Delete last action for a coordinator job
227         *
228         * @param coordJob coordinator job
229         * @param lastActionNum last action number of the coordinator job
230         */
231        private void deleteAction(String jobId, int lastActionNum) throws CommandException {
232            try {
233                CoordinatorActionBean actionBean = jpaService.execute(new CoordJobGetActionByActionNumberJPAExecutor(jobId, lastActionNum));
234                jpaService.execute(new CoordActionRemoveJPAExecutor(actionBean.getId()));
235            }
236            catch (JPAExecutorException e) {
237                throw new CommandException(e);
238            }
239        }
240    
241        /**
242         * Check if new end time, new concurrency, new pause time are valid.
243         *
244         * @param coordJob coordinator job id.
245         * @param newEndTime new end time.
246         * @param newConcurrency new concurrency.
247         * @param newPauseTime new pause time.
248         * @throws CommandException thrown if new values are not valid.
249         */
250        private void check(CoordinatorJobBean coordJob, Date newEndTime, Integer newConcurrency, Date newPauseTime)
251                throws CommandException {
252            if (coordJob.getStatus() == CoordinatorJob.Status.KILLED) {
253                throw new CommandException(ErrorCode.E1016);
254            }
255    
256            if (newEndTime != null) {
257                checkEndTime(coordJob, newEndTime);
258            }
259    
260            if (newPauseTime != null) {
261                checkPauseTime(coordJob, newPauseTime);
262            }
263        }
264    
265        /* (non-Javadoc)
266         * @see org.apache.oozie.command.XCommand#execute()
267         */
268        @Override
269        protected Void execute() throws CommandException {
270            LOG.info("STARTED CoordChangeXCommand for jobId=" + jobId);
271    
272            try {
273                if (newEndTime != null) {
274                    coordJob.setEndTime(newEndTime);
275                    if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED
276                            || coordJob.getStatus() == CoordinatorJob.Status.RUNNING
277                            || coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR
278                            || coordJob.getStatus() == CoordinatorJob.Status.FAILED) {
279                        coordJob.setStatus(CoordinatorJob.Status.RUNNING);
280                        coordJob.setPending();
281                        coordJob.resetDoneMaterialization();
282                    }
283                }
284    
285                if (newConcurrency != null) {
286                    this.coordJob.setConcurrency(newConcurrency);
287                }
288    
289                if (newPauseTime != null || resetPauseTime == true) {
290                    this.coordJob.setPauseTime(newPauseTime);
291                    if (oldPauseTime != null && newPauseTime != null) {
292                        if (oldPauseTime.before(newPauseTime) && this.coordJob.getStatus() == Job.Status.PAUSED) {
293                            this.coordJob.setStatus(Job.Status.RUNNING);
294                        }
295                    }
296                    else if (oldPauseTime != null && newPauseTime == null && this.coordJob.getStatus() == Job.Status.PAUSED) {
297                        this.coordJob.setStatus(Job.Status.RUNNING);
298                    }
299    
300                    if (!resetPauseTime) {
301                        processLookaheadActions(coordJob, newPauseTime);
302                    }
303                }
304    
305                jpaService.execute(new CoordJobUpdateJPAExecutor(this.coordJob));
306    
307                return null;
308            }
309            catch (XException ex) {
310                throw new CommandException(ex);
311            }
312            finally {
313                LOG.info("ENDED CoordChangeXCommand for jobId=" + jobId);
314                // update bundle action
315                if (coordJob.getBundleId() != null) {
316                    BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
317                    bundleStatusUpdate.call();
318                }
319            }
320        }
321    
322        /* (non-Javadoc)
323         * @see org.apache.oozie.command.XCommand#getEntityKey()
324         */
325        @Override
326        protected String getEntityKey() {
327            return this.jobId;
328        }
329    
330        /* (non-Javadoc)
331         * @see org.apache.oozie.command.XCommand#loadState()
332         */
333        @Override
334        protected void loadState() throws CommandException{
335            jpaService = Services.get().get(JPAService.class);
336    
337            if (jpaService == null) {
338                throw new CommandException(ErrorCode.E0610);
339            }
340    
341            try {
342                this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
343                oldPauseTime = coordJob.getPauseTime();
344                prevStatus = coordJob.getStatus();
345            }
346            catch (JPAExecutorException e) {
347                throw new CommandException(e);
348            }
349    
350            LogUtils.setLogInfo(this.coordJob, logInfo);
351        }
352    
353        /* (non-Javadoc)
354         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
355         */
356        @Override
357        protected void verifyPrecondition() throws CommandException,PreconditionException {
358            check(this.coordJob, newEndTime, newConcurrency, newPauseTime);
359        }
360    
361        /* (non-Javadoc)
362         * @see org.apache.oozie.command.XCommand#isLockRequired()
363         */
364        @Override
365        protected boolean isLockRequired() {
366            return true;
367        }
368    }