001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.bundle;
020
021import java.util.ArrayList;
022import java.util.Date;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027
028import org.apache.oozie.BundleActionBean;
029import org.apache.oozie.BundleJobBean;
030import org.apache.oozie.ErrorCode;
031import org.apache.oozie.XException;
032import org.apache.oozie.client.Job;
033import org.apache.oozie.client.OozieClient;
034import org.apache.oozie.command.CommandException;
035import org.apache.oozie.command.PreconditionException;
036import org.apache.oozie.command.XCommand;
037import org.apache.oozie.command.coord.CoordChangeXCommand;
038import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
039import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
040import org.apache.oozie.executor.jpa.BatchQueryExecutor;
041import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
042import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
043import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
044import org.apache.oozie.executor.jpa.JPAExecutorException;
045import org.apache.oozie.util.DateUtils;
046import org.apache.oozie.util.JobUtils;
047import org.apache.oozie.util.LogUtils;
048import org.apache.oozie.util.ParamChecker;
049import org.apache.oozie.util.StatusUtils;
050
051public class BundleJobChangeXCommand extends XCommand<Void> {
052    private String jobId;
053    private String changeValue;
054    private List<BundleActionBean> bundleActions;
055    private BundleJobBean bundleJob;
056    private Date newPauseTime = null;
057    private Date newEndTime = null;
058    boolean isChangePauseTime = false;
059    boolean isChangeEndTime = false;
060    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
061
062    private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>();
063    static {
064        ALLOWED_CHANGE_OPTIONS.add("pausetime");
065        ALLOWED_CHANGE_OPTIONS.add("endtime");
066    }
067
068    /**
069     * @param id bundle job id
070     * @param changeValue change value
071     *
072     * @throws CommandException thrown if failed to change bundle
073     */
074    public BundleJobChangeXCommand(String id, String changeValue) throws CommandException {
075        super("bundle_change", "bundle_change", 1);
076        this.jobId = ParamChecker.notEmpty(id, "id");
077        this.changeValue = ParamChecker.notEmpty(changeValue, "changeValue");
078    }
079
080    /**
081     * Check if new pause time is future time.
082     *
083     * @param newPauseTime new pause time.
084     * @throws CommandException thrown if new pause time is not valid.
085     */
086    private void checkPauseTime(Date newPauseTime) throws CommandException {
087        // New pauseTime has to be a non-past time.
088        Date d = new Date();
089        if (newPauseTime.before(d)) {
090            throw new CommandException(ErrorCode.E1317, newPauseTime, "must be a non-past time");
091        }
092    }
093
094    /**
095     * Check if new pause time is future time.
096     *
097     * @param newEndTime new end time, can be null meaning no change on end time.
098     * @throws CommandException thrown if new end time is not valid.
099     */
100    private void checkEndTime(Date newEndTime) throws CommandException {
101        // New endTime has to be a non-past start time.
102        Date startTime = bundleJob.getKickoffTime();
103        if (startTime != null && newEndTime.before(startTime)) {
104            throw new CommandException(ErrorCode.E1317, newEndTime, "must be greater then kickoff time");
105        }
106    }
107
108    /**
109     * validate if change value is valid.
110     *
111     * @param changeValue change value.
112     * @throws CommandException thrown if changeValue cannot be parsed properly.
113     */
114    private void validateChangeValue(String changeValue) throws CommandException {
115        Map<String, String> map = JobUtils.parseChangeValue(changeValue);
116
117        if (map.size() > ALLOWED_CHANGE_OPTIONS.size()
118                || !(map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME) || map
119                        .containsKey(OozieClient.CHANGE_VALUE_ENDTIME))) {
120            throw new CommandException(ErrorCode.E1317, changeValue, "can only change pausetime or end time");
121        }
122
123        if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) {
124            isChangePauseTime = true;
125        }
126        else if(map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)){
127            isChangeEndTime = true;
128        }
129        else {
130            throw new CommandException(ErrorCode.E1317, changeValue, "should change pausetime or endtime");
131        }
132
133        if(isChangePauseTime){
134            String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME);
135            if (!value.equals(""))   {
136                try {
137                    newPauseTime = DateUtils.parseDateOozieTZ(value);
138                }
139                catch (Exception ex) {
140                    throw new CommandException(ErrorCode.E1317, value, "is not a valid date");
141                }
142
143                checkPauseTime(newPauseTime);
144            }
145        }
146        else if (isChangeEndTime){
147            String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME);
148            if (!value.equals(""))   {
149                try {
150                    newEndTime = DateUtils.parseDateOozieTZ(value);
151                }
152                catch (Exception ex) {
153                    throw new CommandException(ErrorCode.E1317, value, "is not a valid date");
154                }
155
156                checkEndTime(newEndTime);
157            }
158        }
159    }
160
161    /* (non-Javadoc)
162     * @see org.apache.oozie.command.XCommand#execute()
163     */
164    @Override
165    protected Void execute() throws CommandException {
166        StringBuffer changeReport = new StringBuffer();
167        try {
168            if (isChangePauseTime || isChangeEndTime) {
169                if (isChangePauseTime) {
170                    bundleJob.setPauseTime(newPauseTime);
171                }
172                else if (isChangeEndTime) {
173                    bundleJob.setEndTime(newEndTime);
174                    if (bundleJob.getStatus() == Job.Status.SUCCEEDED) {
175                        bundleJob.setStatus(Job.Status.RUNNING);
176                    }
177                    if (bundleJob.getStatus() == Job.Status.DONEWITHERROR || bundleJob.getStatus() == Job.Status.FAILED) {
178                        bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(Job.Status.RUNNINGWITHERROR));
179                    }
180                }
181                for (BundleActionBean action : this.bundleActions) {
182                    // queue coord change commands;
183                    if (action.getStatus() != Job.Status.KILLED && action.getCoordId() != null) {
184                        try {
185                            new CoordChangeXCommand(action.getCoordId(), changeValue).call();
186                        }
187                        catch (Exception e) {
188                            String errorMsg = action.getCoordId() + " : " + e.getMessage();
189                            LOG.info("Change command failed " + errorMsg);
190                            changeReport.append("[ ").append(errorMsg).append(" ]");
191                        }
192                    }
193                    else {
194                        String errorMsg = action.getCoordId() + " : Coord is in killed state";
195                        LOG.info("Change command failed " + errorMsg);
196                        changeReport.append("[ ").append(errorMsg).append(" ]");
197                    }
198                }
199                updateList.add(new UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PAUSE_ENDTIME,
200                        bundleJob));
201                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
202            }
203            if(!changeReport.toString().isEmpty()){
204                throw new CommandException(ErrorCode.E1320, changeReport.toString());
205            }
206            return null;
207        }
208        catch (XException ex) {
209            throw new CommandException(ex);
210        }
211    }
212
213    /* (non-Javadoc)
214     * @see org.apache.oozie.command.XCommand#getEntityKey()
215     */
216    @Override
217    public String getEntityKey() {
218        return this.jobId;
219    }
220
221    /* (non-Javadoc)
222     * @see org.apache.oozie.command.XCommand#isLockRequired()
223     */
224    @Override
225    protected boolean isLockRequired() {
226        return true;
227    }
228
229    @Override
230    protected void loadState() throws CommandException {
231        try {
232            this.bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB, bundleJob.getId());
233            this.bundleActions = BundleActionQueryExecutor.getInstance().getList(
234                    BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, bundleJob.getId());
235        }
236        catch (JPAExecutorException Ex) {
237            throw new CommandException(ErrorCode.E1311, this.jobId);
238        }
239    }
240
241    @Override
242    protected void verifyPrecondition() throws CommandException, PreconditionException {
243    }
244
245    @Override
246    protected void eagerLoadState() throws CommandException {
247        try {
248            this.bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, jobId);
249            LogUtils.setLogInfo(bundleJob);
250        }
251        catch (JPAExecutorException ex) {
252            throw new CommandException(ex);
253        }
254    }
255
256    @Override
257    protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
258        validateChangeValue(changeValue);
259
260        if (bundleJob == null) {
261            LOG.info("BundleChangeCommand not succeeded - " + "job " + jobId + " does not exist");
262            throw new PreconditionException(ErrorCode.E1314, jobId);
263        }
264        if (isChangePauseTime) {
265            if (bundleJob.getStatus() == Job.Status.SUCCEEDED || bundleJob.getStatus() == Job.Status.FAILED
266                    || bundleJob.getStatus() == Job.Status.KILLED || bundleJob.getStatus() == Job.Status.DONEWITHERROR) {
267                LOG.info("BundleChangeCommand not succeeded for changing pausetime- " + "job " + jobId + " finished, status is "
268                        + bundleJob.getStatusStr());
269                throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString());
270            }
271        }
272        else if(isChangeEndTime){
273            if (bundleJob.getStatus() == Job.Status.KILLED) {
274                LOG.info("BundleChangeCommand not succeeded for changing endtime- " + "job " + jobId + " finished, status is "
275                        + bundleJob.getStatusStr());
276                throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString());
277            }
278        }
279    }
280}