HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/lib/util_ses_email.py
import boto3
from logging.handlers import SMTPHandler
from botocore.exceptions import ClientError
from flask import current_app
from buyercall.blueprints.partnership.models import Partnership
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from datetime import datetime


def send_ses_email(recipients, p_id, subject='', text='', html='', sender='', account_name=''):
    ses = boto3.client(
        'ses',
        region_name=current_app.config['SES_REGION_NAME'],
        aws_access_key_id=current_app.config['AMAZON_ACCESS_KEY'],
        aws_secret_access_key=current_app.config['AMAZON_SECRET_KEY']
    )
    if not sender:
        # Set default sender info
        sender_email = current_app.config['SES_EMAIL_SOURCE']
        sender = 'BuyerCall <' + sender_email + '>'
        # lookup partner and set sender to partner info if exist
        partner = Partnership.query.filter(Partnership.id == p_id).first()
        if partner is not None:
            if partner.email_sender:
                sender_email = partner.email_sender
                sender = partner.name + ' <' + sender_email + '>'
    else:
        sender_email = sender
        if account_name:
            sender = account_name + ' ' + '<' + sender_email + '>'
    try:
        if text:
            response = ses.send_email(
                Source=sender,
                Destination={'ToAddresses': recipients},
                Message={
                    'Subject': {'Data': subject},
                    'Body': {
                        'Text': {'Data': text}
                    }
                }
            )
        else:
            response = ses.send_email(
                Source=sender,
                Destination={'ToAddresses': recipients},
                Message={
                    'Subject': {'Data': subject},
                    'Body': {
                        'Html': {'Data': html}
                    }
                }
            )
    except ClientError as e:
        print(e.response['Error']['Message'])
    else:
        print("Email sent using {}! Message ID: {}".format(sender_email, response['MessageId']))


def send_ses_email_with_attachment(recipients, p_id, subject='', text='', html='', sender='', account_name='',
                                   file=None):
    ses = boto3.client(
        'ses',
        region_name=current_app.config['SES_REGION_NAME'],
        aws_access_key_id=current_app.config['AMAZON_ACCESS_KEY'],
        aws_secret_access_key=current_app.config['AMAZON_SECRET_KEY']
    )
    if not sender:
        # Set default sender info
        sender_email = current_app.config['SES_EMAIL_SOURCE']
        sender = 'BuyerCall <' + sender_email + '>'
        # lookup partner and set sender to partner info if exist
        partner = Partnership.query.filter(Partnership.id == p_id).first()
        if partner is not None:
            if partner.email_sender:
                sender_email = partner.email_sender
                sender = partner.name + ' <' + sender_email + '>'
    else:
        sender_email = sender
        if account_name:
            sender = account_name + ' ' + '<' + sender_email + '>'

    multipart_content_subtype = 'alternative' if text and html else 'mixed'
    msg = MIMEMultipart(multipart_content_subtype)
    msg["Subject"] = subject
    msg["From"] = sender
    msg["To"] = recipients

    # Add text to email body
    if text:
        body = MIMEText(text, 'plain')
        msg.attach(body)
    else:
        body = MIMEText(html, 'html')
        msg.attach(body)

    # Assign the file to attach to email
    file_path = file

    with open(file_path, "rb") as attachment:
        part = MIMEApplication(attachment.read())
        part.add_header("Content-Disposition",
                        "attachment",
                        filename=account_name + ' - ' + str(datetime.today().strftime('%Y-%m-%d')))
    msg.attach(part)
    # Convert message to string and send
    try:
        response = ses.send_raw_email(
            Source=sender,
            Destinations=recipients,
            RawMessage={"Data": msg.as_string()}
        )
        print("Email sent using {}! Message ID: {}".format(sender_email, response['MessageId']))
    except ClientError as e:
        print(e.response['Error']['Message'])


class SESHandler(SMTPHandler):
    def emit(self, record):
        ses = boto3.client(
            'ses',
            region_name=current_app.config['SES_REGION_NAME'],
            aws_access_key_id=current_app.config['AMAZON_ACCESS_KEY'],
            aws_secret_access_key=current_app.config['AMAZON_SECRET_KEY']
        )

        message = self.format(record)
        to_addresses = self.toaddrs
        from_address = self.fromaddr
        subject = self.subject

        try:
            response = ses.send_email(
                Source=from_address,
                Destination={'ToAddresses': to_addresses},
                Message={
                    'Subject': {'Data': subject},
                    'Body': {
                        'Text': {'Charset': 'UTF-8',
                                 'Data': message}
                    }
                }
            )
        except ClientError as e:
            print(e.response['Error']['Message'])
        else:
            print("Email sent Message ID: {} - {}".format(response['MessageId'], response))


def send_ses_notification(recipients, subject='', text='', html='', sender='', account_name=''):
    ses = boto3.client(
        'ses',
        region_name=current_app.config['SES_REGION_NAME'],
        aws_access_key_id=current_app.config['AMAZON_ACCESS_KEY'],
        aws_secret_access_key=current_app.config['AMAZON_SECRET_KEY']
    )

    sender_email = sender
    if account_name:
        sender = account_name + ' ' + '<' + sender_email + '>'
    try:
        _email_type, _data = ('Text', text) if text else ('Html', html)
        response = ses.send_email(
            Source=sender,
            Destination={'ToAddresses': recipients},
            Message={
                'Subject': {'Data': subject},
                'Body': {
                    _email_type: {'Data': _data}
                }
            }
        )
    except ClientError as e:
        print(e.response['Error']['Message'])
    else:
        print("Email sent using {}! Message ID: {}".format(sender_email, response['MessageId']))