001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.bundle; 019 020 import java.util.Date; 021 import java.util.HashSet; 022 import java.util.List; 023 import java.util.Map; 024 import java.util.Set; 025 026 import org.apache.oozie.BundleActionBean; 027 import org.apache.oozie.BundleJobBean; 028 import org.apache.oozie.ErrorCode; 029 import org.apache.oozie.XException; 030 import org.apache.oozie.client.Job; 031 import org.apache.oozie.client.OozieClient; 032 import org.apache.oozie.command.CommandException; 033 import org.apache.oozie.command.PreconditionException; 034 import org.apache.oozie.command.XCommand; 035 import org.apache.oozie.command.coord.CoordChangeXCommand; 036 import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor; 037 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor; 038 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 039 import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor; 040 import org.apache.oozie.service.JPAService; 041 import org.apache.oozie.service.Services; 042 import org.apache.oozie.util.DateUtils; 043 import org.apache.oozie.util.JobUtils; 044 import org.apache.oozie.util.LogUtils; 045 import org.apache.oozie.util.ParamChecker; 046 047 public class BundleJobChangeXCommand extends XCommand<Void> { 048 private String jobId; 049 private String changeValue; 050 private JPAService jpaService; 051 private List<BundleActionBean> bundleActions; 052 private BundleJobBean bundleJob; 053 private Date newPauseTime = null; 054 private Date newEndTime = null; 055 boolean isChangePauseTime = false; 056 boolean isChangeEndTime = false; 057 058 private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>(); 059 static { 060 ALLOWED_CHANGE_OPTIONS.add("pausetime"); 061 ALLOWED_CHANGE_OPTIONS.add("endtime"); 062 } 063 064 /** 065 * @param id bundle job id 066 * @param changeValue change value 067 * 068 * @throws CommandException thrown if failed to change bundle 069 */ 070 public BundleJobChangeXCommand(String id, String changeValue) throws CommandException { 071 super("bundle_change", "bundle_change", 1); 072 this.jobId = ParamChecker.notEmpty(id, "id"); 073 this.changeValue = ParamChecker.notEmpty(changeValue, "changeValue"); 074 } 075 076 /** 077 * Check if new pause time is future time. 078 * 079 * @param newPauseTime new pause time. 080 * @throws CommandException thrown if new pause time is not valid. 081 */ 082 private void checkPauseTime(Date newPauseTime) throws CommandException { 083 // New pauseTime has to be a non-past time. 084 Date d = new Date(); 085 if (newPauseTime.before(d)) { 086 throw new CommandException(ErrorCode.E1317, newPauseTime, "must be a non-past time"); 087 } 088 } 089 090 /** 091 * Check if new pause time is future time. 092 * 093 * @param newEndTime new end time, can be null meaning no change on end time. 094 * @throws CommandException thrown if new end time is not valid. 095 */ 096 private void checkEndTime(Date newEndTime) throws CommandException { 097 // New endTime has to be a non-past start time. 098 Date startTime = bundleJob.getKickoffTime(); 099 if (startTime != null && newEndTime.before(startTime)) { 100 throw new CommandException(ErrorCode.E1317, newEndTime, "must be greater then kickoff time"); 101 } 102 } 103 104 /** 105 * validate if change value is valid. 106 * 107 * @param changeValue change value. 108 * @throws CommandException thrown if changeValue cannot be parsed properly. 109 */ 110 private void validateChangeValue(String changeValue) throws CommandException { 111 Map<String, String> map = JobUtils.parseChangeValue(changeValue); 112 113 if (map.size() > ALLOWED_CHANGE_OPTIONS.size() || !(map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME) || map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME))) { 114 throw new CommandException(ErrorCode.E1317, changeValue, "can only change pausetime or end time"); 115 } 116 117 if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) { 118 isChangePauseTime = true; 119 } 120 else if(map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)){ 121 isChangeEndTime = true; 122 } 123 else { 124 throw new CommandException(ErrorCode.E1317, changeValue, "should change pausetime or endtime"); 125 } 126 127 if(isChangePauseTime){ 128 String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME); 129 if (!value.equals("")) { 130 try { 131 newPauseTime = DateUtils.parseDateUTC(value); 132 } 133 catch (Exception ex) { 134 throw new CommandException(ErrorCode.E1317, value, "is not a valid date"); 135 } 136 137 checkPauseTime(newPauseTime); 138 } 139 } 140 else if (isChangeEndTime){ 141 String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME); 142 if (!value.equals("")) { 143 try { 144 newEndTime = DateUtils.parseDateUTC(value); 145 } 146 catch (Exception ex) { 147 throw new CommandException(ErrorCode.E1317, value, "is not a valid date"); 148 } 149 150 checkEndTime(newEndTime); 151 } 152 } 153 } 154 155 /* (non-Javadoc) 156 * @see org.apache.oozie.command.XCommand#execute() 157 */ 158 @Override 159 protected Void execute() throws CommandException { 160 try { 161 if (isChangePauseTime || isChangeEndTime) { 162 if (isChangePauseTime) { 163 bundleJob.setPauseTime(newPauseTime); 164 } 165 else if (isChangeEndTime) { 166 bundleJob.setEndTime(newEndTime); 167 bundleJob.setStatus(Job.Status.RUNNING); 168 } 169 for (BundleActionBean action : this.bundleActions) { 170 // queue coord change commands; 171 if (action.getStatus() != Job.Status.KILLED && action.getCoordId() != null) { 172 queue(new CoordChangeXCommand(action.getCoordId(), changeValue)); 173 LOG.info("Queuing CoordChangeXCommand coord job = " + action.getCoordId() + " to change " 174 + changeValue); 175 action.setPending(action.getPending() + 1); 176 jpaService.execute(new BundleActionUpdateJPAExecutor(action)); 177 } 178 } 179 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob)); 180 } 181 return null; 182 } 183 catch (XException ex) { 184 throw new CommandException(ex); 185 } 186 } 187 188 /* (non-Javadoc) 189 * @see org.apache.oozie.command.XCommand#getEntityKey() 190 */ 191 @Override 192 protected String getEntityKey() { 193 return this.jobId; 194 } 195 196 /* (non-Javadoc) 197 * @see org.apache.oozie.command.XCommand#isLockRequired() 198 */ 199 @Override 200 protected boolean isLockRequired() { 201 return true; 202 } 203 204 /* (non-Javadoc) 205 * @see org.apache.oozie.command.XCommand#loadState() 206 */ 207 @Override 208 protected void loadState() throws CommandException { 209 try{ 210 eagerLoadState(); 211 this.bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(jobId)); 212 } 213 catch(Exception Ex){ 214 throw new CommandException(ErrorCode.E1311,this.jobId); 215 } 216 } 217 218 /* (non-Javadoc) 219 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 220 */ 221 @Override 222 protected void verifyPrecondition() throws CommandException, PreconditionException { 223 } 224 225 /* (non-Javadoc) 226 * @see org.apache.oozie.command.XCommand#eagerLoadState() 227 */ 228 @Override 229 protected void eagerLoadState() throws CommandException { 230 try { 231 jpaService = Services.get().get(JPAService.class); 232 233 if (jpaService != null) { 234 this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId)); 235 LogUtils.setLogInfo(bundleJob, logInfo); 236 } 237 else { 238 throw new CommandException(ErrorCode.E0610); 239 } 240 } 241 catch (XException ex) { 242 throw new CommandException(ex); 243 } 244 } 245 246 /* (non-Javadoc) 247 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition() 248 */ 249 @Override 250 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 251 validateChangeValue(changeValue); 252 253 if (bundleJob == null) { 254 LOG.info("BundleChangeCommand not succeeded - " + "job " + jobId + " does not exist"); 255 throw new PreconditionException(ErrorCode.E1314, jobId); 256 } 257 if (isChangePauseTime) { 258 if (bundleJob.getStatus() == Job.Status.SUCCEEDED || bundleJob.getStatus() == Job.Status.FAILED 259 || bundleJob.getStatus() == Job.Status.KILLED || bundleJob.getStatus() == Job.Status.DONEWITHERROR 260 || bundleJob == null) { 261 LOG.info("BundleChangeCommand not succeeded for changing pausetime- " + "job " + jobId + " finished, status is " 262 + bundleJob.getStatusStr()); 263 throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString()); 264 } 265 } 266 else if(isChangeEndTime){ 267 if (bundleJob.getStatus() == Job.Status.KILLED || bundleJob == null) { 268 LOG.info("BundleChangeCommand not succeeded for changing endtime- " + "job " + jobId + " finished, status is " 269 + bundleJob.getStatusStr()); 270 throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString()); 271 } 272 } 273 } 274 }