001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.bundle; 019 020 import java.util.ArrayList; 021 import java.util.Date; 022 import java.util.HashSet; 023 import java.util.List; 024 import java.util.Map; 025 import java.util.Set; 026 027 import org.apache.oozie.BundleActionBean; 028 import org.apache.oozie.BundleJobBean; 029 import org.apache.oozie.ErrorCode; 030 import org.apache.oozie.XException; 031 import org.apache.oozie.client.Job; 032 import org.apache.oozie.client.OozieClient; 033 import org.apache.oozie.client.rest.JsonBean; 034 import org.apache.oozie.command.CommandException; 035 import org.apache.oozie.command.PreconditionException; 036 import org.apache.oozie.command.XCommand; 037 import org.apache.oozie.command.coord.CoordChangeXCommand; 038 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 039 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor; 040 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 041 import org.apache.oozie.service.JPAService; 042 import org.apache.oozie.service.Services; 043 import org.apache.oozie.util.DateUtils; 044 import org.apache.oozie.util.JobUtils; 045 import org.apache.oozie.util.LogUtils; 046 import org.apache.oozie.util.ParamChecker; 047 import org.apache.oozie.util.StatusUtils; 048 049 public class BundleJobChangeXCommand extends XCommand<Void> { 050 private String jobId; 051 private String changeValue; 052 private JPAService jpaService; 053 private List<BundleActionBean> bundleActions; 054 private BundleJobBean bundleJob; 055 private Date newPauseTime = null; 056 private Date newEndTime = null; 057 boolean isChangePauseTime = false; 058 boolean isChangeEndTime = false; 059 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 060 061 private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>(); 062 static { 063 ALLOWED_CHANGE_OPTIONS.add("pausetime"); 064 ALLOWED_CHANGE_OPTIONS.add("endtime"); 065 } 066 067 /** 068 * @param id bundle job id 069 * @param changeValue change value 070 * 071 * @throws CommandException thrown if failed to change bundle 072 */ 073 public BundleJobChangeXCommand(String id, String changeValue) throws CommandException { 074 super("bundle_change", "bundle_change", 1); 075 this.jobId = ParamChecker.notEmpty(id, "id"); 076 this.changeValue = ParamChecker.notEmpty(changeValue, "changeValue"); 077 } 078 079 /** 080 * Check if new pause time is future time. 081 * 082 * @param newPauseTime new pause time. 083 * @throws CommandException thrown if new pause time is not valid. 084 */ 085 private void checkPauseTime(Date newPauseTime) throws CommandException { 086 // New pauseTime has to be a non-past time. 087 Date d = new Date(); 088 if (newPauseTime.before(d)) { 089 throw new CommandException(ErrorCode.E1317, newPauseTime, "must be a non-past time"); 090 } 091 } 092 093 /** 094 * Check if new pause time is future time. 095 * 096 * @param newEndTime new end time, can be null meaning no change on end time. 097 * @throws CommandException thrown if new end time is not valid. 098 */ 099 private void checkEndTime(Date newEndTime) throws CommandException { 100 // New endTime has to be a non-past start time. 101 Date startTime = bundleJob.getKickoffTime(); 102 if (startTime != null && newEndTime.before(startTime)) { 103 throw new CommandException(ErrorCode.E1317, newEndTime, "must be greater then kickoff time"); 104 } 105 } 106 107 /** 108 * validate if change value is valid. 109 * 110 * @param changeValue change value. 111 * @throws CommandException thrown if changeValue cannot be parsed properly. 112 */ 113 private void validateChangeValue(String changeValue) throws CommandException { 114 Map<String, String> map = JobUtils.parseChangeValue(changeValue); 115 116 if (map.size() > ALLOWED_CHANGE_OPTIONS.size() || !(map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME) || map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME))) { 117 throw new CommandException(ErrorCode.E1317, changeValue, "can only change pausetime or end time"); 118 } 119 120 if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) { 121 isChangePauseTime = true; 122 } 123 else if(map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)){ 124 isChangeEndTime = true; 125 } 126 else { 127 throw new CommandException(ErrorCode.E1317, changeValue, "should change pausetime or endtime"); 128 } 129 130 if(isChangePauseTime){ 131 String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME); 132 if (!value.equals("")) { 133 try { 134 newPauseTime = DateUtils.parseDateOozieTZ(value); 135 } 136 catch (Exception ex) { 137 throw new CommandException(ErrorCode.E1317, value, "is not a valid date"); 138 } 139 140 checkPauseTime(newPauseTime); 141 } 142 } 143 else if (isChangeEndTime){ 144 String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME); 145 if (!value.equals("")) { 146 try { 147 newEndTime = DateUtils.parseDateOozieTZ(value); 148 } 149 catch (Exception ex) { 150 throw new CommandException(ErrorCode.E1317, value, "is not a valid date"); 151 } 152 153 checkEndTime(newEndTime); 154 } 155 } 156 } 157 158 /* (non-Javadoc) 159 * @see org.apache.oozie.command.XCommand#execute() 160 */ 161 @Override 162 protected Void execute() throws CommandException { 163 try { 164 if (isChangePauseTime || isChangeEndTime) { 165 if (isChangePauseTime) { 166 bundleJob.setPauseTime(newPauseTime); 167 } 168 else if (isChangeEndTime) { 169 bundleJob.setEndTime(newEndTime); 170 if (bundleJob.getStatus() == Job.Status.SUCCEEDED) { 171 bundleJob.setStatus(Job.Status.RUNNING); 172 } 173 if (bundleJob.getStatus() == Job.Status.DONEWITHERROR || bundleJob.getStatus() == Job.Status.FAILED) { 174 bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(Job.Status.RUNNINGWITHERROR)); 175 } 176 } 177 for (BundleActionBean action : this.bundleActions) { 178 // queue coord change commands; 179 if (action.getStatus() != Job.Status.KILLED && action.getCoordId() != null) { 180 queue(new CoordChangeXCommand(action.getCoordId(), changeValue)); 181 LOG.info("Queuing CoordChangeXCommand coord job = " + action.getCoordId() + " to change " 182 + changeValue); 183 action.setPending(action.getPending() + 1); 184 updateList.add(action); 185 } 186 } 187 updateList.add(bundleJob); 188 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null)); 189 } 190 return null; 191 } 192 catch (XException ex) { 193 throw new CommandException(ex); 194 } 195 } 196 197 /* (non-Javadoc) 198 * @see org.apache.oozie.command.XCommand#getEntityKey() 199 */ 200 @Override 201 public String getEntityKey() { 202 return this.jobId; 203 } 204 205 /* (non-Javadoc) 206 * @see org.apache.oozie.command.XCommand#isLockRequired() 207 */ 208 @Override 209 protected boolean isLockRequired() { 210 return true; 211 } 212 213 /* (non-Javadoc) 214 * @see org.apache.oozie.command.XCommand#loadState() 215 */ 216 @Override 217 protected void loadState() throws CommandException { 218 try{ 219 eagerLoadState(); 220 this.bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(jobId)); 221 } 222 catch(Exception Ex){ 223 throw new CommandException(ErrorCode.E1311,this.jobId); 224 } 225 } 226 227 /* (non-Javadoc) 228 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 229 */ 230 @Override 231 protected void verifyPrecondition() throws CommandException, PreconditionException { 232 } 233 234 /* (non-Javadoc) 235 * @see org.apache.oozie.command.XCommand#eagerLoadState() 236 */ 237 @Override 238 protected void eagerLoadState() throws CommandException { 239 try { 240 jpaService = Services.get().get(JPAService.class); 241 242 if (jpaService != null) { 243 this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId)); 244 LogUtils.setLogInfo(bundleJob, logInfo); 245 } 246 else { 247 throw new CommandException(ErrorCode.E0610); 248 } 249 } 250 catch (XException ex) { 251 throw new CommandException(ex); 252 } 253 } 254 255 /* (non-Javadoc) 256 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition() 257 */ 258 @Override 259 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 260 validateChangeValue(changeValue); 261 262 if (bundleJob == null) { 263 LOG.info("BundleChangeCommand not succeeded - " + "job " + jobId + " does not exist"); 264 throw new PreconditionException(ErrorCode.E1314, jobId); 265 } 266 if (isChangePauseTime) { 267 if (bundleJob.getStatus() == Job.Status.SUCCEEDED || bundleJob.getStatus() == Job.Status.FAILED 268 || bundleJob.getStatus() == Job.Status.KILLED || bundleJob.getStatus() == Job.Status.DONEWITHERROR 269 || bundleJob == null) { 270 LOG.info("BundleChangeCommand not succeeded for changing pausetime- " + "job " + jobId + " finished, status is " 271 + bundleJob.getStatusStr()); 272 throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString()); 273 } 274 } 275 else if(isChangeEndTime){ 276 if (bundleJob.getStatus() == Job.Status.KILLED || bundleJob == null) { 277 LOG.info("BundleChangeCommand not succeeded for changing endtime- " + "job " + jobId + " finished, status is " 278 + bundleJob.getStatusStr()); 279 throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString()); 280 } 281 } 282 } 283 }