File: //home/arjun/projects/buyercall/buyercall/blueprints/email/utils/create_configuration.py
import uuid
from buyercall.extensions import ses_client, sns_client
class EmailConfiguration:
def __init__(self, endpoint, bucket, prefix=None, ) -> None:
self.prefix = prefix or uuid.uuid4()
self.config_set_name = f'{prefix}-config-set'
self.subscription_endpoint = endpoint
self.receiver_bucket = bucket
def configure(self):
response = {}
try:
# create a configuration set
response['config_set'] = ses_client.create_configuration_set(self.config_set_name)
# create an sns topic
topic = sns_client.create_topic(f'{self.prefix}-topic')
if topic.get('status', False):
topic_arn = topic.get('data')['TopicArn']
# Create an event destination
if topic_arn:
destination_config = {
'Name': f"{self.prefix}-event-destination",
'Enabled': True,
'MatchingEventTypes': [
'send', 'reject', 'bounce', 'complaint', 'delivery', 'open', 'click', 'renderingFailure'
],
'SNSDestination': {
'TopicARN': topic_arn
},
}
response['event_destination'] = ses_client.create_event_destination(self.config_set_name, destination_config)
# topic_arn = "arn:aws:sns:us-east-1:279566895929:1001e251-6d26-40bd-8523-670f611238af-topic"
# Subscribe to topic
response['subscription'] = sns_client.subscribe(topic_arn, self.subscription_endpoint, 'http')
# Create email receiving ruleset
response['rule_set'] = ses_client.create_receipt_rule_set(f'{self.prefix}-rule-set')
# Create receipt rule
response['receipt_rule'] = ses_client.create_receipt_rule(f"{self.prefix}-rule-set", topic_arn, f"{self.prefix}-rule", self.receiver_bucket)
response['success'] = True
except Exception as e:
response['success'] = False
return response