This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.bundle;
019
020 import java.util.Date;
021 import java.util.HashSet;
022 import java.util.List;
023 import java.util.Map;
024 import java.util.Set;
025
026 import org.apache.oozie.BundleActionBean;
027 import org.apache.oozie.BundleJobBean;
028 import org.apache.oozie.ErrorCode;
029 import org.apache.oozie.XException;
030 import org.apache.oozie.client.Job;
031 import org.apache.oozie.client.OozieClient;
032 import org.apache.oozie.command.CommandException;
033 import org.apache.oozie.command.PreconditionException;
034 import org.apache.oozie.command.XCommand;
035 import org.apache.oozie.command.coord.CoordChangeXCommand;
036 import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
037 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
038 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
039 import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
040 import org.apache.oozie.service.JPAService;
041 import org.apache.oozie.service.Services;
042 import org.apache.oozie.util.DateUtils;
043 import org.apache.oozie.util.JobUtils;
044 import org.apache.oozie.util.LogUtils;
045 import org.apache.oozie.util.ParamChecker;
046
047 public class BundleJobChangeXCommand extends XCommand<Void> {
048 private String jobId;
049 private String changeValue;
050 private JPAService jpaService;
051 private List<BundleActionBean> bundleActions;
052 private BundleJobBean bundleJob;
053 private Date newPauseTime = null;
054 private Date newEndTime = null;
055 boolean isChangePauseTime = false;
056 boolean isChangeEndTime = false;
057
058 private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>();
059 static {
060 ALLOWED_CHANGE_OPTIONS.add("pausetime");
061 ALLOWED_CHANGE_OPTIONS.add("endtime");
062 }
063
064 /**
065 * @param id bundle job id
066 * @param changeValue change value
067 *
068 * @throws CommandException thrown if failed to change bundle
069 */
070 public BundleJobChangeXCommand(String id, String changeValue) throws CommandException {
071 super("bundle_change", "bundle_change", 1);
072 this.jobId = ParamChecker.notEmpty(id, "id");
073 this.changeValue = ParamChecker.notEmpty(changeValue, "changeValue");
074 }
075
076 /**
077 * Check if new pause time is future time.
078 *
079 * @param newPauseTime new pause time.
080 * @throws CommandException thrown if new pause time is not valid.
081 */
082 private void checkPauseTime(Date newPauseTime) throws CommandException {
083 // New pauseTime has to be a non-past time.
084 Date d = new Date();
085 if (newPauseTime.before(d)) {
086 throw new CommandException(ErrorCode.E1317, newPauseTime, "must be a non-past time");
087 }
088 }
089
090 /**
091 * Check if new pause time is future time.
092 *
093 * @param newEndTime new end time, can be null meaning no change on end time.
094 * @throws CommandException thrown if new end time is not valid.
095 */
096 private void checkEndTime(Date newEndTime) throws CommandException {
097 // New endTime has to be a non-past start time.
098 Date startTime = bundleJob.getKickoffTime();
099 if (startTime != null && newEndTime.before(startTime)) {
100 throw new CommandException(ErrorCode.E1317, newEndTime, "must be greater then kickoff time");
101 }
102 }
103
104 /**
105 * validate if change value is valid.
106 *
107 * @param changeValue change value.
108 * @throws CommandException thrown if changeValue cannot be parsed properly.
109 */
110 private void validateChangeValue(String changeValue) throws CommandException {
111 Map<String, String> map = JobUtils.parseChangeValue(changeValue);
112
113 if (map.size() > ALLOWED_CHANGE_OPTIONS.size() || !(map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME) || map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME))) {
114 throw new CommandException(ErrorCode.E1317, changeValue, "can only change pausetime or end time");
115 }
116
117 if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) {
118 isChangePauseTime = true;
119 }
120 else if(map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)){
121 isChangeEndTime = true;
122 }
123 else {
124 throw new CommandException(ErrorCode.E1317, changeValue, "should change pausetime or endtime");
125 }
126
127 if(isChangePauseTime){
128 String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME);
129 if (!value.equals("")) {
130 try {
131 newPauseTime = DateUtils.parseDateUTC(value);
132 }
133 catch (Exception ex) {
134 throw new CommandException(ErrorCode.E1317, value, "is not a valid date");
135 }
136
137 checkPauseTime(newPauseTime);
138 }
139 }
140 else if (isChangeEndTime){
141 String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME);
142 if (!value.equals("")) {
143 try {
144 newEndTime = DateUtils.parseDateUTC(value);
145 }
146 catch (Exception ex) {
147 throw new CommandException(ErrorCode.E1317, value, "is not a valid date");
148 }
149
150 checkEndTime(newEndTime);
151 }
152 }
153 }
154
155 /* (non-Javadoc)
156 * @see org.apache.oozie.command.XCommand#execute()
157 */
158 @Override
159 protected Void execute() throws CommandException {
160 try {
161 if (isChangePauseTime || isChangeEndTime) {
162 if (isChangePauseTime) {
163 bundleJob.setPauseTime(newPauseTime);
164 }
165 else if (isChangeEndTime) {
166 bundleJob.setEndTime(newEndTime);
167 bundleJob.setStatus(Job.Status.RUNNING);
168 }
169 for (BundleActionBean action : this.bundleActions) {
170 // queue coord change commands;
171 if (action.getStatus() != Job.Status.KILLED && action.getCoordId() != null) {
172 queue(new CoordChangeXCommand(action.getCoordId(), changeValue));
173 LOG.info("Queuing CoordChangeXCommand coord job = " + action.getCoordId() + " to change "
174 + changeValue);
175 action.setPending(action.getPending() + 1);
176 jpaService.execute(new BundleActionUpdateJPAExecutor(action));
177 }
178 }
179 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
180 }
181 return null;
182 }
183 catch (XException ex) {
184 throw new CommandException(ex);
185 }
186 }
187
188 /* (non-Javadoc)
189 * @see org.apache.oozie.command.XCommand#getEntityKey()
190 */
191 @Override
192 protected String getEntityKey() {
193 return this.jobId;
194 }
195
196 /* (non-Javadoc)
197 * @see org.apache.oozie.command.XCommand#isLockRequired()
198 */
199 @Override
200 protected boolean isLockRequired() {
201 return true;
202 }
203
204 /* (non-Javadoc)
205 * @see org.apache.oozie.command.XCommand#loadState()
206 */
207 @Override
208 protected void loadState() throws CommandException {
209 try{
210 eagerLoadState();
211 this.bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(jobId));
212 }
213 catch(Exception Ex){
214 throw new CommandException(ErrorCode.E1311,this.jobId);
215 }
216 }
217
218 /* (non-Javadoc)
219 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
220 */
221 @Override
222 protected void verifyPrecondition() throws CommandException, PreconditionException {
223 }
224
225 /* (non-Javadoc)
226 * @see org.apache.oozie.command.XCommand#eagerLoadState()
227 */
228 @Override
229 protected void eagerLoadState() throws CommandException {
230 try {
231 jpaService = Services.get().get(JPAService.class);
232
233 if (jpaService != null) {
234 this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
235 LogUtils.setLogInfo(bundleJob, logInfo);
236 }
237 else {
238 throw new CommandException(ErrorCode.E0610);
239 }
240 }
241 catch (XException ex) {
242 throw new CommandException(ex);
243 }
244 }
245
246 /* (non-Javadoc)
247 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
248 */
249 @Override
250 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
251 validateChangeValue(changeValue);
252
253 if (bundleJob == null) {
254 LOG.info("BundleChangeCommand not succeeded - " + "job " + jobId + " does not exist");
255 throw new PreconditionException(ErrorCode.E1314, jobId);
256 }
257 if (isChangePauseTime) {
258 if (bundleJob.getStatus() == Job.Status.SUCCEEDED || bundleJob.getStatus() == Job.Status.FAILED
259 || bundleJob.getStatus() == Job.Status.KILLED || bundleJob.getStatus() == Job.Status.DONEWITHERROR
260 || bundleJob == null) {
261 LOG.info("BundleChangeCommand not succeeded for changing pausetime- " + "job " + jobId + " finished, status is "
262 + bundleJob.getStatusStr());
263 throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString());
264 }
265 }
266 else if(isChangeEndTime){
267 if (bundleJob.getStatus() == Job.Status.KILLED || bundleJob == null) {
268 LOG.info("BundleChangeCommand not succeeded for changing endtime- " + "job " + jobId + " finished, status is "
269 + bundleJob.getStatusStr());
270 throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString());
271 }
272 }
273 }
274 }