HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall/buyercall/lib/util_boto3_ses.py
import logging
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

import boto3
from botocore.exceptions import ClientError

log = logging.getLogger(__name__)


class AmazonSimpleEmailService:
    ses_client = None
    exclude_file_types = ['deb', 'sh', ]

    def __init__(self):
        self.default_sender = None

    def init_app(self, app):
        self.ses_client = boto3.client(
            'ses',
            region_name=app.config.get('SES_REGION_NAME', ''),
            aws_access_key_id=app.config.get('AMAZON_ACCESS_KEY', ''),
            aws_secret_access_key=app.config.get('AMAZON_SECRET_KEY', '')
        )
        self.default_sender = app.config.get('SES_EMAIL_SOURCE', '')

    def send_email(self, recipients, config_set_name='', sender=None, subject='New mail', text='', html=''):
        try:
            if not sender:
                sender = self.default_sender

            if not isinstance(recipients, list):
                recipients = [recipients]

            resp = self.ses_client.send_email(
                Source=sender,
                Destination={'ToAddresses': recipients},
                Message={
                    'Subject': {'Data': subject},
                    'Body': {
                        'Text': {'Data': text},
                        'Html': {'Data': html}
                    }
                },
                ConfigurationSetName=config_set_name
            )
            print('resp : ', resp)
            return resp
        except Exception as e:
            print('Error : ', e)
            return False

    def send_raw_mail(self, sender, recipients, config_set, content):
        try:
            if not sender:
                sender = self.default_sender

            if not isinstance(recipients, list):
                recipients = [recipients]

            response = self.ses_client.send_raw_email(
                Source=sender,
                Destinations=recipients,
                RawMessage={
                    'Data': content
                },
                ConfigurationSetName=config_set
            )
            return response
        except Exception as e:
            print(e)
            return {}

    def create_multipart_message(self,
                                 sender: str, recipients: list, title: str, text: str = None, html: str = None,
                                 attachments: list = []) -> MIMEMultipart:
        """
        Creates a MIME multipart message object.
        Uses only the Python `email` standard library.
        Emails, both sender and recipients, can be just the email string or have the format 'The Name <the_email@host.com>'.
        :param sender: The sender.
        :param recipients: List of recipients. Needs to be a list, even if only one recipient.
        :param title: The title of the email.
        :param text: The text version of the email body (optional).
        :param html: The html version of the email body (optional).
        :param attachments: List of files to attach in the email.
        :return: A `MIMEMultipart` to be used to send the email.
        """
        multipart_content_subtype = 'alternative' if text and html else 'mixed'
        msg = MIMEMultipart(multipart_content_subtype)
        msg['Subject'] = title
        msg['From'] = sender
        msg['To'] = ', '.join(recipients)
        if text:
            part = MIMEText(text, 'plain')
            msg.attach(part)
        if html:
            part = MIMEText(html, 'html')
            msg.attach(part)
        # Attachments
        if attachments:
            for attachment in attachments:
                try:
                    extension = attachment.filename.split(".")[-1]
                    if extension not in self.exclude_file_types:
                        part = MIMEBase('application', "octet-stream")
                        part.set_payload(attachment.read())
                        encoders.encode_base64(part)
                        part.add_header('Content-Disposition', 'attachment; filename="{}"'.format(attachment.filename))
                        msg.attach(part)
                except Exception as e:
                    print('Error: ', e)
        return msg

    def verify_domain(self, domain, is_dkim=True):
        """Verify a domain
        Args:
            domain (str): Domain to verify
            is_dkim (bool, optional): Need DKIM enabled. Defaults to False.
        Returns:
            dict: On success, it will return dkim response
        """
        response = {'status': True, 'message': 'Success', 'data': {}}
        try:
            if domain:
                id_response = self.ses_client.verify_domain_identity(
                    Domain=domain
                )
                if 'VerificationToken' in id_response:
                    response['data']['domain_identity'] = id_response

                if is_dkim:
                    dkim_response = self.ses_client.verify_domain_dkim(
                        Domain=domain
                    )
                    if 'DkimTokens' in dkim_response:
                        response['data']['domain_dkim'] = dkim_response

            else:
                response['status'] = False
                response['message'] = 'Invalid domain'
                response['data'] = {}

        except Exception as ex:
            response['status'] = False

        return response

    def verify_email_identity(self, email):
        response = {'status': True, 'message': 'Success'}
        try:
            if email:
                ve_response = self.ses_client.verify_email_identity(
                    EmailAddress=email
                )
                if 'ResponseMetadata' not in ve_response:
                    response['status'] = False
                    response['message'] = 'API error'
            else:
                response['status'] = False
                response['message'] = 'Invalid email'
        except Exception as ex:
            response['status'] = False
            response['message'] = str(ex)

        return response

    def check_identity_verify_status(self, identities):
        """Check the status of the identities that are previously added/
        Args:
            identities (list): idnetites list
        Returns:
            dict: identities if verified
        """
        response = {'status': True, 'message': 'Success', 'data': []}
        try:
            if identities:
                if not isinstance(identities, list):
                    identities = [identities]

                added_identities = self.ses_client.get_identity_verification_attributes(
                    Identities=identities
                )
                response['data'] = added_identities or []
            else:
                response['status'] = False
                response['message'] = 'Invalid domain/email'

        except Exception as ex:
            response['status'] = False
            response['message'] = str(ex)

        return response

    def create_configuration_set(self, name):
        """Creates a configuration set.

        Args:
            name (str): The name of the configuration set.
                        Name requirements:
                        Contain only letters (a-z, A-Z), numbers (0-9), underscores (_), or dashes (-).
                        Contain 64 characters or fewer.
        Returns:
            dict: Returns a dict contains name of the configuration set that created.
        """
        response = {'status': True, 'message': 'Success', 'data': {}}
        try:
            if name:
                config_set_response = self.ses_client.create_configuration_set(
                    ConfigurationSet={
                        'Name': name
                    }
                )
                response['data'] = config_set_response
            else:
                response['status'] = False
                response['message'] = 'Invalid configuration name'

        except Exception as ex:
            response['status'] = False
            response['message'] = str(ex)

        return response

    def create_event_destination(self, configset_name, destination):
        """Creates a configuration set event destination.

        Args:
            configset_name (str): Coonfiguration set name
            destination (dict, optional): Destination configuration details.
        Returns:
            dict: Returns a dict contains name of the configuration set that created.

        """
        response = {'status': True, 'message': 'Success', 'data': {}}
        try:
            if configset_name and destination:
                config_set_evt_dest_response = self.ses_client.create_configuration_set_event_destination(
                    ConfigurationSetName=configset_name,
                    EventDestination=destination
                )
                response['data'] = config_set_evt_dest_response
            else:
                response['status'] = False
                response['message'] = 'Invalid configuration name and destination'

        except Exception as ex:
            response['status'] = False
            response['message'] = str(ex)

        return response

    def create_tracking_options(self, configset_name, tracking_options):
        """Creates an association between a configuration set and a custom domain for open and click event tracking.

        Args:
            configset_name (str): The name of the configuration set that the tracking options should be associated with.
            tracking_options (dict, optional): contains CustomRedirectDomain. A custom subdomain that will be used to redirect email
            recipients to the Amazon SES event tracking domain.
        Returns:
            dict: Returns a dict contains success or failiure
        """
        response = {'status': True, 'message': 'Success', 'data': {}}
        try:
            if configset_name and tracking_options:
                config_set_tracking_response = self.ses_client.create_configuration_set_tracking_options(
                    ConfigurationSetName=configset_name,
                    TrackingOptions=tracking_options
                )
                response['data'] = config_set_tracking_response
            else:
                response['status'] = False
                response['message'] = 'Invalid configuration name and destination'

        except Exception as ex:
            response['status'] = False
            response['message'] = str(ex)

        return response

    def create_receipt_filter(self, filter_name, ip_address_or_range, allow):
        """
        Creates a filter that allows or blocks incoming mail from an IP address or
        range.
        Args:
            filter_name(str): The name to give the filter.
            ip_address_or_range(str): The IP address or range to block or allow.
            allow(bool): When True, incoming mail is allowed from the specified IP
                        address or range; otherwise, it is blocked.
        Returns:
            dict: Returns a dict contains success or failiure
        """
        response = {'status': True, 'message': 'Success', 'data': {}}
        try:
            policy = 'Allow' if allow else 'Block'
            self.ses_client.create_receipt_filter(
                Filter={
                    'Name': filter_name,
                    'IpFilter': {
                        'Cidr': ip_address_or_range,
                        'Policy': policy}})
        except ClientError as ex:
            response['status'] = False
            response['message'] = str(ex)

    def create_receipt_rule_set(self, name):
        """Creates an empty receipt rule set.
        Args:
            name (str): The name of the rule set to create
        Returns:
            dict: Returns a dict contains success or failure
        """
        response = {'status': True, 'message': 'Success', 'data': {}}
        try:
            if name:
                ruleset_resp = self.ses_client.create_receipt_rule_set(
                    RuleSetName=name
                )
                response['data'] = ruleset_resp
            else:
                response['status'] = False
                response['message'] = 'Invalid inputs'

        except Exception as ex:
            response['status'] = False
            response['message'] = str(ex)

        return response

    def create_receipt_rule(self, rule_set_name, topic_arn, rule_name, bucket_name, recipients=None,
                            key_prefix='emails/'):
        """Creates a receipt rule.
        Args:
            param topic_arn (str):  The Amazon Resource Name (ARN) of the Amazon SNS topic to notify.
            param rule_name (str): The name of the receipt rule.
            param bucket_name: S3 bucket name
        Returns:
            dict: Returns a dict contains success or failure

        """
        if recipients is None:
            recipients = []
        response = {'status': True, 'message': 'Success', 'data': {}}
        try:
            if rule_set_name and topic_arn and rule_name:
                rule = {
                    'Actions': [
                        {
                            'S3Action': {
                                'TopicArn': topic_arn,
                                'BucketName': bucket_name,
                                'ObjectKeyPrefix': key_prefix,
                            },
                        },
                    ],
                    'Enabled': True,
                    'Name': rule_name,
                    'ScanEnabled': True,
                    'TlsPolicy': 'Optional'
                }
                # Add recipients if available
                if recipients:
                    rule['Recipients'] = recipients

                receipt_resp = self.ses_client.create_receipt_rule(
                    RuleSetName=rule_set_name,
                    Rule=rule
                )
                response['data'] = receipt_resp
            else:
                response['status'] = False
                response['message'] = 'Invalid inputs'

        except Exception as ex:
            response['status'] = False
            response['message'] = str(ex)

        return response

    def list_all_identities(self):
        response = {'status': True, 'message': 'Success', 'data': []}
        try:
            identities = self.ses_client.list_identities()
            response['data'] = identities['Identities']

        except Exception as ex:
            log.error(str(ex))
            response['status'] = False
            response['message'] = str(ex)

        return response

    def list_email_identities(self):
        response = {'status': True, 'message': 'Success', 'data': []}
        try:
            identities = self.ses_client.list_identities(IdentityType='EmailAddress')
            response['data'] = identities['Identities']

        except Exception as ex:
            log.error(str(ex))
            response['status'] = False
            response['message'] = str(ex)

        return response

    def list_domain_identities(self):
        response = {'status': True, 'message': 'Success', 'data': []}
        try:
            identities = self.ses_client.list_identities(IdentityType='Domain')
            response['data'] = identities['Identities']

        except Exception as ex:
            log.error(str(ex))
            response['status'] = False
            response['message'] = str(ex)

        return response


class AmazonSimpleEmailServiceV2():
    ses_client = None

    def __init__(self, ses_config):
        self.ses_client = boto3.client(
            'sesv2',
            region_name=ses_config['SES_REGION_NAME'],
            aws_access_key_id=ses_config['AMAZON_ACCESS_KEY'],
            aws_secret_access_key=ses_config['AMAZON_SECRET_KEY']
        )
        self.default_sender = ses_config['SES_EMAIL_SOURCE']

    def get_email_identity(self, identity):
        """Get the email identity that already added.
        Args:
            identity (str): A domain or email address
        Returns:
            dict: Success or failed response
        """
        response = {'status': True, 'message': 'Success', 'data': {}}
        try:
            if identity:
                added_identity = self.ses_client.get_email_identity(
                    Identities=identity
                )
                response['data'] = added_identity
            else:
                response['status'] = False
                response['message'] = 'Invalid domain/email'

        except Exception as ex:
            log.error(str(ex))
            response['status'] = False
            response['message'] = str(ex)

        return response

    def send_email(self, recipients, sender=None, config_name=None, subject='Test mail', text='', html=''):
        try:
            if not sender:
                sender = self.default_sender

            if not isinstance(recipients, list):
                recipients = [recipients]

            resp = self.ses_client.send_email(
                FromEmailAddress=sender,
                Destination={'ToAddresses': recipients},
                Content={
                    'Simple': {
                        'Subject': {
                            'Data': subject,
                            'Charset': 'UTF-8'
                        },
                        'Body': {
                            'Text': {
                                'Data': text,
                                'Charset': 'UTF-8'
                            },
                            'Html': {
                                'Data': html,
                                'Charset': 'UTF-8'
                            }
                        }
                    },
                },
                ConfigurationSetName=config_name
            )
            return True if resp else False
        except Exception as e:
            log.error(f"Error : {str(e)}")
            return False

    def verify_identity(self, identity, config_set):
        """Verify an email or domain
        Args:
            identity (str): Email or domain to verify
            config_set (str): Configuration name to create
        Returns:
            dict: Success or failed response
        """
        response = {'status': True, 'message': 'Success'}
        try:
            if identity:
                ve_response = self.ses_client.create_email_identity(
                    EmailIdentity=identity,
                    ConfigurationSetName=config_set
                )
                print(ve_response)
                if not 'ResponseMetadata' in ve_response:
                    response['status'] = False
                    response['message'] = 'API error'
            else:
                response['status'] = False
                response['message'] = 'Invalid email'
        except Exception as ex:
            log.error(str(ex))
            response['status'] = False
            response['message'] = str(ex)

        return response