File: //proc/thread-self/root/home/arjun/projects/buyercall/buyercall/blueprints/channels/models.py
import uuid
from sqlalchemy.dialects.postgresql import UUID, JSON
from buyercall.blueprints.chat.models import Chat
from buyercall.blueprints.email.models import Email, EmailIdentity
from buyercall.blueprints.email.serializers import EmailSchema, EmailIdentitySchema
from buyercall.blueprints.phonenumbers.models import Phone
from buyercall.blueprints.phonenumbers.serializers import PhoneSchema
from buyercall.blueprints.sources.models import Source
from buyercall.extensions import db
from buyercall.lib.util_sqlalchemy import ResourceMixin
class ChannelType(ResourceMixin, db.Model):
__tablename__ = 'channel_type'
id = db.Column(db.Integer, primary_key=True)
sid = db.Column(UUID(as_uuid=True), unique=True, default=uuid.uuid4, index=True)
name = db.Column(db.String(36), nullable=False)
parent_type_id = db.Column(db.Integer, db.ForeignKey('channel_type.id'), index=True)
parent_type = db.relationship('ChannelType', remote_side=id, backref='sub_types')
def __str__(self):
return self.name or self.sid
@classmethod
def get_by_name(cls, name):
return cls.query.filter(cls.name == name).first() if name else None
@classmethod
def get_name_by_sid(cls, sid):
ct = cls.query.filter(cls.sid == sid).first() if sid else None
return ct.name if ct else None
@classmethod
def get_name_by_id(cls, _id):
ct = cls.query.filter(cls.id == _id).first() if _id else None
return ct.name if ct else None
widget_channel_association_table = db.Table('widget_channel_association', db.Model.metadata,
db.Column('widget_id', db.ForeignKey('widgets.id'), primary_key=True),
db.Column('channel_id', db.ForeignKey('channel.id'), primary_key=True)
)
class Channel(ResourceMixin, db.Model):
__tablename__ = 'channel'
id = db.Column(db.Integer, primary_key=True)
sid = db.Column(UUID(as_uuid=True), unique=True, default=uuid.uuid4, index=True)
name = db.Column(db.String(36), nullable=False)
description = db.Column(db.String(300), nullable=False)
type = db.Column(db.Integer, db.ForeignKey('channel_type.id', onupdate='CASCADE',
ondelete='SET NULL'), index=True, nullable=True)
channel_type = db.relationship("ChannelType", backref="channels")
related_id = db.Column(db.Integer, nullable=True)
content = db.Column(JSON, nullable=False, server_default='{}')
is_active = db.Column('is_active', db.Boolean(), nullable=False, server_default='1')
source = db.Column(db.Integer, db.ForeignKey('source.id', onupdate='CASCADE',
ondelete='SET NULL'), index=True, nullable=True)
partnership_id = db.Column(db.Integer, db.ForeignKey('partnerships.id',
onupdate='CASCADE',
ondelete='CASCADE'),
index=True, nullable=True)
partnership_account_id = db.Column(db.Integer, db.ForeignKey('partnership_accounts.id',
onupdate='CASCADE',
ondelete='CASCADE'),
index=True, nullable=True)
created_by = db.Column(db.Integer, db.ForeignKey('users.id', onupdate='CASCADE',
ondelete='CASCADE'), index=True, nullable=True)
updated_by = db.Column(db.Integer, db.ForeignKey('users.id', onupdate='CASCADE',
ondelete='CASCADE'), index=True, nullable=True)
widgets = db.relationship("Widget", secondary=widget_channel_association_table,
backref=db.backref('channels', lazy=True))
def __str__(self):
return self.name or self.sid
def get_related_object(self):
try:
_type = ChannelType.query.filter(ChannelType.id == self.type).first()
if _type.name == 'phone-number':
queryset = Phone.query.filter(Phone.id == self.related_id).first()
return PhoneSchema().dump(queryset)
elif _type.name == 'email':
queryset = EmailIdentity.query.filter(EmailIdentity.id == self.related_id).first()
return EmailIdentitySchema().dump(queryset)
elif _type.name == 'chat':
return Chat.query.filter(Chat.id == self.related_id).first()
else:
return None
except Exception as e:
return None
@classmethod
def get_by_email_identity(cls, email_identity_id):
channel_type = ChannelType.get_by_name('email')
return cls.query.filter(cls.related_id == email_identity_id, cls.channel_type == channel_type.id).first()
def get_source(self):
return Source.query.filter(Source.id == self.source).first()
@classmethod
def is_widget_channel_type_exist(cls, type_, widget):
channel = Channel.query.filter(Channel.type == type_, Channel.widgets.any(guid=widget)).first()
return True if channel else False
@classmethod
def get_all_by_partnership_account(cls, paid):
return cls.query.filter(cls.partnership_account_id == paid, cls.is_active == True).all()
@classmethod
def get_by_related_id(cls, rid, pid):
return cls.query.filter(cls.related_id == rid, cls.partnership_id == pid).first()
# def is_alive(self):
# if self.type == 'EMAIL':
# # Get domain and email(ses identities) verification status
# ses_config = {
# 'SES_REGION_NAME': current_app.config.get('SES_REGION_NAME', None),
# 'AMAZON_ACCESS_KEY': current_app.config.get('AMAZON_ACCESS_KEY', None),
# 'AMAZON_SECRET_KEY': current_app.config.get('AMAZON_SECRET_KEY', None),
# 'SES_EMAIL_SOURCE': current_app.config.get('SES_EMAIL_SOURCE', None)
# }
# ses_identities = [self.content['email'], self.content['partner_url']]
# identity_status = ses_client.check_identity_verify_status(ses_identities)
# if 'VerificationAttributes' in identity_status['data']:
# for idt, val in identity_status['data']['VerificationAttributes'].items():
# if val['VerificationStatus'] == 'Success':
# return True
# return False
# return True