File: //proc/thread-self/root/home/arjun/projects/buyercall/buyercall/blueprints/channels/endpoints.py
import logging as log
from email.utils import parseaddr
from flask import Blueprint, request, current_app
from flask.views import MethodView
from flask_login import current_user
from sqlalchemy import func
from buyercall.blueprints.channels.models import Channel, ChannelType
from buyercall.blueprints.channels.serializers import ChannelOutMiniSchema, ChannelTypeInSchema, ChannelTypeOutSchema
from buyercall.blueprints.email.models import EmailIdentity
from buyercall.blueprints.email.utils.add_email_identity import AddEmailIdentity
from buyercall.blueprints.partnership.models import Partnership, PartnershipAccount
from buyercall.blueprints.phonenumbers.models import Phone
from buyercall.blueprints.sources.models import Source
from buyercall.blueprints.user.decorators import api_role_required
from buyercall.blueprints.user.models import User
from buyercall.blueprints.widgets.models import Widget
from buyercall.extensions import ses_client
from buyercall.lib.util_rest import BlueprintMixin
from buyercall.lib.util_rest import api_jsonify
channels_api = Blueprint('channelsapi', __name__, url_prefix='/api/channels')
logger = log.getLogger(__name__)
class ChannelTypeApi(MethodView, BlueprintMixin):
serializers = {
"get": {
'out': ChannelTypeOutSchema
},
'post': {
'in': ChannelTypeInSchema,
'out': ChannelTypeOutSchema
},
'put': {
'in': ChannelTypeInSchema,
'out': ChannelTypeOutSchema
},
'patch': {
'in': ChannelTypeInSchema,
'out': ChannelTypeOutSchema
}
}
permission_classes = {
"get": []
}
def get(self, channel_type_id=None):
if not channel_type_id:
queryset = ChannelType.query.all()
serializer = self.get_serializer_class(many=True, key='out')
message = "Channel types fetched successfully."
else:
queryset = ChannelType.query.filter(ChannelType.sid == channel_type_id).first()
serializer = self.get_serializer_class(key='out')
message = "Channel type fetched successfully."
return api_jsonify(data=serializer.dump(queryset), message=message)
def post(self):
serializer = self.get_serializer_class(key='in')
data = serializer.load(request.get_json())
ChannelType.create(**data)
message = "Channel type created successfully."
status_code = 302
return api_jsonify(data=[], message=message, status_code=status_code)
def put(self, channel_type_id):
serializer_in = self.get_serializer_class(key='in')
data = serializer_in.load(request.get_json())
channel_type = ChannelType.get_by_sid(channel_type_id)
updated_ctype = channel_type.put(return_object=True, **data)
if updated_ctype:
message = "Channel type updated successfully."
serializer_out = self.get_serializer_class(key='out')
response = serializer_out.dump(updated_ctype)
status_code = 200
success = True
else:
message = "Channel type update failure."
response = {}
status_code = 500
success = False
return api_jsonify(data=response, message=message, status_code=status_code, success=success)
@api_role_required('admin', 'partner')
def is_email_valid(email_prefix):
partnership = Partnership.query.filter(Partnership.id == current_user.partnership_id).first()
if not partnership:
partnership = Partnership.query.get(1)
partner_url = partnership.partner_url
test_server_domain = current_app.config.get('TEST_SERVER_DOMAIN', None)
if test_server_domain and current_app.config.get('DEBUG', False):
email_address = f"{email_prefix}@{test_server_domain}"
else:
email_address = f"{email_prefix}@{partner_url}"
is_valid = '@' in parseaddr(email_address)[1]
email_identites = ses_client.list_email_identities()
if is_valid and email_address not in email_identites['data']:
is_email_valid = True
message = "Email address is valid"
else:
if not is_valid:
message = "Invalid email format"
else:
message = "Email already exists"
is_email_valid = False
return api_jsonify(message=message, data={"isValid": is_email_valid})
# class ChannelApi(MethodView, BlueprintMixin):
# serializers = {
# "get": {
# 'out': ChannelOutSchema
# },
# 'post': {
# 'in': ChannelInSchema,
# 'out': ChannelOutSchema
# },
# 'put': {
# 'in': ChannelInSchema,
# 'out': ChannelOutSchema
# },
# 'patch': {
# 'in': ChannelInSchema,
# 'out': ChannelOutSchema
# }
# }
# permission_classes = {
# "get": []
# }
# def get(self, channel=None):
# if not channel:
# queryset = Channel.query.all()
# serializer = self.get_serializer_class(many=True, key='out')
# message = "Channels fetched successfully."
# else:
# queryset = Channel.query.filter(Channel.sid == channel).first()
# serializer = self.get_serializer_class(key='out')
# message = "Channel fetched successfully."
# return api_jsonify(data=serializer.dump(queryset), message=message)
# def post(self):
# serializer = self.get_serializer_class(key='in')
# data = serializer.load(request.get_json())
# # Add partnership and account details
# data['partnership_id'] = current_user.partnership_id
# data['partnership_account_id'] = current_user.partnership_account_id
# data['created_by'] = current_user.id
# Channel.create(**data)
# message="Channel created successfully."
# status_code = 302
# return api_jsonify(data=[], message=message, status_code=status_code)
# def put(self, source_id):
# serializer_in = self.get_serializer_class(key='in')
# data = serializer_in.load(request.get_json())
# source = Channel.update(source_id, **data)
# message="Channel updated successfully."
# serializer_out = self.get_serializer_class(key='out')
# return api_jsonify(data=serializer_out.dump(source), message=message)
@api_role_required('admin', 'partner')
def get_channel_by_id(cid):
channel = Channel.query.filter(Channel.sid == cid).first()
_ch = {}
if channel:
channel_type = ChannelType.get_name_by_id(channel.type)
_ch['id'] = channel.sid
_ch['updated_at'] = channel.updated_on
_ch['created_at'] = channel.created_on
created_by = User.get_by_id(channel.created_by)
updated_by = User.get_by_id(channel.updated_by)
_ch['created_by'] = f"{created_by.firstname} {created_by.lastname}" if created_by else None
_ch['updated_by'] = f"{updated_by.firstname} {updated_by.lastname}" if updated_by else None
_ch['created_by_agent_id'] = created_by.sid if created_by else None
_ch['updated_by_agent_id'] = updated_by.sid if updated_by else None
_ch['partnership_id'] = Partnership.get_sid_from_id(channel.partnership_id) if channel.partnership_id else None
_ch['partnership_account_id'] = PartnershipAccount.get_sid_from_id(
channel.partnership_account_id) if channel.partnership_account_id else None
_ch['partnershipName'] = Partnership.get_name(channel.partnership_id) if channel.partnership_id else None
_ch['partnershipAccountname'] = PartnershipAccount.get_name(
channel.partnership_account_id) if channel.partnership_account_id else None
_ch['sourceId'] = Source.get_sid_from_id(channel.source) if channel.source else None
_ch['name'] = channel.name
_ch['description'] = channel.description
_widgets = []
for widg in channel.widgets:
_widgets.append(widg.guid)
_ch['widget'] = _widgets
if channel_type == 'phone-number':
_phone = Phone.get_by_id(channel.related_id)
if _phone:
_ch['numberType'] = "Local" if _phone.local else "Toll Free"
_ch['provisionedPhoneNumber'] = _phone.phonenumber if _phone.phonenumber else None
_ch['provisionedPhoneNumberId'] = _phone.sid
elif channel_type == 'email':
_email = EmailIdentity.get_by_id(channel.related_id)
if _email:
_ch['partnerDomain'] = _email.domain
_ch['emailprefix'] = _email.username
message = "Channel fetched successfully "
status_code = 200
success = True
else:
message = "Channel does not exist"
status_code = 500
success = False
return api_jsonify(_ch, message=message, status_code=status_code, success=success)
@api_role_required('admin', 'partner')
def update_channel(cid):
received = request.get_json()
channel = Channel.query.filter(Channel.sid == cid).first()
if channel:
name_status = True
type_status = True
source_status = True
channel_type = ChannelType.query.get(channel.type)
filtered_widgets = []
if 'name' in received.keys():
name = received.get('name', None)
name_status = True if name else False
if 'widget' in received.keys():
widget_ids = received.get('widget', [])
if not isinstance(widget_ids, list):
widget_ids = [widget_ids]
if widget_ids and type_status and channel_type:
for widget_id in widget_ids:
widget = Widget.query.filter(Widget.guid == widget_id).first()
if widget:
# is_channel_type_exists = Channel.is_widget_channel_type_exist(channel_type.id, widget.guid)
# if not is_channel_type_exists:
filtered_widgets.append(widget)
if 'source' in received.keys():
source_id = Source.get_id_from_sid(received.get('source', None))
source_status = True if source_id else False
if name_status and type_status and source_status:
name = received.get('name', None)
description = received.get('description', None)
content = received.get('content', None)
is_active = received.get('is_active', None)
source = received.get('source', None)
if name:
channel.name = name
if description:
channel.description = description
if content:
channel.content = content
if is_active:
channel.is_active = is_active if 'is_active' in received else channel.is_active
# if c_type:
# channel_type = ChannelType.get_by_name(type_name)
# if channel_type:
# channel.type = channel_type.id
channel.updated_by = current_user.id
channel.widgets = filtered_widgets
if source:
source_id = Source.get_id_from_sid(received.get('source'))
if source_id:
channel.source = source_id
channel.save()
message = "Channel updated successfully"
return api_jsonify(message=message)
else:
keyword = ""
message = ""
if not name_status:
keyword = "Channel name cannot be null"
if not type_status:
keyword = f"Channel type cannot be null, {keyword}" if keyword else "Channel type cannot be null"
if not source_status:
keyword = f"Source does not exist, {keyword}" if keyword else "Source does not exist"
status_code = 401
success = False
else:
message = "Channel not found"
status_code = 404
success = False
return api_jsonify(message=message, status_code=status_code, success=success)
@api_role_required('admin', 'partner')
def get_channels_by_type(channel_type):
success = True
message = "Channels fetched successfully!"
status_code = 200
try:
if channel_type:
try:
_channel_type = ChannelType.get_by_name(channel_type)
if _channel_type:
partnership_account_id = current_user.partnership_account_id or 1
channels = Channel.query.filter(Channel.partnership_account_id == partnership_account_id,
Channel.type == _channel_type.id).order_by(
Channel.updated_on.desc(), Channel.created_on.desc())
else:
success = False
message = "Invalid channel type"
status_code = 404
except Exception as e:
print(e)
channels_list = []
for channel in channels:
created_by = User.get_by_id(channel.created_by)
updated_by = User.get_by_id(channel.updated_by)
_ch = {'id': channel.sid, 'updated_at': channel.updated_on, 'created_at': channel.created_on,
'created_by': f"{created_by.firstname} {created_by.lastname}" if created_by else None,
'updated_by': f"{updated_by.firstname} {updated_by.lastname}" if updated_by else None,
'created_by_agent_id': created_by.sid if created_by else None,
'updated_by_agent_id': updated_by.sid if updated_by else None,
'partnership_id': Partnership.get_sid_from_id(
channel.partnership_id) if channel.partnership_id else None,
'partnership_account_id': PartnershipAccount.get_sid_from_id(
channel.partnership_account_id) if channel.partnership_account_id else None,
'partnershipName': Partnership.get_name(channel.partnership_id) if channel.partnership_id else None,
'partnershipAccountname': PartnershipAccount.get_name(
channel.partnership_account_id) if channel.partnership_account_id else None,
'sourceId': Source.get_sid_from_id(channel.source) if channel.source else None, 'name': channel.name,
'description': channel.description}
if channel_type == 'phone-number':
_phone = Phone.get_by_id(channel.related_id)
if _phone:
_ch['numberType'] = "Local" if _phone.local else "Toll Free"
_ch['provisionedPhoneNumber'] = _phone.phonenumber if _phone.phonenumber else None
_ch['provisionedPhoneNumberId'] = _phone.sid
elif channel_type == 'email':
_email = EmailIdentity.get_by_id(channel.related_id)
if _email:
_ch['partnerDomain'] = _email.domain
_ch['emailprefix'] = _email.username
channels_list.append(_ch)
except Exception as e:
channels_list = []
print(e)
return api_jsonify(channels_list, message=message, success=success, status_code=status_code)
@api_role_required('admin', 'partner')
def get_channels():
message = "Account channels fetched successfully!"
partnership_account_id = current_user.partnership_account_id or 1
serializer = []
try:
channels = Channel.query.filter(
Channel.partnership_account_id == partnership_account_id).all()
serializer = ChannelOutMiniSchema(many=True)
except Exception as e:
print(e)
return api_jsonify(message=str(e), status_code=500, success=False)
return api_jsonify(serializer.dump(channels), message)
@api_role_required('admin', 'partner')
def get_channel_types():
success = True
message = "Channel types fetched successfully!"
status_code = 200
queryset = ChannelType.query.all()
serializer = ChannelTypeOutSchema(many=True)
return api_jsonify(serializer.dump(queryset), status_code, message, success)
@api_role_required('admin', 'partner')
def get_channel_type_count():
success = True
message = "Channel meta details fetched successfully!"
status_code = 200
data = {}
try:
channels = Channel.query \
.join(Channel.channel_type) \
.with_entities(ChannelType.name, func.count(Channel.type)) \
.filter(Channel.partnership_id == current_user.partnership_id,
Channel.partnership_account_id == current_user.partnership_account_id) \
.group_by(ChannelType.name).all()
for channel in channels:
data[channel[0]] = channel[1]
except Exception as e:
success = False
message = str(e)
status_code = 500
return api_jsonify(data, message=message, success=success, status_code=status_code)
@api_role_required('admin', 'partner')
def create_channels():
received = request.get_json()
name = received.get('name', None)
type_name = received.get('type', None)
widget_ids = received.get('widget', None)
type_ = ChannelType.get_by_name(type_name)
source_id = Source.get_id_from_sid(received.get('source', None))
status = None
filtered_widgets = []
for widget_id in widget_ids:
status = Channel.is_widget_channel_type_exist(type_.id, widget_id) if type_ and widget_id else False
widget = Widget.query.filter(Widget.guid == widget_id).first() if widget_id else None
if widget and not status:
filtered_widgets.append(widget)
if type_ and name and source_id:
_email = None
if type_name == 'email':
username = received.get('emailPrefix', '')
if username:
partnership = Partnership.query.filter(Partnership.id == current_user.partnership_id).first()
if not partnership:
partnership = Partnership.query.get(1)
resp = AddEmailIdentity().add(partnership, username)
if resp.get('status', None):
if current_app.config.get('TEST_SERVER_DOMAIN', None) and current_app.config.get('DEBUG', False):
domain = current_app.config.get('TEST_SERVER_DOMAIN', None)
else:
domain = partnership.partner_url
email_data = {
'username': username,
'domain': domain,
'partnership_account_id': current_user.partnership_account_id or 1,
'partnership_id': current_user.partnership_id or 1
}
_email = EmailIdentity.create(**email_data)
else:
return api_jsonify({}, status_code=500, success=False, message=resp.get('message', ''))
data = {"name": name, "description": received.get('description', ""),
"type": type_.id, "content": received.get('content', {}),
"source": source_id, "partnership_id": current_user.partnership_id or 1,
"partnership_account_id": current_user.partnership_account_id or 1,
"created_by": current_user.id, "widgets": filtered_widgets}
if _email:
data["related_id"] = _email.id
Channel.create(**data)
status_code = 200
success = True
message = "Channel added successfully."
else:
status_code = 401
success = False
message = ""
keyword = ""
if not type_:
keyword = "Channel type"
if not name:
keyword = f"Channel name, {keyword}" if keyword else "Channel name"
if not source_id:
keyword = f"{keyword} and Source" if keyword else "Source"
if keyword:
message = f"{keyword} is required"
if status:
message = f"{message}. Widget with the {type_name} channel type already exists." if message else f"Widget with the {type_name} channel type already exists."
return api_jsonify({}, status_code=status_code, success=success, message=message)