001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 020package org.apache.oozie.command.coord; 021 022import java.io.ByteArrayOutputStream; 023import java.io.IOException; 024import java.io.StringReader; 025import java.util.Date; 026 027import org.apache.commons.lang.StringUtils; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.oozie.CoordinatorJobBean; 030import org.apache.oozie.ErrorCode; 031import org.apache.oozie.client.CoordinatorJob; 032import org.apache.oozie.client.OozieClient; 033import org.apache.oozie.command.CommandException; 034import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 035import org.apache.oozie.executor.jpa.JPAExecutorException; 036import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 037import org.apache.oozie.service.JPAService; 038import org.apache.oozie.service.Services; 039import org.apache.oozie.util.LogUtils; 040import org.apache.oozie.util.XConfiguration; 041import org.apache.oozie.util.XmlUtils; 042import org.eclipse.jgit.diff.DiffFormatter; 043import org.eclipse.jgit.diff.EditList; 044import org.eclipse.jgit.diff.HistogramDiff; 045import org.eclipse.jgit.diff.RawText; 046import org.eclipse.jgit.diff.RawTextComparator; 047import org.jdom.Element; 048 049/** 050 * This class provides the functionalities to update coordinator job XML and properties. It uses CoordSubmitXCommand 051 * functionality to validate XML and resolve all the variables or properties using job configurations. 052 */ 053public class CoordUpdateXCommand extends CoordSubmitXCommand { 054 055 private final String jobId; 056 private boolean showDiff = true; 057 private boolean isConfChange = false; 058 059 //This properties are set in coord jobs by bundle. An update command should not overide it. 060 final static String[] bundleConfList = new String[] { OozieClient.BUNDLE_ID }; 061 062 StringBuffer diff = new StringBuffer(); 063 CoordinatorJobBean oldCoordJob = null; 064 065 public CoordUpdateXCommand(boolean dryrun, Configuration conf, String jobId) { 066 super(dryrun, conf); 067 this.jobId = jobId; 068 isConfChange = conf.size() == 0 ? false : true; 069 } 070 071 public CoordUpdateXCommand(boolean dryrun, Configuration conf, String jobId, boolean showDiff) { 072 super(dryrun, conf); 073 this.jobId = jobId; 074 this.showDiff = showDiff; 075 isConfChange = conf.size() == 0 ? false : true; 076 } 077 078 @Override 079 protected String storeToDB(String xmlElement, Element eJob, CoordinatorJobBean coordJob) throws CommandException { 080 check(oldCoordJob, coordJob); 081 computeDiff(eJob); 082 oldCoordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH)); 083 if (isConfChange) { 084 oldCoordJob.setConf(XmlUtils.prettyPrint(conf).toString()); 085 } 086 oldCoordJob.setMatThrottling(coordJob.getMatThrottling()); 087 oldCoordJob.setOrigJobXml(xmlElement); 088 oldCoordJob.setConcurrency(coordJob.getConcurrency()); 089 oldCoordJob.setExecution(coordJob.getExecution()); 090 oldCoordJob.setTimeout(coordJob.getTimeout()); 091 oldCoordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString()); 092 093 094 if (!dryrun) { 095 oldCoordJob.setLastModifiedTime(new Date()); 096 // Should log the changes, this should be useful for debugging. 097 LOG.info("Coord update changes : " + diff.toString()); 098 try { 099 CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, oldCoordJob); 100 } 101 catch (JPAExecutorException jpaee) { 102 throw new CommandException(jpaee); 103 } 104 } 105 return jobId; 106 } 107 108 @Override 109 protected void loadState() throws CommandException { 110 super.loadState(); 111 jpaService = Services.get().get(JPAService.class); 112 if (jpaService == null) { 113 throw new CommandException(ErrorCode.E0610); 114 } 115 coordJob = new CoordinatorJobBean(); 116 try { 117 oldCoordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId); 118 } 119 catch (JPAExecutorException e) { 120 throw new CommandException(e); 121 } 122 123 LogUtils.setLogInfo(oldCoordJob); 124 if (!isConfChange || StringUtils.isEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH))) { 125 try { 126 XConfiguration jobConf = new XConfiguration(new StringReader(oldCoordJob.getConf())); 127 128 if (!isConfChange) { 129 conf = jobConf; 130 } 131 else { 132 for (String bundleConfKey : bundleConfList) { 133 if (jobConf.get(bundleConfKey) != null) { 134 conf.set(bundleConfKey, jobConf.get(bundleConfKey)); 135 } 136 } 137 if (StringUtils.isEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH))) { 138 conf.set(OozieClient.COORDINATOR_APP_PATH, jobConf.get(OozieClient.COORDINATOR_APP_PATH)); 139 } 140 } 141 } 142 catch (Exception e) { 143 throw new CommandException(ErrorCode.E1023, e.getMessage(), e); 144 } 145 } 146 coordJob.setConf(XmlUtils.prettyPrint(conf).toString()); 147 setJob(coordJob); 148 LogUtils.setLogInfo(coordJob); 149 150 } 151 152 @Override 153 protected void verifyPrecondition() throws CommandException { 154 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED 155 || coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR) { 156 LOG.info("Can't update coord job. Job has finished processing"); 157 throw new CommandException(ErrorCode.E1023, "Can't update coord job. Job has finished processing"); 158 } 159 } 160 161 /** 162 * Gets the difference of job definition and properties. 163 * 164 * @param eJob the e job 165 * @return the diff 166 */ 167 168 private void computeDiff(Element eJob) { 169 try { 170 diff.append("**********Job definition changes**********").append(System.getProperty("line.separator")); 171 diff.append(getDiffinGitFormat(oldCoordJob.getJobXml(), XmlUtils.prettyPrint(eJob).toString())); 172 diff.append("******************************************").append(System.getProperty("line.separator")); 173 diff.append("**********Job conf changes****************").append(System.getProperty("line.separator")); 174 if (isConfChange) { 175 diff.append(getDiffinGitFormat(oldCoordJob.getConf(), XmlUtils.prettyPrint(conf).toString())); 176 } 177 else { 178 diff.append("No conf update requested").append(System.getProperty("line.separator")); 179 } 180 diff.append("******************************************").append(System.getProperty("line.separator")); 181 } 182 catch (IOException e) { 183 diff.append("Error computing diff. Error " + e.getMessage()); 184 LOG.warn("Error computing diff.", e); 185 } 186 } 187 188 /** 189 * Get the differences in git format. 190 * 191 * @param string1 the string1 192 * @param string2 the string2 193 * @return the diff 194 * @throws IOException Signals that an I/O exception has occurred. 195 */ 196 private String getDiffinGitFormat(String string1, String string2) throws IOException { 197 ByteArrayOutputStream out = new ByteArrayOutputStream(); 198 RawText rt1 = new RawText(string1.getBytes()); 199 RawText rt2 = new RawText(string2.getBytes()); 200 EditList diffList = new EditList(); 201 diffList.addAll(new HistogramDiff().diff(RawTextComparator.DEFAULT, rt1, rt2)); 202 new DiffFormatter(out).format(diffList, rt1, rt2); 203 return out.toString(); 204 } 205 206 @Override 207 protected String submit() throws CommandException { 208 LOG.info("STARTED Coordinator update"); 209 submitJob(); 210 LOG.info("ENDED Coordinator update"); 211 if (showDiff) { 212 return diff.toString(); 213 } 214 else { 215 return ""; 216 } 217 } 218 219 /** 220 * Check. Frequency can't be changed. EndTime can't be changed. StartTime can't be changed. AppName can't be changed 221 * Timeunit can't be changed. Timezone can't be changed 222 * 223 * @param oldCoord the old coord 224 * @param newCoord the new coord 225 * @throws CommandException the command exception 226 */ 227 public void check(CoordinatorJobBean oldCoord, CoordinatorJobBean newCoord) throws CommandException { 228 if (!oldCoord.getFrequency().equals(newCoord.getFrequency())) { 229 throw new CommandException(ErrorCode.E1023, "Frequency can't be changed. Old frequency = " 230 + oldCoord.getFrequency() + " new frequency = " + newCoord.getFrequency()); 231 } 232 233 if (!oldCoord.getEndTime().equals(newCoord.getEndTime())) { 234 throw new CommandException(ErrorCode.E1023, "End time can't be changed. Old end time = " 235 + oldCoord.getEndTime() + " new end time = " + newCoord.getEndTime()); 236 } 237 238 if (!oldCoord.getStartTime().equals(newCoord.getStartTime())) { 239 throw new CommandException(ErrorCode.E1023, "Start time can't be changed. Old start time = " 240 + oldCoord.getStartTime() + " new start time = " + newCoord.getStartTime()); 241 } 242 243 if (!oldCoord.getAppName().equals(newCoord.getAppName())) { 244 throw new CommandException(ErrorCode.E1023, "Coord name can't be changed. Old name = " 245 + oldCoord.getAppName() + " new name = " + newCoord.getAppName()); 246 } 247 248 if (!oldCoord.getTimeUnitStr().equals(newCoord.getTimeUnitStr())) { 249 throw new CommandException(ErrorCode.E1023, "Timeunit can't be changed. Old Timeunit = " 250 + oldCoord.getTimeUnitStr() + " new Timeunit = " + newCoord.getTimeUnitStr()); 251 } 252 253 if (!oldCoord.getTimeZone().equals(newCoord.getTimeZone())) { 254 throw new CommandException(ErrorCode.E1023, "TimeZone can't be changed. Old timeZone = " 255 + oldCoord.getTimeZone() + " new timeZone = " + newCoord.getTimeZone()); 256 } 257 258 } 259 260 @Override 261 protected void queueMaterializeTransitionXCommand(String jobId) { 262 } 263 264 @Override 265 public void notifyParent() throws CommandException { 266 } 267 268 @Override 269 protected boolean isLockRequired() { 270 return true; 271 } 272 273 @Override 274 public String getEntityKey() { 275 return jobId; 276 } 277 278 @Override 279 public void transitToNext() { 280 } 281 282 @Override 283 public String getKey() { 284 return getName() + "_" + jobId; 285 } 286 287 @Override 288 public String getDryRun(CoordinatorJobBean job) throws Exception{ 289 return super.getDryRun(oldCoordJob); 290 } 291}