001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.bundle; 019 020import java.util.ArrayList; 021import java.util.Date; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Map; 025import java.util.Set; 026 027import org.apache.oozie.BundleActionBean; 028import org.apache.oozie.BundleJobBean; 029import org.apache.oozie.ErrorCode; 030import org.apache.oozie.XException; 031import org.apache.oozie.client.Job; 032import org.apache.oozie.client.OozieClient; 033import org.apache.oozie.command.CommandException; 034import org.apache.oozie.command.PreconditionException; 035import org.apache.oozie.command.XCommand; 036import org.apache.oozie.command.coord.CoordChangeXCommand; 037import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; 038import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; 039import org.apache.oozie.executor.jpa.BatchQueryExecutor; 040import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 041import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; 042import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; 043import org.apache.oozie.executor.jpa.JPAExecutorException; 044import org.apache.oozie.util.DateUtils; 045import org.apache.oozie.util.JobUtils; 046import org.apache.oozie.util.LogUtils; 047import org.apache.oozie.util.ParamChecker; 048import org.apache.oozie.util.StatusUtils; 049 050public class BundleJobChangeXCommand extends XCommand<Void> { 051 private String jobId; 052 private String changeValue; 053 private List<BundleActionBean> bundleActions; 054 private BundleJobBean bundleJob; 055 private Date newPauseTime = null; 056 private Date newEndTime = null; 057 boolean isChangePauseTime = false; 058 boolean isChangeEndTime = false; 059 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 060 061 private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>(); 062 static { 063 ALLOWED_CHANGE_OPTIONS.add("pausetime"); 064 ALLOWED_CHANGE_OPTIONS.add("endtime"); 065 } 066 067 /** 068 * @param id bundle job id 069 * @param changeValue change value 070 * 071 * @throws CommandException thrown if failed to change bundle 072 */ 073 public BundleJobChangeXCommand(String id, String changeValue) throws CommandException { 074 super("bundle_change", "bundle_change", 1); 075 this.jobId = ParamChecker.notEmpty(id, "id"); 076 this.changeValue = ParamChecker.notEmpty(changeValue, "changeValue"); 077 } 078 079 /** 080 * Check if new pause time is future time. 081 * 082 * @param newPauseTime new pause time. 083 * @throws CommandException thrown if new pause time is not valid. 084 */ 085 private void checkPauseTime(Date newPauseTime) throws CommandException { 086 // New pauseTime has to be a non-past time. 087 Date d = new Date(); 088 if (newPauseTime.before(d)) { 089 throw new CommandException(ErrorCode.E1317, newPauseTime, "must be a non-past time"); 090 } 091 } 092 093 /** 094 * Check if new pause time is future time. 095 * 096 * @param newEndTime new end time, can be null meaning no change on end time. 097 * @throws CommandException thrown if new end time is not valid. 098 */ 099 private void checkEndTime(Date newEndTime) throws CommandException { 100 // New endTime has to be a non-past start time. 101 Date startTime = bundleJob.getKickoffTime(); 102 if (startTime != null && newEndTime.before(startTime)) { 103 throw new CommandException(ErrorCode.E1317, newEndTime, "must be greater then kickoff time"); 104 } 105 } 106 107 /** 108 * validate if change value is valid. 109 * 110 * @param changeValue change value. 111 * @throws CommandException thrown if changeValue cannot be parsed properly. 112 */ 113 private void validateChangeValue(String changeValue) throws CommandException { 114 Map<String, String> map = JobUtils.parseChangeValue(changeValue); 115 116 if (map.size() > ALLOWED_CHANGE_OPTIONS.size() 117 || !(map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME) || map 118 .containsKey(OozieClient.CHANGE_VALUE_ENDTIME))) { 119 throw new CommandException(ErrorCode.E1317, changeValue, "can only change pausetime or end time"); 120 } 121 122 if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) { 123 isChangePauseTime = true; 124 } 125 else if(map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)){ 126 isChangeEndTime = true; 127 } 128 else { 129 throw new CommandException(ErrorCode.E1317, changeValue, "should change pausetime or endtime"); 130 } 131 132 if(isChangePauseTime){ 133 String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME); 134 if (!value.equals("")) { 135 try { 136 newPauseTime = DateUtils.parseDateOozieTZ(value); 137 } 138 catch (Exception ex) { 139 throw new CommandException(ErrorCode.E1317, value, "is not a valid date"); 140 } 141 142 checkPauseTime(newPauseTime); 143 } 144 } 145 else if (isChangeEndTime){ 146 String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME); 147 if (!value.equals("")) { 148 try { 149 newEndTime = DateUtils.parseDateOozieTZ(value); 150 } 151 catch (Exception ex) { 152 throw new CommandException(ErrorCode.E1317, value, "is not a valid date"); 153 } 154 155 checkEndTime(newEndTime); 156 } 157 } 158 } 159 160 /* (non-Javadoc) 161 * @see org.apache.oozie.command.XCommand#execute() 162 */ 163 @Override 164 protected Void execute() throws CommandException { 165 StringBuffer changeReport = new StringBuffer(); 166 try { 167 if (isChangePauseTime || isChangeEndTime) { 168 if (isChangePauseTime) { 169 bundleJob.setPauseTime(newPauseTime); 170 } 171 else if (isChangeEndTime) { 172 bundleJob.setEndTime(newEndTime); 173 if (bundleJob.getStatus() == Job.Status.SUCCEEDED) { 174 bundleJob.setStatus(Job.Status.RUNNING); 175 } 176 if (bundleJob.getStatus() == Job.Status.DONEWITHERROR || bundleJob.getStatus() == Job.Status.FAILED) { 177 bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(Job.Status.RUNNINGWITHERROR)); 178 } 179 } 180 for (BundleActionBean action : this.bundleActions) { 181 // queue coord change commands; 182 if (action.getStatus() != Job.Status.KILLED && action.getCoordId() != null) { 183 try { 184 new CoordChangeXCommand(action.getCoordId(), changeValue).call(); 185 } 186 catch (Exception e) { 187 String errorMsg = action.getCoordId() + " : " + e.getMessage(); 188 LOG.info("Change command failed " + errorMsg); 189 changeReport.append("[ ").append(errorMsg).append(" ]"); 190 } 191 } 192 else { 193 String errorMsg = action.getCoordId() + " : Coord is in killed state"; 194 LOG.info("Change command failed " + errorMsg); 195 changeReport.append("[ ").append(errorMsg).append(" ]"); 196 } 197 } 198 updateList.add(new UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PAUSE_ENDTIME, 199 bundleJob)); 200 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 201 } 202 if(!changeReport.toString().isEmpty()){ 203 throw new CommandException(ErrorCode.E1320, changeReport.toString()); 204 } 205 return null; 206 } 207 catch (XException ex) { 208 throw new CommandException(ex); 209 } 210 } 211 212 /* (non-Javadoc) 213 * @see org.apache.oozie.command.XCommand#getEntityKey() 214 */ 215 @Override 216 public String getEntityKey() { 217 return this.jobId; 218 } 219 220 /* (non-Javadoc) 221 * @see org.apache.oozie.command.XCommand#isLockRequired() 222 */ 223 @Override 224 protected boolean isLockRequired() { 225 return true; 226 } 227 228 @Override 229 protected void loadState() throws CommandException { 230 try { 231 this.bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB, bundleJob.getId()); 232 this.bundleActions = BundleActionQueryExecutor.getInstance().getList( 233 BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, bundleJob.getId()); 234 } 235 catch (JPAExecutorException Ex) { 236 throw new CommandException(ErrorCode.E1311, this.jobId); 237 } 238 } 239 240 @Override 241 protected void verifyPrecondition() throws CommandException, PreconditionException { 242 } 243 244 @Override 245 protected void eagerLoadState() throws CommandException { 246 try { 247 this.bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, jobId); 248 LogUtils.setLogInfo(bundleJob, logInfo); 249 } 250 catch (JPAExecutorException ex) { 251 throw new CommandException(ex); 252 } 253 } 254 255 @Override 256 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 257 validateChangeValue(changeValue); 258 259 if (bundleJob == null) { 260 LOG.info("BundleChangeCommand not succeeded - " + "job " + jobId + " does not exist"); 261 throw new PreconditionException(ErrorCode.E1314, jobId); 262 } 263 if (isChangePauseTime) { 264 if (bundleJob.getStatus() == Job.Status.SUCCEEDED || bundleJob.getStatus() == Job.Status.FAILED 265 || bundleJob.getStatus() == Job.Status.KILLED || bundleJob.getStatus() == Job.Status.DONEWITHERROR) { 266 LOG.info("BundleChangeCommand not succeeded for changing pausetime- " + "job " + jobId + " finished, status is " 267 + bundleJob.getStatusStr()); 268 throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString()); 269 } 270 } 271 else if(isChangeEndTime){ 272 if (bundleJob.getStatus() == Job.Status.KILLED) { 273 LOG.info("BundleChangeCommand not succeeded for changing endtime- " + "job " + jobId + " finished, status is " 274 + bundleJob.getStatusStr()); 275 throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString()); 276 } 277 } 278 } 279}