001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.coord; 020 021import java.io.ByteArrayOutputStream; 022import java.io.IOException; 023import java.io.StringReader; 024import java.util.Date; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.oozie.CoordinatorJobBean; 027import org.apache.oozie.ErrorCode; 028import org.apache.oozie.client.CoordinatorJob; 029import org.apache.oozie.client.OozieClient; 030import org.apache.oozie.command.CommandException; 031import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 032import org.apache.oozie.executor.jpa.JPAExecutorException; 033import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 034import org.apache.oozie.service.JPAService; 035import org.apache.oozie.service.Services; 036import org.apache.oozie.util.LogUtils; 037import org.apache.oozie.util.XConfiguration; 038import org.apache.oozie.util.XmlUtils; 039import org.eclipse.jgit.diff.DiffFormatter; 040import org.eclipse.jgit.diff.EditList; 041import org.eclipse.jgit.diff.HistogramDiff; 042import org.eclipse.jgit.diff.RawText; 043import org.eclipse.jgit.diff.RawTextComparator; 044import org.jdom.Element; 045 046/** 047 * This class provides the functionalities to update coordinator job XML and properties. It uses CoordSubmitXCommand 048 * functionality to validate XML and resolve all the variables or properties using job configurations. 049 */ 050public class CoordUpdateXCommand extends CoordSubmitXCommand { 051 052 private final String jobId; 053 private boolean showDiff = true; 054 private boolean isConfChange = false; 055 056 StringBuffer diff = new StringBuffer(); 057 CoordinatorJobBean oldCoordJob = null; 058 059 public CoordUpdateXCommand(boolean dryrun, Configuration conf, String jobId) { 060 super(dryrun, conf); 061 this.jobId = jobId; 062 isConfChange = conf.size() == 0 ? false : true; 063 } 064 065 public CoordUpdateXCommand(boolean dryrun, Configuration conf, String jobId, boolean showDiff) { 066 super(dryrun, conf); 067 this.jobId = jobId; 068 this.showDiff = showDiff; 069 isConfChange = conf.size() == 0 ? false : true; 070 } 071 072 @Override 073 protected String storeToDB(String xmlElement, Element eJob, CoordinatorJobBean coordJob) throws CommandException { 074 check(oldCoordJob, coordJob); 075 computeDiff(eJob); 076 oldCoordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH)); 077 if (isConfChange) { 078 oldCoordJob.setConf(XmlUtils.prettyPrint(conf).toString()); 079 } 080 oldCoordJob.setMatThrottling(coordJob.getMatThrottling()); 081 oldCoordJob.setOrigJobXml(xmlElement); 082 oldCoordJob.setConcurrency(coordJob.getConcurrency()); 083 oldCoordJob.setExecution(coordJob.getExecution()); 084 oldCoordJob.setTimeout(coordJob.getTimeout()); 085 oldCoordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString()); 086 087 088 if (!dryrun) { 089 oldCoordJob.setLastModifiedTime(new Date()); 090 // Should log the changes, this should be useful for debugging. 091 LOG.info("Coord update changes : " + diff.toString()); 092 try { 093 CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, oldCoordJob); 094 } 095 catch (JPAExecutorException jpaee) { 096 throw new CommandException(jpaee); 097 } 098 } 099 return jobId; 100 } 101 102 @Override 103 protected void loadState() throws CommandException { 104 super.loadState(); 105 jpaService = Services.get().get(JPAService.class); 106 if (jpaService == null) { 107 throw new CommandException(ErrorCode.E0610); 108 } 109 coordJob = new CoordinatorJobBean(); 110 try { 111 oldCoordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId); 112 } 113 catch (JPAExecutorException e) { 114 throw new CommandException(e); 115 } 116 117 LogUtils.setLogInfo(oldCoordJob, logInfo); 118 if (!isConfChange) { 119 try { 120 conf = new XConfiguration(new StringReader(oldCoordJob.getConf())); 121 } 122 catch (Exception e) { 123 throw new CommandException(ErrorCode.E1023, e.getMessage(), e); 124 } 125 } 126 coordJob.setConf(XmlUtils.prettyPrint(conf).toString()); 127 setJob(coordJob); 128 } 129 130 @Override 131 protected void verifyPrecondition() throws CommandException { 132 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED 133 || coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR) { 134 LOG.info("Can't update coord job. Job has finished processing"); 135 throw new CommandException(ErrorCode.E1023, "Can't update coord job. Job has finished processing"); 136 } 137 } 138 139 /** 140 * Gets the difference of job definition and properties. 141 * 142 * @param eJob the e job 143 * @return the diff 144 */ 145 146 private void computeDiff(Element eJob) { 147 try { 148 diff.append("**********Job definition changes**********").append(System.getProperty("line.separator")); 149 diff.append(getDiffinGitFormat(oldCoordJob.getJobXml(), XmlUtils.prettyPrint(eJob).toString())); 150 diff.append("******************************************").append(System.getProperty("line.separator")); 151 diff.append("**********Job conf changes****************").append(System.getProperty("line.separator")); 152 if (isConfChange) { 153 diff.append(getDiffinGitFormat(oldCoordJob.getConf(), XmlUtils.prettyPrint(conf).toString())); 154 } 155 else { 156 diff.append("No conf update requested").append(System.getProperty("line.separator")); 157 } 158 diff.append("******************************************").append(System.getProperty("line.separator")); 159 } 160 catch (IOException e) { 161 diff.append("Error computing diff. Error " + e.getMessage()); 162 LOG.warn("Error computing diff.", e); 163 } 164 } 165 166 /** 167 * Get the differences in git format. 168 * 169 * @param string1 the string1 170 * @param string2 the string2 171 * @return the diff 172 * @throws IOException Signals that an I/O exception has occurred. 173 */ 174 private String getDiffinGitFormat(String string1, String string2) throws IOException { 175 ByteArrayOutputStream out = new ByteArrayOutputStream(); 176 RawText rt1 = new RawText(string1.getBytes()); 177 RawText rt2 = new RawText(string2.getBytes()); 178 EditList diffList = new EditList(); 179 diffList.addAll(new HistogramDiff().diff(RawTextComparator.DEFAULT, rt1, rt2)); 180 new DiffFormatter(out).format(diffList, rt1, rt2); 181 return out.toString(); 182 } 183 184 @Override 185 protected String submit() throws CommandException { 186 LOG.info("STARTED Coordinator update"); 187 submitJob(); 188 LOG.info("ENDED Coordinator update"); 189 if (showDiff) { 190 return diff.toString(); 191 } 192 else { 193 return ""; 194 } 195 } 196 197 /** 198 * Check. Frequency can't be changed. EndTime can't be changed. StartTime can't be changed. AppName can't be changed 199 * Timeunit can't be changed. Timezone can't be changed 200 * 201 * @param oldCoord the old coord 202 * @param newCoord the new coord 203 * @throws CommandException the command exception 204 */ 205 public void check(CoordinatorJobBean oldCoord, CoordinatorJobBean newCoord) throws CommandException { 206 if (!oldCoord.getFrequency().equals(newCoord.getFrequency())) { 207 throw new CommandException(ErrorCode.E1023, "Frequency can't be changed. Old frequency = " 208 + oldCoord.getFrequency() + " new frequency = " + newCoord.getFrequency()); 209 } 210 211 if (!oldCoord.getEndTime().equals(newCoord.getEndTime())) { 212 throw new CommandException(ErrorCode.E1023, "End time can't be changed. Old end time = " 213 + oldCoord.getEndTime() + " new end time = " + newCoord.getEndTime()); 214 } 215 216 if (!oldCoord.getStartTime().equals(newCoord.getStartTime())) { 217 throw new CommandException(ErrorCode.E1023, "Start time can't be changed. Old start time = " 218 + oldCoord.getStartTime() + " new start time = " + newCoord.getStartTime()); 219 } 220 221 if (!oldCoord.getAppName().equals(newCoord.getAppName())) { 222 throw new CommandException(ErrorCode.E1023, "Coord name can't be changed. Old name = " 223 + oldCoord.getAppName() + " new name = " + newCoord.getAppName()); 224 } 225 226 if (!oldCoord.getTimeUnitStr().equals(newCoord.getTimeUnitStr())) { 227 throw new CommandException(ErrorCode.E1023, "Timeunit can't be changed. Old Timeunit = " 228 + oldCoord.getTimeUnitStr() + " new Timeunit = " + newCoord.getTimeUnitStr()); 229 } 230 231 if (!oldCoord.getTimeZone().equals(newCoord.getTimeZone())) { 232 throw new CommandException(ErrorCode.E1023, "TimeZone can't be changed. Old timeZone = " 233 + oldCoord.getTimeZone() + " new timeZone = " + newCoord.getTimeZone()); 234 } 235 236 } 237 238 @Override 239 protected void queueMaterializeTransitionXCommand(String jobId) { 240 } 241 242 @Override 243 public void notifyParent() throws CommandException { 244 } 245 246 @Override 247 protected boolean isLockRequired() { 248 return true; 249 } 250 251 @Override 252 public String getEntityKey() { 253 return jobId; 254 } 255 256 @Override 257 public void transitToNext() { 258 } 259 260 @Override 261 public String getKey() { 262 return getName() + "_" + jobId; 263 } 264 265 @Override 266 public String getDryRun(CoordinatorJobBean job) throws Exception{ 267 return super.getDryRun(oldCoordJob); 268 } 269}