File: //proc/1233/root/home/arjun/projects/buyercall/buyercall/blueprints/email/models.py
import json
import uuid
from sqlalchemy.dialects import postgresql
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.ext.hybrid import hybrid_property
from buyercall.extensions import db
from buyercall.lib.util_sqlalchemy import ResourceMixin
class Email(ResourceMixin, db.Model):
__tablename__ = 'email'
id = db.Column(db.Integer, primary_key=True)
sid = db.Column(UUID(as_uuid=True), unique=True, default=uuid.uuid4, index=True)
first_name = db.Column(db.String(200))
last_name = db.Column(db.String(200), nullable=True)
email = db.Column(db.String(120), index=True)
source = db.Column(
db.Integer,
db.ForeignKey('source.id', onupdate='CASCADE', ondelete='CASCADE'),
index=True,
nullable=True
)
channel = db.Column(
db.Integer,
db.ForeignKey('channel.id', onupdate='CASCADE', ondelete='CASCADE'),
index=True,
nullable=True
)
is_inbound = db.Column(db.Boolean(), nullable=False, server_default='true')
is_forward = db.Column(db.Boolean(), nullable=False, server_default='false')
contact_id = db.Column(
db.Integer(),
db.ForeignKey('contacts.id', onupdate="CASCADE", ondelete='SET_NULL'),
index=True,
nullable=True
)
message_id = db.Column(db.String(256), nullable=True)
meta_data = db.Column(postgresql.JSON, nullable=False, server_default='{}')
partnership_account_id = db.Column(db.Integer, db.ForeignKey(
'partnership_accounts.id', onupdate='CASCADE', ondelete='CASCADE'), index=True, nullable=False)
recipients = db.Column(postgresql.ARRAY(db.String(320)))
def __str__(self):
return self.email
@classmethod
def get_by_message(cls, message_id):
return cls.query.filter(cls.message_id == message_id).first()
def update_status(self, status):
mdata = json.loads(self.meta_data)
mdata['status'] = status
self.meta_data = json.dumps(mdata)
self.save()
return self
@classmethod
def create_from_raw(cls, from_email, recipients, subject, body, partnership_account, status, channel,
source, raw=None, message_id=None, is_inbound=True):
email = Email()
email.first_name = from_email.split('@')[0]
email.source = source
email.channel = channel
email.email = from_email
email.message_id = message_id
email.is_inbound = is_inbound
email.partnership_account_id = partnership_account.id if partnership_account else 1
email.recipients = recipients
email.meta_data = json.dumps({
'subject': subject,
'plain': body,
'html': body,
'status': status,
'message_id': message_id,
'raw': raw
})
email.save()
return email
class EmailIdentity(ResourceMixin, db.Model):
__tablename__ = 'email_identity'
id = db.Column(db.Integer, primary_key=True)
sid = db.Column(UUID(as_uuid=True), unique=True, default=uuid.uuid4, index=True)
username = db.Column(db.String(128), index=True)
domain = db.Column(db.String(255), index=True)
partnership_account_id = db.Column(db.Integer, db.ForeignKey(
'partnership_accounts.id', onupdate='CASCADE', ondelete='CASCADE'), index=True, nullable=True)
partnership_id = db.Column(db.Integer, db.ForeignKey(
'partnerships.id', onupdate='CASCADE', ondelete='CASCADE'), index=True, nullable=False)
is_verified = db.Column('is_verified', db.Boolean(), nullable=False, server_default='0')
def __str__(self):
return self.username
@hybrid_property
def email(self):
return f"{self.username}@{self.domain}"
@classmethod
def is_exists(cls, email):
if cls.query.filter(cls.email == email).first():
return True
return False
@classmethod
def get_by_email(cls, email: str):
try:
username, domain = email.split('@')
return cls.query.filter(cls.username == username, cls.domain == domain).first()
except Exception:
return None
@classmethod
def get_by_username(cls, username, partnership_id):
return cls.query.filter(cls.username == username, cls.partnership_id == partnership_id).first()
class EmailTemplate(ResourceMixin, db.Model):
__tablename__ = 'email_template'
id = db.Column(db.Integer, primary_key=True)
sid = db.Column(UUID(as_uuid=True), unique=True, default=uuid.uuid4, index=True)
title = db.Column(db.String(128), index=True)
subject = db.Column(db.String(128), index=True)
description = db.Column(db.String(255), nullable=True)
content = db.Column(postgresql.JSON, nullable=False, server_default='{}')
is_active = db.Column(db.Boolean(), nullable=False, server_default='true')
is_plaintext = db.Column(db.Boolean(), nullable=False, server_default='true')
image = db.Column(db.String(128), nullable=True)
partnership_account_id = db.Column(db.Integer, db.ForeignKey(
'partnership_accounts.id', onupdate='CASCADE', ondelete='CASCADE'), index=True, nullable=True)
partnership_id = db.Column(db.Integer, db.ForeignKey(
'partnerships.id', onupdate='CASCADE', ondelete='CASCADE'), index=True, nullable=False)
def __str__(self):
return self.title