001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.action.email; 020 021import java.io.File; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import java.net.URI; 026import java.net.URISyntaxException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.Properties; 030 031import javax.activation.DataHandler; 032import javax.activation.DataSource; 033import javax.mail.Authenticator; 034import javax.mail.Message; 035import javax.mail.Message.RecipientType; 036import javax.mail.MessagingException; 037import javax.mail.Multipart; 038import javax.mail.NoSuchProviderException; 039import javax.mail.PasswordAuthentication; 040import javax.mail.Session; 041import javax.mail.Transport; 042import javax.mail.internet.AddressException; 043import javax.mail.internet.InternetAddress; 044import javax.mail.internet.MimeBodyPart; 045import javax.mail.internet.MimeMessage; 046import javax.mail.internet.MimeMultipart; 047 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.fs.FileSystem; 050import org.apache.hadoop.fs.Path; 051import org.apache.oozie.action.ActionExecutor; 052import org.apache.oozie.action.ActionExecutorException; 053import org.apache.oozie.action.ActionExecutorException.ErrorType; 054import org.apache.oozie.client.WorkflowAction; 055import org.apache.oozie.service.ConfigurationService; 056import org.apache.oozie.service.HadoopAccessorException; 057import org.apache.oozie.service.Services; 058import org.apache.oozie.service.HadoopAccessorService; 059import org.apache.oozie.util.XLog; 060import org.apache.oozie.util.XmlUtils; 061import org.jdom.Element; 062import org.jdom.Namespace; 063 064/** 065 * Email action executor. It takes to, cc, bcc addresses along with a subject and body and sends 066 * out an email. 067 */ 068public class EmailActionExecutor extends ActionExecutor { 069 070 public static final String CONF_PREFIX = "oozie.email."; 071 public static final String EMAIL_SMTP_HOST = CONF_PREFIX + "smtp.host"; 072 public static final String EMAIL_SMTP_PORT = CONF_PREFIX + "smtp.port"; 073 public static final String EMAIL_SMTP_AUTH = CONF_PREFIX + "smtp.auth"; 074 public static final String EMAIL_SMTP_USER = CONF_PREFIX + "smtp.username"; 075 public static final String EMAIL_SMTP_PASS = CONF_PREFIX + "smtp.password"; 076 public static final String EMAIL_SMTP_FROM = CONF_PREFIX + "from.address"; 077 public static final String EMAIL_SMTP_SOCKET_TIMEOUT_MS = CONF_PREFIX + "smtp.socket.timeout.ms"; 078 public static final String EMAIL_ATTACHMENT_ENABLED = CONF_PREFIX + "attachment.enabled"; 079 080 private final static String TO = "to"; 081 private final static String CC = "cc"; 082 private final static String BCC = "bcc"; 083 private final static String SUB = "subject"; 084 private final static String BOD = "body"; 085 private final static String ATTACHMENT = "attachment"; 086 private final static String COMMA = ","; 087 private final static String CONTENT_TYPE = "content_type"; 088 089 private final static String DEFAULT_CONTENT_TYPE = "text/plain"; 090 private XLog LOG = XLog.getLog(getClass()); 091 public static final String EMAIL_ATTACHMENT_ERROR_MSG = 092 "\n Note: This email is missing configured email attachments " 093 + "as sending attachments in email action is disabled in the Oozie server. " 094 + "It could be for security compliance with data protection or other reasons"; 095 096 public EmailActionExecutor() { 097 super("email"); 098 } 099 100 @Override 101 public void initActionType() { 102 super.initActionType(); 103 } 104 105 @Override 106 public void start(Context context, WorkflowAction action) throws ActionExecutorException { 107 try { 108 context.setStartData("-", "-", "-"); 109 Element actionXml = XmlUtils.parseXml(action.getConf()); 110 validateAndMail(context, actionXml); 111 context.setExecutionData("OK", null); 112 } 113 catch (Exception ex) { 114 throw convertException(ex); 115 } 116 } 117 118 @SuppressWarnings("unchecked") 119 protected void validateAndMail(Context context, Element element) throws ActionExecutorException { 120 // The XSD does the min/max occurrence validation for us. 121 Namespace ns = element.getNamespace(); 122 String tos[] = new String[0]; 123 String ccs[] = new String[0]; 124 String bccs[] ; 125 String subject = ""; 126 String body = ""; 127 String attachments[] = new String[0]; 128 String contentType; 129 Element child = null; 130 131 // <to> - One ought to exist. 132 String text = element.getChildTextTrim(TO, ns); 133 if (text.isEmpty()) { 134 throw new ActionExecutorException(ErrorType.ERROR, "EM001", "No recipients were specified in the to-address field."); 135 } 136 tos = text.split(COMMA); 137 138 // <cc> - Optional, but only one ought to exist. 139 try { 140 ccs = element.getChildTextTrim(CC, ns).split(COMMA); 141 } catch (Exception e) { 142 // It is alright for cc to be given empty or not be present. 143 ccs = new String[0]; 144 } 145 146 // <bcc> - Optional, but only one ought to exist. 147 try { 148 bccs = element.getChildTextTrim(BCC, ns).split(COMMA); 149 } catch (Exception e) { 150 // It is alright for bcc to be given empty or not be present. 151 bccs = new String[0]; 152 } 153 // <subject> - One ought to exist. 154 subject = element.getChildTextTrim(SUB, ns); 155 156 // <body> - One ought to exist. 157 body = element.getChildTextTrim(BOD, ns); 158 159 // <attachment> - Optional 160 String attachment = element.getChildTextTrim(ATTACHMENT, ns); 161 if(attachment != null) { 162 attachments = attachment.split(COMMA); 163 } 164 165 contentType = element.getChildTextTrim(CONTENT_TYPE, ns); 166 if (contentType == null || contentType.isEmpty()) { 167 contentType = DEFAULT_CONTENT_TYPE; 168 } 169 170 // All good - lets try to mail! 171 email(tos, ccs, bccs, subject, body, attachments, contentType, context.getWorkflow().getUser()); 172 } 173 174 public void email(String[] to, String[] cc, String subject, String body, String[] attachments, 175 String contentType, String user) throws ActionExecutorException { 176 email(to, cc, new String[0], subject, body, attachments, contentType, user); 177 } 178 179 public void email(String[] to, String[] cc, String[] bcc, String subject, String body, String[] attachments, 180 String contentType, String user) throws ActionExecutorException { 181 // Get mailing server details. 182 String smtpHost = ConfigurationService.get(EMAIL_SMTP_HOST); 183 Integer smtpPortInt = ConfigurationService.getInt(EMAIL_SMTP_PORT); 184 Boolean smtpAuthBool = ConfigurationService.getBoolean(EMAIL_SMTP_AUTH); 185 String smtpUser = ConfigurationService.get(EMAIL_SMTP_USER); 186 String smtpPassword = ConfigurationService.getPassword(EMAIL_SMTP_PASS, ""); 187 String fromAddr = ConfigurationService.get(EMAIL_SMTP_FROM); 188 Integer timeoutMillisInt = ConfigurationService.getInt(EMAIL_SMTP_SOCKET_TIMEOUT_MS); 189 190 Properties properties = new Properties(); 191 properties.setProperty("mail.smtp.host", smtpHost); 192 properties.setProperty("mail.smtp.port", smtpPortInt.toString()); 193 properties.setProperty("mail.smtp.auth", smtpAuthBool.toString()); 194 195 // Apply sensible timeouts, as defaults are infinite. See https://s.apache.org/javax-mail-timeouts 196 properties.setProperty("mail.smtp.connectiontimeout", timeoutMillisInt.toString()); 197 properties.setProperty("mail.smtp.timeout", timeoutMillisInt.toString()); 198 properties.setProperty("mail.smtp.writetimeout", timeoutMillisInt.toString()); 199 200 Session session; 201 // Do not use default instance (i.e. Session.getDefaultInstance) 202 // (cause it may lead to issues when used second time). 203 if (!smtpAuthBool) { 204 session = Session.getInstance(properties); 205 } else { 206 session = Session.getInstance(properties, new JavaMailAuthenticator(smtpUser, smtpPassword)); 207 } 208 209 Message message = new MimeMessage(session); 210 InternetAddress from; 211 List<InternetAddress> toAddrs = new ArrayList<InternetAddress>(to.length); 212 List<InternetAddress> ccAddrs = new ArrayList<InternetAddress>(cc.length); 213 List<InternetAddress> bccAddrs = new ArrayList<InternetAddress>(bcc.length); 214 215 try { 216 from = new InternetAddress(fromAddr); 217 message.setFrom(from); 218 } catch (AddressException e) { 219 throw new ActionExecutorException(ErrorType.ERROR, "EM002", 220 "Bad from address specified in ${oozie.email.from.address}.", e); 221 } catch (MessagingException e) { 222 throw new ActionExecutorException(ErrorType.ERROR, "EM003", 223 "Error setting a from address in the message.", e); 224 } 225 226 try { 227 // Add all <to> 228 for (String toStr : to) { 229 toAddrs.add(new InternetAddress(toStr.trim())); 230 } 231 message.addRecipients(RecipientType.TO, toAddrs.toArray(new InternetAddress[0])); 232 233 // Add all <cc> 234 for (String ccStr : cc) { 235 ccAddrs.add(new InternetAddress(ccStr.trim())); 236 } 237 message.addRecipients(RecipientType.CC, ccAddrs.toArray(new InternetAddress[0])); 238 239 // Add all <bcc> 240 for (String bccStr : bcc) { 241 bccAddrs.add(new InternetAddress(bccStr.trim())); 242 } 243 message.addRecipients(RecipientType.BCC, bccAddrs.toArray(new InternetAddress[0])); 244 245 // Set subject 246 message.setSubject(subject); 247 248 // when there is attachment 249 if (attachments != null && attachments.length > 0 && ConfigurationService.getBoolean(EMAIL_ATTACHMENT_ENABLED)) { 250 Multipart multipart = new MimeMultipart(); 251 252 // Set body text 253 MimeBodyPart bodyTextPart = new MimeBodyPart(); 254 bodyTextPart.setText(body); 255 multipart.addBodyPart(bodyTextPart); 256 257 for (String attachment : attachments) { 258 URI attachUri = new URI(attachment); 259 if (attachUri.getScheme() != null && attachUri.getScheme().equals("file")) { 260 throw new ActionExecutorException(ErrorType.ERROR, "EM008", 261 "Encountered an error when attaching a file. A local file cannot be attached:" 262 + attachment); 263 } 264 MimeBodyPart messageBodyPart = new MimeBodyPart(); 265 DataSource source = new URIDataSource(attachUri, user); 266 messageBodyPart.setDataHandler(new DataHandler(source)); 267 messageBodyPart.setFileName(new File(attachment).getName()); 268 multipart.addBodyPart(messageBodyPart); 269 } 270 message.setContent(multipart); 271 } 272 else { 273 if (attachments != null && attachments.length > 0 && !ConfigurationService.getBoolean(EMAIL_ATTACHMENT_ENABLED)) { 274 body = body + EMAIL_ATTACHMENT_ERROR_MSG; 275 } 276 message.setContent(body, contentType); 277 } 278 } 279 catch (AddressException e) { 280 throw new ActionExecutorException(ErrorType.ERROR, "EM004", "Bad address format in <to> or <cc> or <bcc>.", e); 281 } 282 catch (MessagingException e) { 283 throw new ActionExecutorException(ErrorType.ERROR, "EM005", "An error occurred while adding recipients.", e); 284 } 285 catch (URISyntaxException e) { 286 throw new ActionExecutorException(ErrorType.ERROR, "EM008", "Encountered an error when attaching a file", e); 287 } 288 catch (HadoopAccessorException e) { 289 throw new ActionExecutorException(ErrorType.ERROR, "EM008", "Encountered an error when attaching a file", e); 290 } 291 292 try { 293 // Send over SMTP Transport 294 // (Session+Message has adequate details.) 295 Transport.send(message); 296 } catch (NoSuchProviderException e) { 297 throw new ActionExecutorException(ErrorType.ERROR, "EM006", 298 "Could not find an SMTP transport provider to email.", e); 299 } catch (MessagingException e) { 300 throw new ActionExecutorException(ErrorType.ERROR, "EM007", 301 "Encountered an error while sending the email message over SMTP.", e); 302 } 303 } 304 305 @Override 306 public void end(Context context, WorkflowAction action) throws ActionExecutorException { 307 String externalStatus = action.getExternalStatus(); 308 WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK : 309 WorkflowAction.Status.ERROR; 310 context.setEndData(status, getActionSignal(status)); 311 } 312 313 @Override 314 public void check(Context context, WorkflowAction action) 315 throws ActionExecutorException { 316 317 } 318 319 @Override 320 public void kill(Context context, WorkflowAction action) 321 throws ActionExecutorException { 322 323 } 324 325 @Override 326 public boolean isCompleted(String externalStatus) { 327 return true; 328 } 329 330 public static class JavaMailAuthenticator extends Authenticator { 331 332 String user; 333 String password; 334 335 public JavaMailAuthenticator(String user, String password) { 336 this.user = user; 337 this.password = password; 338 } 339 340 @Override 341 protected PasswordAuthentication getPasswordAuthentication() { 342 return new PasswordAuthentication(user, password); 343 } 344 } 345 346 class URIDataSource implements DataSource{ 347 348 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 349 FileSystem fs; 350 URI uri; 351 public URIDataSource(URI uri, String user) throws HadoopAccessorException { 352 this.uri = uri; 353 Configuration fsConf = has.createConfiguration(uri.getAuthority()); 354 fs = has.createFileSystem(user, uri, fsConf); 355 } 356 357 @Override 358 public InputStream getInputStream() throws IOException { 359 return fs.open(new Path(uri)); 360 } 361 362 @Override 363 public OutputStream getOutputStream() throws IOException { 364 return fs.create(new Path(uri)); 365 } 366 367 @Override 368 public String getContentType() { 369 return "application/octet-stream"; 370 } 371 372 @Override 373 public String getName() { 374 return uri.getPath(); 375 } 376 } 377}