HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //proc/1233/root/home/arjun/projects/buyercall/buyercall/blueprints/email/backup/webhook.py
import json
from flask import request
from buyercall.extensions import sns_client, ses_client



def aws_ses_webhook():
    raw_data = request.data
    print('raw_data ', raw_data)
    data = json.loads(raw_data) or {}
    try:
        if data.get('Type', None):
            event_type = data.get('Type', None)
            if event_type == 'SubscriptionConfirmation':
                token = data.get('Token')
                topic_arn = data.get('TopicArn')
                response = sns_client.confirm_subscription(topic_arn, token)

            elif event_type == 'Notification':
                email_type, message = get_email_type(data)
                partnership_account = None
                email = None
                source = None

                if email_type == "Send":
                    mail_content = message['mail']
                    message_id = mail_content['messageId']
                    from_email = mail_content.get('source')
                    recipients = mail_content.get('destination')
                    partnership_account = PartnershipAccount.get_by_email(recipients[0])
                    subject = None
                    for h in mail_content['headers']:
                        if h['name'] == 'Subject':
                            subject = h['value']
                    body = message.get('content', None)
                    email = Email.create_from_raw(from_email=from_email, recipients=recipients, subject=subject, body=body,\
                         partnership_account=partnership_account, status=email_type, message_id=message_id, is_inbound=False)

                elif email_type == "Bounce" or email_type == "Open" or email_type == "Click":
                    mail_content = message['mail']
                    message_id = mail_content['messageId']
                    from_email = mail_content.get('source')
                    recipients = mail_content.get('destination')
                    partnership_account = PartnershipAccount.get_by_email(recipients[0])
                    subject = None
                    for h in mail_content['headers']:
                        if h['name'] == 'Subject':
                            subject = h['value']
                    body = message.get('content', None)
                    email = Email.get_by_message(message_id)
                    print('existing email : ', email)
                    email = email.update_status(email_type)

                elif email_type == "Received":
                    mail_content = message['mail']
                    message_id = mail_content['messageId']
                    from_email = mail_content.get('source')
                    recipients = mail_content.get('destination')
                    partnership_account = PartnershipAccount.get_by_email(recipients[0])
                    subject = None
                    for h in mail_content['headers']:
                        if h['name'] == 'Subject':
                            subject = h['value']
                    body = message.get('content', None)
                    email = Email.create_from_raw(from_email, recipients, subject, body, partnership_account, email_type, message_id, body)
                else:
                    pass

                # Find contact
                if email:
                    contact = Contact.find(email.email)
                    if not contact:
                        contact = Contact.create_from_mail(from_email)

                # Get the source
                if partnership_account:
                    source = Source.get_by_partnership_account(partnership_account.id)

                # create contact_channel_tie
                if source:
                    contact_channel_tie = ContactChannelTie()
                    contact_channel_tie.contact = contact
                    contact_channel_tie.email = email
                    contact_channel_tie.source = source.id
                    contact_channel_tie.save()

            else:
                print('Event type not found')
        else:
            print(data)
    except Exception as e:
        print(e)
    return jsonify({'success': True})