HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //proc/self/root/home/arjun/projects/buyercall/buyercall/blueprints/email/endpoints.py
import json
import logging as log

from flask import Blueprint, request, current_app
from flask_login import current_user
from sqlalchemy import and_

from buyercall.blueprints.channels.models import Channel, ChannelType
from buyercall.blueprints.contacts.models import Contact, ContactChannelTie
from buyercall.blueprints.email.models import EmailTemplate
from buyercall.blueprints.email.serializers import EmailTemplateSchema
from buyercall.blueprints.email.utils.validate import EmailValidator
from buyercall.blueprints.partnership.models import PartnershipAccount, Partnership
from buyercall.blueprints.sources.models import Source
from buyercall.blueprints.user.decorators import api_role_required
from buyercall.extensions import ses_client, sns_client
from buyercall.lib.util_boto3_s3 import create_bucket, upload_file_object, download_file
from buyercall.lib.util_rest import api_jsonify, jsonify
from .models import Email, EmailIdentity
from .utils.request_dispatcher import RequestDispatcher
from .utils.urls_collector_from_string import UrlCollector
from ..widgets.utils.interaction_manager import InteractionManager

email_api = Blueprint('email_api', __name__, url_prefix='/api/email')
logger = log.getLogger(__name__)


@api_role_required('admin')
def create_email_channel():
    received = request.get_json()
    success = True
    message = "Email Channel added successfully!"
    status_code = 200
    data = {}
    if received:
        partnership_sid = received.get('partnership_id', None)
        partnership_account_sid = received.get('partnership_account_id', None)
        source_sid = received.get('source', None)
        if partnership_sid and partnership_account_sid and source_sid:
            partnership = Partnership.query.filter(Partnership.sid == partnership_sid).first()
            partnership_account = PartnershipAccount \
                .query \
                .filter(and_(PartnershipAccount.sid == partnership_account_sid,
                             PartnershipAccount.partnership_id == partnership.id)) \
                .first()
            username = received.get('username', '')
            test_server_domain = current_app.config.get('TEST_SERVER_DOMAIN', None)
            if test_server_domain and current_app.config.get('DEBUG', False):
                domain = test_server_domain
            else:
                domain = received.get('domain', None) or partnership.partner_url
            email_ = f"{username}@{domain}"
            is_email_valid = EmailValidator(email_).validate()
            is_email_exists = EmailIdentity.is_exists(email_)

            if partnership_account:
                if is_email_valid:
                    if is_email_exists:
                        params = {
                            'username': username,
                            'domain': domain,
                            'partnership_id': partnership.id,
                            'partnership_account_id': partnership_account.id,
                        }
                        email = EmailIdentity.create(**params)

                        channel_type = ChannelType.get_by_name('email')
                        if channel_type:
                            email_add_resp = ses_client.verify_email_identity(email)
                            if not email_add_resp.get('status'):
                                message = email_add_resp.get('message', 'Something went wrong')
                                status_code = 500
                                success = False
                            else:
                                channel = Channel.create(
                                    name=received.get('username', 'Email channel name'),
                                    description=received.get('description', 'Email channel description'),
                                    type=channel_type.id,
                                    related_id=email.sid,
                                    source=Source.get_id_from_sid(source_sid),
                                    partnership_id=partnership.id,
                                    partnership_account_id=partnership_account.id,
                                    created_by=current_user.id
                                )
                    else:
                        status_code = 404
                        message = "Email address already exists"
                        success = False
                else:
                    status_code = 404
                    message = "Invalid email address"
                    success = False
            else:
                status_code = 404
                message = "Partnership account not found!"
                success = False

        else:
            status_code = 401
            message = "Invalid parameters!"
            success = False

    else:
        status_code = 401
        message = "Invalid parameters!"
        success = False
    return api_jsonify(data, status_code, message, success)


def aws_ses_webhook():
    raw_data = request.data
    data = json.loads(raw_data) or {}
    print(data)
    try:
        if data.get('Type', None):
            event_type = data.get('Type', None)
            if event_type == 'SubscriptionConfirmation':
                token = data.get('Token')
                topic_arn = data.get('TopicArn')
                sns_client.confirm_subscription(topic_arn, token)

            elif event_type == 'Notification':
                email_type, message = get_email_type(data)
                mail_content = message['mail']
                message_id = mail_content['messageId']
                from_email = mail_content.get('source')
                recipients = mail_content.get('destination')
                if email_type == "Send":
                    if from_email:
                        domain = from_email.split('@')[1]
                        partnership = Partnership.get_by_partner_url(domain)
                        partnership_account = PartnershipAccount.get_by_email(from_email)
                        subject = None
                        for h in mail_content['headers']:
                            if h['name'] == 'Subject':
                                subject = h['value']
                        body = message.get('content', None)
                        email_identity = EmailIdentity.get_by_username(from_email, partnership_id=partnership.id)
                        channel = None
                        source_id = None
                        if email_identity:
                            channel = Channel.get_by_related_id(email_identity.id, partnership.id)
                            source_id = channel.source if channel else 1
                        else:
                            source_id = 1
                        if not channel:
                            channel = 1

                        email = Email.create_from_raw(from_email=from_email, recipients=recipients, subject=subject,
                                                      body=body, source=source_id, channel=channel,
                                                      partnership_account=partnership_account, status=email_type,
                                                      message_id=message_id, is_inbound=False)

                elif email_type in ["Bounce", "Open", "Click", "Delivery"]:
                    email = Email.get_by_message(message_id)

                    if email:
                        email = email.update_status(email_type)
                    else:
                        print('No email with message id : ', message_id)
                elif email_type == "Received":
                    is_valid_mail = True
                    if recipients:

                        email_identity = EmailIdentity.get_by_email(recipients[0])
                        if not email_identity:
                            print('No email identity')
                            return jsonify({'success': True})
                        partnership = Partnership.get_by_id(email_identity.partnership_id)
                        if not partnership:
                            print('No partnership')
                            return jsonify({'success': True})
                        partnership_account = PartnershipAccount.get_by_id(email_identity.partnership_account_id)
                        if not partnership_account:
                            email_channel = Channel.get_by_email_identity(email_identity.id)
                            partnership_account = PartnershipAccount.get_by_id(email_channel.partnership_account_id) if email_channel else None
                            if not partnership_account:
                                print('Email is not regitered in any channel')
                                return jsonify({'success': True})

                        # Get email from s3 bucket
                        from buyercall.blueprints.email.utils.email_parser import EmailParser
                        email_key = f"emails/{partnership.sid}/{message_id}"
                        bucket = current_app.config.get('EMAIL_BUCKET')
                        email_file = download_file(email_key, bucket)
                        if not email_file:
                            raise Exception('Email file download failed')
                        parsed_data = EmailParser().parse(email_file)

                        subject = parsed_data.get('subject', None)
                        plain_text = parsed_data.get('plain', '')
                        html = parsed_data.get('html', '')
                        from_email = parsed_data.get('from', '')

                        channel = None

                        if email_identity:
                            channel = Channel.get_by_related_id(email_identity.id, partnership.id)
                            source_id = channel.source if channel else 1
                            if not email_identity.is_verified:
                                from buyercall.blueprints.email.utils.verify_email_identity import EmailVerifier
                                email_bucket = current_app.config.get('EMAIL_BUCKET', None)
                                email_key = f'emails/{partnership.sid}/{message_id}'
                                if EmailVerifier.verify(email_bucket, email_key):
                                    email_identity.is_verified = True
                                    email_identity.save()
                                    print('Email verified successfully - ', recipients[0])
                                else:
                                    print('Email verification failed - ', recipients[0])
                        else:
                            source_id = 1
                        if not channel:
                            channel_id = 1
                        else:
                            channel_id = channel.id

                        email = Email.create_from_raw(from_email=from_email, recipients=recipients, subject=subject,
                                                      body=plain_text or html, source=source_id, channel=channel_id,
                                                      partnership_account=partnership_account, status=email_type,
                                                      message_id=message_id, is_inbound=True)

                        # Find contact
                        if email:
                            is_existing_contact = True
                            contact = Contact.find(from_email)
                            if contact:
                                contact.status = 'new'
                                contact.save()
                            if not contact:
                                is_existing_contact = False
                                contact = Contact.create_from_mail(from_email, partnership_account.id)

                            # create contact_channel_tie
                            if source_id:
                                contact_channel_tie = ContactChannelTie()
                                contact_channel_tie.channel_id = channel_id
                                contact_channel_tie.contact = contact.id
                                contact_channel_tie.email = email.id
                                contact_channel_tie.source = source_id
                                contact_channel_tie.save()

                                # Send email to lead
                                from buyercall.blueprints.user.tasks import send_generic_mail
                                config_set_name = f"{str(partnership.sid)}-config_set"
                                send_generic_mail.delay(subject=subject, recipients=recipients,
                                                        config_set_name=config_set_name,
                                                        text=plain_text, html=html,
                                                        sender=current_app.config.get('SES_EMAIL_SOURCE'))

                                assigned_agents_ids = InteractionManager().run(
                                    'email', contact, channel, is_existing_contact, {'message': plain_text}, partnership_account.id)

                        else:
                            print('Invalid email')
                else:
                    print('Unidentified email type : ', email_type)
            else:
                print('Event type not found')
        else:
            print(data)
    except Exception as e:
        print(e)
    return jsonify({'success': True})


def test_send_mail():
    # Send test mail
    recipients = ['help@baloud.com', ]
    # recipients = ['hello@botfather.in',]
    plain_body = "This is a test mail"
    html_body = "Test mail"
    pid = "66e3c8d6-b164-4054-a431-cad6a00a3f35"
    subject = "Test Subject"
    partnership_account = PartnershipAccount.query.get(1)
    email_type = 'Send'
    mail_resp = ses_client.send_email(recipients=recipients, pid=pid, text=plain_body, html=html_body)
    from_email = current_app.config.get('SES_EMAIL_SOURCE', None)
    message_id = mail_resp.get('MessageId')
    email = Email.create_from_raw(from_email=from_email, recipients=recipients, subject=subject, body=plain_body,
                                  partnership_account=partnership_account,
                                  status=email_type, message_id=message_id, raw=html_body)

    return jsonify({'success': mail_resp})


def parse_email():
    parsed_data = {}
    try:
        bucket_name = current_app.config.get('EMAIL_BUCKET', 'ses-buyercall')
        email_key = '30d44pov8ujfcsc92n8q97a1ihqsk822iopa17g1'
        from buyercall.lib.util_boto3_s3 import download_file
        from buyercall.blueprints.email.utils.email_parser import EmailParser
        email_file = download_file(email_key, bucket_name)
        parsed_data = EmailParser().parse(email_file)
        urls = UrlCollector.collect_urls(parsed_data.get('plain', ''))
        verification_url = [url for url in urls if 'email-verification' in url]
        if verification_url:
            response = RequestDispatcher.request_retry(url=verification_url[0])
            if response.get('success', False):
                print('Email verified')

    except Exception as e:
        print("Error: ", e)
    return api_jsonify(parsed_data)


def test_send_raw_mail():
    mail_resp = None
    try:
        config_set = f"{current_app.config['MAIL_CONFIG_PREFIX']}-config-set"
        attachments = request.files.getlist('attachments')
        sender = current_app.config.get('SES_EMAIL_SOURCE', None)
        recipients = ['help@baloud.com', ]
        title = "Test Subject"
        text = "This is a test mail"
        html = """
            <!DOCTYPE html>
            <html>
                <head></head>
                <body>
                    <h1>HTML body</h1>
                    <p>Hello there</p>
                </body>
            </html>
        """
        msg = ses_client.create_multipart_message(sender, recipients, title, text, html, attachments)
        print('multiplart msg : ', msg)
        mail_resp = ses_client.send_raw_mail(sender=sender, recipients=recipients,
                                             config_set=config_set, content=msg.as_string())
        message_id = mail_resp.get('MessageId')
        # email = Email.create_from_raw(from_email=sender, recipients=recipients, subject=title, body=text,
        # partnership_account=partnership_account, \ status='Send', message_id=message_id, raw=html) print('create in
        # db :', email) if attachments: for attachment in attachments: attachment_ext = attachment.filename.split(
        # '.')[-1] key = f'email/account_{pid}/{message_id}.{attachment_ext}' resp = upload_file_object(attachment,
        # current_app.config['EMAIL_BUCKET'], key) print('s3 resp : ', resp)
    except Exception as e:
        print('Error : ', e)
    return jsonify({'success': mail_resp})


def create_s3_bucket():
    response = create_bucket()
    return api_jsonify(data=response, message="success")


def get_email_type(data):
    message = json.loads(data.get('Message', '{}'))
    return message.get('notificationType', message.get('eventType', None)), message


def get_email_message_id(data):
    return data.get('MessageId', None)


@api_role_required('admin', 'partner')
def create_email_template():
    message = "Email template created successfully!"
    status_code = 200
    success = True
    template_image = request.files.get('image', None)
    template_data = request.form.get('data', {})
    template = None

    if template_data:
        try:
            template_data = json.loads(template_data)
        except:
            template_data = {}
    title = template_data.get('title', '')
    subject = template_data.get('subject', '')
    if title and subject:
        data = {
            'title': template_data.get('title', ''),
            'subject': template_data.get('subject', ''),
            'description': template_data.get('description', ''),
            'content': template_data.get('content', {}),
            'is_active': template_data.get('isActive', True),
            'is_plaintext': template_data.get('isPlainEmail', False),
            'partnership_id': current_user.partnership_id,
            'partnership_account_id': current_user.partnership_account_id
        }
        template = EmailTemplate.create(**data)
    else:
        success = False
        message = "Invalid request. Email template create request failed."
        status_code = 400

    if template and template_image:
        image_ext = template_image.filename.split('.')[-1]
        key = f'email/template_images/{template.sid}.{image_ext}'
        resp = upload_file_object(template_image, current_app.config['EMAIL_TEMPLATE_BUCKET'], key)
        if resp:
            image_key = f"{current_app.config['EMAIL_TEMPLATE_BUCKET']}::{key}"
            template.image = image_key
            template.save()
    return api_jsonify(message=message, success=success, status_code=status_code)


@api_role_required('admin', 'partner')
def get_email_templates():
    try:
        message = "Email templates fetched successfully!"
        data = {}
        templates = EmailTemplate.query.filter(
            EmailTemplate.partnership_account_id == current_user.partnership_account_id).all()
        if templates:
            data = EmailTemplateSchema(many=True).dump(templates)
    except Exception as e:
        print(e)
    return api_jsonify(data=data, message=message)


@api_role_required('admin', 'partner')
def update_email_template(template_id):
    status_code = 200
    success = True
    data = {}
    message = "Email template updated successfully!"
    try:
        template_image = request.files.get('image', None)
        template_data = request.form.get('data', None)
        if template_data:
            try:
                template_data = json.loads(template_data)
            except:
                template_data = {}

        if template_data:
            template = EmailTemplate.query.filter(EmailTemplate.sid == template_id).first()
            if template:
                received_data = {
                    "title": template_data.get('title', ''),
                    "subject": template_data.get('subject', ''),
                    "description": template_data.get('description', ''),
                    "content": template_data.get('content', ''),
                }
                for key, value in received_data.items():
                    if value:
                        setattr(template, key, received_data[key])
                template.is_active = template_data.get('isActive', True)
                template.is_plaintext = template_data.get('isPlainEmail', True)

                template.save()
                if template_image:
                    image_ext = template_image.filename.split('.')[-1]
                    key = f'email/template_images/{template.sid}.{image_ext}'
                    resp = upload_file_object(template_image, current_app.config['EMAIL_TEMPLATE_BUCKET'], key)
                    if resp:
                        image_key = f"{current_app.config['EMAIL_TEMPLATE_BUCKET']}::{key}"
                        template.image = image_key
                        template.save()
            else:
                message = "Email template doesn't exists!"
                status_code = 404
                success = False

        else:
            message = "Invalid parameters!"
            status_code = 401
            success = False
    except Exception as e:
        print(e)
    return api_jsonify(data=data, message=message, status_code=status_code, success=success)


@api_role_required('admin', 'partner')
def get_email_template_by_id(template_id):
    message = "Email template fetched successfully!"
    status_code = 200
    success = True
    data = {}
    template = EmailTemplate.query.filter(EmailTemplate.sid == template_id).first()
    if not template:
        message = "Email template doesn't exists!"
        status_code = 404
        success = False
    else:
        data = EmailTemplateSchema().dump(template)
    return api_jsonify(data=data, message=message, status_code=status_code, success=success)


@api_role_required('admin', 'partner')
def delete_email_template(template_id):
    message = "Email template deleted successfully!"
    status_code = 204
    success = True
    template = EmailTemplate.query.filter(EmailTemplate.sid == template_id).first()
    if not template:
        message = "Email template doesn't exists!"
        status_code = 404
        success = False
    else:
        data = template.delete()
    return api_jsonify(message=message, status_code=status_code, success=success)


@api_role_required('admin', 'partner', 'sysadmin')
def list_all_identites():
    identities = ses_client.list_all_identities()
    if identities['status']:
        status_code = 200
        message = "Successfully fetched the identites"
    else:
        status_code = 500
        message = identities['message']
    return api_jsonify(data=identities['data'], status_code=status_code, success=identities['status'], message=message)


@api_role_required('admin', 'partner', 'sysadmin')
def list_email_identities():
    identities = ses_client.list_email_identities()
    if identities['status']:
        status_code = 200
        message = "Successfully fetched the email identites"
    else:
        status_code = 500
        message = identities['message']
    return api_jsonify(data=identities['data'], status_code=status_code, success=identities['status'], message=message)


@api_role_required('admin', 'partner', 'sysadmin')
def list_domain_identities():
    identities = ses_client.list_domain_identities()
    if identities['status']:
        status_code = 200
        message = "Successfully fetched the domain identites"
    else:
        status_code = 500
        message = identities['message']
    return api_jsonify(data=identities['data'], status_code=status_code, success=identities['status'], message=message)