File: //proc/self/root/home/arjun/projects/buyercall/buyercall/blueprints/email/endpoints.py
import json
import logging as log
from flask import Blueprint, request, current_app
from flask_login import current_user
from sqlalchemy import and_
from buyercall.blueprints.channels.models import Channel, ChannelType
from buyercall.blueprints.contacts.models import Contact, ContactChannelTie
from buyercall.blueprints.email.models import EmailTemplate
from buyercall.blueprints.email.serializers import EmailTemplateSchema
from buyercall.blueprints.email.utils.validate import EmailValidator
from buyercall.blueprints.partnership.models import PartnershipAccount, Partnership
from buyercall.blueprints.sources.models import Source
from buyercall.blueprints.user.decorators import api_role_required
from buyercall.extensions import ses_client, sns_client
from buyercall.lib.util_boto3_s3 import create_bucket, upload_file_object, download_file
from buyercall.lib.util_rest import api_jsonify, jsonify
from .models import Email, EmailIdentity
from .utils.request_dispatcher import RequestDispatcher
from .utils.urls_collector_from_string import UrlCollector
from ..widgets.utils.interaction_manager import InteractionManager
email_api = Blueprint('email_api', __name__, url_prefix='/api/email')
logger = log.getLogger(__name__)
@api_role_required('admin')
def create_email_channel():
received = request.get_json()
success = True
message = "Email Channel added successfully!"
status_code = 200
data = {}
if received:
partnership_sid = received.get('partnership_id', None)
partnership_account_sid = received.get('partnership_account_id', None)
source_sid = received.get('source', None)
if partnership_sid and partnership_account_sid and source_sid:
partnership = Partnership.query.filter(Partnership.sid == partnership_sid).first()
partnership_account = PartnershipAccount \
.query \
.filter(and_(PartnershipAccount.sid == partnership_account_sid,
PartnershipAccount.partnership_id == partnership.id)) \
.first()
username = received.get('username', '')
test_server_domain = current_app.config.get('TEST_SERVER_DOMAIN', None)
if test_server_domain and current_app.config.get('DEBUG', False):
domain = test_server_domain
else:
domain = received.get('domain', None) or partnership.partner_url
email_ = f"{username}@{domain}"
is_email_valid = EmailValidator(email_).validate()
is_email_exists = EmailIdentity.is_exists(email_)
if partnership_account:
if is_email_valid:
if is_email_exists:
params = {
'username': username,
'domain': domain,
'partnership_id': partnership.id,
'partnership_account_id': partnership_account.id,
}
email = EmailIdentity.create(**params)
channel_type = ChannelType.get_by_name('email')
if channel_type:
email_add_resp = ses_client.verify_email_identity(email)
if not email_add_resp.get('status'):
message = email_add_resp.get('message', 'Something went wrong')
status_code = 500
success = False
else:
channel = Channel.create(
name=received.get('username', 'Email channel name'),
description=received.get('description', 'Email channel description'),
type=channel_type.id,
related_id=email.sid,
source=Source.get_id_from_sid(source_sid),
partnership_id=partnership.id,
partnership_account_id=partnership_account.id,
created_by=current_user.id
)
else:
status_code = 404
message = "Email address already exists"
success = False
else:
status_code = 404
message = "Invalid email address"
success = False
else:
status_code = 404
message = "Partnership account not found!"
success = False
else:
status_code = 401
message = "Invalid parameters!"
success = False
else:
status_code = 401
message = "Invalid parameters!"
success = False
return api_jsonify(data, status_code, message, success)
def aws_ses_webhook():
raw_data = request.data
data = json.loads(raw_data) or {}
print(data)
try:
if data.get('Type', None):
event_type = data.get('Type', None)
if event_type == 'SubscriptionConfirmation':
token = data.get('Token')
topic_arn = data.get('TopicArn')
sns_client.confirm_subscription(topic_arn, token)
elif event_type == 'Notification':
email_type, message = get_email_type(data)
mail_content = message['mail']
message_id = mail_content['messageId']
from_email = mail_content.get('source')
recipients = mail_content.get('destination')
if email_type == "Send":
if from_email:
domain = from_email.split('@')[1]
partnership = Partnership.get_by_partner_url(domain)
partnership_account = PartnershipAccount.get_by_email(from_email)
subject = None
for h in mail_content['headers']:
if h['name'] == 'Subject':
subject = h['value']
body = message.get('content', None)
email_identity = EmailIdentity.get_by_username(from_email, partnership_id=partnership.id)
channel = None
source_id = None
if email_identity:
channel = Channel.get_by_related_id(email_identity.id, partnership.id)
source_id = channel.source if channel else 1
else:
source_id = 1
if not channel:
channel = 1
email = Email.create_from_raw(from_email=from_email, recipients=recipients, subject=subject,
body=body, source=source_id, channel=channel,
partnership_account=partnership_account, status=email_type,
message_id=message_id, is_inbound=False)
elif email_type in ["Bounce", "Open", "Click", "Delivery"]:
email = Email.get_by_message(message_id)
if email:
email = email.update_status(email_type)
else:
print('No email with message id : ', message_id)
elif email_type == "Received":
is_valid_mail = True
if recipients:
email_identity = EmailIdentity.get_by_email(recipients[0])
if not email_identity:
print('No email identity')
return jsonify({'success': True})
partnership = Partnership.get_by_id(email_identity.partnership_id)
if not partnership:
print('No partnership')
return jsonify({'success': True})
partnership_account = PartnershipAccount.get_by_id(email_identity.partnership_account_id)
if not partnership_account:
email_channel = Channel.get_by_email_identity(email_identity.id)
partnership_account = PartnershipAccount.get_by_id(email_channel.partnership_account_id) if email_channel else None
if not partnership_account:
print('Email is not regitered in any channel')
return jsonify({'success': True})
# Get email from s3 bucket
from buyercall.blueprints.email.utils.email_parser import EmailParser
email_key = f"emails/{partnership.sid}/{message_id}"
bucket = current_app.config.get('EMAIL_BUCKET')
email_file = download_file(email_key, bucket)
if not email_file:
raise Exception('Email file download failed')
parsed_data = EmailParser().parse(email_file)
subject = parsed_data.get('subject', None)
plain_text = parsed_data.get('plain', '')
html = parsed_data.get('html', '')
from_email = parsed_data.get('from', '')
channel = None
if email_identity:
channel = Channel.get_by_related_id(email_identity.id, partnership.id)
source_id = channel.source if channel else 1
if not email_identity.is_verified:
from buyercall.blueprints.email.utils.verify_email_identity import EmailVerifier
email_bucket = current_app.config.get('EMAIL_BUCKET', None)
email_key = f'emails/{partnership.sid}/{message_id}'
if EmailVerifier.verify(email_bucket, email_key):
email_identity.is_verified = True
email_identity.save()
print('Email verified successfully - ', recipients[0])
else:
print('Email verification failed - ', recipients[0])
else:
source_id = 1
if not channel:
channel_id = 1
else:
channel_id = channel.id
email = Email.create_from_raw(from_email=from_email, recipients=recipients, subject=subject,
body=plain_text or html, source=source_id, channel=channel_id,
partnership_account=partnership_account, status=email_type,
message_id=message_id, is_inbound=True)
# Find contact
if email:
is_existing_contact = True
contact = Contact.find(from_email)
if contact:
contact.status = 'new'
contact.save()
if not contact:
is_existing_contact = False
contact = Contact.create_from_mail(from_email, partnership_account.id)
# create contact_channel_tie
if source_id:
contact_channel_tie = ContactChannelTie()
contact_channel_tie.channel_id = channel_id
contact_channel_tie.contact = contact.id
contact_channel_tie.email = email.id
contact_channel_tie.source = source_id
contact_channel_tie.save()
# Send email to lead
from buyercall.blueprints.user.tasks import send_generic_mail
config_set_name = f"{str(partnership.sid)}-config_set"
send_generic_mail.delay(subject=subject, recipients=recipients,
config_set_name=config_set_name,
text=plain_text, html=html,
sender=current_app.config.get('SES_EMAIL_SOURCE'))
assigned_agents_ids = InteractionManager().run(
'email', contact, channel, is_existing_contact, {'message': plain_text}, partnership_account.id)
else:
print('Invalid email')
else:
print('Unidentified email type : ', email_type)
else:
print('Event type not found')
else:
print(data)
except Exception as e:
print(e)
return jsonify({'success': True})
def test_send_mail():
# Send test mail
recipients = ['help@baloud.com', ]
# recipients = ['hello@botfather.in',]
plain_body = "This is a test mail"
html_body = "Test mail"
pid = "66e3c8d6-b164-4054-a431-cad6a00a3f35"
subject = "Test Subject"
partnership_account = PartnershipAccount.query.get(1)
email_type = 'Send'
mail_resp = ses_client.send_email(recipients=recipients, pid=pid, text=plain_body, html=html_body)
from_email = current_app.config.get('SES_EMAIL_SOURCE', None)
message_id = mail_resp.get('MessageId')
email = Email.create_from_raw(from_email=from_email, recipients=recipients, subject=subject, body=plain_body,
partnership_account=partnership_account,
status=email_type, message_id=message_id, raw=html_body)
return jsonify({'success': mail_resp})
def parse_email():
parsed_data = {}
try:
bucket_name = current_app.config.get('EMAIL_BUCKET', 'ses-buyercall')
email_key = '30d44pov8ujfcsc92n8q97a1ihqsk822iopa17g1'
from buyercall.lib.util_boto3_s3 import download_file
from buyercall.blueprints.email.utils.email_parser import EmailParser
email_file = download_file(email_key, bucket_name)
parsed_data = EmailParser().parse(email_file)
urls = UrlCollector.collect_urls(parsed_data.get('plain', ''))
verification_url = [url for url in urls if 'email-verification' in url]
if verification_url:
response = RequestDispatcher.request_retry(url=verification_url[0])
if response.get('success', False):
print('Email verified')
except Exception as e:
print("Error: ", e)
return api_jsonify(parsed_data)
def test_send_raw_mail():
mail_resp = None
try:
config_set = f"{current_app.config['MAIL_CONFIG_PREFIX']}-config-set"
attachments = request.files.getlist('attachments')
sender = current_app.config.get('SES_EMAIL_SOURCE', None)
recipients = ['help@baloud.com', ]
title = "Test Subject"
text = "This is a test mail"
html = """
<!DOCTYPE html>
<html>
<head></head>
<body>
<h1>HTML body</h1>
<p>Hello there</p>
</body>
</html>
"""
msg = ses_client.create_multipart_message(sender, recipients, title, text, html, attachments)
print('multiplart msg : ', msg)
mail_resp = ses_client.send_raw_mail(sender=sender, recipients=recipients,
config_set=config_set, content=msg.as_string())
message_id = mail_resp.get('MessageId')
# email = Email.create_from_raw(from_email=sender, recipients=recipients, subject=title, body=text,
# partnership_account=partnership_account, \ status='Send', message_id=message_id, raw=html) print('create in
# db :', email) if attachments: for attachment in attachments: attachment_ext = attachment.filename.split(
# '.')[-1] key = f'email/account_{pid}/{message_id}.{attachment_ext}' resp = upload_file_object(attachment,
# current_app.config['EMAIL_BUCKET'], key) print('s3 resp : ', resp)
except Exception as e:
print('Error : ', e)
return jsonify({'success': mail_resp})
def create_s3_bucket():
response = create_bucket()
return api_jsonify(data=response, message="success")
def get_email_type(data):
message = json.loads(data.get('Message', '{}'))
return message.get('notificationType', message.get('eventType', None)), message
def get_email_message_id(data):
return data.get('MessageId', None)
@api_role_required('admin', 'partner')
def create_email_template():
message = "Email template created successfully!"
status_code = 200
success = True
template_image = request.files.get('image', None)
template_data = request.form.get('data', {})
template = None
if template_data:
try:
template_data = json.loads(template_data)
except:
template_data = {}
title = template_data.get('title', '')
subject = template_data.get('subject', '')
if title and subject:
data = {
'title': template_data.get('title', ''),
'subject': template_data.get('subject', ''),
'description': template_data.get('description', ''),
'content': template_data.get('content', {}),
'is_active': template_data.get('isActive', True),
'is_plaintext': template_data.get('isPlainEmail', False),
'partnership_id': current_user.partnership_id,
'partnership_account_id': current_user.partnership_account_id
}
template = EmailTemplate.create(**data)
else:
success = False
message = "Invalid request. Email template create request failed."
status_code = 400
if template and template_image:
image_ext = template_image.filename.split('.')[-1]
key = f'email/template_images/{template.sid}.{image_ext}'
resp = upload_file_object(template_image, current_app.config['EMAIL_TEMPLATE_BUCKET'], key)
if resp:
image_key = f"{current_app.config['EMAIL_TEMPLATE_BUCKET']}::{key}"
template.image = image_key
template.save()
return api_jsonify(message=message, success=success, status_code=status_code)
@api_role_required('admin', 'partner')
def get_email_templates():
try:
message = "Email templates fetched successfully!"
data = {}
templates = EmailTemplate.query.filter(
EmailTemplate.partnership_account_id == current_user.partnership_account_id).all()
if templates:
data = EmailTemplateSchema(many=True).dump(templates)
except Exception as e:
print(e)
return api_jsonify(data=data, message=message)
@api_role_required('admin', 'partner')
def update_email_template(template_id):
status_code = 200
success = True
data = {}
message = "Email template updated successfully!"
try:
template_image = request.files.get('image', None)
template_data = request.form.get('data', None)
if template_data:
try:
template_data = json.loads(template_data)
except:
template_data = {}
if template_data:
template = EmailTemplate.query.filter(EmailTemplate.sid == template_id).first()
if template:
received_data = {
"title": template_data.get('title', ''),
"subject": template_data.get('subject', ''),
"description": template_data.get('description', ''),
"content": template_data.get('content', ''),
}
for key, value in received_data.items():
if value:
setattr(template, key, received_data[key])
template.is_active = template_data.get('isActive', True)
template.is_plaintext = template_data.get('isPlainEmail', True)
template.save()
if template_image:
image_ext = template_image.filename.split('.')[-1]
key = f'email/template_images/{template.sid}.{image_ext}'
resp = upload_file_object(template_image, current_app.config['EMAIL_TEMPLATE_BUCKET'], key)
if resp:
image_key = f"{current_app.config['EMAIL_TEMPLATE_BUCKET']}::{key}"
template.image = image_key
template.save()
else:
message = "Email template doesn't exists!"
status_code = 404
success = False
else:
message = "Invalid parameters!"
status_code = 401
success = False
except Exception as e:
print(e)
return api_jsonify(data=data, message=message, status_code=status_code, success=success)
@api_role_required('admin', 'partner')
def get_email_template_by_id(template_id):
message = "Email template fetched successfully!"
status_code = 200
success = True
data = {}
template = EmailTemplate.query.filter(EmailTemplate.sid == template_id).first()
if not template:
message = "Email template doesn't exists!"
status_code = 404
success = False
else:
data = EmailTemplateSchema().dump(template)
return api_jsonify(data=data, message=message, status_code=status_code, success=success)
@api_role_required('admin', 'partner')
def delete_email_template(template_id):
message = "Email template deleted successfully!"
status_code = 204
success = True
template = EmailTemplate.query.filter(EmailTemplate.sid == template_id).first()
if not template:
message = "Email template doesn't exists!"
status_code = 404
success = False
else:
data = template.delete()
return api_jsonify(message=message, status_code=status_code, success=success)
@api_role_required('admin', 'partner', 'sysadmin')
def list_all_identites():
identities = ses_client.list_all_identities()
if identities['status']:
status_code = 200
message = "Successfully fetched the identites"
else:
status_code = 500
message = identities['message']
return api_jsonify(data=identities['data'], status_code=status_code, success=identities['status'], message=message)
@api_role_required('admin', 'partner', 'sysadmin')
def list_email_identities():
identities = ses_client.list_email_identities()
if identities['status']:
status_code = 200
message = "Successfully fetched the email identites"
else:
status_code = 500
message = identities['message']
return api_jsonify(data=identities['data'], status_code=status_code, success=identities['status'], message=message)
@api_role_required('admin', 'partner', 'sysadmin')
def list_domain_identities():
identities = ses_client.list_domain_identities()
if identities['status']:
status_code = 200
message = "Successfully fetched the domain identites"
else:
status_code = 500
message = identities['message']
return api_jsonify(data=identities['data'], status_code=status_code, success=identities['status'], message=message)