001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.bundle;
019
020import java.util.ArrayList;
021import java.util.Date;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Map;
025import java.util.Set;
026
027import org.apache.oozie.BundleActionBean;
028import org.apache.oozie.BundleJobBean;
029import org.apache.oozie.ErrorCode;
030import org.apache.oozie.XException;
031import org.apache.oozie.client.Job;
032import org.apache.oozie.client.OozieClient;
033import org.apache.oozie.command.CommandException;
034import org.apache.oozie.command.PreconditionException;
035import org.apache.oozie.command.XCommand;
036import org.apache.oozie.command.coord.CoordChangeXCommand;
037import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
038import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
039import org.apache.oozie.executor.jpa.BatchQueryExecutor;
040import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
041import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
042import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
043import org.apache.oozie.executor.jpa.JPAExecutorException;
044import org.apache.oozie.util.DateUtils;
045import org.apache.oozie.util.JobUtils;
046import org.apache.oozie.util.LogUtils;
047import org.apache.oozie.util.ParamChecker;
048import org.apache.oozie.util.StatusUtils;
049
050public class BundleJobChangeXCommand extends XCommand<Void> {
051    private String jobId;
052    private String changeValue;
053    private List<BundleActionBean> bundleActions;
054    private BundleJobBean bundleJob;
055    private Date newPauseTime = null;
056    private Date newEndTime = null;
057    boolean isChangePauseTime = false;
058    boolean isChangeEndTime = false;
059    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
060
061    private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>();
062    static {
063        ALLOWED_CHANGE_OPTIONS.add("pausetime");
064        ALLOWED_CHANGE_OPTIONS.add("endtime");
065    }
066
067    /**
068     * @param id bundle job id
069     * @param changeValue change value
070     *
071     * @throws CommandException thrown if failed to change bundle
072     */
073    public BundleJobChangeXCommand(String id, String changeValue) throws CommandException {
074        super("bundle_change", "bundle_change", 1);
075        this.jobId = ParamChecker.notEmpty(id, "id");
076        this.changeValue = ParamChecker.notEmpty(changeValue, "changeValue");
077    }
078
079    /**
080     * Check if new pause time is future time.
081     *
082     * @param newPauseTime new pause time.
083     * @throws CommandException thrown if new pause time is not valid.
084     */
085    private void checkPauseTime(Date newPauseTime) throws CommandException {
086        // New pauseTime has to be a non-past time.
087        Date d = new Date();
088        if (newPauseTime.before(d)) {
089            throw new CommandException(ErrorCode.E1317, newPauseTime, "must be a non-past time");
090        }
091    }
092
093    /**
094     * Check if new pause time is future time.
095     *
096     * @param newEndTime new end time, can be null meaning no change on end time.
097     * @throws CommandException thrown if new end time is not valid.
098     */
099    private void checkEndTime(Date newEndTime) throws CommandException {
100        // New endTime has to be a non-past start time.
101        Date startTime = bundleJob.getKickoffTime();
102        if (startTime != null && newEndTime.before(startTime)) {
103            throw new CommandException(ErrorCode.E1317, newEndTime, "must be greater then kickoff time");
104        }
105    }
106
107    /**
108     * validate if change value is valid.
109     *
110     * @param changeValue change value.
111     * @throws CommandException thrown if changeValue cannot be parsed properly.
112     */
113    private void validateChangeValue(String changeValue) throws CommandException {
114        Map<String, String> map = JobUtils.parseChangeValue(changeValue);
115
116        if (map.size() > ALLOWED_CHANGE_OPTIONS.size()
117                || !(map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME) || map
118                        .containsKey(OozieClient.CHANGE_VALUE_ENDTIME))) {
119            throw new CommandException(ErrorCode.E1317, changeValue, "can only change pausetime or end time");
120        }
121
122        if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) {
123            isChangePauseTime = true;
124        }
125        else if(map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)){
126            isChangeEndTime = true;
127        }
128        else {
129            throw new CommandException(ErrorCode.E1317, changeValue, "should change pausetime or endtime");
130        }
131
132        if(isChangePauseTime){
133            String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME);
134            if (!value.equals(""))   {
135                try {
136                    newPauseTime = DateUtils.parseDateOozieTZ(value);
137                }
138                catch (Exception ex) {
139                    throw new CommandException(ErrorCode.E1317, value, "is not a valid date");
140                }
141
142                checkPauseTime(newPauseTime);
143            }
144        }
145        else if (isChangeEndTime){
146            String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME);
147            if (!value.equals(""))   {
148                try {
149                    newEndTime = DateUtils.parseDateOozieTZ(value);
150                }
151                catch (Exception ex) {
152                    throw new CommandException(ErrorCode.E1317, value, "is not a valid date");
153                }
154
155                checkEndTime(newEndTime);
156            }
157        }
158    }
159
160    /* (non-Javadoc)
161     * @see org.apache.oozie.command.XCommand#execute()
162     */
163    @Override
164    protected Void execute() throws CommandException {
165        StringBuffer changeReport = new StringBuffer();
166        try {
167            if (isChangePauseTime || isChangeEndTime) {
168                if (isChangePauseTime) {
169                    bundleJob.setPauseTime(newPauseTime);
170                }
171                else if (isChangeEndTime) {
172                    bundleJob.setEndTime(newEndTime);
173                    if (bundleJob.getStatus() == Job.Status.SUCCEEDED) {
174                        bundleJob.setStatus(Job.Status.RUNNING);
175                    }
176                    if (bundleJob.getStatus() == Job.Status.DONEWITHERROR || bundleJob.getStatus() == Job.Status.FAILED) {
177                        bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(Job.Status.RUNNINGWITHERROR));
178                    }
179                }
180                for (BundleActionBean action : this.bundleActions) {
181                    // queue coord change commands;
182                    if (action.getStatus() != Job.Status.KILLED && action.getCoordId() != null) {
183                        try {
184                            new CoordChangeXCommand(action.getCoordId(), changeValue).call();
185                        }
186                        catch (Exception e) {
187                            String errorMsg = action.getCoordId() + " : " + e.getMessage();
188                            LOG.info("Change command failed " + errorMsg);
189                            changeReport.append("[ ").append(errorMsg).append(" ]");
190                        }
191                    }
192                    else {
193                        String errorMsg = action.getCoordId() + " : Coord is in killed state";
194                        LOG.info("Change command failed " + errorMsg);
195                        changeReport.append("[ ").append(errorMsg).append(" ]");
196                    }
197                }
198                updateList.add(new UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PAUSE_ENDTIME,
199                        bundleJob));
200                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
201            }
202            if(!changeReport.toString().isEmpty()){
203                throw new CommandException(ErrorCode.E1320, changeReport.toString());
204            }
205            return null;
206        }
207        catch (XException ex) {
208            throw new CommandException(ex);
209        }
210    }
211
212    /* (non-Javadoc)
213     * @see org.apache.oozie.command.XCommand#getEntityKey()
214     */
215    @Override
216    public String getEntityKey() {
217        return this.jobId;
218    }
219
220    /* (non-Javadoc)
221     * @see org.apache.oozie.command.XCommand#isLockRequired()
222     */
223    @Override
224    protected boolean isLockRequired() {
225        return true;
226    }
227
228    @Override
229    protected void loadState() throws CommandException {
230        try {
231            this.bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB, bundleJob.getId());
232            this.bundleActions = BundleActionQueryExecutor.getInstance().getList(
233                    BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, bundleJob.getId());
234        }
235        catch (JPAExecutorException Ex) {
236            throw new CommandException(ErrorCode.E1311, this.jobId);
237        }
238    }
239
240    @Override
241    protected void verifyPrecondition() throws CommandException, PreconditionException {
242    }
243
244    @Override
245    protected void eagerLoadState() throws CommandException {
246        try {
247            this.bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, jobId);
248            LogUtils.setLogInfo(bundleJob, logInfo);
249        }
250        catch (JPAExecutorException ex) {
251            throw new CommandException(ex);
252        }
253    }
254
255    @Override
256    protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
257        validateChangeValue(changeValue);
258
259        if (bundleJob == null) {
260            LOG.info("BundleChangeCommand not succeeded - " + "job " + jobId + " does not exist");
261            throw new PreconditionException(ErrorCode.E1314, jobId);
262        }
263        if (isChangePauseTime) {
264            if (bundleJob.getStatus() == Job.Status.SUCCEEDED || bundleJob.getStatus() == Job.Status.FAILED
265                    || bundleJob.getStatus() == Job.Status.KILLED || bundleJob.getStatus() == Job.Status.DONEWITHERROR) {
266                LOG.info("BundleChangeCommand not succeeded for changing pausetime- " + "job " + jobId + " finished, status is "
267                        + bundleJob.getStatusStr());
268                throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString());
269            }
270        }
271        else if(isChangeEndTime){
272            if (bundleJob.getStatus() == Job.Status.KILLED) {
273                LOG.info("BundleChangeCommand not succeeded for changing endtime- " + "job " + jobId + " finished, status is "
274                        + bundleJob.getStatusStr());
275                throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString());
276            }
277        }
278    }
279}