001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.bundle;
019    
020    import java.util.ArrayList;
021    import java.util.Date;
022    import java.util.HashSet;
023    import java.util.List;
024    import java.util.Map;
025    import java.util.Set;
026    
027    import org.apache.oozie.BundleActionBean;
028    import org.apache.oozie.BundleJobBean;
029    import org.apache.oozie.ErrorCode;
030    import org.apache.oozie.XException;
031    import org.apache.oozie.client.Job;
032    import org.apache.oozie.client.OozieClient;
033    import org.apache.oozie.client.rest.JsonBean;
034    import org.apache.oozie.command.CommandException;
035    import org.apache.oozie.command.PreconditionException;
036    import org.apache.oozie.command.XCommand;
037    import org.apache.oozie.command.coord.CoordChangeXCommand;
038    import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
039    import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
040    import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
041    import org.apache.oozie.service.JPAService;
042    import org.apache.oozie.service.Services;
043    import org.apache.oozie.util.DateUtils;
044    import org.apache.oozie.util.JobUtils;
045    import org.apache.oozie.util.LogUtils;
046    import org.apache.oozie.util.ParamChecker;
047    import org.apache.oozie.util.StatusUtils;
048    
049    public class BundleJobChangeXCommand extends XCommand<Void> {
050        private String jobId;
051        private String changeValue;
052        private JPAService jpaService;
053        private List<BundleActionBean> bundleActions;
054        private BundleJobBean bundleJob;
055        private Date newPauseTime = null;
056        private Date newEndTime = null;
057        boolean isChangePauseTime = false;
058        boolean isChangeEndTime = false;
059        private List<JsonBean> updateList = new ArrayList<JsonBean>();
060    
061        private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>();
062        static {
063            ALLOWED_CHANGE_OPTIONS.add("pausetime");
064            ALLOWED_CHANGE_OPTIONS.add("endtime");
065        }
066    
067        /**
068         * @param id bundle job id
069         * @param changeValue change value
070         *
071         * @throws CommandException thrown if failed to change bundle
072         */
073        public BundleJobChangeXCommand(String id, String changeValue) throws CommandException {
074            super("bundle_change", "bundle_change", 1);
075            this.jobId = ParamChecker.notEmpty(id, "id");
076            this.changeValue = ParamChecker.notEmpty(changeValue, "changeValue");
077        }
078    
079        /**
080         * Check if new pause time is future time.
081         *
082         * @param newPauseTime new pause time.
083         * @throws CommandException thrown if new pause time is not valid.
084         */
085        private void checkPauseTime(Date newPauseTime) throws CommandException {
086            // New pauseTime has to be a non-past time.
087            Date d = new Date();
088            if (newPauseTime.before(d)) {
089                throw new CommandException(ErrorCode.E1317, newPauseTime, "must be a non-past time");
090            }
091        }
092    
093        /**
094         * Check if new pause time is future time.
095         *
096         * @param newEndTime new end time, can be null meaning no change on end time.
097         * @throws CommandException thrown if new end time is not valid.
098         */
099        private void checkEndTime(Date newEndTime) throws CommandException {
100            // New endTime has to be a non-past start time.
101            Date startTime = bundleJob.getKickoffTime();
102            if (startTime != null && newEndTime.before(startTime)) {
103                throw new CommandException(ErrorCode.E1317, newEndTime, "must be greater then kickoff time");
104            }
105        }
106    
107        /**
108         * validate if change value is valid.
109         *
110         * @param changeValue change value.
111         * @throws CommandException thrown if changeValue cannot be parsed properly.
112         */
113        private void validateChangeValue(String changeValue) throws CommandException {
114            Map<String, String> map = JobUtils.parseChangeValue(changeValue);
115    
116            if (map.size() > ALLOWED_CHANGE_OPTIONS.size() || !(map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME) || map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME))) {
117                throw new CommandException(ErrorCode.E1317, changeValue, "can only change pausetime or end time");
118            }
119    
120            if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) {
121                isChangePauseTime = true;
122            }
123            else if(map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)){
124                isChangeEndTime = true;
125            }
126            else {
127                throw new CommandException(ErrorCode.E1317, changeValue, "should change pausetime or endtime");
128            }
129    
130            if(isChangePauseTime){
131                String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME);
132                if (!value.equals(""))   {
133                    try {
134                        newPauseTime = DateUtils.parseDateOozieTZ(value);
135                    }
136                    catch (Exception ex) {
137                        throw new CommandException(ErrorCode.E1317, value, "is not a valid date");
138                    }
139    
140                    checkPauseTime(newPauseTime);
141                }
142            }
143            else if (isChangeEndTime){
144                String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME);
145                if (!value.equals(""))   {
146                    try {
147                        newEndTime = DateUtils.parseDateOozieTZ(value);
148                    }
149                    catch (Exception ex) {
150                        throw new CommandException(ErrorCode.E1317, value, "is not a valid date");
151                    }
152    
153                    checkEndTime(newEndTime);
154                }
155            }
156        }
157    
158        /* (non-Javadoc)
159         * @see org.apache.oozie.command.XCommand#execute()
160         */
161        @Override
162        protected Void execute() throws CommandException {
163            try {
164                if (isChangePauseTime || isChangeEndTime) {
165                    if (isChangePauseTime) {
166                        bundleJob.setPauseTime(newPauseTime);
167                    }
168                    else if (isChangeEndTime) {
169                        bundleJob.setEndTime(newEndTime);
170                        if (bundleJob.getStatus() == Job.Status.SUCCEEDED) {
171                            bundleJob.setStatus(Job.Status.RUNNING);
172                        }
173                        if (bundleJob.getStatus() == Job.Status.DONEWITHERROR || bundleJob.getStatus() == Job.Status.FAILED) {
174                            bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(Job.Status.RUNNINGWITHERROR));
175                        }
176                    }
177                    for (BundleActionBean action : this.bundleActions) {
178                        // queue coord change commands;
179                        if (action.getStatus() != Job.Status.KILLED && action.getCoordId() != null) {
180                            queue(new CoordChangeXCommand(action.getCoordId(), changeValue));
181                            LOG.info("Queuing CoordChangeXCommand coord job = " + action.getCoordId() + " to change "
182                                    + changeValue);
183                            action.setPending(action.getPending() + 1);
184                            updateList.add(action);
185                        }
186                    }
187                    updateList.add(bundleJob);
188                    jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
189                }
190                return null;
191            }
192            catch (XException ex) {
193                throw new CommandException(ex);
194            }
195        }
196    
197        /* (non-Javadoc)
198         * @see org.apache.oozie.command.XCommand#getEntityKey()
199         */
200        @Override
201        public String getEntityKey() {
202            return this.jobId;
203        }
204    
205        /* (non-Javadoc)
206         * @see org.apache.oozie.command.XCommand#isLockRequired()
207         */
208        @Override
209        protected boolean isLockRequired() {
210            return true;
211        }
212    
213        /* (non-Javadoc)
214         * @see org.apache.oozie.command.XCommand#loadState()
215         */
216        @Override
217        protected void loadState() throws CommandException {
218            try{
219                eagerLoadState();
220                this.bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(jobId));
221            }
222            catch(Exception Ex){
223                throw new CommandException(ErrorCode.E1311,this.jobId);
224            }
225        }
226    
227        /* (non-Javadoc)
228         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
229         */
230        @Override
231        protected void verifyPrecondition() throws CommandException, PreconditionException {
232        }
233    
234        /* (non-Javadoc)
235         * @see org.apache.oozie.command.XCommand#eagerLoadState()
236         */
237        @Override
238        protected void eagerLoadState() throws CommandException {
239            try {
240                jpaService = Services.get().get(JPAService.class);
241    
242                if (jpaService != null) {
243                    this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
244                    LogUtils.setLogInfo(bundleJob, logInfo);
245                }
246                else {
247                    throw new CommandException(ErrorCode.E0610);
248                }
249            }
250            catch (XException ex) {
251                throw new CommandException(ex);
252            }
253        }
254    
255        /* (non-Javadoc)
256         * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
257         */
258        @Override
259        protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
260            validateChangeValue(changeValue);
261    
262            if (bundleJob == null) {
263                LOG.info("BundleChangeCommand not succeeded - " + "job " + jobId + " does not exist");
264                throw new PreconditionException(ErrorCode.E1314, jobId);
265            }
266            if (isChangePauseTime) {
267                if (bundleJob.getStatus() == Job.Status.SUCCEEDED || bundleJob.getStatus() == Job.Status.FAILED
268                        || bundleJob.getStatus() == Job.Status.KILLED || bundleJob.getStatus() == Job.Status.DONEWITHERROR
269                        || bundleJob == null) {
270                    LOG.info("BundleChangeCommand not succeeded for changing pausetime- " + "job " + jobId + " finished, status is "
271                            + bundleJob.getStatusStr());
272                    throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString());
273                }
274            }
275            else if(isChangeEndTime){
276                if (bundleJob.getStatus() == Job.Status.KILLED || bundleJob == null) {
277                    LOG.info("BundleChangeCommand not succeeded for changing endtime- " + "job " + jobId + " finished, status is "
278                            + bundleJob.getStatusStr());
279                    throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString());
280                }
281            }
282        }
283    }