001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.sla.listener; 020 021import java.util.ArrayList; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Properties; 025import java.util.Set; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicInteger; 028 029import javax.mail.Address; 030import javax.mail.Message; 031import javax.mail.MessagingException; 032import javax.mail.NoSuchProviderException; 033import javax.mail.SendFailedException; 034import javax.mail.Session; 035import javax.mail.Transport; 036import javax.mail.internet.AddressException; 037import javax.mail.internet.InternetAddress; 038import javax.mail.internet.MimeMessage; 039import javax.mail.internet.MimeMessage.RecipientType; 040 041import org.apache.hadoop.conf.Configuration; 042import org.apache.oozie.action.email.EmailActionExecutor; 043import org.apache.oozie.action.email.EmailActionExecutor.JavaMailAuthenticator; 044import org.apache.oozie.client.event.SLAEvent; 045import org.apache.oozie.sla.listener.SLAEventListener; 046import org.apache.oozie.sla.service.SLAService; 047import org.apache.oozie.util.XLog; 048 049import com.google.common.annotations.VisibleForTesting; 050import com.google.common.cache.CacheBuilder; 051import com.google.common.cache.CacheLoader; 052import com.google.common.cache.LoadingCache; 053 054public class SLAEmailEventListener extends SLAEventListener { 055 056 public static final String SMTP_CONNECTION_TIMEOUT = EmailActionExecutor.CONF_PREFIX + "smtp.connectiontimeout"; 057 public static final String SMTP_TIMEOUT = EmailActionExecutor.CONF_PREFIX + "smtp.timeout"; 058 public static final String BLACKLIST_CACHE_TIMEOUT = EmailActionExecutor.CONF_PREFIX + "blacklist.cachetimeout"; 059 public static final String BLACKLIST_FAIL_COUNT = EmailActionExecutor.CONF_PREFIX + "blacklist.failcount"; 060 public static final String OOZIE_BASE_URL = "oozie.base.url"; 061 private Session session; 062 private String oozieBaseUrl; 063 private InternetAddress fromAddr; 064 private String ADDRESS_SEPARATOR = ","; 065 private LoadingCache<String, AtomicInteger> blackList; 066 private int blacklistFailCount; 067 private final String BLACKLIST_CACHE_TIMEOUT_DEFAULT = "1800"; // in sec. default to 30 min 068 private final String BLACKLIST_FAIL_COUNT_DEFAULT = "2"; // stop sending when fail count equals or exceeds 069 private final String SMTP_HOST_DEFAULT = "localhost"; 070 private final String SMTP_PORT_DEFAULT = "25"; 071 private final boolean SMTP_AUTH_DEFAULT = false; 072 private final String SMTP_SOURCE_DEFAULT = "oozie@localhost"; 073 private final String SMTP_CONNECTION_TIMEOUT_DEFAULT = "5000"; 074 private final String SMTP_TIMEOUT_DEFAULT = "5000"; 075 private static XLog LOG = XLog.getLog(SLAEmailEventListener.class); 076 private Set<SLAEvent.EventStatus> alertEvents; 077 public static String EMAIL_BODY_FIELD_SEPARATER = " - "; 078 public static String EMAIL_BODY_FIELD_INDENT = " "; 079 public static String EMAIL_BODY_HEADER_SEPARATER = ":"; 080 081 public enum EmailField { 082 EVENT_STATUS("SLA Status"), APP_TYPE("App Type"), APP_NAME("App Name"), USER("User"), JOBID("Job ID"), PARENT_JOBID( 083 "Parent Job ID"), JOB_URL("Job URL"), PARENT_JOB_URL("Parent Job URL"), NOMINAL_TIME("Nominal Time"), 084 EXPECTED_START_TIME("Expected Start Time"), ACTUAL_START_TIME("Actual Start Time"), 085 EXPECTED_END_TIME("Expected End Time"), ACTUAL_END_TIME("Actual End Time"), EXPECTED_DURATION("Expected Duration (in mins)"), 086 ACTUAL_DURATION("Actual Duration (in mins)"), NOTIFICATION_MESSAGE("Notification Message"), UPSTREAM_APPS("Upstream Apps"), 087 JOB_STATUS("Job Status"); 088 private String name; 089 090 private EmailField(String name) { 091 this.name = name; 092 } 093 094 public String toString() { 095 return name; 096 } 097 }; 098 099 @Override 100 public void init(Configuration conf) throws Exception { 101 102 oozieBaseUrl = conf.get(OOZIE_BASE_URL); 103 // Get SMTP properties from the configuration used in Email Action 104 String smtpHost = conf.get(EmailActionExecutor.EMAIL_SMTP_HOST, SMTP_HOST_DEFAULT); 105 String smtpPort = conf.get(EmailActionExecutor.EMAIL_SMTP_PORT, SMTP_PORT_DEFAULT); 106 Boolean smtpAuth = conf.getBoolean(EmailActionExecutor.EMAIL_SMTP_AUTH, SMTP_AUTH_DEFAULT); 107 String smtpUser = conf.get(EmailActionExecutor.EMAIL_SMTP_USER, ""); 108 String smtpPassword = conf.get(EmailActionExecutor.EMAIL_SMTP_PASS, ""); 109 String smtpConnectTimeout = conf.get(SMTP_CONNECTION_TIMEOUT, SMTP_CONNECTION_TIMEOUT_DEFAULT); 110 String smtpTimeout = conf.get(SMTP_TIMEOUT, SMTP_TIMEOUT_DEFAULT); 111 112 int blacklistTimeOut = Integer.valueOf(conf.get(BLACKLIST_CACHE_TIMEOUT, BLACKLIST_CACHE_TIMEOUT_DEFAULT)); 113 blacklistFailCount = Integer.valueOf(conf.get(BLACKLIST_FAIL_COUNT, BLACKLIST_FAIL_COUNT_DEFAULT)); 114 115 // blacklist email addresses causing SendFailedException with cache timeout 116 blackList = CacheBuilder.newBuilder() 117 .expireAfterWrite(blacklistTimeOut, TimeUnit.SECONDS) 118 .build(new CacheLoader<String, AtomicInteger>() { 119 public AtomicInteger load(String key) throws Exception { 120 return new AtomicInteger(); 121 } 122 }); 123 124 // Set SMTP properties 125 Properties properties = new Properties(); 126 properties.setProperty("mail.smtp.host", smtpHost); 127 properties.setProperty("mail.smtp.port", smtpPort); 128 properties.setProperty("mail.smtp.auth", smtpAuth.toString()); 129 properties.setProperty("mail.smtp.connectiontimeout", smtpConnectTimeout); 130 properties.setProperty("mail.smtp.timeout", smtpTimeout); 131 132 try { 133 fromAddr = new InternetAddress(conf.get("oozie.email.from.address", SMTP_SOURCE_DEFAULT)); 134 } 135 catch (AddressException ae) { 136 LOG.error("Bad Source Address specified in oozie.email.from.address", ae); 137 throw ae; 138 } 139 140 if (!smtpAuth) { 141 session = Session.getInstance(properties); 142 } 143 else { 144 session = Session.getInstance(properties, new JavaMailAuthenticator(smtpUser, smtpPassword)); 145 } 146 147 alertEvents = new HashSet<SLAEvent.EventStatus>(); 148 String alertEventsStr = conf.get(SLAService.CONF_ALERT_EVENTS); 149 if (alertEventsStr != null) { 150 String[] alertEvt = alertEventsStr.split(",", -1); 151 for (String evt : alertEvt) { 152 alertEvents.add(SLAEvent.EventStatus.valueOf(evt)); 153 } 154 } 155 } 156 157 @Override 158 public void destroy() { 159 } 160 161 private void sendSLAEmail(SLAEvent event) throws Exception { 162 // If no address is provided, the user did not want to send an email so simply log it and do nothing 163 if (event.getAlertContact() == null || event.getAlertContact().trim().length() == 0) { 164 LOG.info("No destination address provided; an SLA alert email will not be sent"); 165 } else { 166 // Create and send an email 167 Message message = new MimeMessage(session); 168 setMessageHeader(message, event); 169 setMessageBody(message, event); 170 sendEmail(message); 171 } 172 } 173 174 @Override 175 public void onStartMiss(SLAEvent event) { 176 boolean flag = false; 177 if (event.getAlertEvents() == null) { 178 flag = alertEvents.contains(SLAEvent.EventStatus.START_MISS); 179 } 180 else if (event.getAlertEvents().contains(SLAEvent.EventStatus.START_MISS.name())) { 181 flag = true; 182 } 183 184 if (flag) { 185 try { 186 sendSLAEmail(event); 187 } 188 catch (Exception e) { 189 LOG.error("Failed to send StartMiss alert email", e); 190 } 191 } 192 } 193 194 @Override 195 public void onEndMiss(SLAEvent event) { 196 boolean flag = false; 197 if (event.getAlertEvents() == null) { 198 flag = alertEvents.contains(SLAEvent.EventStatus.END_MISS); 199 } 200 else if (event.getAlertEvents().contains(SLAEvent.EventStatus.END_MISS.name())) { 201 flag = true; 202 } 203 204 if (flag) { 205 try { 206 sendSLAEmail(event); 207 } 208 catch (Exception e) { 209 LOG.error("Failed to send EndMiss alert email", e); 210 } 211 } 212 } 213 214 @Override 215 public void onDurationMiss(SLAEvent event) { 216 boolean flag = false; 217 if (event.getAlertEvents() == null) { 218 flag = alertEvents.contains(SLAEvent.EventStatus.DURATION_MISS); 219 } 220 else if (event.getAlertEvents().contains(SLAEvent.EventStatus.DURATION_MISS.name())) { 221 flag = true; 222 } 223 224 if (flag) { 225 try { 226 sendSLAEmail(event); 227 } 228 catch (Exception e) { 229 LOG.error("Failed to send DurationMiss alert email", e); 230 } 231 } 232 } 233 234 private Address[] parseAddress(String str) { 235 Address[] addrs = null; 236 List<InternetAddress> addrList = new ArrayList<InternetAddress>(); 237 String[] emails = str.split(ADDRESS_SEPARATOR, -1); 238 239 for (String email : emails) { 240 boolean isBlackListed = false; 241 AtomicInteger val = blackList.getIfPresent(email); 242 if(val != null){ 243 isBlackListed = ( val.get() >= blacklistFailCount ); 244 } 245 if (!isBlackListed) { 246 try { 247 // turn on strict syntax check by setting 2nd argument true 248 addrList.add(new InternetAddress(email, true)); 249 } 250 catch (AddressException ae) { 251 // simply skip bad address but do not throw exception 252 LOG.error("Skipping bad destination address: " + email, ae); 253 } 254 } 255 } 256 257 if (addrList.size() > 0) 258 addrs = (Address[]) addrList.toArray(new InternetAddress[addrList.size()]); 259 260 return addrs; 261 } 262 263 private void setMessageHeader(Message msg, SLAEvent event) throws MessagingException { 264 Address[] from = new InternetAddress[] { fromAddr }; 265 Address[] to; 266 StringBuilder subject = new StringBuilder(); 267 268 to = parseAddress(event.getAlertContact()); 269 if (to == null) { 270 LOG.error("Destination address is null or invalid, stop sending SLA alert email"); 271 throw new IllegalArgumentException("Destination address is not specified properly"); 272 } 273 subject.append("OOZIE - SLA "); 274 subject.append(event.getEventStatus().name()); 275 subject.append(" (AppName="); 276 subject.append(event.getAppName()); 277 subject.append(", JobID="); 278 subject.append(event.getId()); 279 subject.append(")"); 280 281 try { 282 msg.addFrom(from); 283 msg.addRecipients(RecipientType.TO, to); 284 msg.setSubject(subject.toString()); 285 } 286 catch (MessagingException me) { 287 LOG.error("Message Exception in setting message header of SLA alert email", me); 288 throw me; 289 } 290 } 291 292 private void setMessageBody(Message msg, SLAEvent event) throws MessagingException { 293 StringBuilder body = new StringBuilder(); 294 printHeading(body, "Status"); 295 printField(body, EmailField.EVENT_STATUS.toString(), event.getEventStatus()); 296 printField(body, EmailField.JOB_STATUS.toString(), event.getJobStatus()); 297 printField(body, EmailField.NOTIFICATION_MESSAGE.toString(), event.getNotificationMsg()); 298 299 printHeading(body, "Job Details"); 300 printField(body, EmailField.APP_NAME.toString(), event.getAppName()); 301 printField(body, EmailField.APP_TYPE.toString(), event.getAppType()); 302 printField(body, EmailField.USER.toString(), event.getUser()); 303 printField(body, EmailField.JOBID.toString(), event.getId()); 304 printField(body, EmailField.JOB_URL.toString(), getJobLink(event.getId())); 305 printField(body, EmailField.PARENT_JOBID.toString(), event.getParentId() != null ? event.getParentId() : "N/A"); 306 printField(body, EmailField.PARENT_JOB_URL.toString(), 307 event.getParentId() != null ? getJobLink(event.getParentId()) : "N/A"); 308 printField(body, EmailField.UPSTREAM_APPS.toString(), event.getUpstreamApps()); 309 310 printHeading(body, "SLA Details"); 311 printField(body, EmailField.NOMINAL_TIME.toString(), event.getNominalTime()); 312 printField(body, EmailField.EXPECTED_START_TIME.toString(), event.getExpectedStart()); 313 printField(body, EmailField.ACTUAL_START_TIME.toString(), event.getActualStart()); 314 printField(body, EmailField.EXPECTED_END_TIME.toString(), event.getExpectedEnd()); 315 printField(body, EmailField.ACTUAL_END_TIME.toString(), event.getActualEnd()); 316 printField(body, EmailField.EXPECTED_DURATION.toString(), getDurationInMins(event.getExpectedDuration())); 317 printField(body, EmailField.ACTUAL_DURATION.toString(), getDurationInMins(event.getActualDuration())); 318 319 try { 320 msg.setText(body.toString()); 321 } 322 catch (MessagingException me) { 323 LOG.error("Message Exception in setting message body of SLA alert email", me); 324 throw me; 325 } 326 } 327 328 private long getDurationInMins(long duration) { 329 if (duration < 0) { 330 return duration; 331 } 332 return duration / 60000; //Convert millis to minutes 333 } 334 335 private String getJobLink(String jobId) { 336 StringBuffer url = new StringBuffer(); 337 String param = "/?job="; 338 url.append(oozieBaseUrl); 339 url.append(param); 340 url.append(jobId); 341 return url.toString(); 342 } 343 344 private void printField(StringBuilder st, String name, Object value) { 345 String lineFeed = "\n"; 346 if (value != null) { 347 st.append(EMAIL_BODY_FIELD_INDENT); 348 st.append(name); 349 st.append(EMAIL_BODY_FIELD_SEPARATER); 350 st.append(value); 351 st.append(lineFeed); 352 } 353 } 354 355 private void printHeading(StringBuilder st, String header) { 356 st.append(header); 357 st.append(EMAIL_BODY_HEADER_SEPARATER); 358 st.append("\n"); 359 } 360 361 private void sendEmail(Message message) throws MessagingException { 362 try { 363 Transport.send(message); 364 } 365 catch (NoSuchProviderException se) { 366 LOG.error("Could not find an SMTP transport provider to email", se); 367 throw se; 368 } 369 catch (MessagingException me) { 370 LOG.error("Message Exception in transporting SLA alert email", me); 371 if (me instanceof SendFailedException) { 372 Address[] invalidAddrs = ((SendFailedException) me).getInvalidAddresses(); 373 if (invalidAddrs != null && invalidAddrs.length > 0) { 374 for (Address addr : invalidAddrs) { 375 try { 376 // 'get' method loads key into cache when it doesn't exist 377 AtomicInteger val = blackList.get(addr.toString()); 378 val.incrementAndGet(); 379 } 380 catch (Exception e) { 381 LOG.debug("blacklist loading throwed exception"); 382 } 383 } 384 } 385 } 386 throw me; 387 } 388 } 389 390 @VisibleForTesting 391 public void addBlackList(String email) throws Exception { 392 // this is for testing 393 if(email == null || email.equals("")){ 394 return; 395 } 396 AtomicInteger val = blackList.get(email); 397 val.set(blacklistFailCount); 398 } 399 400 @Override 401 public void onStartMet(SLAEvent work) { 402 } 403 404 @Override 405 public void onEndMet(SLAEvent work) { 406 } 407 408 @Override 409 public void onDurationMet(SLAEvent work) { 410 } 411 412}