001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.sla; 020 021import java.util.Date; 022 023import org.apache.oozie.XException; 024import org.apache.oozie.client.OozieClient; 025import org.apache.oozie.client.event.SLAEvent.EventStatus; 026import org.apache.oozie.client.event.SLAEvent.SLAStatus; 027import org.apache.oozie.command.CommandException; 028import org.apache.oozie.command.PreconditionException; 029import org.apache.oozie.command.XCommand; 030import org.apache.oozie.executor.jpa.JPAExecutorException; 031import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor; 032import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery; 033import org.apache.oozie.service.EventHandlerService; 034import org.apache.oozie.service.JPAService; 035import org.apache.oozie.service.Services; 036import org.apache.oozie.sla.SLACalcStatus; 037import org.apache.oozie.sla.SLASummaryBean; 038 039public abstract class SLAJobEventXCommand extends XCommand<Void> { 040 private long lockTimeOut = 0 ; 041 JPAService jpaService = Services.get().get(JPAService.class); 042 SLACalcStatus slaCalc; 043 final static String SLA_LOCK_PREFIX = "sla_"; 044 private boolean isEnded = false; 045 private boolean isEndMiss = false; 046 047 public SLAJobEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) { 048 super("SLA.job.event", "SLA.job.event", 1); 049 this.slaCalc = slaCalc; 050 this.lockTimeOut = lockTimeOut; 051 } 052 053 @Override 054 protected boolean isLockRequired() { 055 return true; 056 } 057 058 @Override 059 protected boolean isReQueueRequired() { 060 return false; 061 } 062 063 @Override 064 public String getEntityKey() { 065 return SLA_LOCK_PREFIX + slaCalc.getId(); 066 } 067 068 protected long getLockTimeOut() { 069 return lockTimeOut; 070 } 071 072 @Override 073 protected void verifyPrecondition() throws CommandException, PreconditionException { 074 } 075 076 077 @Override 078 protected Void execute() throws CommandException { 079 updateJobInfo(); 080 if (isEnded) { 081 processForEnd(); 082 } 083 else { 084 processForRunning(); 085 } 086 try { 087 writeToDB(); 088 } 089 catch (XException e) { 090 throw new CommandException(e); 091 } 092 return null; 093 } 094 095 /** 096 * Verify job. 097 */ 098 protected abstract void updateJobInfo(); 099 100 /** 101 * Should alert. 102 * 103 * @param slaObj the sla obj 104 * @return true, if successful 105 */ 106 private boolean shouldAlert(SLACalcStatus slaObj) { 107 return !slaObj.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT); 108 } 109 110 /** 111 * Queue event. 112 * 113 * @param event the event 114 */ 115 private void queueEvent(SLACalcStatus event) { 116 Services.get().get(EventHandlerService.class).queueEvent(event); 117 } 118 119 /** 120 * Process duration sla. 121 * 122 * @param expected the expected 123 * @param actual the actual 124 * @param slaCalc the sla calc 125 */ 126 private void processDurationSLA(long expected, long actual, SLACalcStatus slaCalc) { 127 if (expected != -1) { 128 if (actual > expected) { 129 slaCalc.setEventStatus(EventStatus.DURATION_MISS); 130 } 131 else if (actual <= expected) { 132 slaCalc.setEventStatus(EventStatus.DURATION_MET); 133 } 134 if (shouldAlert(slaCalc)) { 135 queueEvent(new SLACalcStatus(slaCalc)); 136 } 137 } 138 } 139 140 141 /** 142 * WriteSLA object to DB. 143 * 144 * @throws JPAExecutorException the JPA executor exception 145 */ 146 private void writeToDB() throws JPAExecutorException { 147 byte eventProc = slaCalc.getEventProcessed(); 148 // no more processing, no transfer to history set 149 if (slaCalc.getEventProcessed() >= 8) { 150 slaCalc.setEventProcessed(8); 151 } 152 153 SLASummaryBean slaSummaryBean = new SLASummaryBean(); 154 slaSummaryBean.setId(slaCalc.getId()); 155 slaSummaryBean.setEventProcessed(eventProc); 156 slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus()); 157 slaSummaryBean.setEventStatus(slaCalc.getEventStatus()); 158 slaSummaryBean.setActualEnd(slaCalc.getActualEnd()); 159 slaSummaryBean.setActualStart(slaCalc.getActualStart()); 160 slaSummaryBean.setActualDuration(slaCalc.getActualDuration()); 161 slaSummaryBean.setJobStatus(slaCalc.getJobStatus()); 162 slaSummaryBean.setLastModifiedTime(new Date()); 163 164 SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, 165 slaSummaryBean); 166 167 LOG.debug(" Stored SLA SummaryBean Job [{0}] eventProc = [{1}], status = [{2}]", slaCalc.getId(), 168 slaCalc.getEventProcessed(), slaCalc.getJobStatus()); 169 170 } 171 172 /** 173 * Process for end. 174 */ 175 private void processForEnd() { 176 byte eventProc = slaCalc.getEventProcessed(); 177 178 LOG.debug("Job {0} has ended. endtime = [{1}]", slaCalc.getId(), slaCalc.getActualEnd()); 179 if (isEndMiss()) { 180 slaCalc.setSLAStatus(SLAStatus.MISS); 181 } 182 else { 183 slaCalc.setSLAStatus(SLAStatus.MET); 184 } 185 if (eventProc != 8 && slaCalc.getActualStart() != null) { 186 if ((eventProc & 1) == 0) { 187 if (slaCalc.getExpectedStart() != null) { 188 if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) { 189 slaCalc.setEventStatus(EventStatus.START_MISS); 190 } 191 else { 192 slaCalc.setEventStatus(EventStatus.START_MET); 193 } 194 if (shouldAlert(slaCalc)) { 195 queueEvent(new SLACalcStatus(slaCalc)); 196 } 197 } 198 } 199 slaCalc.setActualDuration(slaCalc.getActualEnd().getTime() - slaCalc.getActualStart().getTime()); 200 if (((eventProc >> 1) & 1) == 0) { 201 processDurationSLA(slaCalc.getExpectedDuration(), slaCalc.getActualDuration(), slaCalc); 202 } 203 } 204 if (eventProc != 8 && eventProc < 4) { 205 if (isEndMiss()) { 206 slaCalc.setEventStatus(EventStatus.END_MISS); 207 } 208 else { 209 slaCalc.setEventStatus(EventStatus.END_MET); 210 } 211 if (shouldAlert(slaCalc)) { 212 queueEvent(new SLACalcStatus(slaCalc)); 213 } 214 } 215 slaCalc.setEventProcessed(8); 216 } 217 218 /** 219 * Process for running. 220 */ 221 private void processForRunning() { 222 byte eventProc = slaCalc.getEventProcessed(); 223 224 if (eventProc != 8 && slaCalc.getActualStart() != null) { 225 slaCalc.setSLAStatus(SLAStatus.IN_PROCESS); 226 } 227 if (eventProc != 8 && (eventProc & 1) == 0) { 228 if (slaCalc.getExpectedStart() == null) { 229 eventProc++; 230 } 231 else if (slaCalc.getActualStart() != null) { 232 if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) { 233 slaCalc.setEventStatus(EventStatus.START_MISS); 234 } 235 else { 236 slaCalc.setEventStatus(EventStatus.START_MET); 237 } 238 if (shouldAlert(slaCalc)) { 239 queueEvent(new SLACalcStatus(slaCalc)); 240 } 241 eventProc++; 242 } 243 else if (slaCalc.getExpectedStart() != null 244 && slaCalc.getExpectedStart().getTime() < System.currentTimeMillis()) { 245 slaCalc.setEventStatus(EventStatus.START_MISS); 246 if (shouldAlert(slaCalc)) { 247 queueEvent(new SLACalcStatus(slaCalc)); 248 } 249 eventProc++; 250 } 251 252 } 253 if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) { 254 if (slaCalc.getExpectedDuration() == -1) { 255 eventProc += 2; 256 } 257 else if (slaCalc.getActualStart() != null && slaCalc.getExpectedDuration() != -1) { 258 if (System.currentTimeMillis() - slaCalc.getActualStart().getTime() > slaCalc.getExpectedDuration()) { 259 slaCalc.setEventStatus(EventStatus.DURATION_MISS); 260 if (shouldAlert(slaCalc)) { 261 queueEvent(new SLACalcStatus(slaCalc)); 262 } 263 eventProc += 2; 264 } 265 } 266 } 267 if (eventProc < 4) { 268 if (slaCalc.getExpectedEnd() != null) { 269 if (slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) { 270 slaCalc.setEventStatus(EventStatus.END_MISS); 271 slaCalc.setSLAStatus(SLAStatus.MISS); 272 if (shouldAlert(slaCalc)) { 273 queueEvent(new SLACalcStatus(slaCalc)); 274 } 275 eventProc += 4; 276 } 277 } 278 else { 279 eventProc += 4; 280 } 281 } 282 slaCalc.setEventProcessed(eventProc); 283 } 284 285 public boolean isEnded() { 286 return isEnded; 287 } 288 289 public void setEnded(boolean isEnded) { 290 this.isEnded = isEnded; 291 } 292 293 public boolean isEndMiss() { 294 return isEndMiss; 295 } 296 297 public void setEndMiss(boolean isEndMiss) { 298 this.isEndMiss = isEndMiss; 299 } 300 301}