File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/lib/util_ses_email.py
import boto3
from logging.handlers import SMTPHandler
from botocore.exceptions import ClientError
from flask import current_app
from buyercall.blueprints.partnership.models import Partnership
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from datetime import datetime
def send_ses_email(recipients, p_id, subject='', text='', html='', sender='', account_name=''):
ses = boto3.client(
'ses',
region_name=current_app.config['SES_REGION_NAME'],
aws_access_key_id=current_app.config['AMAZON_ACCESS_KEY'],
aws_secret_access_key=current_app.config['AMAZON_SECRET_KEY']
)
if not sender:
# Set default sender info
sender_email = current_app.config['SES_EMAIL_SOURCE']
sender = 'BuyerCall <' + sender_email + '>'
# lookup partner and set sender to partner info if exist
partner = Partnership.query.filter(Partnership.id == p_id).first()
if partner is not None:
if partner.email_sender:
sender_email = partner.email_sender
sender = partner.name + ' <' + sender_email + '>'
else:
sender_email = sender
if account_name:
sender = account_name + ' ' + '<' + sender_email + '>'
try:
if text:
response = ses.send_email(
Source=sender,
Destination={'ToAddresses': recipients},
Message={
'Subject': {'Data': subject},
'Body': {
'Text': {'Data': text}
}
}
)
else:
response = ses.send_email(
Source=sender,
Destination={'ToAddresses': recipients},
Message={
'Subject': {'Data': subject},
'Body': {
'Html': {'Data': html}
}
}
)
except ClientError as e:
print(e.response['Error']['Message'])
else:
print("Email sent using {}! Message ID: {}".format(sender_email, response['MessageId']))
def send_ses_email_with_attachment(recipients, p_id, subject='', text='', html='', sender='', account_name='',
file=None):
ses = boto3.client(
'ses',
region_name=current_app.config['SES_REGION_NAME'],
aws_access_key_id=current_app.config['AMAZON_ACCESS_KEY'],
aws_secret_access_key=current_app.config['AMAZON_SECRET_KEY']
)
if not sender:
# Set default sender info
sender_email = current_app.config['SES_EMAIL_SOURCE']
sender = 'BuyerCall <' + sender_email + '>'
# lookup partner and set sender to partner info if exist
partner = Partnership.query.filter(Partnership.id == p_id).first()
if partner is not None:
if partner.email_sender:
sender_email = partner.email_sender
sender = partner.name + ' <' + sender_email + '>'
else:
sender_email = sender
if account_name:
sender = account_name + ' ' + '<' + sender_email + '>'
multipart_content_subtype = 'alternative' if text and html else 'mixed'
msg = MIMEMultipart(multipart_content_subtype)
msg["Subject"] = subject
msg["From"] = sender
msg["To"] = recipients
# Add text to email body
if text:
body = MIMEText(text, 'plain')
msg.attach(body)
else:
body = MIMEText(html, 'html')
msg.attach(body)
# Assign the file to attach to email
file_path = file
with open(file_path, "rb") as attachment:
part = MIMEApplication(attachment.read())
part.add_header("Content-Disposition",
"attachment",
filename=account_name + ' - ' + str(datetime.today().strftime('%Y-%m-%d')))
msg.attach(part)
# Convert message to string and send
try:
response = ses.send_raw_email(
Source=sender,
Destinations=recipients,
RawMessage={"Data": msg.as_string()}
)
print("Email sent using {}! Message ID: {}".format(sender_email, response['MessageId']))
except ClientError as e:
print(e.response['Error']['Message'])
class SESHandler(SMTPHandler):
def emit(self, record):
ses = boto3.client(
'ses',
region_name=current_app.config['SES_REGION_NAME'],
aws_access_key_id=current_app.config['AMAZON_ACCESS_KEY'],
aws_secret_access_key=current_app.config['AMAZON_SECRET_KEY']
)
message = self.format(record)
to_addresses = self.toaddrs
from_address = self.fromaddr
subject = self.subject
try:
response = ses.send_email(
Source=from_address,
Destination={'ToAddresses': to_addresses},
Message={
'Subject': {'Data': subject},
'Body': {
'Text': {'Charset': 'UTF-8',
'Data': message}
}
}
)
except ClientError as e:
print(e.response['Error']['Message'])
else:
print("Email sent Message ID: {} - {}".format(response['MessageId'], response))
def send_ses_notification(recipients, subject='', text='', html='', sender='', account_name=''):
ses = boto3.client(
'ses',
region_name=current_app.config['SES_REGION_NAME'],
aws_access_key_id=current_app.config['AMAZON_ACCESS_KEY'],
aws_secret_access_key=current_app.config['AMAZON_SECRET_KEY']
)
sender_email = sender
if account_name:
sender = account_name + ' ' + '<' + sender_email + '>'
try:
_email_type, _data = ('Text', text) if text else ('Html', html)
response = ses.send_email(
Source=sender,
Destination={'ToAddresses': recipients},
Message={
'Subject': {'Data': subject},
'Body': {
_email_type: {'Data': _data}
}
}
)
except ClientError as e:
print(e.response['Error']['Message'])
else:
print("Email sent using {}! Message ID: {}".format(sender_email, response['MessageId']))