001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.sla.listener;
020
021import java.util.ArrayList;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Properties;
025import java.util.Set;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicInteger;
028
029import javax.mail.Address;
030import javax.mail.Message;
031import javax.mail.MessagingException;
032import javax.mail.NoSuchProviderException;
033import javax.mail.SendFailedException;
034import javax.mail.Session;
035import javax.mail.Transport;
036import javax.mail.internet.AddressException;
037import javax.mail.internet.InternetAddress;
038import javax.mail.internet.MimeMessage;
039import javax.mail.internet.MimeMessage.RecipientType;
040
041import org.apache.hadoop.conf.Configuration;
042import org.apache.oozie.action.email.EmailActionExecutor;
043import org.apache.oozie.action.email.EmailActionExecutor.JavaMailAuthenticator;
044import org.apache.oozie.client.event.SLAEvent;
045import org.apache.oozie.sla.listener.SLAEventListener;
046import org.apache.oozie.sla.service.SLAService;
047import org.apache.oozie.util.XLog;
048
049import com.google.common.annotations.VisibleForTesting;
050import com.google.common.cache.CacheBuilder;
051import com.google.common.cache.CacheLoader;
052import com.google.common.cache.LoadingCache;
053
054public class SLAEmailEventListener extends SLAEventListener {
055
056    public static final String SMTP_CONNECTION_TIMEOUT = EmailActionExecutor.CONF_PREFIX + "smtp.connectiontimeout";
057    public static final String SMTP_TIMEOUT = EmailActionExecutor.CONF_PREFIX + "smtp.timeout";
058    public static final String BLACKLIST_CACHE_TIMEOUT = EmailActionExecutor.CONF_PREFIX + "blacklist.cachetimeout";
059    public static final String BLACKLIST_FAIL_COUNT = EmailActionExecutor.CONF_PREFIX + "blacklist.failcount";
060    public static final String OOZIE_BASE_URL = "oozie.base.url";
061    private Session session;
062    private String oozieBaseUrl;
063    private InternetAddress fromAddr;
064    private String ADDRESS_SEPARATOR = ",";
065    private LoadingCache<String, AtomicInteger> blackList;
066    private int blacklistFailCount;
067    private final String BLACKLIST_CACHE_TIMEOUT_DEFAULT = "1800"; // in sec. default to 30 min
068    private final String BLACKLIST_FAIL_COUNT_DEFAULT = "2"; // stop sending when fail count equals or exceeds
069    private final String SMTP_HOST_DEFAULT = "localhost";
070    private final String SMTP_PORT_DEFAULT = "25";
071    private final boolean SMTP_AUTH_DEFAULT = false;
072    private final String SMTP_SOURCE_DEFAULT = "oozie@localhost";
073    private final String SMTP_CONNECTION_TIMEOUT_DEFAULT = "5000";
074    private final String SMTP_TIMEOUT_DEFAULT = "5000";
075    private static XLog LOG = XLog.getLog(SLAEmailEventListener.class);
076    private Set<SLAEvent.EventStatus> alertEvents;
077    public static String EMAIL_BODY_FIELD_SEPARATER = " - ";
078    public static String EMAIL_BODY_FIELD_INDENT = "  ";
079    public static String EMAIL_BODY_HEADER_SEPARATER = ":";
080
081    public enum EmailField {
082        EVENT_STATUS("SLA Status"), APP_TYPE("App Type"), APP_NAME("App Name"), USER("User"), JOBID("Job ID"), PARENT_JOBID(
083                "Parent Job ID"), JOB_URL("Job URL"), PARENT_JOB_URL("Parent Job URL"), NOMINAL_TIME("Nominal Time"),
084                EXPECTED_START_TIME("Expected Start Time"), ACTUAL_START_TIME("Actual Start Time"),
085                EXPECTED_END_TIME("Expected End Time"), ACTUAL_END_TIME("Actual End Time"), EXPECTED_DURATION("Expected Duration (in mins)"),
086                ACTUAL_DURATION("Actual Duration (in mins)"), NOTIFICATION_MESSAGE("Notification Message"), UPSTREAM_APPS("Upstream Apps"),
087                JOB_STATUS("Job Status");
088        private String name;
089
090        private EmailField(String name) {
091            this.name = name;
092        }
093
094        public String toString() {
095            return name;
096        }
097    };
098
099    @Override
100    public void init(Configuration conf) throws Exception {
101
102        oozieBaseUrl = conf.get(OOZIE_BASE_URL);
103        // Get SMTP properties from the configuration used in Email Action
104        String smtpHost = conf.get(EmailActionExecutor.EMAIL_SMTP_HOST, SMTP_HOST_DEFAULT);
105        String smtpPort = conf.get(EmailActionExecutor.EMAIL_SMTP_PORT, SMTP_PORT_DEFAULT);
106        Boolean smtpAuth = conf.getBoolean(EmailActionExecutor.EMAIL_SMTP_AUTH, SMTP_AUTH_DEFAULT);
107        String smtpUser = conf.get(EmailActionExecutor.EMAIL_SMTP_USER, "");
108        String smtpPassword = conf.get(EmailActionExecutor.EMAIL_SMTP_PASS, "");
109        String smtpConnectTimeout = conf.get(SMTP_CONNECTION_TIMEOUT, SMTP_CONNECTION_TIMEOUT_DEFAULT);
110        String smtpTimeout = conf.get(SMTP_TIMEOUT, SMTP_TIMEOUT_DEFAULT);
111
112        int blacklistTimeOut = Integer.valueOf(conf.get(BLACKLIST_CACHE_TIMEOUT, BLACKLIST_CACHE_TIMEOUT_DEFAULT));
113        blacklistFailCount = Integer.valueOf(conf.get(BLACKLIST_FAIL_COUNT, BLACKLIST_FAIL_COUNT_DEFAULT));
114
115        // blacklist email addresses causing SendFailedException with cache timeout
116        blackList = CacheBuilder.newBuilder()
117                .expireAfterWrite(blacklistTimeOut, TimeUnit.SECONDS)
118                .build(new CacheLoader<String, AtomicInteger>() {
119                    public AtomicInteger load(String key) throws Exception {
120                        return new AtomicInteger();
121                    }
122                });
123
124        // Set SMTP properties
125        Properties properties = new Properties();
126        properties.setProperty("mail.smtp.host", smtpHost);
127        properties.setProperty("mail.smtp.port", smtpPort);
128        properties.setProperty("mail.smtp.auth", smtpAuth.toString());
129        properties.setProperty("mail.smtp.connectiontimeout", smtpConnectTimeout);
130        properties.setProperty("mail.smtp.timeout", smtpTimeout);
131
132        try {
133            fromAddr = new InternetAddress(conf.get("oozie.email.from.address", SMTP_SOURCE_DEFAULT));
134        }
135        catch (AddressException ae) {
136            LOG.error("Bad Source Address specified in oozie.email.from.address", ae);
137            throw ae;
138        }
139
140        if (!smtpAuth) {
141            session = Session.getInstance(properties);
142        }
143        else {
144            session = Session.getInstance(properties, new JavaMailAuthenticator(smtpUser, smtpPassword));
145        }
146
147        alertEvents = new HashSet<SLAEvent.EventStatus>();
148        String alertEventsStr = conf.get(SLAService.CONF_ALERT_EVENTS);
149        if (alertEventsStr != null) {
150            String[] alertEvt = alertEventsStr.split(",", -1);
151            for (String evt : alertEvt) {
152                alertEvents.add(SLAEvent.EventStatus.valueOf(evt));
153            }
154        }
155    }
156
157    @Override
158    public void destroy() {
159    }
160
161    private void sendSLAEmail(SLAEvent event) throws Exception {
162        // If no address is provided, the user did not want to send an email so simply log it and do nothing
163        if (event.getAlertContact() == null || event.getAlertContact().trim().length() == 0) {
164            LOG.info("No destination address provided; an SLA alert email will not be sent");
165        } else {
166            // Create and send an email
167            Message message = new MimeMessage(session);
168            setMessageHeader(message, event);
169            setMessageBody(message, event);
170            sendEmail(message);
171        }
172    }
173
174    @Override
175    public void onStartMiss(SLAEvent event) {
176        boolean flag = false;
177        if (event.getAlertEvents() == null) {
178            flag = alertEvents.contains(SLAEvent.EventStatus.START_MISS);
179        }
180        else if (event.getAlertEvents().contains(SLAEvent.EventStatus.START_MISS.name())) {
181            flag = true;
182        }
183
184        if (flag) {
185            try {
186                sendSLAEmail(event);
187            }
188            catch (Exception e) {
189                LOG.error("Failed to send StartMiss alert email", e);
190            }
191        }
192    }
193
194    @Override
195    public void onEndMiss(SLAEvent event) {
196        boolean flag = false;
197        if (event.getAlertEvents() == null) {
198            flag = alertEvents.contains(SLAEvent.EventStatus.END_MISS);
199        }
200        else if (event.getAlertEvents().contains(SLAEvent.EventStatus.END_MISS.name())) {
201            flag = true;
202        }
203
204        if (flag) {
205            try {
206                sendSLAEmail(event);
207            }
208            catch (Exception e) {
209                LOG.error("Failed to send EndMiss alert email", e);
210            }
211        }
212    }
213
214    @Override
215    public void onDurationMiss(SLAEvent event) {
216        boolean flag = false;
217        if (event.getAlertEvents() == null) {
218            flag = alertEvents.contains(SLAEvent.EventStatus.DURATION_MISS);
219        }
220        else if (event.getAlertEvents().contains(SLAEvent.EventStatus.DURATION_MISS.name())) {
221            flag = true;
222        }
223
224        if (flag) {
225            try {
226                sendSLAEmail(event);
227            }
228            catch (Exception e) {
229                LOG.error("Failed to send DurationMiss alert email", e);
230            }
231        }
232    }
233
234    private Address[] parseAddress(String str) {
235        Address[] addrs = null;
236        List<InternetAddress> addrList = new ArrayList<InternetAddress>();
237        String[] emails = str.split(ADDRESS_SEPARATOR, -1);
238
239        for (String email : emails) {
240            boolean isBlackListed = false;
241            AtomicInteger val = blackList.getIfPresent(email);
242            if(val != null){
243                isBlackListed = ( val.get() >= blacklistFailCount );
244            }
245            if (!isBlackListed) {
246                try {
247                    // turn on strict syntax check by setting 2nd argument true
248                    addrList.add(new InternetAddress(email, true));
249                }
250                catch (AddressException ae) {
251                    // simply skip bad address but do not throw exception
252                    LOG.error("Skipping bad destination address: " + email, ae);
253                }
254            }
255        }
256
257        if (addrList.size() > 0)
258            addrs = (Address[]) addrList.toArray(new InternetAddress[addrList.size()]);
259
260        return addrs;
261    }
262
263    private void setMessageHeader(Message msg, SLAEvent event) throws MessagingException {
264        Address[] from = new InternetAddress[] { fromAddr };
265        Address[] to;
266        StringBuilder subject = new StringBuilder();
267
268        to = parseAddress(event.getAlertContact());
269        if (to == null) {
270            LOG.error("Destination address is null or invalid, stop sending SLA alert email");
271            throw new IllegalArgumentException("Destination address is not specified properly");
272        }
273        subject.append("OOZIE - SLA ");
274        subject.append(event.getEventStatus().name());
275        subject.append(" (AppName=");
276        subject.append(event.getAppName());
277        subject.append(", JobID=");
278        subject.append(event.getId());
279        subject.append(")");
280
281        try {
282            msg.addFrom(from);
283            msg.addRecipients(RecipientType.TO, to);
284            msg.setSubject(subject.toString());
285        }
286        catch (MessagingException me) {
287            LOG.error("Message Exception in setting message header of SLA alert email", me);
288            throw me;
289        }
290    }
291
292    private void setMessageBody(Message msg, SLAEvent event) throws MessagingException {
293        StringBuilder body = new StringBuilder();
294        printHeading(body, "Status");
295        printField(body, EmailField.EVENT_STATUS.toString(), event.getEventStatus());
296        printField(body, EmailField.JOB_STATUS.toString(), event.getJobStatus());
297        printField(body, EmailField.NOTIFICATION_MESSAGE.toString(), event.getNotificationMsg());
298
299        printHeading(body, "Job Details");
300        printField(body, EmailField.APP_NAME.toString(), event.getAppName());
301        printField(body, EmailField.APP_TYPE.toString(), event.getAppType());
302        printField(body, EmailField.USER.toString(), event.getUser());
303        printField(body, EmailField.JOBID.toString(), event.getId());
304        printField(body, EmailField.JOB_URL.toString(), getJobLink(event.getId()));
305        printField(body, EmailField.PARENT_JOBID.toString(), event.getParentId() != null ? event.getParentId() : "N/A");
306        printField(body, EmailField.PARENT_JOB_URL.toString(),
307                event.getParentId() != null ? getJobLink(event.getParentId()) : "N/A");
308        printField(body, EmailField.UPSTREAM_APPS.toString(), event.getUpstreamApps());
309
310        printHeading(body, "SLA Details");
311        printField(body, EmailField.NOMINAL_TIME.toString(), event.getNominalTime());
312        printField(body, EmailField.EXPECTED_START_TIME.toString(), event.getExpectedStart());
313        printField(body, EmailField.ACTUAL_START_TIME.toString(), event.getActualStart());
314        printField(body, EmailField.EXPECTED_END_TIME.toString(), event.getExpectedEnd());
315        printField(body, EmailField.ACTUAL_END_TIME.toString(), event.getActualEnd());
316        printField(body, EmailField.EXPECTED_DURATION.toString(), getDurationInMins(event.getExpectedDuration()));
317        printField(body, EmailField.ACTUAL_DURATION.toString(), getDurationInMins(event.getActualDuration()));
318
319        try {
320            msg.setText(body.toString());
321        }
322        catch (MessagingException me) {
323            LOG.error("Message Exception in setting message body of SLA alert email", me);
324            throw me;
325        }
326    }
327
328    private long getDurationInMins(long duration) {
329        if (duration < 0) {
330            return duration;
331        }
332        return duration / 60000; //Convert millis to minutes
333    }
334
335    private String getJobLink(String jobId) {
336        StringBuffer url = new StringBuffer();
337        String param = "/?job=";
338        url.append(oozieBaseUrl);
339        url.append(param);
340        url.append(jobId);
341        return url.toString();
342    }
343
344    private void printField(StringBuilder st, String name, Object value) {
345        String lineFeed = "\n";
346        if (value != null) {
347            st.append(EMAIL_BODY_FIELD_INDENT);
348            st.append(name);
349            st.append(EMAIL_BODY_FIELD_SEPARATER);
350            st.append(value);
351            st.append(lineFeed);
352        }
353    }
354
355    private void printHeading(StringBuilder st, String header) {
356        st.append(header);
357        st.append(EMAIL_BODY_HEADER_SEPARATER);
358        st.append("\n");
359    }
360
361    private void sendEmail(Message message) throws MessagingException {
362        try {
363            Transport.send(message);
364        }
365        catch (NoSuchProviderException se) {
366            LOG.error("Could not find an SMTP transport provider to email", se);
367            throw se;
368        }
369        catch (MessagingException me) {
370            LOG.error("Message Exception in transporting SLA alert email", me);
371            if (me instanceof SendFailedException) {
372                Address[] invalidAddrs = ((SendFailedException) me).getInvalidAddresses();
373                if (invalidAddrs != null && invalidAddrs.length > 0) {
374                    for (Address addr : invalidAddrs) {
375                        try {
376                            // 'get' method loads key into cache when it doesn't exist
377                            AtomicInteger val = blackList.get(addr.toString());
378                            val.incrementAndGet();
379                        }
380                        catch (Exception e) {
381                            LOG.debug("blacklist loading throwed exception");
382                        }
383                    }
384                }
385            }
386            throw me;
387        }
388    }
389
390    @VisibleForTesting
391    public void addBlackList(String email) throws Exception {
392        // this is for testing
393        if(email == null || email.equals("")){
394            return;
395        }
396        AtomicInteger val = blackList.get(email);
397        val.set(blacklistFailCount);
398    }
399
400    @Override
401    public void onStartMet(SLAEvent work) {
402    }
403
404    @Override
405    public void onEndMet(SLAEvent work) {
406    }
407
408    @Override
409    public void onDurationMet(SLAEvent work) {
410    }
411
412}