001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019
020package org.apache.oozie.sla.listener;
021
022import java.util.ArrayList;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Properties;
026import java.util.Set;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicInteger;
029
030import javax.mail.Address;
031import javax.mail.Message;
032import javax.mail.MessagingException;
033import javax.mail.NoSuchProviderException;
034import javax.mail.SendFailedException;
035import javax.mail.Session;
036import javax.mail.Transport;
037import javax.mail.internet.AddressException;
038import javax.mail.internet.InternetAddress;
039import javax.mail.internet.MimeMessage;
040import javax.mail.internet.MimeMessage.RecipientType;
041
042import org.apache.hadoop.conf.Configuration;
043import org.apache.oozie.action.email.EmailActionExecutor;
044import org.apache.oozie.action.email.EmailActionExecutor.JavaMailAuthenticator;
045import org.apache.oozie.client.event.SLAEvent;
046import org.apache.oozie.service.ConfigurationService;
047import org.apache.oozie.sla.listener.SLAEventListener;
048import org.apache.oozie.sla.service.SLAService;
049import org.apache.oozie.util.XLog;
050
051import com.google.common.annotations.VisibleForTesting;
052import com.google.common.cache.CacheBuilder;
053import com.google.common.cache.CacheLoader;
054import com.google.common.cache.LoadingCache;
055
056public class SLAEmailEventListener extends SLAEventListener {
057
058    public static final String SMTP_CONNECTION_TIMEOUT = EmailActionExecutor.CONF_PREFIX + "smtp.connectiontimeout";
059    public static final String SMTP_TIMEOUT = EmailActionExecutor.CONF_PREFIX + "smtp.timeout";
060    public static final String BLACKLIST_CACHE_TIMEOUT = EmailActionExecutor.CONF_PREFIX + "blacklist.cachetimeout";
061    public static final String BLACKLIST_FAIL_COUNT = EmailActionExecutor.CONF_PREFIX + "blacklist.failcount";
062    public static final String OOZIE_BASE_URL = "oozie.base.url";
063    private Session session;
064    private String oozieBaseUrl;
065    private InternetAddress fromAddr;
066    private String ADDRESS_SEPARATOR = ",";
067    private LoadingCache<String, AtomicInteger> blackList;
068    private int blacklistFailCount;
069    private final String BLACKLIST_CACHE_TIMEOUT_DEFAULT = "1800"; // in sec. default to 30 min
070    private final String BLACKLIST_FAIL_COUNT_DEFAULT = "2"; // stop sending when fail count equals or exceeds
071    private final String SMTP_HOST_DEFAULT = "localhost";
072    private final String SMTP_PORT_DEFAULT = "25";
073    private final boolean SMTP_AUTH_DEFAULT = false;
074    private final String SMTP_SOURCE_DEFAULT = "oozie@localhost";
075    private final String SMTP_CONNECTION_TIMEOUT_DEFAULT = "5000";
076    private final String SMTP_TIMEOUT_DEFAULT = "5000";
077    private static XLog LOG = XLog.getLog(SLAEmailEventListener.class);
078    private Set<SLAEvent.EventStatus> alertEvents;
079    public static String EMAIL_BODY_FIELD_SEPARATER = " - ";
080    public static String EMAIL_BODY_FIELD_INDENT = "  ";
081    public static String EMAIL_BODY_HEADER_SEPARATER = ":";
082
083    public enum EmailField {
084        EVENT_STATUS("SLA Status"), APP_TYPE("App Type"), APP_NAME("App Name"), USER("User"), JOBID("Job ID"), PARENT_JOBID(
085                "Parent Job ID"), JOB_URL("Job URL"), PARENT_JOB_URL("Parent Job URL"), NOMINAL_TIME("Nominal Time"),
086                EXPECTED_START_TIME("Expected Start Time"), ACTUAL_START_TIME("Actual Start Time"),
087                EXPECTED_END_TIME("Expected End Time"), ACTUAL_END_TIME("Actual End Time"), EXPECTED_DURATION("Expected Duration (in mins)"),
088                ACTUAL_DURATION("Actual Duration (in mins)"), NOTIFICATION_MESSAGE("Notification Message"), UPSTREAM_APPS("Upstream Apps"),
089                JOB_STATUS("Job Status");
090        private String name;
091
092        private EmailField(String name) {
093            this.name = name;
094        }
095
096        @Override
097        public String toString() {
098            return name;
099        }
100    };
101
102    @Override
103    public void init(Configuration conf) throws Exception {
104
105        oozieBaseUrl = ConfigurationService.get(conf, OOZIE_BASE_URL);
106        // Get SMTP properties from the configuration used in Email Action
107        String smtpHost = conf.get(EmailActionExecutor.EMAIL_SMTP_HOST, SMTP_HOST_DEFAULT);
108        String smtpPort = conf.get(EmailActionExecutor.EMAIL_SMTP_PORT, SMTP_PORT_DEFAULT);
109        Boolean smtpAuth = conf.getBoolean(EmailActionExecutor.EMAIL_SMTP_AUTH, SMTP_AUTH_DEFAULT);
110        String smtpUser = conf.get(EmailActionExecutor.EMAIL_SMTP_USER, "");
111        String smtpPassword = ConfigurationService.getPassword(EmailActionExecutor.EMAIL_SMTP_PASS, "");
112        String smtpConnectTimeout = conf.get(SMTP_CONNECTION_TIMEOUT, SMTP_CONNECTION_TIMEOUT_DEFAULT);
113        String smtpTimeout = conf.get(SMTP_TIMEOUT, SMTP_TIMEOUT_DEFAULT);
114
115        int blacklistTimeOut = Integer.valueOf(conf.get(BLACKLIST_CACHE_TIMEOUT, BLACKLIST_CACHE_TIMEOUT_DEFAULT));
116        blacklistFailCount = Integer.valueOf(conf.get(BLACKLIST_FAIL_COUNT, BLACKLIST_FAIL_COUNT_DEFAULT));
117
118        // blacklist email addresses causing SendFailedException with cache timeout
119        blackList = CacheBuilder.newBuilder()
120                .expireAfterWrite(blacklistTimeOut, TimeUnit.SECONDS)
121                .build(new CacheLoader<String, AtomicInteger>() {
122                    @Override
123                    public AtomicInteger load(String key) throws Exception {
124                        return new AtomicInteger();
125                    }
126                });
127
128        // Set SMTP properties
129        Properties properties = new Properties();
130        properties.setProperty("mail.smtp.host", smtpHost);
131        properties.setProperty("mail.smtp.port", smtpPort);
132        properties.setProperty("mail.smtp.auth", smtpAuth.toString());
133        properties.setProperty("mail.smtp.connectiontimeout", smtpConnectTimeout);
134        properties.setProperty("mail.smtp.timeout", smtpTimeout);
135
136        try {
137            fromAddr = new InternetAddress(conf.get("oozie.email.from.address", SMTP_SOURCE_DEFAULT));
138        }
139        catch (AddressException ae) {
140            LOG.error("Bad Source Address specified in oozie.email.from.address", ae);
141            throw ae;
142        }
143
144        if (!smtpAuth) {
145            session = Session.getInstance(properties);
146        }
147        else {
148            session = Session.getInstance(properties, new JavaMailAuthenticator(smtpUser, smtpPassword));
149        }
150
151        alertEvents = new HashSet<SLAEvent.EventStatus>();
152        String alertEventsStr = ConfigurationService.get(conf, SLAService.CONF_ALERT_EVENTS);
153        if (alertEventsStr != null) {
154            String[] alertEvt = alertEventsStr.split(",", -1);
155            for (String evt : alertEvt) {
156                alertEvents.add(SLAEvent.EventStatus.valueOf(evt));
157            }
158        }
159    }
160
161    @Override
162    public void destroy() {
163    }
164
165    private void sendSLAEmail(SLAEvent event) throws Exception {
166        // If no address is provided, the user did not want to send an email so simply log it and do nothing
167        if (event.getAlertContact() == null || event.getAlertContact().trim().length() == 0) {
168            LOG.info("No destination address provided; an SLA alert email will not be sent");
169        } else {
170            // Create and send an email
171            Message message = new MimeMessage(session);
172            setMessageHeader(message, event);
173            setMessageBody(message, event);
174            sendEmail(message);
175        }
176    }
177
178    @Override
179    public void onStartMiss(SLAEvent event) {
180        boolean flag = false;
181        if (event.getAlertEvents() == null) {
182            flag = alertEvents.contains(SLAEvent.EventStatus.START_MISS);
183        }
184        else if (event.getAlertEvents().contains(SLAEvent.EventStatus.START_MISS.name())) {
185            flag = true;
186        }
187
188        if (flag) {
189            try {
190                sendSLAEmail(event);
191            }
192            catch (Exception e) {
193                LOG.error("Failed to send StartMiss alert email", e);
194            }
195        }
196    }
197
198    @Override
199    public void onEndMiss(SLAEvent event) {
200        boolean flag = false;
201        if (event.getAlertEvents() == null) {
202            flag = alertEvents.contains(SLAEvent.EventStatus.END_MISS);
203        }
204        else if (event.getAlertEvents().contains(SLAEvent.EventStatus.END_MISS.name())) {
205            flag = true;
206        }
207
208        if (flag) {
209            try {
210                sendSLAEmail(event);
211            }
212            catch (Exception e) {
213                LOG.error("Failed to send EndMiss alert email", e);
214            }
215        }
216    }
217
218    @Override
219    public void onDurationMiss(SLAEvent event) {
220        boolean flag = false;
221        if (event.getAlertEvents() == null) {
222            flag = alertEvents.contains(SLAEvent.EventStatus.DURATION_MISS);
223        }
224        else if (event.getAlertEvents().contains(SLAEvent.EventStatus.DURATION_MISS.name())) {
225            flag = true;
226        }
227
228        if (flag) {
229            try {
230                sendSLAEmail(event);
231            }
232            catch (Exception e) {
233                LOG.error("Failed to send DurationMiss alert email", e);
234            }
235        }
236    }
237
238    private Address[] parseAddress(String str) {
239        Address[] addrs = null;
240        List<InternetAddress> addrList = new ArrayList<InternetAddress>();
241        String[] emails = str.split(ADDRESS_SEPARATOR, -1);
242
243        for (String email : emails) {
244            boolean isBlackListed = false;
245            AtomicInteger val = blackList.getIfPresent(email);
246            if(val != null){
247                isBlackListed = ( val.get() >= blacklistFailCount );
248            }
249            if (!isBlackListed) {
250                try {
251                    // turn on strict syntax check by setting 2nd argument true
252                    addrList.add(new InternetAddress(email, true));
253                }
254                catch (AddressException ae) {
255                    // simply skip bad address but do not throw exception
256                    LOG.error("Skipping bad destination address: " + email, ae);
257                }
258            }
259        }
260
261        if (addrList.size() > 0) {
262            addrs = (Address[]) addrList.toArray(new InternetAddress[addrList.size()]);
263        }
264
265        return addrs;
266    }
267
268    private void setMessageHeader(Message msg, SLAEvent event) throws MessagingException {
269        Address[] from = new InternetAddress[] { fromAddr };
270        Address[] to;
271        StringBuilder subject = new StringBuilder();
272
273        to = parseAddress(event.getAlertContact());
274        if (to == null) {
275            LOG.error("Destination address is null or invalid, stop sending SLA alert email");
276            throw new IllegalArgumentException("Destination address is not specified properly");
277        }
278        subject.append("OOZIE - SLA ");
279        subject.append(event.getEventStatus().name());
280        subject.append(" (AppName=");
281        subject.append(event.getAppName());
282        subject.append(", JobID=");
283        subject.append(event.getId());
284        subject.append(")");
285
286        try {
287            msg.addFrom(from);
288            msg.addRecipients(RecipientType.TO, to);
289            msg.setSubject(subject.toString());
290        }
291        catch (MessagingException me) {
292            LOG.error("Message Exception in setting message header of SLA alert email", me);
293            throw me;
294        }
295    }
296
297    private void setMessageBody(Message msg, SLAEvent event) throws MessagingException {
298        StringBuilder body = new StringBuilder();
299        printHeading(body, "Status");
300        printField(body, EmailField.EVENT_STATUS.toString(), event.getEventStatus());
301        printField(body, EmailField.JOB_STATUS.toString(), event.getJobStatus());
302        printField(body, EmailField.NOTIFICATION_MESSAGE.toString(), event.getNotificationMsg());
303
304        printHeading(body, "Job Details");
305        printField(body, EmailField.APP_NAME.toString(), event.getAppName());
306        printField(body, EmailField.APP_TYPE.toString(), event.getAppType());
307        printField(body, EmailField.USER.toString(), event.getUser());
308        printField(body, EmailField.JOBID.toString(), event.getId());
309        printField(body, EmailField.JOB_URL.toString(), getJobLink(event.getId()));
310        printField(body, EmailField.PARENT_JOBID.toString(), event.getParentId() != null ? event.getParentId() : "N/A");
311        printField(body, EmailField.PARENT_JOB_URL.toString(),
312                event.getParentId() != null ? getJobLink(event.getParentId()) : "N/A");
313        printField(body, EmailField.UPSTREAM_APPS.toString(), event.getUpstreamApps());
314
315        printHeading(body, "SLA Details");
316        printField(body, EmailField.NOMINAL_TIME.toString(), event.getNominalTime());
317        printField(body, EmailField.EXPECTED_START_TIME.toString(), event.getExpectedStart());
318        printField(body, EmailField.ACTUAL_START_TIME.toString(), event.getActualStart());
319        printField(body, EmailField.EXPECTED_END_TIME.toString(), event.getExpectedEnd());
320        printField(body, EmailField.ACTUAL_END_TIME.toString(), event.getActualEnd());
321        printField(body, EmailField.EXPECTED_DURATION.toString(), getDurationInMins(event.getExpectedDuration()));
322        printField(body, EmailField.ACTUAL_DURATION.toString(), getDurationInMins(event.getActualDuration()));
323
324        try {
325            msg.setText(body.toString());
326        }
327        catch (MessagingException me) {
328            LOG.error("Message Exception in setting message body of SLA alert email", me);
329            throw me;
330        }
331    }
332
333    private long getDurationInMins(long duration) {
334        if (duration < 0) {
335            return duration;
336        }
337        return duration / 60000; //Convert millis to minutes
338    }
339
340    private String getJobLink(String jobId) {
341        StringBuffer url = new StringBuffer();
342        String param = "/?job=";
343        url.append(oozieBaseUrl);
344        url.append(param);
345        url.append(jobId);
346        return url.toString();
347    }
348
349    private void printField(StringBuilder st, String name, Object value) {
350        String lineFeed = "\n";
351        if (value != null) {
352            st.append(EMAIL_BODY_FIELD_INDENT);
353            st.append(name);
354            st.append(EMAIL_BODY_FIELD_SEPARATER);
355            st.append(value);
356            st.append(lineFeed);
357        }
358    }
359
360    private void printHeading(StringBuilder st, String header) {
361        st.append(header);
362        st.append(EMAIL_BODY_HEADER_SEPARATER);
363        st.append("\n");
364    }
365
366    private void sendEmail(Message message) throws MessagingException {
367        try {
368            Transport.send(message);
369        }
370        catch (NoSuchProviderException se) {
371            LOG.error("Could not find an SMTP transport provider to email", se);
372            throw se;
373        }
374        catch (MessagingException me) {
375            LOG.error("Message Exception in transporting SLA alert email", me);
376            if (me instanceof SendFailedException) {
377                Address[] invalidAddrs = ((SendFailedException) me).getInvalidAddresses();
378                if (invalidAddrs != null && invalidAddrs.length > 0) {
379                    for (Address addr : invalidAddrs) {
380                        try {
381                            // 'get' method loads key into cache when it doesn't exist
382                            AtomicInteger val = blackList.get(addr.toString());
383                            val.incrementAndGet();
384                        }
385                        catch (Exception e) {
386                            LOG.debug("blacklist loading throwed exception");
387                        }
388                    }
389                }
390            }
391            throw me;
392        }
393    }
394
395    @VisibleForTesting
396    public void addBlackList(String email) throws Exception {
397        // this is for testing
398        if(email == null || email.equals("")){
399            return;
400        }
401        AtomicInteger val = blackList.get(email);
402        val.set(blacklistFailCount);
403    }
404
405    @Override
406    public void onStartMet(SLAEvent work) {
407    }
408
409    @Override
410    public void onEndMet(SLAEvent work) {
411    }
412
413    @Override
414    public void onDurationMet(SLAEvent work) {
415    }
416
417}