001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import org.apache.oozie.util.DateUtils; 021 import org.apache.oozie.client.CoordinatorJob; 022 import org.apache.oozie.client.OozieClient; 023 import org.apache.oozie.CoordinatorActionBean; 024 import org.apache.oozie.CoordinatorJobBean; 025 import org.apache.oozie.ErrorCode; 026 import org.apache.oozie.XException; 027 import org.apache.oozie.command.CommandException; 028 import org.apache.oozie.executor.jpa.CoordActionRemoveJPAExecutor; 029 import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor; 030 import org.apache.oozie.executor.jpa.JPAExecutorException; 031 import org.apache.oozie.service.JPAService; 032 import org.apache.oozie.service.Services; 033 import org.apache.oozie.store.CoordinatorStore; 034 import org.apache.oozie.store.StoreException; 035 import org.apache.oozie.util.JobUtils; 036 import org.apache.oozie.util.ParamChecker; 037 import org.apache.oozie.util.XLog; 038 039 import java.util.Date; 040 import java.util.HashSet; 041 import java.util.Map; 042 import java.util.Set; 043 import java.util.Map.Entry; 044 045 public class CoordChangeCommand extends CoordinatorCommand<Void> { 046 private String jobId; 047 private Date newEndTime = null; 048 private Integer newConcurrency = null; 049 private Date newPauseTime = null; 050 private boolean resetPauseTime = false; 051 private static final XLog LOG = XLog.getLog(CoordChangeCommand.class); 052 053 private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>(); 054 static { 055 ALLOWED_CHANGE_OPTIONS.add("endtime"); 056 ALLOWED_CHANGE_OPTIONS.add("concurrency"); 057 ALLOWED_CHANGE_OPTIONS.add("pausetime"); 058 } 059 060 public CoordChangeCommand(String id, String changeValue) throws CommandException { 061 super("coord_change", "coord_change", 0, XLog.STD); 062 this.jobId = ParamChecker.notEmpty(id, "id"); 063 ParamChecker.notEmpty(changeValue, "value"); 064 065 validateChangeValue(changeValue); 066 } 067 068 /** 069 * @param changeValue change value. 070 * @throws CommandException thrown if changeValue cannot be parsed properly. 071 */ 072 private void validateChangeValue(String changeValue) throws CommandException { 073 Map<String, String> map = JobUtils.parseChangeValue(changeValue); 074 075 if (map.size() > ALLOWED_CHANGE_OPTIONS.size()) { 076 throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime"); 077 } 078 079 java.util.Iterator<Entry<String, String>> iter = map.entrySet().iterator(); 080 while (iter.hasNext()) { 081 Entry<String, String> entry = iter.next(); 082 String key = entry.getKey(); 083 String value = entry.getValue(); 084 085 if (!ALLOWED_CHANGE_OPTIONS.contains(key)) { 086 throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime"); 087 } 088 089 if (!key.equals(OozieClient.CHANGE_VALUE_PAUSETIME) && value.equalsIgnoreCase("")) { 090 throw new CommandException(ErrorCode.E1015, changeValue, "value on " + key + " can not be empty"); 091 } 092 } 093 094 if (map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)) { 095 String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME); 096 try { 097 newEndTime = DateUtils.parseDateUTC(value); 098 } 099 catch (Exception ex) { 100 throw new CommandException(ErrorCode.E1015, value, "must be a valid date"); 101 } 102 } 103 104 if (map.containsKey(OozieClient.CHANGE_VALUE_CONCURRENCY)) { 105 String value = map.get(OozieClient.CHANGE_VALUE_CONCURRENCY); 106 try { 107 newConcurrency = Integer.parseInt(value); 108 } 109 catch (NumberFormatException ex) { 110 throw new CommandException(ErrorCode.E1015, value, "must be a valid integer"); 111 } 112 } 113 114 if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) { 115 String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME); 116 if (value.equals("")) { // this is to reset pause time to null; 117 resetPauseTime = true; 118 } 119 else { 120 try { 121 newPauseTime = DateUtils.parseDateUTC(value); 122 } 123 catch (Exception ex) { 124 throw new CommandException(ErrorCode.E1015, value, "must be a valid date"); 125 } 126 } 127 } 128 } 129 130 /** 131 * @param coordJob coordinator job id. 132 * @param newEndTime new end time. 133 * @throws CommandException thrown if new end time is not valid. 134 */ 135 private void checkEndTime(CoordinatorJobBean coordJob, Date newEndTime) throws CommandException { 136 // New endTime cannot be before coordinator job's start time. 137 Date startTime = coordJob.getStartTime(); 138 if (newEndTime.before(startTime)) { 139 throw new CommandException(ErrorCode.E1015, newEndTime, "cannot be before coordinator job's start time [" + startTime + "]"); 140 } 141 142 // New endTime cannot be before coordinator job's last action time. 143 Date lastActionTime = coordJob.getLastActionTime(); 144 if (lastActionTime != null) { 145 Date d = new Date(lastActionTime.getTime() - coordJob.getFrequency() * 60 * 1000); 146 if (!newEndTime.after(d)) { 147 throw new CommandException(ErrorCode.E1015, newEndTime, 148 "must be after coordinator job's last action time [" + d + "]"); 149 } 150 } 151 } 152 153 /** 154 * @param coordJob coordinator job id. 155 * @param newPauseTime new pause time. 156 * @param newEndTime new end time, can be null meaning no change on end time. 157 * @throws CommandException thrown if new pause time is not valid. 158 */ 159 private void checkPauseTime(CoordinatorJobBean coordJob, Date newPauseTime) 160 throws CommandException { 161 // New pauseTime has to be a non-past time. 162 Date d = new Date(); 163 if (newPauseTime.before(d)) { 164 throw new CommandException(ErrorCode.E1015, newPauseTime, "must be a non-past time"); 165 } 166 } 167 168 /** 169 * Process lookahead created actions that become invalid because of the new pause time, 170 * These actions will be deleted from DB, also the coordinator job will be updated accordingly 171 * 172 * @param coordJob coordinator job 173 * @param newPauseTime new pause time 174 */ 175 private void processLookaheadActions(CoordinatorJobBean coordJob, Date newPauseTime) throws CommandException { 176 Date lastActionTime = coordJob.getLastActionTime(); 177 if (lastActionTime != null) { 178 // d is the real last action time. 179 Date d = new Date(lastActionTime.getTime() - coordJob.getFrequency() * 60 * 1000); 180 int lastActionNumber = coordJob.getLastActionNumber(); 181 182 boolean hasChanged = false; 183 while (true) { 184 if (!newPauseTime.after(d)) { 185 deleteAction(coordJob.getId(), lastActionNumber); 186 d = new Date(d.getTime() - coordJob.getFrequency() * 60 * 1000); 187 lastActionNumber = lastActionNumber - 1; 188 189 hasChanged = true; 190 } 191 else { 192 break; 193 } 194 } 195 196 if (hasChanged == true) { 197 coordJob.setLastActionNumber(lastActionNumber); 198 Date d1 = new Date(d.getTime() + coordJob.getFrequency() * 60 * 1000); 199 coordJob.setLastActionTime(d1); 200 coordJob.setNextMaterializedTime(d1); 201 202 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED) { 203 coordJob.setStatus(CoordinatorJob.Status.RUNNING); 204 } 205 } 206 } 207 } 208 209 /** 210 * delete last action for a coordinator job 211 * @param coordJob coordinator job 212 * @param lastActionNum last action number of the coordinator job 213 */ 214 private void deleteAction(String jobId, int lastActionNum) throws CommandException { 215 JPAService jpaService = Services.get().get(JPAService.class); 216 if (jpaService == null) { 217 throw new CommandException(ErrorCode.E0610); 218 } 219 220 try { 221 CoordinatorActionBean actionBean = jpaService.execute(new CoordJobGetActionByActionNumberJPAExecutor(jobId, lastActionNum)); 222 jpaService.execute(new CoordActionRemoveJPAExecutor(actionBean.getId())); 223 } 224 catch (JPAExecutorException e) { 225 throw new CommandException(e); 226 } 227 } 228 229 /** 230 * @param coordJob coordinator job id. 231 * @param newEndTime new end time. 232 * @param newConcurrency new concurrency. 233 * @param newPauseTime new pause time. 234 * @throws CommandException thrown if new values are not valid. 235 */ 236 private void check(CoordinatorJobBean coordJob, Date newEndTime, Integer newConcurrency, Date newPauseTime) 237 throws CommandException { 238 if (coordJob.getStatus() == CoordinatorJob.Status.KILLED) { 239 throw new CommandException(ErrorCode.E1016); 240 } 241 242 if (newEndTime != null) { 243 checkEndTime(coordJob, newEndTime); 244 } 245 246 if (newPauseTime != null) { 247 checkPauseTime(coordJob, newPauseTime); 248 } 249 } 250 251 @Override 252 protected Void call(CoordinatorStore store) throws StoreException, CommandException { 253 try { 254 CoordinatorJobBean coordJob = store.getCoordinatorJob(jobId, false); 255 setLogInfo(coordJob); 256 257 check(coordJob, newEndTime, newConcurrency, newPauseTime); 258 259 if (newEndTime != null) { 260 coordJob.setEndTime(newEndTime); 261 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED) { 262 coordJob.setStatus(CoordinatorJob.Status.RUNNING); 263 } 264 } 265 266 if (newConcurrency != null) { 267 coordJob.setConcurrency(newConcurrency); 268 } 269 270 if (newPauseTime != null || resetPauseTime == true) { 271 coordJob.setPauseTime(newPauseTime); 272 if (!resetPauseTime) { 273 processLookaheadActions(coordJob, newPauseTime); 274 } 275 } 276 277 store.updateCoordinatorJob(coordJob); 278 279 return null; 280 } 281 catch (XException ex) { 282 throw new CommandException(ex); 283 } 284 } 285 286 @Override 287 protected Void execute(CoordinatorStore store) throws StoreException, CommandException { 288 LOG.info("STARTED CoordChangeCommand for jobId=" + jobId); 289 try { 290 if (lock(jobId)) { 291 call(store); 292 } 293 else { 294 throw new CommandException(ErrorCode.E0606, "job " + jobId 295 + " has been locked and cannot change value, please retry later"); 296 } 297 } 298 catch (InterruptedException e) { 299 throw new CommandException(ErrorCode.E0606, "acquiring lock for job " + jobId + " failed " 300 + " with exception " + e.getMessage()); 301 } 302 finally { 303 LOG.info("ENDED CoordChangeCommand for jobId=" + jobId); 304 } 305 return null; 306 } 307 }