001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.oozie.sla.listener;
020    
021    import java.util.ArrayList;
022    import java.util.HashSet;
023    import java.util.List;
024    import java.util.Properties;
025    import java.util.Set;
026    import java.util.concurrent.TimeUnit;
027    import java.util.concurrent.atomic.AtomicInteger;
028    
029    import javax.mail.Address;
030    import javax.mail.Message;
031    import javax.mail.MessagingException;
032    import javax.mail.NoSuchProviderException;
033    import javax.mail.SendFailedException;
034    import javax.mail.Session;
035    import javax.mail.Transport;
036    import javax.mail.internet.AddressException;
037    import javax.mail.internet.InternetAddress;
038    import javax.mail.internet.MimeMessage;
039    import javax.mail.internet.MimeMessage.RecipientType;
040    
041    import org.apache.hadoop.conf.Configuration;
042    import org.apache.oozie.action.email.EmailActionExecutor;
043    import org.apache.oozie.action.email.EmailActionExecutor.JavaMailAuthenticator;
044    import org.apache.oozie.client.event.SLAEvent;
045    import org.apache.oozie.sla.listener.SLAEventListener;
046    import org.apache.oozie.sla.service.SLAService;
047    import org.apache.oozie.util.XLog;
048    
049    import com.google.common.annotations.VisibleForTesting;
050    import com.google.common.cache.CacheBuilder;
051    import com.google.common.cache.CacheLoader;
052    import com.google.common.cache.LoadingCache;
053    
054    public class SLAEmailEventListener extends SLAEventListener {
055    
056        public static final String SMTP_CONNECTION_TIMEOUT = EmailActionExecutor.CONF_PREFIX + "smtp.connectiontimeout";
057        public static final String SMTP_TIMEOUT = EmailActionExecutor.CONF_PREFIX + "smtp.timeout";
058        public static final String BLACKLIST_CACHE_TIMEOUT = EmailActionExecutor.CONF_PREFIX + "blacklist.cachetimeout";
059        public static final String BLACKLIST_FAIL_COUNT = EmailActionExecutor.CONF_PREFIX + "blacklist.failcount";
060        public static final String OOZIE_BASE_URL = "oozie.base.url";
061        private Session session;
062        private String oozieBaseUrl;
063        private InternetAddress fromAddr;
064        private String ADDRESS_SEPARATOR = ",";
065        private LoadingCache<String, AtomicInteger> blackList;
066        private int blacklistFailCount;
067        private final String BLACKLIST_CACHE_TIMEOUT_DEFAULT = "1800"; // in sec. default to 30 min
068        private final String BLACKLIST_FAIL_COUNT_DEFAULT = "2"; // stop sending when fail count equals or exceeds
069        private final String SMTP_HOST_DEFAULT = "localhost";
070        private final String SMTP_PORT_DEFAULT = "25";
071        private final boolean SMTP_AUTH_DEFAULT = false;
072        private final String SMTP_SOURCE_DEFAULT = "oozie@localhost";
073        private final String SMTP_CONNECTION_TIMEOUT_DEFAULT = "5000";
074        private final String SMTP_TIMEOUT_DEFAULT = "5000";
075        private static XLog LOG = XLog.getLog(SLAEmailEventListener.class);
076        private Set<SLAEvent.EventStatus> alertEvents;
077        public static String EMAIL_BODY_FIELD_SEPARATER = " - ";
078        public static String EMAIL_BODY_FIELD_INDENT = "  ";
079        public static String EMAIL_BODY_HEADER_SEPARATER = ":";
080    
081        public enum EmailField {
082            EVENT_STATUS("SLA Status"), APP_TYPE("App Type"), APP_NAME("App Name"), USER("User"), JOBID("Job ID"), PARENT_JOBID(
083                    "Parent Job ID"), JOB_URL("Job URL"), PARENT_JOB_URL("Parent Job URL"), NOMINAL_TIME("Nominal Time"),
084                    EXPECTED_START_TIME("Expected Start Time"), ACTUAL_START_TIME("Actual Start Time"),
085                    EXPECTED_END_TIME("Expected End Time"), ACTUAL_END_TIME("Actual End Time"), EXPECTED_DURATION("Expected Duration (in mins)"),
086                    ACTUAL_DURATION("Actual Duration (in mins)"), NOTIFICATION_MESSAGE("Notification Message"), UPSTREAM_APPS("Upstream Apps"),
087                    JOB_STATUS("Job Status");
088            private String name;
089    
090            private EmailField(String name) {
091                this.name = name;
092            }
093    
094            public String toString() {
095                return name;
096            }
097        };
098    
099        @Override
100        public void init(Configuration conf) throws Exception {
101    
102            oozieBaseUrl = conf.get(OOZIE_BASE_URL);
103            // Get SMTP properties from the configuration used in Email Action
104            String smtpHost = conf.get(EmailActionExecutor.EMAIL_SMTP_HOST, SMTP_HOST_DEFAULT);
105            String smtpPort = conf.get(EmailActionExecutor.EMAIL_SMTP_PORT, SMTP_PORT_DEFAULT);
106            Boolean smtpAuth = conf.getBoolean(EmailActionExecutor.EMAIL_SMTP_AUTH, SMTP_AUTH_DEFAULT);
107            String smtpUser = conf.get(EmailActionExecutor.EMAIL_SMTP_USER, "");
108            String smtpPassword = conf.get(EmailActionExecutor.EMAIL_SMTP_PASS, "");
109            String smtpConnectTimeout = conf.get(SMTP_CONNECTION_TIMEOUT, SMTP_CONNECTION_TIMEOUT_DEFAULT);
110            String smtpTimeout = conf.get(SMTP_TIMEOUT, SMTP_TIMEOUT_DEFAULT);
111    
112            int blacklistTimeOut = Integer.valueOf(conf.get(BLACKLIST_CACHE_TIMEOUT, BLACKLIST_CACHE_TIMEOUT_DEFAULT));
113            blacklistFailCount = Integer.valueOf(conf.get(BLACKLIST_FAIL_COUNT, BLACKLIST_FAIL_COUNT_DEFAULT));
114    
115            // blacklist email addresses causing SendFailedException with cache timeout
116            blackList = CacheBuilder.newBuilder()
117                    .expireAfterWrite(blacklistTimeOut, TimeUnit.SECONDS)
118                    .build(new CacheLoader<String, AtomicInteger>() {
119                        public AtomicInteger load(String key) throws Exception {
120                            return new AtomicInteger();
121                        }
122                    });
123    
124            // Set SMTP properties
125            Properties properties = new Properties();
126            properties.setProperty("mail.smtp.host", smtpHost);
127            properties.setProperty("mail.smtp.port", smtpPort);
128            properties.setProperty("mail.smtp.auth", smtpAuth.toString());
129            properties.setProperty("mail.smtp.connectiontimeout", smtpConnectTimeout);
130            properties.setProperty("mail.smtp.timeout", smtpTimeout);
131    
132            try {
133                fromAddr = new InternetAddress(conf.get("oozie.email.from.address", SMTP_SOURCE_DEFAULT));
134            }
135            catch (AddressException ae) {
136                LOG.error("Bad Source Address specified in oozie.email.from.address", ae);
137                throw ae;
138            }
139    
140            if (!smtpAuth) {
141                session = Session.getInstance(properties);
142            }
143            else {
144                session = Session.getInstance(properties, new JavaMailAuthenticator(smtpUser, smtpPassword));
145            }
146    
147            alertEvents = new HashSet<SLAEvent.EventStatus>();
148            String alertEventsStr = conf.get(SLAService.CONF_ALERT_EVENTS);
149            if (alertEventsStr != null) {
150                String[] alertEvt = alertEventsStr.split(",", -1);
151                for (String evt : alertEvt) {
152                    alertEvents.add(SLAEvent.EventStatus.valueOf(evt));
153                }
154            }
155        }
156    
157        @Override
158        public void destroy() {
159        }
160    
161        private void sendSLAEmail(SLAEvent event) throws Exception {
162            Message message = new MimeMessage(session);
163            setMessageHeader(message, event);
164            setMessageBody(message, event);
165            sendEmail(message);
166        }
167    
168        @Override
169        public void onStartMiss(SLAEvent event) {
170            boolean flag = false;
171            if (event.getAlertEvents() == null) {
172                flag = alertEvents.contains(SLAEvent.EventStatus.START_MISS);
173            }
174            else if (event.getAlertEvents().contains(SLAEvent.EventStatus.START_MISS.name())) {
175                flag = true;
176            }
177    
178            if (flag) {
179                try {
180                    sendSLAEmail(event);
181                }
182                catch (Exception e) {
183                    LOG.error("Failed to send StartMiss alert email", e);
184                }
185            }
186        }
187    
188        @Override
189        public void onEndMiss(SLAEvent event) {
190            boolean flag = false;
191            if (event.getAlertEvents() == null) {
192                flag = alertEvents.contains(SLAEvent.EventStatus.END_MISS);
193            }
194            else if (event.getAlertEvents().contains(SLAEvent.EventStatus.END_MISS.name())) {
195                flag = true;
196            }
197    
198            if (flag) {
199                try {
200                    sendSLAEmail(event);
201                }
202                catch (Exception e) {
203                    LOG.error("Failed to send EndMiss alert email", e);
204                }
205            }
206        }
207    
208        @Override
209        public void onDurationMiss(SLAEvent event) {
210            boolean flag = false;
211            if (event.getAlertEvents() == null) {
212                flag = alertEvents.contains(SLAEvent.EventStatus.DURATION_MISS);
213            }
214            else if (event.getAlertEvents().contains(SLAEvent.EventStatus.DURATION_MISS.name())) {
215                flag = true;
216            }
217    
218            if (flag) {
219                try {
220                    sendSLAEmail(event);
221                }
222                catch (Exception e) {
223                    LOG.error("Failed to send DurationMiss alert email", e);
224                }
225            }
226        }
227    
228        private Address[] parseAddress(String str) {
229            Address[] addrs = null;
230            List<InternetAddress> addrList = new ArrayList<InternetAddress>();
231            String[] emails = str.split(ADDRESS_SEPARATOR, -1);
232    
233            for (String email : emails) {
234                boolean isBlackListed = false;
235                AtomicInteger val = blackList.getIfPresent(email);
236                if(val != null){
237                    isBlackListed = ( val.get() >= blacklistFailCount );
238                }
239                if (!isBlackListed) {
240                    try {
241                        // turn on strict syntax check by setting 2nd argument true
242                        addrList.add(new InternetAddress(email, true));
243                    }
244                    catch (AddressException ae) {
245                        // simply skip bad address but do not throw exception
246                        LOG.error("Skipping bad destination address: " + email, ae);
247                    }
248                }
249            }
250    
251            if (addrList.size() > 0)
252                addrs = (Address[]) addrList.toArray(new InternetAddress[addrList.size()]);
253    
254            return addrs;
255        }
256    
257        private void setMessageHeader(Message msg, SLAEvent event) throws MessagingException {
258            Address[] from = new InternetAddress[] { fromAddr };
259            Address[] to;
260            StringBuilder subject = new StringBuilder();
261    
262            to = parseAddress(event.getAlertContact());
263            if (to == null) {
264                LOG.error("Destination address is null or invalid, stop sending SLA alert email");
265                throw new IllegalArgumentException("Destination address is not specified properly");
266            }
267            subject.append("OOZIE - SLA ");
268            subject.append(event.getEventStatus().name());
269            subject.append(" (AppName=");
270            subject.append(event.getAppName());
271            subject.append(", JobID=");
272            subject.append(event.getId());
273            subject.append(")");
274    
275            try {
276                msg.addFrom(from);
277                msg.addRecipients(RecipientType.TO, to);
278                msg.setSubject(subject.toString());
279            }
280            catch (MessagingException me) {
281                LOG.error("Message Exception in setting message header of SLA alert email", me);
282                throw me;
283            }
284        }
285    
286        private void setMessageBody(Message msg, SLAEvent event) throws MessagingException {
287            StringBuilder body = new StringBuilder();
288            printHeading(body, "Status");
289            printField(body, EmailField.EVENT_STATUS.toString(), event.getEventStatus());
290            printField(body, EmailField.JOB_STATUS.toString(), event.getJobStatus());
291            printField(body, EmailField.NOTIFICATION_MESSAGE.toString(), event.getNotificationMsg());
292    
293            printHeading(body, "Job Details");
294            printField(body, EmailField.APP_NAME.toString(), event.getAppName());
295            printField(body, EmailField.APP_TYPE.toString(), event.getAppType());
296            printField(body, EmailField.USER.toString(), event.getUser());
297            printField(body, EmailField.JOBID.toString(), event.getId());
298            printField(body, EmailField.JOB_URL.toString(), getJobLink(event.getId()));
299            printField(body, EmailField.PARENT_JOBID.toString(), event.getParentId() != null ? event.getParentId() : "N/A");
300            printField(body, EmailField.PARENT_JOB_URL.toString(),
301                    event.getParentId() != null ? getJobLink(event.getParentId()) : "N/A");
302            printField(body, EmailField.UPSTREAM_APPS.toString(), event.getUpstreamApps());
303    
304            printHeading(body, "SLA Details");
305            printField(body, EmailField.NOMINAL_TIME.toString(), event.getNominalTime());
306            printField(body, EmailField.EXPECTED_START_TIME.toString(), event.getExpectedStart());
307            printField(body, EmailField.ACTUAL_START_TIME.toString(), event.getActualStart());
308            printField(body, EmailField.EXPECTED_END_TIME.toString(), event.getExpectedEnd());
309            printField(body, EmailField.ACTUAL_END_TIME.toString(), event.getActualEnd());
310            printField(body, EmailField.EXPECTED_DURATION.toString(), getDurationInMins(event.getExpectedDuration()));
311            printField(body, EmailField.ACTUAL_DURATION.toString(), getDurationInMins(event.getActualDuration()));
312    
313            try {
314                msg.setText(body.toString());
315            }
316            catch (MessagingException me) {
317                LOG.error("Message Exception in setting message body of SLA alert email", me);
318                throw me;
319            }
320        }
321    
322        private long getDurationInMins(long duration) {
323            if (duration < 0) {
324                return duration;
325            }
326            return duration / 60000; //Convert millis to minutes
327        }
328    
329        private String getJobLink(String jobId) {
330            StringBuffer url = new StringBuffer();
331            String param = "/?job=";
332            url.append(oozieBaseUrl);
333            url.append(param);
334            url.append(jobId);
335            return url.toString();
336        }
337    
338        private void printField(StringBuilder st, String name, Object value) {
339            String lineFeed = "\n";
340            if (value != null) {
341                st.append(EMAIL_BODY_FIELD_INDENT);
342                st.append(name);
343                st.append(EMAIL_BODY_FIELD_SEPARATER);
344                st.append(value);
345                st.append(lineFeed);
346            }
347        }
348    
349        private void printHeading(StringBuilder st, String header) {
350            st.append(header);
351            st.append(EMAIL_BODY_HEADER_SEPARATER);
352            st.append("\n");
353        }
354    
355        private void sendEmail(Message message) throws MessagingException {
356            try {
357                Transport.send(message);
358            }
359            catch (NoSuchProviderException se) {
360                LOG.error("Could not find an SMTP transport provider to email", se);
361                throw se;
362            }
363            catch (MessagingException me) {
364                LOG.error("Message Exception in transporting SLA alert email", me);
365                if (me instanceof SendFailedException) {
366                    Address[] invalidAddrs = ((SendFailedException) me).getInvalidAddresses();
367                    if (invalidAddrs != null && invalidAddrs.length > 0) {
368                        for (Address addr : invalidAddrs) {
369                            try {
370                                // 'get' method loads key into cache when it doesn't exist
371                                AtomicInteger val = blackList.get(addr.toString());
372                                val.incrementAndGet();
373                            }
374                            catch (Exception e) {
375                                LOG.debug("blacklist loading throwed exception");
376                            }
377                        }
378                    }
379                }
380                throw me;
381            }
382        }
383    
384        @VisibleForTesting
385        public void addBlackList(String email) throws Exception {
386            // this is for testing
387            if(email == null || email.equals("")){
388                return;
389            }
390            AtomicInteger val = blackList.get(email);
391            val.set(blacklistFailCount);
392        }
393    
394        @Override
395        public void onStartMet(SLAEvent work) {
396        }
397    
398        @Override
399        public void onEndMet(SLAEvent work) {
400        }
401    
402        @Override
403        public void onDurationMet(SLAEvent work) {
404        }
405    
406    }