001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.sla;
020
021import java.util.Date;
022
023import org.apache.oozie.XException;
024import org.apache.oozie.command.CommandException;
025import org.apache.oozie.command.PreconditionException;
026import org.apache.oozie.command.XCommand;
027import org.apache.oozie.executor.jpa.JPAExecutorException;
028import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
029import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
030import org.apache.oozie.sla.SLASummaryBean;
031
032public abstract class SLAJobHistoryXCommand extends XCommand<Boolean> {
033
034    protected String jobId;
035
036    public SLAJobHistoryXCommand(String jobId) {
037        super("SLAJobHistoryXCommand", "SLAJobHistoryXCommand", 1);
038        this.jobId = jobId;
039
040    }
041
042    @Override
043    protected void verifyPrecondition() throws CommandException, PreconditionException {
044    }
045
046    @Override
047    protected boolean isLockRequired() {
048        return true;
049    }
050
051    @Override
052    protected boolean isReQueueRequired() {
053        return false;
054    }
055
056    @Override
057    public String getEntityKey() {
058        return SLAJobEventXCommand.SLA_LOCK_PREFIX + jobId;
059    }
060
061    protected long getLockTimeOut() {
062        return 0L;
063    }
064
065    protected Boolean execute() throws CommandException {
066        if (isJobEnded()) {
067            try {
068                updateSLASummary();
069            }
070            catch (XException e) {
071                throw new CommandException(e);
072            }
073            return true;
074        }
075        else {
076            LOG.debug("Job [{0}] is not finished", jobId);
077        }
078        return false;
079
080    }
081
082    /**
083     * Checks if is job ended.
084     *
085     * @return true, if is job ended
086     */
087    protected abstract boolean isJobEnded();
088
089    /**
090     * Update SLASummary
091     *
092     */
093    protected abstract void updateSLASummary() throws CommandException, XException;
094
095    /**
096     * Update sla summary.
097     *
098     * @param id the id
099     * @param startTime the start time
100     * @param endTime the end time
101     * @param status the status
102     * @throws JPAExecutorException the JPA executor exception
103     */
104    protected void updateSLASummary(String id, Date startTime, Date endTime, String status) throws JPAExecutorException {
105        SLASummaryBean sla = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, id);
106        if (sla.getJobStatus().equals(status) && sla.getEventProcessed() == 8) {
107            LOG.debug("SLA job is already updated", sla.getId(), sla.getEventProcessed(), sla.getJobStatus());
108            return;
109        }
110        if (sla != null) {
111            sla.setActualStart(startTime);
112            sla.setActualEnd(endTime);
113            if (startTime != null && endTime != null) {
114                sla.setActualDuration(endTime.getTime() - startTime.getTime());
115            }
116            sla.setLastModifiedTime(new Date());
117            sla.setEventProcessed(8);
118            sla.setJobStatus(status);
119            SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES,
120                    sla);
121            LOG.debug(" Stored SLA SummaryBean Job [{0}] eventProc = [{1}], status = [{2}]", sla.getId(),
122                    sla.getEventProcessed(), sla.getJobStatus());
123
124        }
125    }
126
127}