This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.bundle;
019
020 import java.util.ArrayList;
021 import java.util.Date;
022 import java.util.HashSet;
023 import java.util.List;
024 import java.util.Map;
025 import java.util.Set;
026
027 import org.apache.oozie.BundleActionBean;
028 import org.apache.oozie.BundleJobBean;
029 import org.apache.oozie.ErrorCode;
030 import org.apache.oozie.XException;
031 import org.apache.oozie.client.Job;
032 import org.apache.oozie.client.OozieClient;
033 import org.apache.oozie.client.rest.JsonBean;
034 import org.apache.oozie.command.CommandException;
035 import org.apache.oozie.command.PreconditionException;
036 import org.apache.oozie.command.XCommand;
037 import org.apache.oozie.command.coord.CoordChangeXCommand;
038 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
039 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
040 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
041 import org.apache.oozie.service.JPAService;
042 import org.apache.oozie.service.Services;
043 import org.apache.oozie.util.DateUtils;
044 import org.apache.oozie.util.JobUtils;
045 import org.apache.oozie.util.LogUtils;
046 import org.apache.oozie.util.ParamChecker;
047 import org.apache.oozie.util.StatusUtils;
048
049 public class BundleJobChangeXCommand extends XCommand<Void> {
050 private String jobId;
051 private String changeValue;
052 private JPAService jpaService;
053 private List<BundleActionBean> bundleActions;
054 private BundleJobBean bundleJob;
055 private Date newPauseTime = null;
056 private Date newEndTime = null;
057 boolean isChangePauseTime = false;
058 boolean isChangeEndTime = false;
059 private List<JsonBean> updateList = new ArrayList<JsonBean>();
060
061 private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>();
062 static {
063 ALLOWED_CHANGE_OPTIONS.add("pausetime");
064 ALLOWED_CHANGE_OPTIONS.add("endtime");
065 }
066
067 /**
068 * @param id bundle job id
069 * @param changeValue change value
070 *
071 * @throws CommandException thrown if failed to change bundle
072 */
073 public BundleJobChangeXCommand(String id, String changeValue) throws CommandException {
074 super("bundle_change", "bundle_change", 1);
075 this.jobId = ParamChecker.notEmpty(id, "id");
076 this.changeValue = ParamChecker.notEmpty(changeValue, "changeValue");
077 }
078
079 /**
080 * Check if new pause time is future time.
081 *
082 * @param newPauseTime new pause time.
083 * @throws CommandException thrown if new pause time is not valid.
084 */
085 private void checkPauseTime(Date newPauseTime) throws CommandException {
086 // New pauseTime has to be a non-past time.
087 Date d = new Date();
088 if (newPauseTime.before(d)) {
089 throw new CommandException(ErrorCode.E1317, newPauseTime, "must be a non-past time");
090 }
091 }
092
093 /**
094 * Check if new pause time is future time.
095 *
096 * @param newEndTime new end time, can be null meaning no change on end time.
097 * @throws CommandException thrown if new end time is not valid.
098 */
099 private void checkEndTime(Date newEndTime) throws CommandException {
100 // New endTime has to be a non-past start time.
101 Date startTime = bundleJob.getKickoffTime();
102 if (startTime != null && newEndTime.before(startTime)) {
103 throw new CommandException(ErrorCode.E1317, newEndTime, "must be greater then kickoff time");
104 }
105 }
106
107 /**
108 * validate if change value is valid.
109 *
110 * @param changeValue change value.
111 * @throws CommandException thrown if changeValue cannot be parsed properly.
112 */
113 private void validateChangeValue(String changeValue) throws CommandException {
114 Map<String, String> map = JobUtils.parseChangeValue(changeValue);
115
116 if (map.size() > ALLOWED_CHANGE_OPTIONS.size() || !(map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME) || map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME))) {
117 throw new CommandException(ErrorCode.E1317, changeValue, "can only change pausetime or end time");
118 }
119
120 if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) {
121 isChangePauseTime = true;
122 }
123 else if(map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)){
124 isChangeEndTime = true;
125 }
126 else {
127 throw new CommandException(ErrorCode.E1317, changeValue, "should change pausetime or endtime");
128 }
129
130 if(isChangePauseTime){
131 String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME);
132 if (!value.equals("")) {
133 try {
134 newPauseTime = DateUtils.parseDateOozieTZ(value);
135 }
136 catch (Exception ex) {
137 throw new CommandException(ErrorCode.E1317, value, "is not a valid date");
138 }
139
140 checkPauseTime(newPauseTime);
141 }
142 }
143 else if (isChangeEndTime){
144 String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME);
145 if (!value.equals("")) {
146 try {
147 newEndTime = DateUtils.parseDateOozieTZ(value);
148 }
149 catch (Exception ex) {
150 throw new CommandException(ErrorCode.E1317, value, "is not a valid date");
151 }
152
153 checkEndTime(newEndTime);
154 }
155 }
156 }
157
158 /* (non-Javadoc)
159 * @see org.apache.oozie.command.XCommand#execute()
160 */
161 @Override
162 protected Void execute() throws CommandException {
163 try {
164 if (isChangePauseTime || isChangeEndTime) {
165 if (isChangePauseTime) {
166 bundleJob.setPauseTime(newPauseTime);
167 }
168 else if (isChangeEndTime) {
169 bundleJob.setEndTime(newEndTime);
170 if (bundleJob.getStatus() == Job.Status.SUCCEEDED) {
171 bundleJob.setStatus(Job.Status.RUNNING);
172 }
173 if (bundleJob.getStatus() == Job.Status.DONEWITHERROR || bundleJob.getStatus() == Job.Status.FAILED) {
174 bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(Job.Status.RUNNINGWITHERROR));
175 }
176 }
177 for (BundleActionBean action : this.bundleActions) {
178 // queue coord change commands;
179 if (action.getStatus() != Job.Status.KILLED && action.getCoordId() != null) {
180 queue(new CoordChangeXCommand(action.getCoordId(), changeValue));
181 LOG.info("Queuing CoordChangeXCommand coord job = " + action.getCoordId() + " to change "
182 + changeValue);
183 action.setPending(action.getPending() + 1);
184 updateList.add(action);
185 }
186 }
187 updateList.add(bundleJob);
188 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
189 }
190 return null;
191 }
192 catch (XException ex) {
193 throw new CommandException(ex);
194 }
195 }
196
197 /* (non-Javadoc)
198 * @see org.apache.oozie.command.XCommand#getEntityKey()
199 */
200 @Override
201 public String getEntityKey() {
202 return this.jobId;
203 }
204
205 /* (non-Javadoc)
206 * @see org.apache.oozie.command.XCommand#isLockRequired()
207 */
208 @Override
209 protected boolean isLockRequired() {
210 return true;
211 }
212
213 /* (non-Javadoc)
214 * @see org.apache.oozie.command.XCommand#loadState()
215 */
216 @Override
217 protected void loadState() throws CommandException {
218 try{
219 eagerLoadState();
220 this.bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(jobId));
221 }
222 catch(Exception Ex){
223 throw new CommandException(ErrorCode.E1311,this.jobId);
224 }
225 }
226
227 /* (non-Javadoc)
228 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
229 */
230 @Override
231 protected void verifyPrecondition() throws CommandException, PreconditionException {
232 }
233
234 /* (non-Javadoc)
235 * @see org.apache.oozie.command.XCommand#eagerLoadState()
236 */
237 @Override
238 protected void eagerLoadState() throws CommandException {
239 try {
240 jpaService = Services.get().get(JPAService.class);
241
242 if (jpaService != null) {
243 this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
244 LogUtils.setLogInfo(bundleJob, logInfo);
245 }
246 else {
247 throw new CommandException(ErrorCode.E0610);
248 }
249 }
250 catch (XException ex) {
251 throw new CommandException(ex);
252 }
253 }
254
255 /* (non-Javadoc)
256 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
257 */
258 @Override
259 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
260 validateChangeValue(changeValue);
261
262 if (bundleJob == null) {
263 LOG.info("BundleChangeCommand not succeeded - " + "job " + jobId + " does not exist");
264 throw new PreconditionException(ErrorCode.E1314, jobId);
265 }
266 if (isChangePauseTime) {
267 if (bundleJob.getStatus() == Job.Status.SUCCEEDED || bundleJob.getStatus() == Job.Status.FAILED
268 || bundleJob.getStatus() == Job.Status.KILLED || bundleJob.getStatus() == Job.Status.DONEWITHERROR
269 || bundleJob == null) {
270 LOG.info("BundleChangeCommand not succeeded for changing pausetime- " + "job " + jobId + " finished, status is "
271 + bundleJob.getStatusStr());
272 throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString());
273 }
274 }
275 else if(isChangeEndTime){
276 if (bundleJob.getStatus() == Job.Status.KILLED || bundleJob == null) {
277 LOG.info("BundleChangeCommand not succeeded for changing endtime- " + "job " + jobId + " finished, status is "
278 + bundleJob.getStatusStr());
279 throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString());
280 }
281 }
282 }
283 }