001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.action.email; 020 021import java.io.File; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import java.net.URI; 026import java.net.URISyntaxException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.Properties; 030 031import javax.activation.DataHandler; 032import javax.activation.DataSource; 033import javax.mail.Authenticator; 034import javax.mail.Message; 035import javax.mail.Message.RecipientType; 036import javax.mail.MessagingException; 037import javax.mail.Multipart; 038import javax.mail.NoSuchProviderException; 039import javax.mail.PasswordAuthentication; 040import javax.mail.Session; 041import javax.mail.Transport; 042import javax.mail.internet.AddressException; 043import javax.mail.internet.InternetAddress; 044import javax.mail.internet.MimeBodyPart; 045import javax.mail.internet.MimeMessage; 046import javax.mail.internet.MimeMultipart; 047 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.fs.FileSystem; 050import org.apache.hadoop.fs.Path; 051import org.apache.oozie.action.ActionExecutor; 052import org.apache.oozie.action.ActionExecutorException; 053import org.apache.oozie.action.ActionExecutorException.ErrorType; 054import org.apache.oozie.client.WorkflowAction; 055import org.apache.oozie.service.ConfigurationService; 056import org.apache.oozie.service.HadoopAccessorException; 057import org.apache.oozie.service.Services; 058import org.apache.oozie.service.HadoopAccessorService; 059import org.apache.oozie.util.XLog; 060import org.apache.oozie.util.XmlUtils; 061import org.jdom.Element; 062import org.jdom.Namespace; 063 064/** 065 * Email action executor. It takes to, cc, bcc addresses along with a subject and body and sends 066 * out an email. 067 */ 068public class EmailActionExecutor extends ActionExecutor { 069 070 public static final String CONF_PREFIX = "oozie.email."; 071 public static final String EMAIL_SMTP_HOST = CONF_PREFIX + "smtp.host"; 072 public static final String EMAIL_SMTP_PORT = CONF_PREFIX + "smtp.port"; 073 public static final String EMAIL_SMTP_AUTH = CONF_PREFIX + "smtp.auth"; 074 public static final String EMAIL_SMTP_USER = CONF_PREFIX + "smtp.username"; 075 public static final String EMAIL_SMTP_PASS = CONF_PREFIX + "smtp.password"; 076 public static final String EMAIL_SMTP_FROM = CONF_PREFIX + "from.address"; 077 public static final String EMAIL_SMTP_SOCKET_TIMEOUT_MS = CONF_PREFIX + "smtp.socket.timeout.ms"; 078 public static final String EMAIL_ATTACHMENT_ENABLED = CONF_PREFIX + "attachment.enabled"; 079 080 private final static String TO = "to"; 081 private final static String CC = "cc"; 082 private final static String BCC = "bcc"; 083 private final static String SUB = "subject"; 084 private final static String BOD = "body"; 085 private final static String ATTACHMENT = "attachment"; 086 private final static String COMMA = ","; 087 private final static String CONTENT_TYPE = "content_type"; 088 089 private final static String DEFAULT_CONTENT_TYPE = "text/plain"; 090 private XLog LOG = XLog.getLog(getClass()); 091 public static final String EMAIL_ATTACHMENT_ERROR_MSG = 092 "\n Note: This email is missing configured email attachments " 093 + "as sending attachments in email action is disabled in the Oozie server. " 094 + "It could be for security compliance with data protection or other reasons"; 095 096 public EmailActionExecutor() { 097 super("email"); 098 } 099 100 @Override 101 public void initActionType() { 102 super.initActionType(); 103 } 104 105 @Override 106 public void start(Context context, WorkflowAction action) throws ActionExecutorException { 107 try { 108 context.setStartData("-", "-", "-"); 109 Element actionXml = XmlUtils.parseXml(action.getConf()); 110 validateAndMail(context, actionXml); 111 context.setExecutionData("OK", null); 112 } 113 catch (Exception ex) { 114 throw convertException(ex); 115 } 116 } 117 118 @SuppressWarnings("unchecked") 119 protected void validateAndMail(Context context, Element element) throws ActionExecutorException { 120 // The XSD does the min/max occurrence validation for us. 121 Namespace ns = element.getNamespace(); 122 String tos[] = new String[0]; 123 String ccs[] = new String[0]; 124 String bccs[] ; 125 String subject = ""; 126 String body = ""; 127 String attachments[] = new String[0]; 128 String contentType; 129 Element child = null; 130 131 // <to> - One ought to exist. 132 String text = element.getChildTextTrim(TO, ns); 133 if (text.isEmpty()) { 134 throw new ActionExecutorException(ErrorType.ERROR, "EM001", "No receipents were specified in the to-address field."); 135 } 136 tos = text.split(COMMA); 137 138 // <cc> - Optional, but only one ought to exist. 139 try { 140 ccs = element.getChildTextTrim(CC, ns).split(COMMA); 141 } catch (Exception e) { 142 // It is alright for cc to be given empty or not be present. 143 ccs = new String[0]; 144 } 145 146 // <bcc> - Optional, but only one ought to exist. 147 try { 148 bccs = element.getChildTextTrim(BCC, ns).split(COMMA); 149 } catch (Exception e) { 150 // It is alright for bcc to be given empty or not be present. 151 bccs = new String[0]; 152 } 153 // <subject> - One ought to exist. 154 subject = element.getChildTextTrim(SUB, ns); 155 156 // <body> - One ought to exist. 157 body = element.getChildTextTrim(BOD, ns); 158 159 // <attachment> - Optional 160 String attachment = element.getChildTextTrim(ATTACHMENT, ns); 161 if(attachment != null) { 162 attachments = attachment.split(COMMA); 163 } 164 165 contentType = element.getChildTextTrim(CONTENT_TYPE, ns); 166 if (contentType == null || contentType.isEmpty()) { 167 contentType = DEFAULT_CONTENT_TYPE; 168 } 169 170 // All good - lets try to mail! 171 email(tos, ccs, bccs, subject, body, attachments, contentType, context.getWorkflow().getUser()); 172 } 173 174 public void email(String[] to, String[] cc, String subject, String body, String[] attachments, 175 String contentType, String user) throws ActionExecutorException { 176 email(to, cc, new String[0], subject, body, attachments, contentType, user); 177 } 178 179 public void email(String[] to, String[] cc, String[] bcc, String subject, String body, String[] attachments, 180 String contentType, String user) throws ActionExecutorException { 181 // Get mailing server details. 182 String smtpHost = ConfigurationService.get(EMAIL_SMTP_HOST); 183 Integer smtpPortInt = ConfigurationService.getInt(EMAIL_SMTP_PORT); 184 Boolean smtpAuthBool = ConfigurationService.getBoolean(EMAIL_SMTP_AUTH); 185 String smtpUser = ConfigurationService.get(EMAIL_SMTP_USER); 186 String smtpPassword = ConfigurationService.getPassword(EMAIL_SMTP_PASS, ""); 187 String fromAddr = ConfigurationService.get(EMAIL_SMTP_FROM); 188 Integer timeoutMillisInt = ConfigurationService.getInt(EMAIL_SMTP_SOCKET_TIMEOUT_MS); 189 190 Properties properties = new Properties(); 191 properties.setProperty("mail.smtp.host", smtpHost); 192 properties.setProperty("mail.smtp.port", smtpPortInt.toString()); 193 properties.setProperty("mail.smtp.auth", smtpAuthBool.toString()); 194 195 // Apply sensible timeouts, as defaults are infinite. See https://s.apache.org/javax-mail-timeouts 196 properties.setProperty("mail.smtp.connectiontimeout", timeoutMillisInt.toString()); 197 properties.setProperty("mail.smtp.timeout", timeoutMillisInt.toString()); 198 properties.setProperty("mail.smtp.writetimeout", timeoutMillisInt.toString()); 199 200 Session session; 201 // Do not use default instance (i.e. Session.getDefaultInstance) 202 // (cause it may lead to issues when used second time). 203 if (!smtpAuthBool) { 204 session = Session.getInstance(properties); 205 } else { 206 session = Session.getInstance(properties, new JavaMailAuthenticator(smtpUser, smtpPassword)); 207 } 208 209 Message message = new MimeMessage(session); 210 InternetAddress from; 211 List<InternetAddress> toAddrs = new ArrayList<InternetAddress>(to.length); 212 List<InternetAddress> ccAddrs = new ArrayList<InternetAddress>(cc.length); 213 List<InternetAddress> bccAddrs = new ArrayList<InternetAddress>(bcc.length); 214 215 try { 216 from = new InternetAddress(fromAddr); 217 message.setFrom(from); 218 } catch (AddressException e) { 219 throw new ActionExecutorException(ErrorType.ERROR, "EM002", "Bad from address specified in ${oozie.email.from.address}.", e); 220 } catch (MessagingException e) { 221 throw new ActionExecutorException(ErrorType.ERROR, "EM003", "Error setting a from address in the message.", e); 222 } 223 224 try { 225 // Add all <to> 226 for (String toStr : to) { 227 toAddrs.add(new InternetAddress(toStr.trim())); 228 } 229 message.addRecipients(RecipientType.TO, toAddrs.toArray(new InternetAddress[0])); 230 231 // Add all <cc> 232 for (String ccStr : cc) { 233 ccAddrs.add(new InternetAddress(ccStr.trim())); 234 } 235 message.addRecipients(RecipientType.CC, ccAddrs.toArray(new InternetAddress[0])); 236 237 // Add all <bcc> 238 for (String bccStr : bcc) { 239 bccAddrs.add(new InternetAddress(bccStr.trim())); 240 } 241 message.addRecipients(RecipientType.BCC, bccAddrs.toArray(new InternetAddress[0])); 242 243 // Set subject 244 message.setSubject(subject); 245 246 // when there is attachment 247 if (attachments != null && attachments.length > 0 && ConfigurationService.getBoolean(EMAIL_ATTACHMENT_ENABLED)) { 248 Multipart multipart = new MimeMultipart(); 249 250 // Set body text 251 MimeBodyPart bodyTextPart = new MimeBodyPart(); 252 bodyTextPart.setText(body); 253 multipart.addBodyPart(bodyTextPart); 254 255 for (String attachment : attachments) { 256 URI attachUri = new URI(attachment); 257 if (attachUri.getScheme() != null && attachUri.getScheme().equals("file")) { 258 throw new ActionExecutorException(ErrorType.ERROR, "EM008", 259 "Encountered an error when attaching a file. A local file cannot be attached:" 260 + attachment); 261 } 262 MimeBodyPart messageBodyPart = new MimeBodyPart(); 263 DataSource source = new URIDataSource(attachUri, user); 264 messageBodyPart.setDataHandler(new DataHandler(source)); 265 messageBodyPart.setFileName(new File(attachment).getName()); 266 multipart.addBodyPart(messageBodyPart); 267 } 268 message.setContent(multipart); 269 } 270 else { 271 if (attachments != null && attachments.length > 0 && !ConfigurationService.getBoolean(EMAIL_ATTACHMENT_ENABLED)) { 272 body = body + EMAIL_ATTACHMENT_ERROR_MSG; 273 } 274 message.setContent(body, contentType); 275 } 276 } 277 catch (AddressException e) { 278 throw new ActionExecutorException(ErrorType.ERROR, "EM004", "Bad address format in <to> or <cc> or <bcc>.", e); 279 } 280 catch (MessagingException e) { 281 throw new ActionExecutorException(ErrorType.ERROR, "EM005", "An error occured while adding recipients.", e); 282 } 283 catch (URISyntaxException e) { 284 throw new ActionExecutorException(ErrorType.ERROR, "EM008", "Encountered an error when attaching a file", e); 285 } 286 catch (HadoopAccessorException e) { 287 throw new ActionExecutorException(ErrorType.ERROR, "EM008", "Encountered an error when attaching a file", e); 288 } 289 290 try { 291 // Send over SMTP Transport 292 // (Session+Message has adequate details.) 293 Transport.send(message); 294 } catch (NoSuchProviderException e) { 295 throw new ActionExecutorException(ErrorType.ERROR, "EM006", "Could not find an SMTP transport provider to email.", e); 296 } catch (MessagingException e) { 297 throw new ActionExecutorException(ErrorType.ERROR, "EM007", "Encountered an error while sending the email message over SMTP.", e); 298 } 299 } 300 301 @Override 302 public void end(Context context, WorkflowAction action) throws ActionExecutorException { 303 String externalStatus = action.getExternalStatus(); 304 WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK : 305 WorkflowAction.Status.ERROR; 306 context.setEndData(status, getActionSignal(status)); 307 } 308 309 @Override 310 public void check(Context context, WorkflowAction action) 311 throws ActionExecutorException { 312 313 } 314 315 @Override 316 public void kill(Context context, WorkflowAction action) 317 throws ActionExecutorException { 318 319 } 320 321 @Override 322 public boolean isCompleted(String externalStatus) { 323 return true; 324 } 325 326 public static class JavaMailAuthenticator extends Authenticator { 327 328 String user; 329 String password; 330 331 public JavaMailAuthenticator(String user, String password) { 332 this.user = user; 333 this.password = password; 334 } 335 336 @Override 337 protected PasswordAuthentication getPasswordAuthentication() { 338 return new PasswordAuthentication(user, password); 339 } 340 } 341 342 class URIDataSource implements DataSource{ 343 344 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 345 FileSystem fs; 346 URI uri; 347 public URIDataSource(URI uri, String user) throws HadoopAccessorException { 348 this.uri = uri; 349 Configuration fsConf = has.createJobConf(uri.getAuthority()); 350 fs = has.createFileSystem(user, uri, fsConf); 351 } 352 353 @Override 354 public InputStream getInputStream() throws IOException { 355 return fs.open(new Path(uri)); 356 } 357 358 @Override 359 public OutputStream getOutputStream() throws IOException { 360 return fs.create(new Path(uri)); 361 } 362 363 @Override 364 public String getContentType() { 365 return "application/octet-stream"; 366 } 367 368 @Override 369 public String getName() { 370 return uri.getPath(); 371 } 372 } 373}