001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.coord;
020
021import java.io.ByteArrayOutputStream;
022import java.io.IOException;
023import java.io.StringReader;
024import java.util.Date;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.oozie.CoordinatorJobBean;
027import org.apache.oozie.ErrorCode;
028import org.apache.oozie.client.CoordinatorJob;
029import org.apache.oozie.client.OozieClient;
030import org.apache.oozie.command.CommandException;
031import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
032import org.apache.oozie.executor.jpa.JPAExecutorException;
033import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
034import org.apache.oozie.service.JPAService;
035import org.apache.oozie.service.Services;
036import org.apache.oozie.util.LogUtils;
037import org.apache.oozie.util.XConfiguration;
038import org.apache.oozie.util.XmlUtils;
039import org.eclipse.jgit.diff.DiffFormatter;
040import org.eclipse.jgit.diff.EditList;
041import org.eclipse.jgit.diff.HistogramDiff;
042import org.eclipse.jgit.diff.RawText;
043import org.eclipse.jgit.diff.RawTextComparator;
044import org.jdom.Element;
045
046/**
047 * This class provides the functionalities to update coordinator job XML and properties. It uses CoordSubmitXCommand
048 * functionality to validate XML and resolve all the variables or properties using job configurations.
049 */
050public class CoordUpdateXCommand extends CoordSubmitXCommand {
051
052    private final String jobId;
053    private boolean showDiff = true;
054    private boolean isConfChange = false;
055
056    StringBuffer diff = new StringBuffer();
057    CoordinatorJobBean oldCoordJob = null;
058
059    public CoordUpdateXCommand(boolean dryrun, Configuration conf, String jobId) {
060        super(dryrun, conf);
061        this.jobId = jobId;
062        isConfChange = conf.size() == 0 ? false : true;
063    }
064
065    public CoordUpdateXCommand(boolean dryrun, Configuration conf, String jobId, boolean showDiff) {
066        super(dryrun, conf);
067        this.jobId = jobId;
068        this.showDiff = showDiff;
069        isConfChange = conf.size() == 0 ? false : true;
070    }
071
072    @Override
073    protected String storeToDB(String xmlElement, Element eJob, CoordinatorJobBean coordJob) throws CommandException {
074        check(oldCoordJob, coordJob);
075        computeDiff(eJob);
076        oldCoordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH));
077        if (isConfChange) {
078            oldCoordJob.setConf(XmlUtils.prettyPrint(conf).toString());
079        }
080        oldCoordJob.setMatThrottling(coordJob.getMatThrottling());
081        oldCoordJob.setOrigJobXml(xmlElement);
082        oldCoordJob.setConcurrency(coordJob.getConcurrency());
083        oldCoordJob.setExecution(coordJob.getExecution());
084        oldCoordJob.setTimeout(coordJob.getTimeout());
085        oldCoordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString());
086
087
088        if (!dryrun) {
089            oldCoordJob.setLastModifiedTime(new Date());
090            // Should log the changes, this should be useful for debugging.
091            LOG.info("Coord update changes : " + diff.toString());
092            try {
093                CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, oldCoordJob);
094            }
095            catch (JPAExecutorException jpaee) {
096                throw new CommandException(jpaee);
097            }
098        }
099        return jobId;
100    }
101
102    @Override
103    protected void loadState() throws CommandException {
104        super.loadState();
105        jpaService = Services.get().get(JPAService.class);
106        if (jpaService == null) {
107            throw new CommandException(ErrorCode.E0610);
108        }
109        coordJob = new CoordinatorJobBean();
110        try {
111            oldCoordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId);
112        }
113        catch (JPAExecutorException e) {
114            throw new CommandException(e);
115        }
116
117        LogUtils.setLogInfo(oldCoordJob, logInfo);
118        if (!isConfChange) {
119            try {
120                conf = new XConfiguration(new StringReader(oldCoordJob.getConf()));
121            }
122            catch (Exception e) {
123                throw new CommandException(ErrorCode.E1023, e.getMessage(), e);
124            }
125        }
126        coordJob.setConf(XmlUtils.prettyPrint(conf).toString());
127        setJob(coordJob);
128    }
129
130    @Override
131    protected void verifyPrecondition() throws CommandException {
132        if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED
133                || coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR) {
134            LOG.info("Can't update coord job. Job has finished processing");
135            throw new CommandException(ErrorCode.E1023, "Can't update coord job. Job has finished processing");
136        }
137    }
138
139    /**
140     * Gets the difference of job definition and properties.
141     *
142     * @param eJob the e job
143     * @return the diff
144     */
145
146    private void computeDiff(Element eJob) {
147        try {
148            diff.append("**********Job definition changes**********").append(System.getProperty("line.separator"));
149            diff.append(getDiffinGitFormat(oldCoordJob.getJobXml(), XmlUtils.prettyPrint(eJob).toString()));
150            diff.append("******************************************").append(System.getProperty("line.separator"));
151            diff.append("**********Job conf changes****************").append(System.getProperty("line.separator"));
152            if (isConfChange) {
153                diff.append(getDiffinGitFormat(oldCoordJob.getConf(), XmlUtils.prettyPrint(conf).toString()));
154            }
155            else {
156                diff.append("No conf update requested").append(System.getProperty("line.separator"));
157            }
158            diff.append("******************************************").append(System.getProperty("line.separator"));
159        }
160        catch (IOException e) {
161            diff.append("Error computing diff. Error " + e.getMessage());
162            LOG.warn("Error computing diff.", e);
163        }
164    }
165
166    /**
167     * Get the differences in git format.
168     *
169     * @param string1 the string1
170     * @param string2 the string2
171     * @return the diff
172     * @throws IOException Signals that an I/O exception has occurred.
173     */
174    private String getDiffinGitFormat(String string1, String string2) throws IOException {
175        ByteArrayOutputStream out = new ByteArrayOutputStream();
176        RawText rt1 = new RawText(string1.getBytes());
177        RawText rt2 = new RawText(string2.getBytes());
178        EditList diffList = new EditList();
179        diffList.addAll(new HistogramDiff().diff(RawTextComparator.DEFAULT, rt1, rt2));
180        new DiffFormatter(out).format(diffList, rt1, rt2);
181        return out.toString();
182    }
183
184    @Override
185    protected String submit() throws CommandException {
186        LOG.info("STARTED Coordinator update");
187        submitJob();
188        LOG.info("ENDED Coordinator update");
189        if (showDiff) {
190            return diff.toString();
191        }
192        else {
193            return "";
194        }
195    }
196
197    /**
198     * Check. Frequency can't be changed. EndTime can't be changed. StartTime can't be changed. AppName can't be changed
199     * Timeunit can't be changed. Timezone can't be changed
200     *
201     * @param oldCoord the old coord
202     * @param newCoord the new coord
203     * @throws CommandException the command exception
204     */
205    public void check(CoordinatorJobBean oldCoord, CoordinatorJobBean newCoord) throws CommandException {
206        if (!oldCoord.getFrequency().equals(newCoord.getFrequency())) {
207            throw new CommandException(ErrorCode.E1023, "Frequency can't be changed. Old frequency = "
208                    + oldCoord.getFrequency() + " new frequency = " + newCoord.getFrequency());
209        }
210
211        if (!oldCoord.getEndTime().equals(newCoord.getEndTime())) {
212            throw new CommandException(ErrorCode.E1023, "End time can't be changed. Old end time = "
213                    + oldCoord.getEndTime() + " new end time = " + newCoord.getEndTime());
214        }
215
216        if (!oldCoord.getStartTime().equals(newCoord.getStartTime())) {
217            throw new CommandException(ErrorCode.E1023, "Start time can't be changed. Old start time = "
218                    + oldCoord.getStartTime() + " new start time = " + newCoord.getStartTime());
219        }
220
221        if (!oldCoord.getAppName().equals(newCoord.getAppName())) {
222            throw new CommandException(ErrorCode.E1023, "Coord name can't be changed. Old name = "
223                    + oldCoord.getAppName() + " new name = " + newCoord.getAppName());
224        }
225
226        if (!oldCoord.getTimeUnitStr().equals(newCoord.getTimeUnitStr())) {
227            throw new CommandException(ErrorCode.E1023, "Timeunit can't be changed. Old Timeunit = "
228                    + oldCoord.getTimeUnitStr() + " new Timeunit = " + newCoord.getTimeUnitStr());
229        }
230
231        if (!oldCoord.getTimeZone().equals(newCoord.getTimeZone())) {
232            throw new CommandException(ErrorCode.E1023, "TimeZone can't be changed. Old timeZone = "
233                    + oldCoord.getTimeZone() + " new timeZone = " + newCoord.getTimeZone());
234        }
235
236    }
237
238    @Override
239    protected void queueMaterializeTransitionXCommand(String jobId) {
240    }
241
242    @Override
243    public void notifyParent() throws CommandException {
244    }
245
246    @Override
247    protected boolean isLockRequired() {
248        return true;
249    }
250
251    @Override
252    public String getEntityKey() {
253        return jobId;
254    }
255
256    @Override
257    public void transitToNext() {
258    }
259
260    @Override
261    public String getKey() {
262        return getName() + "_" + jobId;
263    }
264
265    @Override
266    public String getDryRun(CoordinatorJobBean job) throws Exception{
267        return super.getDryRun(oldCoordJob);
268    }
269}