001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import org.apache.oozie.util.DateUtils;
021    import org.apache.oozie.client.CoordinatorJob;
022    import org.apache.oozie.client.OozieClient;
023    import org.apache.oozie.CoordinatorActionBean;
024    import org.apache.oozie.CoordinatorJobBean;
025    import org.apache.oozie.ErrorCode;
026    import org.apache.oozie.XException;
027    import org.apache.oozie.command.CommandException;
028    import org.apache.oozie.executor.jpa.CoordActionRemoveJPAExecutor;
029    import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
030    import org.apache.oozie.executor.jpa.JPAExecutorException;
031    import org.apache.oozie.service.JPAService;
032    import org.apache.oozie.service.Services;
033    import org.apache.oozie.store.CoordinatorStore;
034    import org.apache.oozie.store.StoreException;
035    import org.apache.oozie.util.JobUtils;
036    import org.apache.oozie.util.ParamChecker;
037    import org.apache.oozie.util.XLog;
038    
039    import java.util.Date;
040    import java.util.HashSet;
041    import java.util.Map;
042    import java.util.Set;
043    import java.util.Map.Entry;
044    
045    public class CoordChangeCommand extends CoordinatorCommand<Void> {
046        private String jobId;
047        private Date newEndTime = null;
048        private Integer newConcurrency = null;
049        private Date newPauseTime = null;
050        private boolean resetPauseTime = false;
051        private static final XLog LOG = XLog.getLog(CoordChangeCommand.class);    
052        
053        private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>();
054        static {
055            ALLOWED_CHANGE_OPTIONS.add("endtime");
056            ALLOWED_CHANGE_OPTIONS.add("concurrency");
057            ALLOWED_CHANGE_OPTIONS.add("pausetime");
058        }
059    
060        public CoordChangeCommand(String id, String changeValue) throws CommandException {
061            super("coord_change", "coord_change", 0, XLog.STD);
062            this.jobId = ParamChecker.notEmpty(id, "id");
063            ParamChecker.notEmpty(changeValue, "value");
064    
065            validateChangeValue(changeValue);
066        }
067    
068        /**
069         * @param changeValue change value.
070         * @throws CommandException thrown if changeValue cannot be parsed properly.
071         */
072        private void validateChangeValue(String changeValue) throws CommandException {
073            Map<String, String> map = JobUtils.parseChangeValue(changeValue);
074    
075            if (map.size() > ALLOWED_CHANGE_OPTIONS.size()) {
076                throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime");
077            }
078    
079            java.util.Iterator<Entry<String, String>> iter = map.entrySet().iterator();
080            while (iter.hasNext()) {
081                Entry<String, String> entry = iter.next();
082                String key = entry.getKey();
083                String value = entry.getValue();
084    
085                if (!ALLOWED_CHANGE_OPTIONS.contains(key)) {
086                    throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime");
087                }
088    
089                if (!key.equals(OozieClient.CHANGE_VALUE_PAUSETIME) && value.equalsIgnoreCase("")) {
090                    throw new CommandException(ErrorCode.E1015, changeValue, "value on " + key + " can not be empty");
091                }
092            }
093    
094            if (map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)) {
095                String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME);
096                try {
097                    newEndTime = DateUtils.parseDateUTC(value);
098                }
099                catch (Exception ex) {
100                    throw new CommandException(ErrorCode.E1015, value, "must be a valid date");
101                }
102            }
103    
104            if (map.containsKey(OozieClient.CHANGE_VALUE_CONCURRENCY)) {
105                String value = map.get(OozieClient.CHANGE_VALUE_CONCURRENCY);
106                try {
107                    newConcurrency = Integer.parseInt(value);
108                }
109                catch (NumberFormatException ex) {
110                    throw new CommandException(ErrorCode.E1015, value, "must be a valid integer");
111                }
112            }
113    
114            if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) {
115                String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME);
116                if (value.equals("")) { // this is to reset pause time to null;
117                    resetPauseTime = true;
118                }
119                else {
120                    try {
121                        newPauseTime = DateUtils.parseDateUTC(value);
122                    }
123                    catch (Exception ex) {
124                        throw new CommandException(ErrorCode.E1015, value, "must be a valid date");
125                    }
126                }
127            }
128        }
129    
130        /**
131         * @param coordJob coordinator job id.
132         * @param newEndTime new end time.
133         * @throws CommandException thrown if new end time is not valid.
134         */
135        private void checkEndTime(CoordinatorJobBean coordJob, Date newEndTime) throws CommandException {
136            // New endTime cannot be before coordinator job's start time.
137            Date startTime = coordJob.getStartTime();
138            if (newEndTime.before(startTime)) {
139                throw new CommandException(ErrorCode.E1015, newEndTime, "cannot be before coordinator job's start time [" + startTime + "]");
140            }
141    
142            // New endTime cannot be before coordinator job's last action time.
143            Date lastActionTime = coordJob.getLastActionTime();
144            if (lastActionTime != null) {
145                Date d = new Date(lastActionTime.getTime() - coordJob.getFrequency() * 60 * 1000);
146                if (!newEndTime.after(d)) {
147                    throw new CommandException(ErrorCode.E1015, newEndTime,
148                            "must be after coordinator job's last action time [" + d + "]");
149                }
150            }
151        }
152    
153        /**
154         * @param coordJob coordinator job id.
155         * @param newPauseTime new pause time.
156         * @param newEndTime new end time, can be null meaning no change on end time.
157         * @throws CommandException thrown if new pause time is not valid.
158         */
159        private void checkPauseTime(CoordinatorJobBean coordJob, Date newPauseTime)
160                throws CommandException {
161            // New pauseTime has to be a non-past time.
162            Date d = new Date();
163            if (newPauseTime.before(d)) {
164                throw new CommandException(ErrorCode.E1015, newPauseTime, "must be a non-past time");            
165            }
166        }
167        
168        /**
169         * Process lookahead created actions that become invalid because of the new pause time,
170         * These actions will be deleted from DB, also the coordinator job will be updated accordingly
171         * 
172         * @param coordJob coordinator job
173         * @param newPauseTime new pause time
174         */
175        private void processLookaheadActions(CoordinatorJobBean coordJob, Date newPauseTime) throws CommandException {
176            Date lastActionTime = coordJob.getLastActionTime();
177            if (lastActionTime != null) {
178                // d is the real last action time.
179                Date d = new Date(lastActionTime.getTime() - coordJob.getFrequency() * 60 * 1000);
180                int lastActionNumber = coordJob.getLastActionNumber();
181                
182                boolean hasChanged = false;
183                while (true) {
184                    if (!newPauseTime.after(d)) {
185                        deleteAction(coordJob.getId(), lastActionNumber);
186                        d = new Date(d.getTime() - coordJob.getFrequency() * 60 * 1000);
187                        lastActionNumber = lastActionNumber - 1;
188                        
189                        hasChanged = true;
190                    }
191                    else {
192                        break;
193                    }
194                }
195                
196                if (hasChanged == true) {
197                    coordJob.setLastActionNumber(lastActionNumber);
198                    Date d1 = new Date(d.getTime() + coordJob.getFrequency() * 60 * 1000);
199                    coordJob.setLastActionTime(d1);
200                    coordJob.setNextMaterializedTime(d1);
201                    
202                    if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED) {
203                        coordJob.setStatus(CoordinatorJob.Status.RUNNING);
204                    }
205                }
206            }
207        }
208    
209        /**
210         * delete last action for a coordinator job
211         * @param coordJob coordinator job
212         * @param lastActionNum last action number of the coordinator job
213         */
214        private void deleteAction(String jobId, int lastActionNum) throws CommandException {
215            JPAService jpaService = Services.get().get(JPAService.class);
216            if (jpaService == null) {
217                throw new CommandException(ErrorCode.E0610);
218            }
219            
220            try {
221                CoordinatorActionBean actionBean = jpaService.execute(new CoordJobGetActionByActionNumberJPAExecutor(jobId, lastActionNum));
222                jpaService.execute(new CoordActionRemoveJPAExecutor(actionBean.getId()));
223            }
224            catch (JPAExecutorException e) {
225                throw new CommandException(e);
226            }
227        }
228    
229        /**
230         * @param coordJob coordinator job id.
231         * @param newEndTime new end time.
232         * @param newConcurrency new concurrency.
233         * @param newPauseTime new pause time.
234         * @throws CommandException thrown if new values are not valid.
235         */
236        private void check(CoordinatorJobBean coordJob, Date newEndTime, Integer newConcurrency, Date newPauseTime)
237                throws CommandException {
238            if (coordJob.getStatus() == CoordinatorJob.Status.KILLED) {
239                throw new CommandException(ErrorCode.E1016);
240            }
241    
242            if (newEndTime != null) {
243                checkEndTime(coordJob, newEndTime);
244            }
245    
246            if (newPauseTime != null) {
247                checkPauseTime(coordJob, newPauseTime);
248            }
249        }
250    
251        @Override
252        protected Void call(CoordinatorStore store) throws StoreException, CommandException {
253            try {
254                CoordinatorJobBean coordJob = store.getCoordinatorJob(jobId, false);
255                setLogInfo(coordJob);
256    
257                check(coordJob, newEndTime, newConcurrency, newPauseTime);
258    
259                if (newEndTime != null) {
260                    coordJob.setEndTime(newEndTime);
261                    if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED) {
262                        coordJob.setStatus(CoordinatorJob.Status.RUNNING);
263                    }
264                }
265    
266                if (newConcurrency != null) {
267                    coordJob.setConcurrency(newConcurrency);
268                }
269    
270                if (newPauseTime != null || resetPauseTime == true) {
271                    coordJob.setPauseTime(newPauseTime);
272                    if (!resetPauseTime) {
273                        processLookaheadActions(coordJob, newPauseTime);
274                    }
275                }
276    
277                store.updateCoordinatorJob(coordJob);
278    
279                return null;
280            }
281            catch (XException ex) {
282                throw new CommandException(ex);
283            }
284        }
285    
286        @Override
287        protected Void execute(CoordinatorStore store) throws StoreException, CommandException {
288            LOG.info("STARTED CoordChangeCommand for jobId=" + jobId);
289            try {
290                if (lock(jobId)) {
291                    call(store);
292                }
293                else {
294                    throw new CommandException(ErrorCode.E0606, "job " + jobId
295                            + " has been locked and cannot change value, please retry later");
296                }
297            }
298            catch (InterruptedException e) {
299                throw new CommandException(ErrorCode.E0606, "acquiring lock for job " + jobId + " failed "
300                        + " with exception " + e.getMessage());
301            }
302            finally {
303                LOG.info("ENDED CoordChangeCommand for jobId=" + jobId);
304            }
305            return null;
306        }
307    }