001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.bundle; 020 021import java.util.ArrayList; 022import java.util.Date; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027 028import org.apache.oozie.BundleActionBean; 029import org.apache.oozie.BundleJobBean; 030import org.apache.oozie.ErrorCode; 031import org.apache.oozie.XException; 032import org.apache.oozie.client.Job; 033import org.apache.oozie.client.OozieClient; 034import org.apache.oozie.command.CommandException; 035import org.apache.oozie.command.PreconditionException; 036import org.apache.oozie.command.XCommand; 037import org.apache.oozie.command.coord.CoordChangeXCommand; 038import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; 039import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; 040import org.apache.oozie.executor.jpa.BatchQueryExecutor; 041import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 042import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; 043import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; 044import org.apache.oozie.executor.jpa.JPAExecutorException; 045import org.apache.oozie.util.DateUtils; 046import org.apache.oozie.util.JobUtils; 047import org.apache.oozie.util.LogUtils; 048import org.apache.oozie.util.ParamChecker; 049import org.apache.oozie.util.StatusUtils; 050 051public class BundleJobChangeXCommand extends XCommand<Void> { 052 private String jobId; 053 private String changeValue; 054 private List<BundleActionBean> bundleActions; 055 private BundleJobBean bundleJob; 056 private Date newPauseTime = null; 057 private Date newEndTime = null; 058 boolean isChangePauseTime = false; 059 boolean isChangeEndTime = false; 060 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 061 062 private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>(); 063 static { 064 ALLOWED_CHANGE_OPTIONS.add("pausetime"); 065 ALLOWED_CHANGE_OPTIONS.add("endtime"); 066 } 067 068 /** 069 * @param id bundle job id 070 * @param changeValue change value 071 * 072 * @throws CommandException thrown if failed to change bundle 073 */ 074 public BundleJobChangeXCommand(String id, String changeValue) throws CommandException { 075 super("bundle_change", "bundle_change", 1); 076 this.jobId = ParamChecker.notEmpty(id, "id"); 077 this.changeValue = ParamChecker.notEmpty(changeValue, "changeValue"); 078 } 079 080 /** 081 * Check if new pause time is future time. 082 * 083 * @param newPauseTime new pause time. 084 * @throws CommandException thrown if new pause time is not valid. 085 */ 086 private void checkPauseTime(Date newPauseTime) throws CommandException { 087 // New pauseTime has to be a non-past time. 088 Date d = new Date(); 089 if (newPauseTime.before(d)) { 090 throw new CommandException(ErrorCode.E1317, newPauseTime, "must be a non-past time"); 091 } 092 } 093 094 /** 095 * Check if new pause time is future time. 096 * 097 * @param newEndTime new end time, can be null meaning no change on end time. 098 * @throws CommandException thrown if new end time is not valid. 099 */ 100 private void checkEndTime(Date newEndTime) throws CommandException { 101 // New endTime has to be a non-past start time. 102 Date startTime = bundleJob.getKickoffTime(); 103 if (startTime != null && newEndTime.before(startTime)) { 104 throw new CommandException(ErrorCode.E1317, newEndTime, "must be greater then kickoff time"); 105 } 106 } 107 108 /** 109 * validate if change value is valid. 110 * 111 * @param changeValue change value. 112 * @throws CommandException thrown if changeValue cannot be parsed properly. 113 */ 114 private void validateChangeValue(String changeValue) throws CommandException { 115 Map<String, String> map = JobUtils.parseChangeValue(changeValue); 116 117 if (map.size() > ALLOWED_CHANGE_OPTIONS.size() 118 || !(map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME) || map 119 .containsKey(OozieClient.CHANGE_VALUE_ENDTIME))) { 120 throw new CommandException(ErrorCode.E1317, changeValue, "can only change pausetime or end time"); 121 } 122 123 if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) { 124 isChangePauseTime = true; 125 } 126 else if(map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)){ 127 isChangeEndTime = true; 128 } 129 else { 130 throw new CommandException(ErrorCode.E1317, changeValue, "should change pausetime or endtime"); 131 } 132 133 if(isChangePauseTime){ 134 String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME); 135 if (!value.equals("")) { 136 try { 137 newPauseTime = DateUtils.parseDateOozieTZ(value); 138 } 139 catch (Exception ex) { 140 throw new CommandException(ErrorCode.E1317, value, "is not a valid date"); 141 } 142 143 checkPauseTime(newPauseTime); 144 } 145 } 146 else if (isChangeEndTime){ 147 String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME); 148 if (!value.equals("")) { 149 try { 150 newEndTime = DateUtils.parseDateOozieTZ(value); 151 } 152 catch (Exception ex) { 153 throw new CommandException(ErrorCode.E1317, value, "is not a valid date"); 154 } 155 156 checkEndTime(newEndTime); 157 } 158 } 159 } 160 161 /* (non-Javadoc) 162 * @see org.apache.oozie.command.XCommand#execute() 163 */ 164 @Override 165 protected Void execute() throws CommandException { 166 StringBuffer changeReport = new StringBuffer(); 167 try { 168 if (isChangePauseTime || isChangeEndTime) { 169 if (isChangePauseTime) { 170 bundleJob.setPauseTime(newPauseTime); 171 } 172 else if (isChangeEndTime) { 173 bundleJob.setEndTime(newEndTime); 174 if (bundleJob.getStatus() == Job.Status.SUCCEEDED) { 175 bundleJob.setStatus(Job.Status.RUNNING); 176 } 177 if (bundleJob.getStatus() == Job.Status.DONEWITHERROR || bundleJob.getStatus() == Job.Status.FAILED) { 178 bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(Job.Status.RUNNINGWITHERROR)); 179 } 180 } 181 for (BundleActionBean action : this.bundleActions) { 182 // queue coord change commands; 183 if (action.getStatus() != Job.Status.KILLED && action.getCoordId() != null) { 184 try { 185 new CoordChangeXCommand(action.getCoordId(), changeValue).call(); 186 } 187 catch (Exception e) { 188 String errorMsg = action.getCoordId() + " : " + e.getMessage(); 189 LOG.info("Change command failed " + errorMsg); 190 changeReport.append("[ ").append(errorMsg).append(" ]"); 191 } 192 } 193 else { 194 String errorMsg = action.getCoordId() + " : Coord is in killed state"; 195 LOG.info("Change command failed " + errorMsg); 196 changeReport.append("[ ").append(errorMsg).append(" ]"); 197 } 198 } 199 updateList.add(new UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PAUSE_ENDTIME, 200 bundleJob)); 201 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 202 } 203 if(!changeReport.toString().isEmpty()){ 204 throw new CommandException(ErrorCode.E1320, changeReport.toString()); 205 } 206 return null; 207 } 208 catch (XException ex) { 209 throw new CommandException(ex); 210 } 211 } 212 213 /* (non-Javadoc) 214 * @see org.apache.oozie.command.XCommand#getEntityKey() 215 */ 216 @Override 217 public String getEntityKey() { 218 return this.jobId; 219 } 220 221 /* (non-Javadoc) 222 * @see org.apache.oozie.command.XCommand#isLockRequired() 223 */ 224 @Override 225 protected boolean isLockRequired() { 226 return true; 227 } 228 229 @Override 230 protected void loadState() throws CommandException { 231 try { 232 this.bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB, bundleJob.getId()); 233 this.bundleActions = BundleActionQueryExecutor.getInstance().getList( 234 BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, bundleJob.getId()); 235 } 236 catch (JPAExecutorException Ex) { 237 throw new CommandException(ErrorCode.E1311, this.jobId); 238 } 239 } 240 241 @Override 242 protected void verifyPrecondition() throws CommandException, PreconditionException { 243 } 244 245 @Override 246 protected void eagerLoadState() throws CommandException { 247 try { 248 this.bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, jobId); 249 LogUtils.setLogInfo(bundleJob); 250 } 251 catch (JPAExecutorException ex) { 252 throw new CommandException(ex); 253 } 254 } 255 256 @Override 257 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 258 validateChangeValue(changeValue); 259 260 if (bundleJob == null) { 261 LOG.info("BundleChangeCommand not succeeded - " + "job " + jobId + " does not exist"); 262 throw new PreconditionException(ErrorCode.E1314, jobId); 263 } 264 if (isChangePauseTime) { 265 if (bundleJob.getStatus() == Job.Status.SUCCEEDED || bundleJob.getStatus() == Job.Status.FAILED 266 || bundleJob.getStatus() == Job.Status.KILLED || bundleJob.getStatus() == Job.Status.DONEWITHERROR) { 267 LOG.info("BundleChangeCommand not succeeded for changing pausetime- " + "job " + jobId + " finished, status is " 268 + bundleJob.getStatusStr()); 269 throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString()); 270 } 271 } 272 else if(isChangeEndTime){ 273 if (bundleJob.getStatus() == Job.Status.KILLED) { 274 LOG.info("BundleChangeCommand not succeeded for changing endtime- " + "job " + jobId + " finished, status is " 275 + bundleJob.getStatusStr()); 276 throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString()); 277 } 278 } 279 } 280}