001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.oozie.sla.listener; 020 021 import java.util.ArrayList; 022 import java.util.HashSet; 023 import java.util.List; 024 import java.util.Properties; 025 import java.util.Set; 026 import java.util.concurrent.TimeUnit; 027 import java.util.concurrent.atomic.AtomicInteger; 028 029 import javax.mail.Address; 030 import javax.mail.Message; 031 import javax.mail.MessagingException; 032 import javax.mail.NoSuchProviderException; 033 import javax.mail.SendFailedException; 034 import javax.mail.Session; 035 import javax.mail.Transport; 036 import javax.mail.internet.AddressException; 037 import javax.mail.internet.InternetAddress; 038 import javax.mail.internet.MimeMessage; 039 import javax.mail.internet.MimeMessage.RecipientType; 040 041 import org.apache.hadoop.conf.Configuration; 042 import org.apache.oozie.action.email.EmailActionExecutor; 043 import org.apache.oozie.action.email.EmailActionExecutor.JavaMailAuthenticator; 044 import org.apache.oozie.client.event.SLAEvent; 045 import org.apache.oozie.sla.listener.SLAEventListener; 046 import org.apache.oozie.sla.service.SLAService; 047 import org.apache.oozie.util.XLog; 048 049 import com.google.common.annotations.VisibleForTesting; 050 import com.google.common.cache.CacheBuilder; 051 import com.google.common.cache.CacheLoader; 052 import com.google.common.cache.LoadingCache; 053 054 public class SLAEmailEventListener extends SLAEventListener { 055 056 public static final String SMTP_CONNECTION_TIMEOUT = EmailActionExecutor.CONF_PREFIX + "smtp.connectiontimeout"; 057 public static final String SMTP_TIMEOUT = EmailActionExecutor.CONF_PREFIX + "smtp.timeout"; 058 public static final String BLACKLIST_CACHE_TIMEOUT = EmailActionExecutor.CONF_PREFIX + "blacklist.cachetimeout"; 059 public static final String BLACKLIST_FAIL_COUNT = EmailActionExecutor.CONF_PREFIX + "blacklist.failcount"; 060 public static final String OOZIE_BASE_URL = "oozie.base.url"; 061 private Session session; 062 private String oozieBaseUrl; 063 private InternetAddress fromAddr; 064 private String ADDRESS_SEPARATOR = ","; 065 private LoadingCache<String, AtomicInteger> blackList; 066 private int blacklistFailCount; 067 private final String BLACKLIST_CACHE_TIMEOUT_DEFAULT = "1800"; // in sec. default to 30 min 068 private final String BLACKLIST_FAIL_COUNT_DEFAULT = "2"; // stop sending when fail count equals or exceeds 069 private final String SMTP_HOST_DEFAULT = "localhost"; 070 private final String SMTP_PORT_DEFAULT = "25"; 071 private final boolean SMTP_AUTH_DEFAULT = false; 072 private final String SMTP_SOURCE_DEFAULT = "oozie@localhost"; 073 private final String SMTP_CONNECTION_TIMEOUT_DEFAULT = "5000"; 074 private final String SMTP_TIMEOUT_DEFAULT = "5000"; 075 private static XLog LOG = XLog.getLog(SLAEmailEventListener.class); 076 private Set<SLAEvent.EventStatus> alertEvents; 077 public static String EMAIL_BODY_FIELD_SEPARATER = " - "; 078 public static String EMAIL_BODY_FIELD_INDENT = " "; 079 public static String EMAIL_BODY_HEADER_SEPARATER = ":"; 080 081 public enum EmailField { 082 EVENT_STATUS("SLA Status"), APP_TYPE("App Type"), APP_NAME("App Name"), USER("User"), JOBID("Job ID"), PARENT_JOBID( 083 "Parent Job ID"), JOB_URL("Job URL"), PARENT_JOB_URL("Parent Job URL"), NOMINAL_TIME("Nominal Time"), 084 EXPECTED_START_TIME("Expected Start Time"), ACTUAL_START_TIME("Actual Start Time"), 085 EXPECTED_END_TIME("Expected End Time"), ACTUAL_END_TIME("Actual End Time"), EXPECTED_DURATION("Expected Duration (in mins)"), 086 ACTUAL_DURATION("Actual Duration (in mins)"), NOTIFICATION_MESSAGE("Notification Message"), UPSTREAM_APPS("Upstream Apps"), 087 JOB_STATUS("Job Status"); 088 private String name; 089 090 private EmailField(String name) { 091 this.name = name; 092 } 093 094 public String toString() { 095 return name; 096 } 097 }; 098 099 @Override 100 public void init(Configuration conf) throws Exception { 101 102 oozieBaseUrl = conf.get(OOZIE_BASE_URL); 103 // Get SMTP properties from the configuration used in Email Action 104 String smtpHost = conf.get(EmailActionExecutor.EMAIL_SMTP_HOST, SMTP_HOST_DEFAULT); 105 String smtpPort = conf.get(EmailActionExecutor.EMAIL_SMTP_PORT, SMTP_PORT_DEFAULT); 106 Boolean smtpAuth = conf.getBoolean(EmailActionExecutor.EMAIL_SMTP_AUTH, SMTP_AUTH_DEFAULT); 107 String smtpUser = conf.get(EmailActionExecutor.EMAIL_SMTP_USER, ""); 108 String smtpPassword = conf.get(EmailActionExecutor.EMAIL_SMTP_PASS, ""); 109 String smtpConnectTimeout = conf.get(SMTP_CONNECTION_TIMEOUT, SMTP_CONNECTION_TIMEOUT_DEFAULT); 110 String smtpTimeout = conf.get(SMTP_TIMEOUT, SMTP_TIMEOUT_DEFAULT); 111 112 int blacklistTimeOut = Integer.valueOf(conf.get(BLACKLIST_CACHE_TIMEOUT, BLACKLIST_CACHE_TIMEOUT_DEFAULT)); 113 blacklistFailCount = Integer.valueOf(conf.get(BLACKLIST_FAIL_COUNT, BLACKLIST_FAIL_COUNT_DEFAULT)); 114 115 // blacklist email addresses causing SendFailedException with cache timeout 116 blackList = CacheBuilder.newBuilder() 117 .expireAfterWrite(blacklistTimeOut, TimeUnit.SECONDS) 118 .build(new CacheLoader<String, AtomicInteger>() { 119 public AtomicInteger load(String key) throws Exception { 120 return new AtomicInteger(); 121 } 122 }); 123 124 // Set SMTP properties 125 Properties properties = new Properties(); 126 properties.setProperty("mail.smtp.host", smtpHost); 127 properties.setProperty("mail.smtp.port", smtpPort); 128 properties.setProperty("mail.smtp.auth", smtpAuth.toString()); 129 properties.setProperty("mail.smtp.connectiontimeout", smtpConnectTimeout); 130 properties.setProperty("mail.smtp.timeout", smtpTimeout); 131 132 try { 133 fromAddr = new InternetAddress(conf.get("oozie.email.from.address", SMTP_SOURCE_DEFAULT)); 134 } 135 catch (AddressException ae) { 136 LOG.error("Bad Source Address specified in oozie.email.from.address", ae); 137 throw ae; 138 } 139 140 if (!smtpAuth) { 141 session = Session.getInstance(properties); 142 } 143 else { 144 session = Session.getInstance(properties, new JavaMailAuthenticator(smtpUser, smtpPassword)); 145 } 146 147 alertEvents = new HashSet<SLAEvent.EventStatus>(); 148 String alertEventsStr = conf.get(SLAService.CONF_ALERT_EVENTS); 149 if (alertEventsStr != null) { 150 String[] alertEvt = alertEventsStr.split(",", -1); 151 for (String evt : alertEvt) { 152 alertEvents.add(SLAEvent.EventStatus.valueOf(evt)); 153 } 154 } 155 } 156 157 @Override 158 public void destroy() { 159 } 160 161 private void sendSLAEmail(SLAEvent event) throws Exception { 162 Message message = new MimeMessage(session); 163 setMessageHeader(message, event); 164 setMessageBody(message, event); 165 sendEmail(message); 166 } 167 168 @Override 169 public void onStartMiss(SLAEvent event) { 170 boolean flag = false; 171 if (event.getAlertEvents() == null) { 172 flag = alertEvents.contains(SLAEvent.EventStatus.START_MISS); 173 } 174 else if (event.getAlertEvents().contains(SLAEvent.EventStatus.START_MISS.name())) { 175 flag = true; 176 } 177 178 if (flag) { 179 try { 180 sendSLAEmail(event); 181 } 182 catch (Exception e) { 183 LOG.error("Failed to send StartMiss alert email", e); 184 } 185 } 186 } 187 188 @Override 189 public void onEndMiss(SLAEvent event) { 190 boolean flag = false; 191 if (event.getAlertEvents() == null) { 192 flag = alertEvents.contains(SLAEvent.EventStatus.END_MISS); 193 } 194 else if (event.getAlertEvents().contains(SLAEvent.EventStatus.END_MISS.name())) { 195 flag = true; 196 } 197 198 if (flag) { 199 try { 200 sendSLAEmail(event); 201 } 202 catch (Exception e) { 203 LOG.error("Failed to send EndMiss alert email", e); 204 } 205 } 206 } 207 208 @Override 209 public void onDurationMiss(SLAEvent event) { 210 boolean flag = false; 211 if (event.getAlertEvents() == null) { 212 flag = alertEvents.contains(SLAEvent.EventStatus.DURATION_MISS); 213 } 214 else if (event.getAlertEvents().contains(SLAEvent.EventStatus.DURATION_MISS.name())) { 215 flag = true; 216 } 217 218 if (flag) { 219 try { 220 sendSLAEmail(event); 221 } 222 catch (Exception e) { 223 LOG.error("Failed to send DurationMiss alert email", e); 224 } 225 } 226 } 227 228 private Address[] parseAddress(String str) { 229 Address[] addrs = null; 230 List<InternetAddress> addrList = new ArrayList<InternetAddress>(); 231 String[] emails = str.split(ADDRESS_SEPARATOR, -1); 232 233 for (String email : emails) { 234 boolean isBlackListed = false; 235 AtomicInteger val = blackList.getIfPresent(email); 236 if(val != null){ 237 isBlackListed = ( val.get() >= blacklistFailCount ); 238 } 239 if (!isBlackListed) { 240 try { 241 // turn on strict syntax check by setting 2nd argument true 242 addrList.add(new InternetAddress(email, true)); 243 } 244 catch (AddressException ae) { 245 // simply skip bad address but do not throw exception 246 LOG.error("Skipping bad destination address: " + email, ae); 247 } 248 } 249 } 250 251 if (addrList.size() > 0) 252 addrs = (Address[]) addrList.toArray(new InternetAddress[addrList.size()]); 253 254 return addrs; 255 } 256 257 private void setMessageHeader(Message msg, SLAEvent event) throws MessagingException { 258 Address[] from = new InternetAddress[] { fromAddr }; 259 Address[] to; 260 StringBuilder subject = new StringBuilder(); 261 262 to = parseAddress(event.getAlertContact()); 263 if (to == null) { 264 LOG.error("Destination address is null or invalid, stop sending SLA alert email"); 265 throw new IllegalArgumentException("Destination address is not specified properly"); 266 } 267 subject.append("OOZIE - SLA "); 268 subject.append(event.getEventStatus().name()); 269 subject.append(" (AppName="); 270 subject.append(event.getAppName()); 271 subject.append(", JobID="); 272 subject.append(event.getId()); 273 subject.append(")"); 274 275 try { 276 msg.addFrom(from); 277 msg.addRecipients(RecipientType.TO, to); 278 msg.setSubject(subject.toString()); 279 } 280 catch (MessagingException me) { 281 LOG.error("Message Exception in setting message header of SLA alert email", me); 282 throw me; 283 } 284 } 285 286 private void setMessageBody(Message msg, SLAEvent event) throws MessagingException { 287 StringBuilder body = new StringBuilder(); 288 printHeading(body, "Status"); 289 printField(body, EmailField.EVENT_STATUS.toString(), event.getEventStatus()); 290 printField(body, EmailField.JOB_STATUS.toString(), event.getJobStatus()); 291 printField(body, EmailField.NOTIFICATION_MESSAGE.toString(), event.getNotificationMsg()); 292 293 printHeading(body, "Job Details"); 294 printField(body, EmailField.APP_NAME.toString(), event.getAppName()); 295 printField(body, EmailField.APP_TYPE.toString(), event.getAppType()); 296 printField(body, EmailField.USER.toString(), event.getUser()); 297 printField(body, EmailField.JOBID.toString(), event.getId()); 298 printField(body, EmailField.JOB_URL.toString(), getJobLink(event.getId())); 299 printField(body, EmailField.PARENT_JOBID.toString(), event.getParentId() != null ? event.getParentId() : "N/A"); 300 printField(body, EmailField.PARENT_JOB_URL.toString(), 301 event.getParentId() != null ? getJobLink(event.getParentId()) : "N/A"); 302 printField(body, EmailField.UPSTREAM_APPS.toString(), event.getUpstreamApps()); 303 304 printHeading(body, "SLA Details"); 305 printField(body, EmailField.NOMINAL_TIME.toString(), event.getNominalTime()); 306 printField(body, EmailField.EXPECTED_START_TIME.toString(), event.getExpectedStart()); 307 printField(body, EmailField.ACTUAL_START_TIME.toString(), event.getActualStart()); 308 printField(body, EmailField.EXPECTED_END_TIME.toString(), event.getExpectedEnd()); 309 printField(body, EmailField.ACTUAL_END_TIME.toString(), event.getActualEnd()); 310 printField(body, EmailField.EXPECTED_DURATION.toString(), getDurationInMins(event.getExpectedDuration())); 311 printField(body, EmailField.ACTUAL_DURATION.toString(), getDurationInMins(event.getActualDuration())); 312 313 try { 314 msg.setText(body.toString()); 315 } 316 catch (MessagingException me) { 317 LOG.error("Message Exception in setting message body of SLA alert email", me); 318 throw me; 319 } 320 } 321 322 private long getDurationInMins(long duration) { 323 if (duration < 0) { 324 return duration; 325 } 326 return duration / 60000; //Convert millis to minutes 327 } 328 329 private String getJobLink(String jobId) { 330 StringBuffer url = new StringBuffer(); 331 String param = "/?job="; 332 url.append(oozieBaseUrl); 333 url.append(param); 334 url.append(jobId); 335 return url.toString(); 336 } 337 338 private void printField(StringBuilder st, String name, Object value) { 339 String lineFeed = "\n"; 340 if (value != null) { 341 st.append(EMAIL_BODY_FIELD_INDENT); 342 st.append(name); 343 st.append(EMAIL_BODY_FIELD_SEPARATER); 344 st.append(value); 345 st.append(lineFeed); 346 } 347 } 348 349 private void printHeading(StringBuilder st, String header) { 350 st.append(header); 351 st.append(EMAIL_BODY_HEADER_SEPARATER); 352 st.append("\n"); 353 } 354 355 private void sendEmail(Message message) throws MessagingException { 356 try { 357 Transport.send(message); 358 } 359 catch (NoSuchProviderException se) { 360 LOG.error("Could not find an SMTP transport provider to email", se); 361 throw se; 362 } 363 catch (MessagingException me) { 364 LOG.error("Message Exception in transporting SLA alert email", me); 365 if (me instanceof SendFailedException) { 366 Address[] invalidAddrs = ((SendFailedException) me).getInvalidAddresses(); 367 if (invalidAddrs != null && invalidAddrs.length > 0) { 368 for (Address addr : invalidAddrs) { 369 try { 370 // 'get' method loads key into cache when it doesn't exist 371 AtomicInteger val = blackList.get(addr.toString()); 372 val.incrementAndGet(); 373 } 374 catch (Exception e) { 375 LOG.debug("blacklist loading throwed exception"); 376 } 377 } 378 } 379 } 380 throw me; 381 } 382 } 383 384 @VisibleForTesting 385 public void addBlackList(String email) throws Exception { 386 // this is for testing 387 if(email == null || email.equals("")){ 388 return; 389 } 390 AtomicInteger val = blackList.get(email); 391 val.set(blacklistFailCount); 392 } 393 394 @Override 395 public void onStartMet(SLAEvent work) { 396 } 397 398 @Override 399 public void onEndMet(SLAEvent work) { 400 } 401 402 @Override 403 public void onDurationMet(SLAEvent work) { 404 } 405 406 }