001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 020package org.apache.oozie.sla.listener; 021 022import java.util.ArrayList; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Properties; 026import java.util.Set; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicInteger; 029 030import javax.mail.Address; 031import javax.mail.Message; 032import javax.mail.MessagingException; 033import javax.mail.NoSuchProviderException; 034import javax.mail.SendFailedException; 035import javax.mail.Session; 036import javax.mail.Transport; 037import javax.mail.internet.AddressException; 038import javax.mail.internet.InternetAddress; 039import javax.mail.internet.MimeMessage; 040import javax.mail.internet.MimeMessage.RecipientType; 041 042import org.apache.hadoop.conf.Configuration; 043import org.apache.oozie.action.email.EmailActionExecutor; 044import org.apache.oozie.action.email.EmailActionExecutor.JavaMailAuthenticator; 045import org.apache.oozie.client.event.SLAEvent; 046import org.apache.oozie.service.ConfigurationService; 047import org.apache.oozie.sla.listener.SLAEventListener; 048import org.apache.oozie.sla.service.SLAService; 049import org.apache.oozie.util.XLog; 050 051import com.google.common.annotations.VisibleForTesting; 052import com.google.common.cache.CacheBuilder; 053import com.google.common.cache.CacheLoader; 054import com.google.common.cache.LoadingCache; 055 056public class SLAEmailEventListener extends SLAEventListener { 057 058 public static final String SMTP_CONNECTION_TIMEOUT = EmailActionExecutor.CONF_PREFIX + "smtp.connectiontimeout"; 059 public static final String SMTP_TIMEOUT = EmailActionExecutor.CONF_PREFIX + "smtp.timeout"; 060 public static final String BLACKLIST_CACHE_TIMEOUT = EmailActionExecutor.CONF_PREFIX + "blacklist.cachetimeout"; 061 public static final String BLACKLIST_FAIL_COUNT = EmailActionExecutor.CONF_PREFIX + "blacklist.failcount"; 062 public static final String OOZIE_BASE_URL = "oozie.base.url"; 063 private Session session; 064 private String oozieBaseUrl; 065 private InternetAddress fromAddr; 066 private String ADDRESS_SEPARATOR = ","; 067 private LoadingCache<String, AtomicInteger> blackList; 068 private int blacklistFailCount; 069 private final String BLACKLIST_CACHE_TIMEOUT_DEFAULT = "1800"; // in sec. default to 30 min 070 private final String BLACKLIST_FAIL_COUNT_DEFAULT = "2"; // stop sending when fail count equals or exceeds 071 private final String SMTP_HOST_DEFAULT = "localhost"; 072 private final String SMTP_PORT_DEFAULT = "25"; 073 private final boolean SMTP_AUTH_DEFAULT = false; 074 private final String SMTP_SOURCE_DEFAULT = "oozie@localhost"; 075 private final String SMTP_CONNECTION_TIMEOUT_DEFAULT = "5000"; 076 private final String SMTP_TIMEOUT_DEFAULT = "5000"; 077 private static XLog LOG = XLog.getLog(SLAEmailEventListener.class); 078 private Set<SLAEvent.EventStatus> alertEvents; 079 public static String EMAIL_BODY_FIELD_SEPARATER = " - "; 080 public static String EMAIL_BODY_FIELD_INDENT = " "; 081 public static String EMAIL_BODY_HEADER_SEPARATER = ":"; 082 083 public enum EmailField { 084 EVENT_STATUS("SLA Status"), APP_TYPE("App Type"), APP_NAME("App Name"), USER("User"), JOBID("Job ID"), PARENT_JOBID( 085 "Parent Job ID"), JOB_URL("Job URL"), PARENT_JOB_URL("Parent Job URL"), NOMINAL_TIME("Nominal Time"), 086 EXPECTED_START_TIME("Expected Start Time"), ACTUAL_START_TIME("Actual Start Time"), 087 EXPECTED_END_TIME("Expected End Time"), ACTUAL_END_TIME("Actual End Time"), EXPECTED_DURATION("Expected Duration (in mins)"), 088 ACTUAL_DURATION("Actual Duration (in mins)"), NOTIFICATION_MESSAGE("Notification Message"), UPSTREAM_APPS("Upstream Apps"), 089 JOB_STATUS("Job Status"); 090 private String name; 091 092 private EmailField(String name) { 093 this.name = name; 094 } 095 096 @Override 097 public String toString() { 098 return name; 099 } 100 }; 101 102 @Override 103 public void init(Configuration conf) throws Exception { 104 105 oozieBaseUrl = ConfigurationService.get(conf, OOZIE_BASE_URL); 106 // Get SMTP properties from the configuration used in Email Action 107 String smtpHost = conf.get(EmailActionExecutor.EMAIL_SMTP_HOST, SMTP_HOST_DEFAULT); 108 String smtpPort = conf.get(EmailActionExecutor.EMAIL_SMTP_PORT, SMTP_PORT_DEFAULT); 109 Boolean smtpAuth = conf.getBoolean(EmailActionExecutor.EMAIL_SMTP_AUTH, SMTP_AUTH_DEFAULT); 110 String smtpUser = conf.get(EmailActionExecutor.EMAIL_SMTP_USER, ""); 111 String smtpPassword = ConfigurationService.getPassword(EmailActionExecutor.EMAIL_SMTP_PASS, ""); 112 String smtpConnectTimeout = conf.get(SMTP_CONNECTION_TIMEOUT, SMTP_CONNECTION_TIMEOUT_DEFAULT); 113 String smtpTimeout = conf.get(SMTP_TIMEOUT, SMTP_TIMEOUT_DEFAULT); 114 115 int blacklistTimeOut = Integer.valueOf(conf.get(BLACKLIST_CACHE_TIMEOUT, BLACKLIST_CACHE_TIMEOUT_DEFAULT)); 116 blacklistFailCount = Integer.valueOf(conf.get(BLACKLIST_FAIL_COUNT, BLACKLIST_FAIL_COUNT_DEFAULT)); 117 118 // blacklist email addresses causing SendFailedException with cache timeout 119 blackList = CacheBuilder.newBuilder() 120 .expireAfterWrite(blacklistTimeOut, TimeUnit.SECONDS) 121 .build(new CacheLoader<String, AtomicInteger>() { 122 @Override 123 public AtomicInteger load(String key) throws Exception { 124 return new AtomicInteger(); 125 } 126 }); 127 128 // Set SMTP properties 129 Properties properties = new Properties(); 130 properties.setProperty("mail.smtp.host", smtpHost); 131 properties.setProperty("mail.smtp.port", smtpPort); 132 properties.setProperty("mail.smtp.auth", smtpAuth.toString()); 133 properties.setProperty("mail.smtp.connectiontimeout", smtpConnectTimeout); 134 properties.setProperty("mail.smtp.timeout", smtpTimeout); 135 136 try { 137 fromAddr = new InternetAddress(conf.get("oozie.email.from.address", SMTP_SOURCE_DEFAULT)); 138 } 139 catch (AddressException ae) { 140 LOG.error("Bad Source Address specified in oozie.email.from.address", ae); 141 throw ae; 142 } 143 144 if (!smtpAuth) { 145 session = Session.getInstance(properties); 146 } 147 else { 148 session = Session.getInstance(properties, new JavaMailAuthenticator(smtpUser, smtpPassword)); 149 } 150 151 alertEvents = new HashSet<SLAEvent.EventStatus>(); 152 String alertEventsStr = ConfigurationService.get(conf, SLAService.CONF_ALERT_EVENTS); 153 if (alertEventsStr != null) { 154 String[] alertEvt = alertEventsStr.split(",", -1); 155 for (String evt : alertEvt) { 156 alertEvents.add(SLAEvent.EventStatus.valueOf(evt)); 157 } 158 } 159 } 160 161 @Override 162 public void destroy() { 163 } 164 165 private void sendSLAEmail(SLAEvent event) throws Exception { 166 // If no address is provided, the user did not want to send an email so simply log it and do nothing 167 if (event.getAlertContact() == null || event.getAlertContact().trim().length() == 0) { 168 LOG.info("No destination address provided; an SLA alert email will not be sent"); 169 } else { 170 // Create and send an email 171 Message message = new MimeMessage(session); 172 setMessageHeader(message, event); 173 setMessageBody(message, event); 174 sendEmail(message); 175 } 176 } 177 178 @Override 179 public void onStartMiss(SLAEvent event) { 180 boolean flag = false; 181 if (event.getAlertEvents() == null) { 182 flag = alertEvents.contains(SLAEvent.EventStatus.START_MISS); 183 } 184 else if (event.getAlertEvents().contains(SLAEvent.EventStatus.START_MISS.name())) { 185 flag = true; 186 } 187 188 if (flag) { 189 try { 190 sendSLAEmail(event); 191 } 192 catch (Exception e) { 193 LOG.error("Failed to send StartMiss alert email", e); 194 } 195 } 196 } 197 198 @Override 199 public void onEndMiss(SLAEvent event) { 200 boolean flag = false; 201 if (event.getAlertEvents() == null) { 202 flag = alertEvents.contains(SLAEvent.EventStatus.END_MISS); 203 } 204 else if (event.getAlertEvents().contains(SLAEvent.EventStatus.END_MISS.name())) { 205 flag = true; 206 } 207 208 if (flag) { 209 try { 210 sendSLAEmail(event); 211 } 212 catch (Exception e) { 213 LOG.error("Failed to send EndMiss alert email", e); 214 } 215 } 216 } 217 218 @Override 219 public void onDurationMiss(SLAEvent event) { 220 boolean flag = false; 221 if (event.getAlertEvents() == null) { 222 flag = alertEvents.contains(SLAEvent.EventStatus.DURATION_MISS); 223 } 224 else if (event.getAlertEvents().contains(SLAEvent.EventStatus.DURATION_MISS.name())) { 225 flag = true; 226 } 227 228 if (flag) { 229 try { 230 sendSLAEmail(event); 231 } 232 catch (Exception e) { 233 LOG.error("Failed to send DurationMiss alert email", e); 234 } 235 } 236 } 237 238 private Address[] parseAddress(String str) { 239 Address[] addrs = null; 240 List<InternetAddress> addrList = new ArrayList<InternetAddress>(); 241 String[] emails = str.split(ADDRESS_SEPARATOR, -1); 242 243 for (String email : emails) { 244 boolean isBlackListed = false; 245 AtomicInteger val = blackList.getIfPresent(email); 246 if(val != null){ 247 isBlackListed = ( val.get() >= blacklistFailCount ); 248 } 249 if (!isBlackListed) { 250 try { 251 // turn on strict syntax check by setting 2nd argument true 252 addrList.add(new InternetAddress(email, true)); 253 } 254 catch (AddressException ae) { 255 // simply skip bad address but do not throw exception 256 LOG.error("Skipping bad destination address: " + email, ae); 257 } 258 } 259 } 260 261 if (addrList.size() > 0) { 262 addrs = (Address[]) addrList.toArray(new InternetAddress[addrList.size()]); 263 } 264 265 return addrs; 266 } 267 268 private void setMessageHeader(Message msg, SLAEvent event) throws MessagingException { 269 Address[] from = new InternetAddress[] { fromAddr }; 270 Address[] to; 271 StringBuilder subject = new StringBuilder(); 272 273 to = parseAddress(event.getAlertContact()); 274 if (to == null) { 275 LOG.error("Destination address is null or invalid, stop sending SLA alert email"); 276 throw new IllegalArgumentException("Destination address is not specified properly"); 277 } 278 subject.append("OOZIE - SLA "); 279 subject.append(event.getEventStatus().name()); 280 subject.append(" (AppName="); 281 subject.append(event.getAppName()); 282 subject.append(", JobID="); 283 subject.append(event.getId()); 284 subject.append(")"); 285 286 try { 287 msg.addFrom(from); 288 msg.addRecipients(RecipientType.TO, to); 289 msg.setSubject(subject.toString()); 290 } 291 catch (MessagingException me) { 292 LOG.error("Message Exception in setting message header of SLA alert email", me); 293 throw me; 294 } 295 } 296 297 private void setMessageBody(Message msg, SLAEvent event) throws MessagingException { 298 StringBuilder body = new StringBuilder(); 299 printHeading(body, "Status"); 300 printField(body, EmailField.EVENT_STATUS.toString(), event.getEventStatus()); 301 printField(body, EmailField.JOB_STATUS.toString(), event.getJobStatus()); 302 printField(body, EmailField.NOTIFICATION_MESSAGE.toString(), event.getNotificationMsg()); 303 304 printHeading(body, "Job Details"); 305 printField(body, EmailField.APP_NAME.toString(), event.getAppName()); 306 printField(body, EmailField.APP_TYPE.toString(), event.getAppType()); 307 printField(body, EmailField.USER.toString(), event.getUser()); 308 printField(body, EmailField.JOBID.toString(), event.getId()); 309 printField(body, EmailField.JOB_URL.toString(), getJobLink(event.getId())); 310 printField(body, EmailField.PARENT_JOBID.toString(), event.getParentId() != null ? event.getParentId() : "N/A"); 311 printField(body, EmailField.PARENT_JOB_URL.toString(), 312 event.getParentId() != null ? getJobLink(event.getParentId()) : "N/A"); 313 printField(body, EmailField.UPSTREAM_APPS.toString(), event.getUpstreamApps()); 314 315 printHeading(body, "SLA Details"); 316 printField(body, EmailField.NOMINAL_TIME.toString(), event.getNominalTime()); 317 printField(body, EmailField.EXPECTED_START_TIME.toString(), event.getExpectedStart()); 318 printField(body, EmailField.ACTUAL_START_TIME.toString(), event.getActualStart()); 319 printField(body, EmailField.EXPECTED_END_TIME.toString(), event.getExpectedEnd()); 320 printField(body, EmailField.ACTUAL_END_TIME.toString(), event.getActualEnd()); 321 printField(body, EmailField.EXPECTED_DURATION.toString(), getDurationInMins(event.getExpectedDuration())); 322 printField(body, EmailField.ACTUAL_DURATION.toString(), getDurationInMins(event.getActualDuration())); 323 324 try { 325 msg.setText(body.toString()); 326 } 327 catch (MessagingException me) { 328 LOG.error("Message Exception in setting message body of SLA alert email", me); 329 throw me; 330 } 331 } 332 333 private long getDurationInMins(long duration) { 334 if (duration < 0) { 335 return duration; 336 } 337 return duration / 60000; //Convert millis to minutes 338 } 339 340 private String getJobLink(String jobId) { 341 StringBuffer url = new StringBuffer(); 342 String param = "/?job="; 343 url.append(oozieBaseUrl); 344 url.append(param); 345 url.append(jobId); 346 return url.toString(); 347 } 348 349 private void printField(StringBuilder st, String name, Object value) { 350 String lineFeed = "\n"; 351 if (value != null) { 352 st.append(EMAIL_BODY_FIELD_INDENT); 353 st.append(name); 354 st.append(EMAIL_BODY_FIELD_SEPARATER); 355 st.append(value); 356 st.append(lineFeed); 357 } 358 } 359 360 private void printHeading(StringBuilder st, String header) { 361 st.append(header); 362 st.append(EMAIL_BODY_HEADER_SEPARATER); 363 st.append("\n"); 364 } 365 366 private void sendEmail(Message message) throws MessagingException { 367 try { 368 Transport.send(message); 369 } 370 catch (NoSuchProviderException se) { 371 LOG.error("Could not find an SMTP transport provider to email", se); 372 throw se; 373 } 374 catch (MessagingException me) { 375 LOG.error("Message Exception in transporting SLA alert email", me); 376 if (me instanceof SendFailedException) { 377 Address[] invalidAddrs = ((SendFailedException) me).getInvalidAddresses(); 378 if (invalidAddrs != null && invalidAddrs.length > 0) { 379 for (Address addr : invalidAddrs) { 380 try { 381 // 'get' method loads key into cache when it doesn't exist 382 AtomicInteger val = blackList.get(addr.toString()); 383 val.incrementAndGet(); 384 } 385 catch (Exception e) { 386 LOG.debug("blacklist loading throwed exception"); 387 } 388 } 389 } 390 } 391 throw me; 392 } 393 } 394 395 @VisibleForTesting 396 public void addBlackList(String email) throws Exception { 397 // this is for testing 398 if(email == null || email.equals("")){ 399 return; 400 } 401 AtomicInteger val = blackList.get(email); 402 val.set(blacklistFailCount); 403 } 404 405 @Override 406 public void onStartMet(SLAEvent work) { 407 } 408 409 @Override 410 public void onEndMet(SLAEvent work) { 411 } 412 413 @Override 414 public void onDurationMet(SLAEvent work) { 415 } 416 417}