File: //home/arjun/projects/buyercall/buyercall/lib/util_boto3_ses.py
import logging
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import boto3
from botocore.exceptions import ClientError
log = logging.getLogger(__name__)
class AmazonSimpleEmailService:
ses_client = None
exclude_file_types = ['deb', 'sh', ]
def __init__(self):
self.default_sender = None
def init_app(self, app):
self.ses_client = boto3.client(
'ses',
region_name=app.config.get('SES_REGION_NAME', ''),
aws_access_key_id=app.config.get('AMAZON_ACCESS_KEY', ''),
aws_secret_access_key=app.config.get('AMAZON_SECRET_KEY', '')
)
self.default_sender = app.config.get('SES_EMAIL_SOURCE', '')
def send_email(self, recipients, config_set_name='', sender=None, subject='New mail', text='', html=''):
try:
if not sender:
sender = self.default_sender
if not isinstance(recipients, list):
recipients = [recipients]
resp = self.ses_client.send_email(
Source=sender,
Destination={'ToAddresses': recipients},
Message={
'Subject': {'Data': subject},
'Body': {
'Text': {'Data': text},
'Html': {'Data': html}
}
},
ConfigurationSetName=config_set_name
)
print('resp : ', resp)
return resp
except Exception as e:
print('Error : ', e)
return False
def send_raw_mail(self, sender, recipients, config_set, content):
try:
if not sender:
sender = self.default_sender
if not isinstance(recipients, list):
recipients = [recipients]
response = self.ses_client.send_raw_email(
Source=sender,
Destinations=recipients,
RawMessage={
'Data': content
},
ConfigurationSetName=config_set
)
return response
except Exception as e:
print(e)
return {}
def create_multipart_message(self,
sender: str, recipients: list, title: str, text: str = None, html: str = None,
attachments: list = []) -> MIMEMultipart:
"""
Creates a MIME multipart message object.
Uses only the Python `email` standard library.
Emails, both sender and recipients, can be just the email string or have the format 'The Name <the_email@host.com>'.
:param sender: The sender.
:param recipients: List of recipients. Needs to be a list, even if only one recipient.
:param title: The title of the email.
:param text: The text version of the email body (optional).
:param html: The html version of the email body (optional).
:param attachments: List of files to attach in the email.
:return: A `MIMEMultipart` to be used to send the email.
"""
multipart_content_subtype = 'alternative' if text and html else 'mixed'
msg = MIMEMultipart(multipart_content_subtype)
msg['Subject'] = title
msg['From'] = sender
msg['To'] = ', '.join(recipients)
if text:
part = MIMEText(text, 'plain')
msg.attach(part)
if html:
part = MIMEText(html, 'html')
msg.attach(part)
# Attachments
if attachments:
for attachment in attachments:
try:
extension = attachment.filename.split(".")[-1]
if extension not in self.exclude_file_types:
part = MIMEBase('application', "octet-stream")
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header('Content-Disposition', 'attachment; filename="{}"'.format(attachment.filename))
msg.attach(part)
except Exception as e:
print('Error: ', e)
return msg
def verify_domain(self, domain, is_dkim=True):
"""Verify a domain
Args:
domain (str): Domain to verify
is_dkim (bool, optional): Need DKIM enabled. Defaults to False.
Returns:
dict: On success, it will return dkim response
"""
response = {'status': True, 'message': 'Success', 'data': {}}
try:
if domain:
id_response = self.ses_client.verify_domain_identity(
Domain=domain
)
if 'VerificationToken' in id_response:
response['data']['domain_identity'] = id_response
if is_dkim:
dkim_response = self.ses_client.verify_domain_dkim(
Domain=domain
)
if 'DkimTokens' in dkim_response:
response['data']['domain_dkim'] = dkim_response
else:
response['status'] = False
response['message'] = 'Invalid domain'
response['data'] = {}
except Exception as ex:
response['status'] = False
return response
def verify_email_identity(self, email):
response = {'status': True, 'message': 'Success'}
try:
if email:
ve_response = self.ses_client.verify_email_identity(
EmailAddress=email
)
if 'ResponseMetadata' not in ve_response:
response['status'] = False
response['message'] = 'API error'
else:
response['status'] = False
response['message'] = 'Invalid email'
except Exception as ex:
response['status'] = False
response['message'] = str(ex)
return response
def check_identity_verify_status(self, identities):
"""Check the status of the identities that are previously added/
Args:
identities (list): idnetites list
Returns:
dict: identities if verified
"""
response = {'status': True, 'message': 'Success', 'data': []}
try:
if identities:
if not isinstance(identities, list):
identities = [identities]
added_identities = self.ses_client.get_identity_verification_attributes(
Identities=identities
)
response['data'] = added_identities or []
else:
response['status'] = False
response['message'] = 'Invalid domain/email'
except Exception as ex:
response['status'] = False
response['message'] = str(ex)
return response
def create_configuration_set(self, name):
"""Creates a configuration set.
Args:
name (str): The name of the configuration set.
Name requirements:
Contain only letters (a-z, A-Z), numbers (0-9), underscores (_), or dashes (-).
Contain 64 characters or fewer.
Returns:
dict: Returns a dict contains name of the configuration set that created.
"""
response = {'status': True, 'message': 'Success', 'data': {}}
try:
if name:
config_set_response = self.ses_client.create_configuration_set(
ConfigurationSet={
'Name': name
}
)
response['data'] = config_set_response
else:
response['status'] = False
response['message'] = 'Invalid configuration name'
except Exception as ex:
response['status'] = False
response['message'] = str(ex)
return response
def create_event_destination(self, configset_name, destination):
"""Creates a configuration set event destination.
Args:
configset_name (str): Coonfiguration set name
destination (dict, optional): Destination configuration details.
Returns:
dict: Returns a dict contains name of the configuration set that created.
"""
response = {'status': True, 'message': 'Success', 'data': {}}
try:
if configset_name and destination:
config_set_evt_dest_response = self.ses_client.create_configuration_set_event_destination(
ConfigurationSetName=configset_name,
EventDestination=destination
)
response['data'] = config_set_evt_dest_response
else:
response['status'] = False
response['message'] = 'Invalid configuration name and destination'
except Exception as ex:
response['status'] = False
response['message'] = str(ex)
return response
def create_tracking_options(self, configset_name, tracking_options):
"""Creates an association between a configuration set and a custom domain for open and click event tracking.
Args:
configset_name (str): The name of the configuration set that the tracking options should be associated with.
tracking_options (dict, optional): contains CustomRedirectDomain. A custom subdomain that will be used to redirect email
recipients to the Amazon SES event tracking domain.
Returns:
dict: Returns a dict contains success or failiure
"""
response = {'status': True, 'message': 'Success', 'data': {}}
try:
if configset_name and tracking_options:
config_set_tracking_response = self.ses_client.create_configuration_set_tracking_options(
ConfigurationSetName=configset_name,
TrackingOptions=tracking_options
)
response['data'] = config_set_tracking_response
else:
response['status'] = False
response['message'] = 'Invalid configuration name and destination'
except Exception as ex:
response['status'] = False
response['message'] = str(ex)
return response
def create_receipt_filter(self, filter_name, ip_address_or_range, allow):
"""
Creates a filter that allows or blocks incoming mail from an IP address or
range.
Args:
filter_name(str): The name to give the filter.
ip_address_or_range(str): The IP address or range to block or allow.
allow(bool): When True, incoming mail is allowed from the specified IP
address or range; otherwise, it is blocked.
Returns:
dict: Returns a dict contains success or failiure
"""
response = {'status': True, 'message': 'Success', 'data': {}}
try:
policy = 'Allow' if allow else 'Block'
self.ses_client.create_receipt_filter(
Filter={
'Name': filter_name,
'IpFilter': {
'Cidr': ip_address_or_range,
'Policy': policy}})
except ClientError as ex:
response['status'] = False
response['message'] = str(ex)
def create_receipt_rule_set(self, name):
"""Creates an empty receipt rule set.
Args:
name (str): The name of the rule set to create
Returns:
dict: Returns a dict contains success or failure
"""
response = {'status': True, 'message': 'Success', 'data': {}}
try:
if name:
ruleset_resp = self.ses_client.create_receipt_rule_set(
RuleSetName=name
)
response['data'] = ruleset_resp
else:
response['status'] = False
response['message'] = 'Invalid inputs'
except Exception as ex:
response['status'] = False
response['message'] = str(ex)
return response
def create_receipt_rule(self, rule_set_name, topic_arn, rule_name, bucket_name, recipients=None,
key_prefix='emails/'):
"""Creates a receipt rule.
Args:
param topic_arn (str): The Amazon Resource Name (ARN) of the Amazon SNS topic to notify.
param rule_name (str): The name of the receipt rule.
param bucket_name: S3 bucket name
Returns:
dict: Returns a dict contains success or failure
"""
if recipients is None:
recipients = []
response = {'status': True, 'message': 'Success', 'data': {}}
try:
if rule_set_name and topic_arn and rule_name:
rule = {
'Actions': [
{
'S3Action': {
'TopicArn': topic_arn,
'BucketName': bucket_name,
'ObjectKeyPrefix': key_prefix,
},
},
],
'Enabled': True,
'Name': rule_name,
'ScanEnabled': True,
'TlsPolicy': 'Optional'
}
# Add recipients if available
if recipients:
rule['Recipients'] = recipients
receipt_resp = self.ses_client.create_receipt_rule(
RuleSetName=rule_set_name,
Rule=rule
)
response['data'] = receipt_resp
else:
response['status'] = False
response['message'] = 'Invalid inputs'
except Exception as ex:
response['status'] = False
response['message'] = str(ex)
return response
def list_all_identities(self):
response = {'status': True, 'message': 'Success', 'data': []}
try:
identities = self.ses_client.list_identities()
response['data'] = identities['Identities']
except Exception as ex:
log.error(str(ex))
response['status'] = False
response['message'] = str(ex)
return response
def list_email_identities(self):
response = {'status': True, 'message': 'Success', 'data': []}
try:
identities = self.ses_client.list_identities(IdentityType='EmailAddress')
response['data'] = identities['Identities']
except Exception as ex:
log.error(str(ex))
response['status'] = False
response['message'] = str(ex)
return response
def list_domain_identities(self):
response = {'status': True, 'message': 'Success', 'data': []}
try:
identities = self.ses_client.list_identities(IdentityType='Domain')
response['data'] = identities['Identities']
except Exception as ex:
log.error(str(ex))
response['status'] = False
response['message'] = str(ex)
return response
class AmazonSimpleEmailServiceV2():
ses_client = None
def __init__(self, ses_config):
self.ses_client = boto3.client(
'sesv2',
region_name=ses_config['SES_REGION_NAME'],
aws_access_key_id=ses_config['AMAZON_ACCESS_KEY'],
aws_secret_access_key=ses_config['AMAZON_SECRET_KEY']
)
self.default_sender = ses_config['SES_EMAIL_SOURCE']
def get_email_identity(self, identity):
"""Get the email identity that already added.
Args:
identity (str): A domain or email address
Returns:
dict: Success or failed response
"""
response = {'status': True, 'message': 'Success', 'data': {}}
try:
if identity:
added_identity = self.ses_client.get_email_identity(
Identities=identity
)
response['data'] = added_identity
else:
response['status'] = False
response['message'] = 'Invalid domain/email'
except Exception as ex:
log.error(str(ex))
response['status'] = False
response['message'] = str(ex)
return response
def send_email(self, recipients, sender=None, config_name=None, subject='Test mail', text='', html=''):
try:
if not sender:
sender = self.default_sender
if not isinstance(recipients, list):
recipients = [recipients]
resp = self.ses_client.send_email(
FromEmailAddress=sender,
Destination={'ToAddresses': recipients},
Content={
'Simple': {
'Subject': {
'Data': subject,
'Charset': 'UTF-8'
},
'Body': {
'Text': {
'Data': text,
'Charset': 'UTF-8'
},
'Html': {
'Data': html,
'Charset': 'UTF-8'
}
}
},
},
ConfigurationSetName=config_name
)
return True if resp else False
except Exception as e:
log.error(f"Error : {str(e)}")
return False
def verify_identity(self, identity, config_set):
"""Verify an email or domain
Args:
identity (str): Email or domain to verify
config_set (str): Configuration name to create
Returns:
dict: Success or failed response
"""
response = {'status': True, 'message': 'Success'}
try:
if identity:
ve_response = self.ses_client.create_email_identity(
EmailIdentity=identity,
ConfigurationSetName=config_set
)
print(ve_response)
if not 'ResponseMetadata' in ve_response:
response['status'] = False
response['message'] = 'API error'
else:
response['status'] = False
response['message'] = 'Invalid email'
except Exception as ex:
log.error(str(ex))
response['status'] = False
response['message'] = str(ex)
return response