This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.oozie.sla.listener;
020
021 import java.util.ArrayList;
022 import java.util.HashSet;
023 import java.util.List;
024 import java.util.Properties;
025 import java.util.Set;
026 import java.util.concurrent.TimeUnit;
027 import java.util.concurrent.atomic.AtomicInteger;
028
029 import javax.mail.Address;
030 import javax.mail.Message;
031 import javax.mail.MessagingException;
032 import javax.mail.NoSuchProviderException;
033 import javax.mail.SendFailedException;
034 import javax.mail.Session;
035 import javax.mail.Transport;
036 import javax.mail.internet.AddressException;
037 import javax.mail.internet.InternetAddress;
038 import javax.mail.internet.MimeMessage;
039 import javax.mail.internet.MimeMessage.RecipientType;
040
041 import org.apache.hadoop.conf.Configuration;
042 import org.apache.oozie.action.email.EmailActionExecutor;
043 import org.apache.oozie.action.email.EmailActionExecutor.JavaMailAuthenticator;
044 import org.apache.oozie.client.event.SLAEvent;
045 import org.apache.oozie.sla.listener.SLAEventListener;
046 import org.apache.oozie.sla.service.SLAService;
047 import org.apache.oozie.util.XLog;
048
049 import com.google.common.annotations.VisibleForTesting;
050 import com.google.common.cache.CacheBuilder;
051 import com.google.common.cache.CacheLoader;
052 import com.google.common.cache.LoadingCache;
053
054 public class SLAEmailEventListener extends SLAEventListener {
055
056 public static final String SMTP_CONNECTION_TIMEOUT = EmailActionExecutor.CONF_PREFIX + "smtp.connectiontimeout";
057 public static final String SMTP_TIMEOUT = EmailActionExecutor.CONF_PREFIX + "smtp.timeout";
058 public static final String BLACKLIST_CACHE_TIMEOUT = EmailActionExecutor.CONF_PREFIX + "blacklist.cachetimeout";
059 public static final String BLACKLIST_FAIL_COUNT = EmailActionExecutor.CONF_PREFIX + "blacklist.failcount";
060 public static final String OOZIE_BASE_URL = "oozie.base.url";
061 private Session session;
062 private String oozieBaseUrl;
063 private InternetAddress fromAddr;
064 private String ADDRESS_SEPARATOR = ",";
065 private LoadingCache<String, AtomicInteger> blackList;
066 private int blacklistFailCount;
067 private final String BLACKLIST_CACHE_TIMEOUT_DEFAULT = "1800"; // in sec. default to 30 min
068 private final String BLACKLIST_FAIL_COUNT_DEFAULT = "2"; // stop sending when fail count equals or exceeds
069 private final String SMTP_HOST_DEFAULT = "localhost";
070 private final String SMTP_PORT_DEFAULT = "25";
071 private final boolean SMTP_AUTH_DEFAULT = false;
072 private final String SMTP_SOURCE_DEFAULT = "oozie@localhost";
073 private final String SMTP_CONNECTION_TIMEOUT_DEFAULT = "5000";
074 private final String SMTP_TIMEOUT_DEFAULT = "5000";
075 private static XLog LOG = XLog.getLog(SLAEmailEventListener.class);
076 private Set<SLAEvent.EventStatus> alertEvents;
077 public static String EMAIL_BODY_FIELD_SEPARATER = " - ";
078 public static String EMAIL_BODY_FIELD_INDENT = " ";
079 public static String EMAIL_BODY_HEADER_SEPARATER = ":";
080
081 public enum EmailField {
082 EVENT_STATUS("SLA Status"), APP_TYPE("App Type"), APP_NAME("App Name"), USER("User"), JOBID("Job ID"), PARENT_JOBID(
083 "Parent Job ID"), JOB_URL("Job URL"), PARENT_JOB_URL("Parent Job URL"), NOMINAL_TIME("Nominal Time"),
084 EXPECTED_START_TIME("Expected Start Time"), ACTUAL_START_TIME("Actual Start Time"),
085 EXPECTED_END_TIME("Expected End Time"), ACTUAL_END_TIME("Actual End Time"), EXPECTED_DURATION("Expected Duration (in mins)"),
086 ACTUAL_DURATION("Actual Duration (in mins)"), NOTIFICATION_MESSAGE("Notification Message"), UPSTREAM_APPS("Upstream Apps"),
087 JOB_STATUS("Job Status");
088 private String name;
089
090 private EmailField(String name) {
091 this.name = name;
092 }
093
094 public String toString() {
095 return name;
096 }
097 };
098
099 @Override
100 public void init(Configuration conf) throws Exception {
101
102 oozieBaseUrl = conf.get(OOZIE_BASE_URL);
103 // Get SMTP properties from the configuration used in Email Action
104 String smtpHost = conf.get(EmailActionExecutor.EMAIL_SMTP_HOST, SMTP_HOST_DEFAULT);
105 String smtpPort = conf.get(EmailActionExecutor.EMAIL_SMTP_PORT, SMTP_PORT_DEFAULT);
106 Boolean smtpAuth = conf.getBoolean(EmailActionExecutor.EMAIL_SMTP_AUTH, SMTP_AUTH_DEFAULT);
107 String smtpUser = conf.get(EmailActionExecutor.EMAIL_SMTP_USER, "");
108 String smtpPassword = conf.get(EmailActionExecutor.EMAIL_SMTP_PASS, "");
109 String smtpConnectTimeout = conf.get(SMTP_CONNECTION_TIMEOUT, SMTP_CONNECTION_TIMEOUT_DEFAULT);
110 String smtpTimeout = conf.get(SMTP_TIMEOUT, SMTP_TIMEOUT_DEFAULT);
111
112 int blacklistTimeOut = Integer.valueOf(conf.get(BLACKLIST_CACHE_TIMEOUT, BLACKLIST_CACHE_TIMEOUT_DEFAULT));
113 blacklistFailCount = Integer.valueOf(conf.get(BLACKLIST_FAIL_COUNT, BLACKLIST_FAIL_COUNT_DEFAULT));
114
115 // blacklist email addresses causing SendFailedException with cache timeout
116 blackList = CacheBuilder.newBuilder()
117 .expireAfterWrite(blacklistTimeOut, TimeUnit.SECONDS)
118 .build(new CacheLoader<String, AtomicInteger>() {
119 public AtomicInteger load(String key) throws Exception {
120 return new AtomicInteger();
121 }
122 });
123
124 // Set SMTP properties
125 Properties properties = new Properties();
126 properties.setProperty("mail.smtp.host", smtpHost);
127 properties.setProperty("mail.smtp.port", smtpPort);
128 properties.setProperty("mail.smtp.auth", smtpAuth.toString());
129 properties.setProperty("mail.smtp.connectiontimeout", smtpConnectTimeout);
130 properties.setProperty("mail.smtp.timeout", smtpTimeout);
131
132 try {
133 fromAddr = new InternetAddress(conf.get("oozie.email.from.address", SMTP_SOURCE_DEFAULT));
134 }
135 catch (AddressException ae) {
136 LOG.error("Bad Source Address specified in oozie.email.from.address", ae);
137 throw ae;
138 }
139
140 if (!smtpAuth) {
141 session = Session.getInstance(properties);
142 }
143 else {
144 session = Session.getInstance(properties, new JavaMailAuthenticator(smtpUser, smtpPassword));
145 }
146
147 alertEvents = new HashSet<SLAEvent.EventStatus>();
148 String alertEventsStr = conf.get(SLAService.CONF_ALERT_EVENTS);
149 if (alertEventsStr != null) {
150 String[] alertEvt = alertEventsStr.split(",", -1);
151 for (String evt : alertEvt) {
152 alertEvents.add(SLAEvent.EventStatus.valueOf(evt));
153 }
154 }
155 }
156
157 @Override
158 public void destroy() {
159 }
160
161 private void sendSLAEmail(SLAEvent event) throws Exception {
162 Message message = new MimeMessage(session);
163 setMessageHeader(message, event);
164 setMessageBody(message, event);
165 sendEmail(message);
166 }
167
168 @Override
169 public void onStartMiss(SLAEvent event) {
170 boolean flag = false;
171 if (event.getAlertEvents() == null) {
172 flag = alertEvents.contains(SLAEvent.EventStatus.START_MISS);
173 }
174 else if (event.getAlertEvents().contains(SLAEvent.EventStatus.START_MISS.name())) {
175 flag = true;
176 }
177
178 if (flag) {
179 try {
180 sendSLAEmail(event);
181 }
182 catch (Exception e) {
183 LOG.error("Failed to send StartMiss alert email", e);
184 }
185 }
186 }
187
188 @Override
189 public void onEndMiss(SLAEvent event) {
190 boolean flag = false;
191 if (event.getAlertEvents() == null) {
192 flag = alertEvents.contains(SLAEvent.EventStatus.END_MISS);
193 }
194 else if (event.getAlertEvents().contains(SLAEvent.EventStatus.END_MISS.name())) {
195 flag = true;
196 }
197
198 if (flag) {
199 try {
200 sendSLAEmail(event);
201 }
202 catch (Exception e) {
203 LOG.error("Failed to send EndMiss alert email", e);
204 }
205 }
206 }
207
208 @Override
209 public void onDurationMiss(SLAEvent event) {
210 boolean flag = false;
211 if (event.getAlertEvents() == null) {
212 flag = alertEvents.contains(SLAEvent.EventStatus.DURATION_MISS);
213 }
214 else if (event.getAlertEvents().contains(SLAEvent.EventStatus.DURATION_MISS.name())) {
215 flag = true;
216 }
217
218 if (flag) {
219 try {
220 sendSLAEmail(event);
221 }
222 catch (Exception e) {
223 LOG.error("Failed to send DurationMiss alert email", e);
224 }
225 }
226 }
227
228 private Address[] parseAddress(String str) {
229 Address[] addrs = null;
230 List<InternetAddress> addrList = new ArrayList<InternetAddress>();
231 String[] emails = str.split(ADDRESS_SEPARATOR, -1);
232
233 for (String email : emails) {
234 boolean isBlackListed = false;
235 AtomicInteger val = blackList.getIfPresent(email);
236 if(val != null){
237 isBlackListed = ( val.get() >= blacklistFailCount );
238 }
239 if (!isBlackListed) {
240 try {
241 // turn on strict syntax check by setting 2nd argument true
242 addrList.add(new InternetAddress(email, true));
243 }
244 catch (AddressException ae) {
245 // simply skip bad address but do not throw exception
246 LOG.error("Skipping bad destination address: " + email, ae);
247 }
248 }
249 }
250
251 if (addrList.size() > 0)
252 addrs = (Address[]) addrList.toArray(new InternetAddress[addrList.size()]);
253
254 return addrs;
255 }
256
257 private void setMessageHeader(Message msg, SLAEvent event) throws MessagingException {
258 Address[] from = new InternetAddress[] { fromAddr };
259 Address[] to;
260 StringBuilder subject = new StringBuilder();
261
262 to = parseAddress(event.getAlertContact());
263 if (to == null) {
264 LOG.error("Destination address is null or invalid, stop sending SLA alert email");
265 throw new IllegalArgumentException("Destination address is not specified properly");
266 }
267 subject.append("OOZIE - SLA ");
268 subject.append(event.getEventStatus().name());
269 subject.append(" (AppName=");
270 subject.append(event.getAppName());
271 subject.append(", JobID=");
272 subject.append(event.getId());
273 subject.append(")");
274
275 try {
276 msg.addFrom(from);
277 msg.addRecipients(RecipientType.TO, to);
278 msg.setSubject(subject.toString());
279 }
280 catch (MessagingException me) {
281 LOG.error("Message Exception in setting message header of SLA alert email", me);
282 throw me;
283 }
284 }
285
286 private void setMessageBody(Message msg, SLAEvent event) throws MessagingException {
287 StringBuilder body = new StringBuilder();
288 printHeading(body, "Status");
289 printField(body, EmailField.EVENT_STATUS.toString(), event.getEventStatus());
290 printField(body, EmailField.JOB_STATUS.toString(), event.getJobStatus());
291 printField(body, EmailField.NOTIFICATION_MESSAGE.toString(), event.getNotificationMsg());
292
293 printHeading(body, "Job Details");
294 printField(body, EmailField.APP_NAME.toString(), event.getAppName());
295 printField(body, EmailField.APP_TYPE.toString(), event.getAppType());
296 printField(body, EmailField.USER.toString(), event.getUser());
297 printField(body, EmailField.JOBID.toString(), event.getId());
298 printField(body, EmailField.JOB_URL.toString(), getJobLink(event.getId()));
299 printField(body, EmailField.PARENT_JOBID.toString(), event.getParentId() != null ? event.getParentId() : "N/A");
300 printField(body, EmailField.PARENT_JOB_URL.toString(),
301 event.getParentId() != null ? getJobLink(event.getParentId()) : "N/A");
302 printField(body, EmailField.UPSTREAM_APPS.toString(), event.getUpstreamApps());
303
304 printHeading(body, "SLA Details");
305 printField(body, EmailField.NOMINAL_TIME.toString(), event.getNominalTime());
306 printField(body, EmailField.EXPECTED_START_TIME.toString(), event.getExpectedStart());
307 printField(body, EmailField.ACTUAL_START_TIME.toString(), event.getActualStart());
308 printField(body, EmailField.EXPECTED_END_TIME.toString(), event.getExpectedEnd());
309 printField(body, EmailField.ACTUAL_END_TIME.toString(), event.getActualEnd());
310 printField(body, EmailField.EXPECTED_DURATION.toString(), getDurationInMins(event.getExpectedDuration()));
311 printField(body, EmailField.ACTUAL_DURATION.toString(), getDurationInMins(event.getActualDuration()));
312
313 try {
314 msg.setText(body.toString());
315 }
316 catch (MessagingException me) {
317 LOG.error("Message Exception in setting message body of SLA alert email", me);
318 throw me;
319 }
320 }
321
322 private long getDurationInMins(long duration) {
323 if (duration < 0) {
324 return duration;
325 }
326 return duration / 60000; //Convert millis to minutes
327 }
328
329 private String getJobLink(String jobId) {
330 StringBuffer url = new StringBuffer();
331 String param = "/?job=";
332 url.append(oozieBaseUrl);
333 url.append(param);
334 url.append(jobId);
335 return url.toString();
336 }
337
338 private void printField(StringBuilder st, String name, Object value) {
339 String lineFeed = "\n";
340 if (value != null) {
341 st.append(EMAIL_BODY_FIELD_INDENT);
342 st.append(name);
343 st.append(EMAIL_BODY_FIELD_SEPARATER);
344 st.append(value);
345 st.append(lineFeed);
346 }
347 }
348
349 private void printHeading(StringBuilder st, String header) {
350 st.append(header);
351 st.append(EMAIL_BODY_HEADER_SEPARATER);
352 st.append("\n");
353 }
354
355 private void sendEmail(Message message) throws MessagingException {
356 try {
357 Transport.send(message);
358 }
359 catch (NoSuchProviderException se) {
360 LOG.error("Could not find an SMTP transport provider to email", se);
361 throw se;
362 }
363 catch (MessagingException me) {
364 LOG.error("Message Exception in transporting SLA alert email", me);
365 if (me instanceof SendFailedException) {
366 Address[] invalidAddrs = ((SendFailedException) me).getInvalidAddresses();
367 if (invalidAddrs != null && invalidAddrs.length > 0) {
368 for (Address addr : invalidAddrs) {
369 try {
370 // 'get' method loads key into cache when it doesn't exist
371 AtomicInteger val = blackList.get(addr.toString());
372 val.incrementAndGet();
373 }
374 catch (Exception e) {
375 LOG.debug("blacklist loading throwed exception");
376 }
377 }
378 }
379 }
380 throw me;
381 }
382 }
383
384 @VisibleForTesting
385 public void addBlackList(String email) throws Exception {
386 // this is for testing
387 if(email == null || email.equals("")){
388 return;
389 }
390 AtomicInteger val = blackList.get(email);
391 val.set(blacklistFailCount);
392 }
393
394 @Override
395 public void onStartMet(SLAEvent work) {
396 }
397
398 @Override
399 public void onEndMet(SLAEvent work) {
400 }
401
402 @Override
403 public void onDurationMet(SLAEvent work) {
404 }
405
406 }