001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.sla;
020
021import java.util.Date;
022
023import org.apache.oozie.XException;
024import org.apache.oozie.client.OozieClient;
025import org.apache.oozie.client.event.SLAEvent.EventStatus;
026import org.apache.oozie.client.event.SLAEvent.SLAStatus;
027import org.apache.oozie.command.CommandException;
028import org.apache.oozie.command.PreconditionException;
029import org.apache.oozie.command.XCommand;
030import org.apache.oozie.executor.jpa.JPAExecutorException;
031import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
032import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
033import org.apache.oozie.service.EventHandlerService;
034import org.apache.oozie.service.JPAService;
035import org.apache.oozie.service.Services;
036import org.apache.oozie.sla.SLACalcStatus;
037import org.apache.oozie.sla.SLASummaryBean;
038
039public abstract class SLAJobEventXCommand extends XCommand<Void> {
040    private long lockTimeOut = 0 ;
041    JPAService jpaService = Services.get().get(JPAService.class);
042    SLACalcStatus slaCalc;
043    final static String SLA_LOCK_PREFIX = "sla_";
044    private boolean isEnded = false;
045    private boolean isEndMiss = false;
046
047    public SLAJobEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) {
048        super("SLA.job.event", "SLA.job.event", 1);
049        this.slaCalc = slaCalc;
050        this.lockTimeOut = lockTimeOut;
051    }
052
053    @Override
054    protected boolean isLockRequired() {
055        return true;
056    }
057
058    @Override
059    protected boolean isReQueueRequired() {
060        return false;
061    }
062
063    @Override
064    public String getEntityKey() {
065        return SLA_LOCK_PREFIX + slaCalc.getId();
066    }
067
068    protected long getLockTimeOut() {
069        return lockTimeOut;
070    }
071
072    @Override
073    protected void verifyPrecondition() throws CommandException, PreconditionException {
074    }
075
076
077    @Override
078    protected Void execute() throws CommandException {
079        updateJobInfo();
080        if (isEnded) {
081            processForEnd();
082        }
083        else {
084            processForRunning();
085        }
086        try {
087            writeToDB();
088        }
089        catch (XException e) {
090            throw new CommandException(e);
091        }
092        return null;
093    }
094
095    /**
096     * Verify job.
097     */
098    protected abstract void updateJobInfo();
099
100    /**
101     * Should alert.
102     *
103     * @param slaObj the sla obj
104     * @return true, if successful
105     */
106    private boolean shouldAlert(SLACalcStatus slaObj) {
107        return !slaObj.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT);
108    }
109
110    /**
111     * Queue event.
112     *
113     * @param event the event
114     */
115    private void queueEvent(SLACalcStatus event) {
116        Services.get().get(EventHandlerService.class).queueEvent(event);
117    }
118
119    /**
120     * Process duration sla.
121     *
122     * @param expected the expected
123     * @param actual the actual
124     * @param slaCalc the sla calc
125     */
126    private void processDurationSLA(long expected, long actual, SLACalcStatus slaCalc) {
127        if (expected != -1) {
128            if (actual > expected) {
129                slaCalc.setEventStatus(EventStatus.DURATION_MISS);
130            }
131            else if (actual <= expected) {
132                slaCalc.setEventStatus(EventStatus.DURATION_MET);
133            }
134            if (shouldAlert(slaCalc)) {
135                queueEvent(new SLACalcStatus(slaCalc));
136            }
137        }
138    }
139
140
141    /**
142     * WriteSLA object to DB.
143     *
144     * @throws JPAExecutorException the JPA executor exception
145     */
146    private void writeToDB() throws JPAExecutorException {
147        byte eventProc = slaCalc.getEventProcessed();
148        // no more processing, no transfer to history set
149        if (slaCalc.getEventProcessed() >= 8) {
150            slaCalc.setEventProcessed(8);
151        }
152
153        SLASummaryBean slaSummaryBean = new SLASummaryBean();
154        slaSummaryBean.setId(slaCalc.getId());
155        slaSummaryBean.setEventProcessed(eventProc);
156        slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
157        slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
158        slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
159        slaSummaryBean.setActualStart(slaCalc.getActualStart());
160        slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
161        slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
162        slaSummaryBean.setLastModifiedTime(new Date());
163
164        SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES,
165                slaSummaryBean);
166
167        LOG.debug(" Stored SLA SummaryBean Job [{0}] eventProc = [{1}], status = [{2}]", slaCalc.getId(),
168                slaCalc.getEventProcessed(), slaCalc.getJobStatus());
169
170    }
171
172    /**
173     * Process for end.
174     */
175    private void processForEnd() {
176        byte eventProc = slaCalc.getEventProcessed();
177
178        LOG.debug("Job {0} has ended. endtime = [{1}]", slaCalc.getId(), slaCalc.getActualEnd());
179        if (isEndMiss()) {
180            slaCalc.setSLAStatus(SLAStatus.MISS);
181        }
182        else {
183            slaCalc.setSLAStatus(SLAStatus.MET);
184        }
185        if (eventProc != 8 && slaCalc.getActualStart() != null) {
186            if ((eventProc & 1) == 0) {
187                if (slaCalc.getExpectedStart() != null) {
188                    if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) {
189                        slaCalc.setEventStatus(EventStatus.START_MISS);
190                    }
191                    else {
192                        slaCalc.setEventStatus(EventStatus.START_MET);
193                    }
194                    if (shouldAlert(slaCalc)) {
195                        queueEvent(new SLACalcStatus(slaCalc));
196                    }
197                }
198            }
199            slaCalc.setActualDuration(slaCalc.getActualEnd().getTime() - slaCalc.getActualStart().getTime());
200            if (((eventProc >> 1) & 1) == 0) {
201                processDurationSLA(slaCalc.getExpectedDuration(), slaCalc.getActualDuration(), slaCalc);
202            }
203        }
204        if (eventProc != 8 && eventProc < 4) {
205            if (isEndMiss()) {
206                slaCalc.setEventStatus(EventStatus.END_MISS);
207            }
208            else {
209                slaCalc.setEventStatus(EventStatus.END_MET);
210            }
211            if (shouldAlert(slaCalc)) {
212                queueEvent(new SLACalcStatus(slaCalc));
213            }
214        }
215        slaCalc.setEventProcessed(8);
216    }
217
218    /**
219     * Process for running.
220     */
221    private void processForRunning() {
222        byte eventProc = slaCalc.getEventProcessed();
223
224        if (eventProc != 8 && slaCalc.getActualStart() != null) {
225            slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
226        }
227        if (eventProc != 8 && (eventProc & 1) == 0) {
228            if (slaCalc.getExpectedStart() == null) {
229                eventProc++;
230            }
231            else if (slaCalc.getActualStart() != null) {
232                if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) {
233                    slaCalc.setEventStatus(EventStatus.START_MISS);
234                }
235                else {
236                    slaCalc.setEventStatus(EventStatus.START_MET);
237                }
238                if (shouldAlert(slaCalc)) {
239                    queueEvent(new SLACalcStatus(slaCalc));
240                }
241                eventProc++;
242            }
243            else if (slaCalc.getExpectedStart() != null
244                    && slaCalc.getExpectedStart().getTime() < System.currentTimeMillis()) {
245                slaCalc.setEventStatus(EventStatus.START_MISS);
246                if (shouldAlert(slaCalc)) {
247                    queueEvent(new SLACalcStatus(slaCalc));
248                }
249                eventProc++;
250            }
251
252        }
253        if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
254            if (slaCalc.getExpectedDuration() == -1) {
255                eventProc += 2;
256            }
257            else if (slaCalc.getActualStart() != null && slaCalc.getExpectedDuration() != -1) {
258                if (System.currentTimeMillis() - slaCalc.getActualStart().getTime() > slaCalc.getExpectedDuration()) {
259                    slaCalc.setEventStatus(EventStatus.DURATION_MISS);
260                    if (shouldAlert(slaCalc)) {
261                        queueEvent(new SLACalcStatus(slaCalc));
262                    }
263                    eventProc += 2;
264                }
265            }
266        }
267        if (eventProc < 4) {
268            if (slaCalc.getExpectedEnd() != null) {
269                if (slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) {
270                    slaCalc.setEventStatus(EventStatus.END_MISS);
271                    slaCalc.setSLAStatus(SLAStatus.MISS);
272                    if (shouldAlert(slaCalc)) {
273                        queueEvent(new SLACalcStatus(slaCalc));
274                    }
275                    eventProc += 4;
276                }
277            }
278            else {
279                eventProc += 4;
280            }
281        }
282        slaCalc.setEventProcessed(eventProc);
283    }
284
285    public boolean isEnded() {
286        return isEnded;
287    }
288
289    public void setEnded(boolean isEnded) {
290        this.isEnded = isEnded;
291    }
292
293    public boolean isEndMiss() {
294        return isEndMiss;
295    }
296
297    public void setEndMiss(boolean isEndMiss) {
298        this.isEndMiss = isEndMiss;
299    }
300
301}