001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019
020package org.apache.oozie.command.coord;
021
022import java.io.ByteArrayOutputStream;
023import java.io.IOException;
024import java.io.StringReader;
025import java.util.Date;
026
027import org.apache.commons.lang.StringUtils;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.oozie.CoordinatorJobBean;
030import org.apache.oozie.ErrorCode;
031import org.apache.oozie.client.CoordinatorJob;
032import org.apache.oozie.client.OozieClient;
033import org.apache.oozie.command.CommandException;
034import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
035import org.apache.oozie.executor.jpa.JPAExecutorException;
036import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
037import org.apache.oozie.service.JPAService;
038import org.apache.oozie.service.Services;
039import org.apache.oozie.util.LogUtils;
040import org.apache.oozie.util.XConfiguration;
041import org.apache.oozie.util.XmlUtils;
042import org.eclipse.jgit.diff.DiffFormatter;
043import org.eclipse.jgit.diff.EditList;
044import org.eclipse.jgit.diff.HistogramDiff;
045import org.eclipse.jgit.diff.RawText;
046import org.eclipse.jgit.diff.RawTextComparator;
047import org.jdom.Element;
048
049/**
050 * This class provides the functionalities to update coordinator job XML and properties. It uses CoordSubmitXCommand
051 * functionality to validate XML and resolve all the variables or properties using job configurations.
052 */
053public class CoordUpdateXCommand extends CoordSubmitXCommand {
054
055    private final String jobId;
056    private boolean showDiff = true;
057    private boolean isConfChange = false;
058
059    //This properties are set in coord jobs by bundle. An update command should not overide it.
060    final static String[] bundleConfList = new String[] { OozieClient.BUNDLE_ID };
061
062    StringBuffer diff = new StringBuffer();
063    CoordinatorJobBean oldCoordJob = null;
064
065    public CoordUpdateXCommand(boolean dryrun, Configuration conf, String jobId) {
066        super(dryrun, conf);
067        this.jobId = jobId;
068        isConfChange = conf.size() == 0 ? false : true;
069    }
070
071    public CoordUpdateXCommand(boolean dryrun, Configuration conf, String jobId, boolean showDiff) {
072        super(dryrun, conf);
073        this.jobId = jobId;
074        this.showDiff = showDiff;
075        isConfChange = conf.size() == 0 ? false : true;
076    }
077
078    @Override
079    protected String storeToDB(String xmlElement, Element eJob, CoordinatorJobBean coordJob) throws CommandException {
080        check(oldCoordJob, coordJob);
081        computeDiff(eJob);
082        oldCoordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH));
083        if (isConfChange) {
084            oldCoordJob.setConf(XmlUtils.prettyPrint(conf).toString());
085        }
086        oldCoordJob.setMatThrottling(coordJob.getMatThrottling());
087        oldCoordJob.setOrigJobXml(xmlElement);
088        oldCoordJob.setConcurrency(coordJob.getConcurrency());
089        oldCoordJob.setExecution(coordJob.getExecution());
090        oldCoordJob.setTimeout(coordJob.getTimeout());
091        oldCoordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString());
092
093
094        if (!dryrun) {
095            oldCoordJob.setLastModifiedTime(new Date());
096            // Should log the changes, this should be useful for debugging.
097            LOG.info("Coord update changes : " + diff.toString());
098            try {
099                CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, oldCoordJob);
100            }
101            catch (JPAExecutorException jpaee) {
102                throw new CommandException(jpaee);
103            }
104        }
105        return jobId;
106    }
107
108    @Override
109    protected void loadState() throws CommandException {
110        super.loadState();
111        jpaService = Services.get().get(JPAService.class);
112        if (jpaService == null) {
113            throw new CommandException(ErrorCode.E0610);
114        }
115        coordJob = new CoordinatorJobBean();
116        try {
117            oldCoordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId);
118        }
119        catch (JPAExecutorException e) {
120            throw new CommandException(e);
121        }
122
123        LogUtils.setLogInfo(oldCoordJob);
124        if (!isConfChange || StringUtils.isEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH))) {
125            try {
126                XConfiguration jobConf = new XConfiguration(new StringReader(oldCoordJob.getConf()));
127
128                if (!isConfChange) {
129                    conf = jobConf;
130                }
131                else {
132                    for (String bundleConfKey : bundleConfList) {
133                        if (jobConf.get(bundleConfKey) != null) {
134                            conf.set(bundleConfKey, jobConf.get(bundleConfKey));
135                        }
136                    }
137                    if (StringUtils.isEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH))) {
138                        conf.set(OozieClient.COORDINATOR_APP_PATH, jobConf.get(OozieClient.COORDINATOR_APP_PATH));
139                    }
140                }
141            }
142            catch (Exception e) {
143                throw new CommandException(ErrorCode.E1023, e.getMessage(), e);
144            }
145        }
146        coordJob.setConf(XmlUtils.prettyPrint(conf).toString());
147        setJob(coordJob);
148        LogUtils.setLogInfo(coordJob);
149
150    }
151
152    @Override
153    protected void verifyPrecondition() throws CommandException {
154        if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED
155                || coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR) {
156            LOG.info("Can't update coord job. Job has finished processing");
157            throw new CommandException(ErrorCode.E1023, "Can't update coord job. Job has finished processing");
158        }
159    }
160
161    /**
162     * Gets the difference of job definition and properties.
163     *
164     * @param eJob the e job
165     * @return the diff
166     */
167
168    private void computeDiff(Element eJob) {
169        try {
170            diff.append("**********Job definition changes**********").append(System.getProperty("line.separator"));
171            diff.append(getDiffinGitFormat(oldCoordJob.getJobXml(), XmlUtils.prettyPrint(eJob).toString()));
172            diff.append("******************************************").append(System.getProperty("line.separator"));
173            diff.append("**********Job conf changes****************").append(System.getProperty("line.separator"));
174            if (isConfChange) {
175                diff.append(getDiffinGitFormat(oldCoordJob.getConf(), XmlUtils.prettyPrint(conf).toString()));
176            }
177            else {
178                diff.append("No conf update requested").append(System.getProperty("line.separator"));
179            }
180            diff.append("******************************************").append(System.getProperty("line.separator"));
181        }
182        catch (IOException e) {
183            diff.append("Error computing diff. Error " + e.getMessage());
184            LOG.warn("Error computing diff.", e);
185        }
186    }
187
188    /**
189     * Get the differences in git format.
190     *
191     * @param string1 the string1
192     * @param string2 the string2
193     * @return the diff
194     * @throws IOException Signals that an I/O exception has occurred.
195     */
196    private String getDiffinGitFormat(String string1, String string2) throws IOException {
197        ByteArrayOutputStream out = new ByteArrayOutputStream();
198        RawText rt1 = new RawText(string1.getBytes());
199        RawText rt2 = new RawText(string2.getBytes());
200        EditList diffList = new EditList();
201        diffList.addAll(new HistogramDiff().diff(RawTextComparator.DEFAULT, rt1, rt2));
202        new DiffFormatter(out).format(diffList, rt1, rt2);
203        return out.toString();
204    }
205
206    @Override
207    protected String submit() throws CommandException {
208        LOG.info("STARTED Coordinator update");
209        submitJob();
210        LOG.info("ENDED Coordinator update");
211        if (showDiff) {
212            return diff.toString();
213        }
214        else {
215            return "";
216        }
217    }
218
219    /**
220     * Check. Frequency can't be changed. EndTime can't be changed. StartTime can't be changed. AppName can't be changed
221     * Timeunit can't be changed. Timezone can't be changed
222     *
223     * @param oldCoord the old coord
224     * @param newCoord the new coord
225     * @throws CommandException the command exception
226     */
227    public void check(CoordinatorJobBean oldCoord, CoordinatorJobBean newCoord) throws CommandException {
228        if (!oldCoord.getFrequency().equals(newCoord.getFrequency())) {
229            throw new CommandException(ErrorCode.E1023, "Frequency can't be changed. Old frequency = "
230                    + oldCoord.getFrequency() + " new frequency = " + newCoord.getFrequency());
231        }
232
233        if (!oldCoord.getEndTime().equals(newCoord.getEndTime())) {
234            throw new CommandException(ErrorCode.E1023, "End time can't be changed. Old end time = "
235                    + oldCoord.getEndTime() + " new end time = " + newCoord.getEndTime());
236        }
237
238        if (!oldCoord.getStartTime().equals(newCoord.getStartTime())) {
239            throw new CommandException(ErrorCode.E1023, "Start time can't be changed. Old start time = "
240                    + oldCoord.getStartTime() + " new start time = " + newCoord.getStartTime());
241        }
242
243        if (!oldCoord.getAppName().equals(newCoord.getAppName())) {
244            throw new CommandException(ErrorCode.E1023, "Coord name can't be changed. Old name = "
245                    + oldCoord.getAppName() + " new name = " + newCoord.getAppName());
246        }
247
248        if (!oldCoord.getTimeUnitStr().equals(newCoord.getTimeUnitStr())) {
249            throw new CommandException(ErrorCode.E1023, "Timeunit can't be changed. Old Timeunit = "
250                    + oldCoord.getTimeUnitStr() + " new Timeunit = " + newCoord.getTimeUnitStr());
251        }
252
253        if (!oldCoord.getTimeZone().equals(newCoord.getTimeZone())) {
254            throw new CommandException(ErrorCode.E1023, "TimeZone can't be changed. Old timeZone = "
255                    + oldCoord.getTimeZone() + " new timeZone = " + newCoord.getTimeZone());
256        }
257
258    }
259
260    @Override
261    protected void queueMaterializeTransitionXCommand(String jobId) {
262    }
263
264    @Override
265    public void notifyParent() throws CommandException {
266    }
267
268    @Override
269    protected boolean isLockRequired() {
270        return true;
271    }
272
273    @Override
274    public String getEntityKey() {
275        return jobId;
276    }
277
278    @Override
279    public void transitToNext() {
280    }
281
282    @Override
283    public String getKey() {
284        return getName() + "_" + jobId;
285    }
286
287    @Override
288    public String getDryRun(CoordinatorJobBean job) throws Exception{
289        return super.getDryRun(oldCoordJob);
290    }
291}