File: //proc/self/root/home/arjun/projects/buyercall/buyercall/blueprints/agents/endpoints.py
import logging as log
from flask import Blueprint, jsonify, request
from flask_login import current_user, login_required
from sqlalchemy import and_
from buyercall.blueprints.agents.models import AgentSchedule, Agent, Team
from buyercall.blueprints.email.models import Email, EmailIdentity, EmailTemplate
from buyercall.blueprints.sources.models import Source
from buyercall.blueprints.user.decorators import api_role_required, role_required
from buyercall.extensions import db, ses_client
from buyercall.lib.util_rest import api_jsonify
agent_api = Blueprint('agentsapi', __name__, url_prefix='/api/agents')
logger = log.getLogger(__name__)
def getboolean(param):
if param == 'true' or param == 'True' or (isinstance(param, bool) and param == True):
return True
else:
return False
@role_required('agent', 'admin')
def update_schedule(sid):
try:
status_code = 200
success = True
message = "Agent schedule & availability updated successfully!"
data = []
form_data = request.get_json()
available_hours = []
agent = Agent.query.filter(Agent.sid == current_user.agent.sid).first()
from buyercall.blueprints.partnership.models import Partnership
partnership = Partnership.query.filter(Partnership.id == current_user.partnership_id).first()
if not agent:
status_code = 404
success = False
message = "Agent not found!"
data = []
else:
schedules_exist = AgentSchedule.query.filter(AgentSchedule.agent_id == agent.id).count()
partnership_account_id = agent.partnership_account_id
if 'availableHours' in form_data:
available_hours = form_data['availableHours']
if 'allHours' in form_data:
agent.all_hours = form_data['allHours']
agent.save()
if schedules_exist > 0:
for day_obj in available_hours:
if 'schedule_id' in day_obj:
if day_obj['schedule_id']:
AgentSchedule.update(
day_obj["schedule_id"],
str(day_obj["start"]),
str(day_obj["stop"]),
getboolean(str(day_obj["is_active"])),
int(partnership_account_id)
)
elif form_data:
day_params = [{
'day': int(dayobj['day_id']),
'available_from': str(dayobj['start']),
'available_to': str(dayobj['stop']),
'is_active': getboolean(str(dayobj["is_active"])),
'partnership_account_id': int(partnership_account_id),
'agent_id': int(agent.id)
} for dayobj in available_hours]
if available_hours and day_params:
AgentSchedule.query.filter(AgentSchedule.agent_id == agent.id).delete()
day_result = AgentSchedule.create(*day_params)
else:
pass
es_data = {
'user_id': current_user.sid,
'notify_message_type': 'SCHEDULE_SETTINGS_EDITED',
'user_related_entities': "You've",
'other_user_related_entities': f'{current_user.firstname} {current_user.lastname}',
'hyperlink': f'{partnership.partner_url}/settings'
}
from buyercall.blueprints.notification.utilities import send_notifications
es_response = send_notifications(**es_data)
logger.info(f'Updated schedule for agent: {sid}')
except Exception as e:
logger.error(f"{e}")
status_code = 500
success = False
message = f"Agent schedule & availability updation failed! {e}"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
@login_required
@api_role_required('admin', 'agent')
def get_schedule():
try:
status_code = 200
success = True
message = "Agent schedule & availability fetched successfully!"
data = {}
agent = Agent.query.filter(Agent.user_id == current_user.id).first()
all_hours = False
agent_schedules = []
if agent:
all_hours = agent.all_hours
agent_schedules = AgentSchedule.query.filter(AgentSchedule.agent_id == agent.id)
available_hours = []
for schedule in agent_schedules:
_day_schedule = {'is_active': schedule.is_active, 'day_id': schedule.day, 'schedule_id': schedule.sid,
'start': schedule.available_from, 'stop': schedule.available_to}
available_hours.append(_day_schedule)
data['allHours'] = all_hours
data['availableHours'] = available_hours
else:
status_code = 404
success = False
message = f"Agent not found"
data = []
except Exception as ex:
status_code = 500
success = False
message = f"Agent schedule & availability fetching failed! Error: {ex}"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
# @api_role_required('sysadmin', 'admin', 'partner')
def create_agent():
received = request.get_json()
partnership_sid = received.get('partnership_id', None)
partnership_account_sid = received.get('partnership_account_id', None)
agent = None
success = True
message = "Agent added successfully!"
status_code = 200
data = {}
from buyercall.blueprints.partnership.models import PartnershipAccount, Partnership
from buyercall.blueprints.user.models import User
if partnership_sid and partnership_account_sid:
partnership = Partnership.query.filter(Partnership.sid == partnership_sid).first()
partnership_account = PartnershipAccount \
.query \
.filter(and_(PartnershipAccount.sid == partnership_account_sid,
PartnershipAccount.partnership_id == partnership.id)) \
.first()
if received and partnership_account:
# Check Subscription details if needed
#
# total_agents = Agent.query.filter(partnership_account.id == Agent.partnership_account_id).count()
# subscription = (db.session.query(Subscription))\
# .filter(Subscription.id == partnership.subscription_id)\
# .first()
# if total_agents is not None and subscription is not None:
# if total_agents >= subscription.agent_limit:
# api.abort(code=400, message="Error creating agent. Agent limit exceeded.")
# else:
result_all_hours = True
result_firstname = ''
result_lastname = ''
result_title = ''
result_email = ''
result_mobile = ''
result_department = ''
result_description = ''
result_timezone = 'US/Central'
if 'all_hours' in received:
result_all_hours = received['all_hours']
if 'firstname' in received:
result_firstname = received['firstname']
if 'lastname' in received:
result_lastname = received['lastname']
if 'title' in received:
result_title = received['title']
if 'email' in received:
result_email = received['email']
if 'department' in received:
result_department = received['department']
if 'description' in received:
result_description = received['description']
if 'timezone' in received:
result_timezone = received['timezone']
if 'mobile_number' in received:
result_mobile = received['mobile_number']
params = {
'user_id': None,
'firstname': result_firstname,
'lastname': result_lastname,
'title': result_title,
'email': result_email,
'phonenumber': received['phone_number'],
'mobile': result_mobile,
'extension': None,
'department': result_department,
'description': result_description,
'partnership_account_id': partnership_account.id,
'all_hours': result_all_hours,
'timezone': result_timezone
}
agent_added = Agent.create(params)
if agent_added > 0:
day1param = {
'day': 0,
'available_from': '08:00 AM',
'available_to': '17:00 PM',
'is_active': False,
'partnership_account_id': partnership_account.id,
'agent_id': int(agent_added)
}
day2param = {
'day': 1,
'available_from': '08:00 AM',
'available_to': '17:00 PM',
'is_active': True,
'partnership_account_id': partnership_account.id,
'agent_id': int(agent_added)
}
day3param = {
'day': 2,
'available_from': '08:00 AM',
'available_to': '17:00 PM',
'is_active': True,
'partnership_account_id': partnership_account.id,
'agent_id': int(agent_added)
}
day4param = {
'day': 3,
'available_from': '08:00 AM',
'available_to': '17:00 PM',
'is_active': True,
'partnership_account_id': partnership_account.id,
'agent_id': int(agent_added)
}
day5param = {
'day': 4,
'available_from': '08:00 AM',
'available_to': '17:00 PM',
'is_active': True,
'partnership_account_id': partnership_account.id,
'agent_id': int(agent_added)
}
day6param = {
'day': 5,
'available_from': '08:00 AM',
'available_to': '17:00 PM',
'is_active': True,
'partnership_account_id': partnership_account.id,
'agent_id': int(agent_added)
}
day7param = {
'day': 6,
'available_from': '08:00 AM',
'available_to': '17:00 PM',
'is_active': False,
'partnership_account_id': partnership_account.id,
'agent_id': int(agent_added)
}
day_result = AgentSchedule.create(day1param,
day2param,
day3param,
day4param,
day5param,
day6param,
day7param)
if agent_added and day_result:
agent = agent_added
# Set an agent terms of service as true based on BuyerCall TOS
tos = True
try:
params = {
'role': 'agent',
'firstname': agent.firstname,
'lastname': agent.lastname,
'email': agent.email,
'company': '',
'title': agent.title or '',
'department': agent.department or '',
'phonenumber': agent.phonenumber,
'extension': agent.extension,
'partnership_id': partnership.id,
'partnership_account_id': partnership_account.id,
'tos_agreement': tos
}
user = User(**params)
user.save()
agent.user_id = user.id
db.session.add(agent)
db.session.commit()
reset_token = user.serialize_token()
from buyercall.blueprints.user.tasks import send_agent_invitation_email
send_agent_invitation_email.delay(user.id, reset_token, user.partnership.name,
user.partnership.logo, user.partnership.partner_url)
except Exception as e:
db.session.rollback()
else:
status_code = 404
message = "Partnership account not found!"
success = False
else:
status_code = 401
message = "Invalid parameters!"
success = False
return api_jsonify(data, status_code, message, success)
@api_role_required('sysadmin', 'admin', 'partner')
def get_all_account_agents():
success = True
message = "Agents fetched successfully!"
status_code = 200
from buyercall.blueprints.partnership.models import Partnership, PartnershipAccount
partnership_account = PartnershipAccount.query.filter(
PartnershipAccount.id == current_user.partnership_account_id).first()
_partnership = None
if not partnership_account:
_partnership = Partnership.query.filter(Partnership.id == current_user.partnership_id).first()
if not _partnership:
_partnership = Partnership.query.get(1)
from buyercall.blueprints.agents.models import Agent
from buyercall.blueprints.agents.serializers import AgentMiniSchema, TeamSchema
if _partnership:
pa_agents = Agent.query.filter(Agent.user.partnership_id == _partnership.id).all()
pa_teams = Team.query.filter(Team.partnership_id == _partnership.id).all()
else:
pa_agents = Agent.query.filter(Agent.partnership_account_id == partnership_account.id).all()
pa_teams = Team.query.filter(Team.partnership_account_id == partnership_account.id).all()
data = AgentMiniSchema(many=True).dump(pa_agents)
team_data = TeamSchema(many=True).dump(pa_teams)
for td in team_data:
td['isTeam'] = True
data.append(td)
return api_jsonify(data, status_code, message, success)
def get_all_agents_only():
success = True
message = "Agents fetched successfully!"
status_code = 200
from buyercall.blueprints.partnership.models import Partnership, PartnershipAccount
partnership_account = PartnershipAccount.query.filter(
PartnershipAccount.id == current_user.partnership_account_id).first()
if not partnership_account:
partnership = Partnership.query.filter(Partnership.id == current_user.partnership_id).first()
if not partnership:
partnership = Partnership.query.get(1)
from buyercall.blueprints.user.models import User
if not partnership_account:
pa_users = User.query.filter(User.partnership_id == partnership.id).all()
else:
pa_users = User.query.filter(User.partnership_account_id == partnership_account.id).all()
data = []
for user in pa_users:
_user = {'sid': user.sid, 'firstname': user.firstname, 'lastname': user.lastname,
'phonenumber': user.phonenumber, 'email': user.email, 'role': user.role if user else None}
data.append(_user)
return api_jsonify(data, status_code, message, success)
@api_role_required('admin')
def get_teams():
from buyercall.blueprints.agents.serializers import TeamSchema
message = 'Teams fetched successfully!'
return api_jsonify(TeamSchema(many=True).dump(current_user.agent.teams), message=message)
@api_role_required('admin')
def get_team_by_id(team_id):
from buyercall.blueprints.agents.serializers import TeamSchema, Team
message = 'Team fetched successfully!'
status_code = 200
success = True
team = Team.get_by_sid(team_id) or {}
if not team:
message = "Team not found"
status_code = 404
success = False
return api_jsonify(TeamSchema().dump(team) if team else {}, message=message, status_code=status_code,
success=success)
def get_agents_by_team_id(team_id):
from buyercall.blueprints.agents.serializers import AgentWithoutTeamSchema, Team
message = 'Successfully fetched team agents!'
status_code = 200
success = True
team = Team.get_by_sid(team_id) or {}
if not team:
message = "Team not found"
status_code = 404
success = False
return api_jsonify(AgentWithoutTeamSchema(many=True).dump(team.agents) if team else [], message=message,
status_code=status_code, success=success)
@api_role_required('admin')
def add_agents_to_team(team_id):
received = request.get_json()
if received:
message = "Agents added successfully"
status_code = 201
success = True
team = Team.query.filter(Team.sid == team_id).first()
agents = received.get('agents', [])
if team and agents:
team.agents = [Agent.get_id_from_sid(agent_sid) for agent_sid in agents]
team.save()
else:
message = "Team not found"
status_code = 404
success = False
else:
message = "Invalid parameters"
status_code = 401
success = False
return api_jsonify(status_code=status_code, message=message, success=success)
@api_role_required('admin', 'agent')
def get_leads():
message = "Leads fetched successfully!"
from buyercall.blueprints.contacts.models import ContactChannelTie, Contact
contact_type = request.args.get('contact-type', None)
status = request.args.get('status', None)
intent_score = request.args.get('intent-score', None)
contact_channel_ties = ContactChannelTie.get_by_partnership_account(current_user.partnership_account_id)
contact_ids = [cid.contact for cid in contact_channel_ties if cid.contact is not None]
# if contact_type:
# contact_tag = ContactTags.query.filter(ContactTags.name == contact_type.lower(),
# ContactTags.tag_type == 'primary').first()
# if contact_tag:
# contact_ids_by_tag = []
# if contact_tag.contacts:
# contact_ids_by_tag = [_contact.id for _contact in contact_tag.contacts]
# Find intersection if contact ids
# _contact_ids = list(set(contact_ids) & set(contact_ids_by_tag))
# contact_ids = _contact_ids
# else:
# contact_ids = []
# if status:
# contacts_by_status = Contact.get_by_status(status)
# if contacts_by_status:
# contact_ids_by_status = [_contact.id for _contact in contacts_by_status]
# if contact_ids_by_status:
# # Find intersection if contact ids
# _contact_ids = list(set(contact_ids) & set(contact_ids_by_status))
# contact_ids = _contact_ids
# else:
# contact_ids = []
contacts = Contact.get_leads(contact_type, contact_ids, status, intent_score)
leads = []
for idx, contact in enumerate(contacts, start=1):
contact_data = {
"leadId": contact.sid,
"name": contact.name,
"primaryTag": contact.bdc_status,
"secondaryTags": contact.get_secondary_tag_names(),
"manualStatus": contact.status,
"intentScore": contact.get_intent_state or 'cold',
"updatedOn": contact.updated_on,
"sort": idx
}
leads.append(contact_data)
return api_jsonify(data=leads, message=message)
@api_role_required('admin', 'agent', 'partner', 'sysadmin')
def get_lead_detail(contact_id):
from buyercall.blueprints.contacts.models import Contact
from buyercall.blueprints.contacts.serializers import ContactSchema
message = "Lead details fetched successfully!"
success = True
status_code = 200
contact = Contact.query.filter(Contact.sid == contact_id).first()
if not contact:
message = "Contact not found."
status_code = 404
success = False
return api_jsonify(ContactSchema().dump(contact), message=message, success=success, status_code=status_code)
@api_role_required('admin', 'agent')
def update_lead(contact_id):
from buyercall.blueprints.contacts.models import Contact, ContactTags
from buyercall.blueprints.contacts.serializers import ContactSchema
message = "Lead updated successfully!"
success = True
status_code = 200
contact = Contact.query.filter(Contact.sid == contact_id).first()
received = request.get_json()
if not contact:
message = "Contact not found."
status_code = 404
success = False
elif not received:
message = "Invalid details given."
status_code = 401
success = False
else:
first_name = received.get('first_name', '')
last_name = received.get('last_name', '')
phone_number_1 = received.get('phone_1', '')
phone_number_2 = received.get('phone_2', '')
email = received.get('email', '')
address_1 = received.get('address_1', '')
address_2 = received.get('address_2', '')
city = received.get('city', '')
state = received.get('state', '')
zipcode = received.get('zip', '')
is_subscribed = received.get('is_subscribed', False)
contact_type = received.get('contact-type', '')
manual_status = received.get('status', '')
secondary_tags = received.get('secondaryTags', '')
details = [
('firstname', first_name),
('lastname', last_name),
('phonenumber_1', phone_number_1),
('phonenumber_2', phone_number_2),
('email', email),
('city', city),
('state', state),
('zip', zipcode),
('address_1', address_1),
('address_2', address_2),
('bdc_status', contact_type),
('is_subscribed', is_subscribed),
('status', manual_status),
]
for attr, value in details:
if attr == 'is_subscribed':
setattr(contact, attr, value)
if value:
if attr == 'bdc_status':
if contact_type in ['lead', 'customer', 'prospect', 'returning customer', 'spam']:
setattr(contact, attr, value)
elif attr == 'status':
if contact_type in ['new', 'open', 'completed', 'archive']:
setattr(contact, attr, value)
else:
setattr(contact, attr, value)
if secondary_tags:
contact_tags = ContactTags.get_by_sids(secondary_tags)
if contact_tags:
contact.contact_tags = contact_tags
contact.save()
return api_jsonify(ContactSchema().dump(contact), message=message, success=success, status_code=status_code)
@api_role_required('admin', 'agent')
def update_manual_status():
from buyercall.blueprints.contacts.models import Contact
message = "Leads manual status updated successfully!"
success = True
status_code = 200
received = request.get_json()
new_status = received.get('manualStatus', None)
contact_sids = received.get('leadIds', None)
if new_status:
if new_status in ['new', 'open', 'completed', 'archive'] and contact_sids:
resp = Contact.update_manual_status(new_status=new_status, contact_sids=contact_sids)
else:
message = "Invalid input data!"
success = False
status_code = 401
else:
message = "Invalid input data!"
success = False
status_code = 401
return api_jsonify(message=message, status_code=status_code, success=success)
@api_role_required('admin', 'agent')
def send_email_to_lead():
received = request.get_json()
from buyercall.blueprints.contacts.models import Contact
from buyercall.blueprints.partnership.models import PartnershipAccount
from buyercall.blueprints.email.models import Email
partnership_id = received.get('partnership_id', None)
lead_id = received.get('lead_id', None)
source_id = received.get('source_id', None)
account_id = received.get('partnership_account_id', None)
agent_id = received.get('agent_id', None)
agent = Agent.get_by_sid(agent_id)
contact = Contact.get_by_sid(lead_id)
payload = received.get('payload', {})
if contact and agent and account_id:
email_params = {
"first_name": agent.first_name,
"last_name": agent.last_name,
"email": agent.email,
"source": Source.get_id_from_sid(source_id),
"is_inbound": False,
"contact_id": Contact.get_id_from_sid(lead_id),
"meta_data": received.get('payload', {}),
"partnership_account_id": PartnershipAccount.get_id_from_sid(account_id),
"recipients": [contact.email]
}
email = Email.create(**email_params)
payload_content = payload.get('content', '')
html_payload = ''
subject = payload.get('subject', f"Email message from {agent.full_name}")
if partnership_id:
config_set_name = f"{partnership_id}-config_set"
# Send email to lead
ses_client.send_email(recipients=[contact.email], config_set_name=config_set_name, sender=agent.email,
subject=subject, text=payload_content, html=html_payload)
else:
print('Invalid parameters')
return api_jsonify(message="Mail sent successfully")
@api_role_required('agent', 'admin')
def send_email_to_lead_from_inbox():
received = request.get_json()
if not received:
return api_jsonify(status_code=400, success=False, message="Invalid parameters")
source_sid = received.get('source')
contact_sid = received.get('leadId')
template_sid = received.get('templateId')
template_data = received.get('templateData')
email_content = received.get('content')
from buyercall.blueprints.contacts.models import Contact
from buyercall.blueprints.partnership.models import Partnership, PartnershipAccount
from_email = current_user.email
contact = Contact.get_by_sid(contact_sid)
partnership_account = PartnershipAccount.get_by_id(current_user.partnership_account_id)
partnership = Partnership.get_by_id(current_user.partnership_id)
email_template_data = EmailTemplate.get_by_sid(template_sid)
subject = email_template_data.subject if email_template_data else None
body_content = email_template_data.content if email_template_data else None
html_payload = body_content.format(template_data) if body_content else ''
if isinstance(email_content, dict):
if not subject:
subject = email_content.get('subject', f"Email message from {current_user.firstname} {current_user.lastname}")
plain_body = email_content.get('body', email_content)
else:
subject = f"{current_user.firstname} {current_user.lastname} from {partnership.name}"
plain_body = email_content
params = {
"first_name": current_user.firstname,
"last_name": current_user.lastname,
"email": from_email,
"source": Source.get_id_from_sid(source_sid),
"is_inbound": False,
'channel': None,
"contact_id": contact.id,
"meta_data": {'subject': subject, "body": plain_body or html_payload},
"partnership_account_id": partnership_account.id,
"recipients": [contact.email]
}
try:
email = Email.create(**params)
except Exception as e:
print('Error : ', e)
# Send email to lead
from buyercall.blueprints.user.tasks import send_generic_mail
config_set_name = f"{partnership.sid}-config_set"
send_generic_mail.delay(subject=subject, recipients=[contact.email], config_set_name=config_set_name,
text=email_content, html=html_payload, sender=from_email)
from buyercall.blueprints.agents.webhooks.inbox_interaction import InboxConnector
from flask import current_app
endpoint = current_app.config.get('SOCKET_ENDPOINT')
resp = InboxConnector(endpoint).send_interaction(contact_id=str(contact.sid),
source_id=source_sid,
interaction_type='EMAIL',
agent_ids=[str(current_user.sid)],
partnership_id=str(partnership.sid),
partnership_account_id=str(partnership_account.sid),
from_lead=False, payload=email_content or html_payload)
return api_jsonify(message="Mail sent successfully")
@api_role_required('admin')
def get_inbox():
success = True
message = "Inbox fetched successfully!"
status_code = 200
data = []
agent = current_user.agent
interactions = [
{
"sid": '',
"type": '',
"source": '',
"detail": '',
"created_at": '',
"status": '',
"is_banned": '',
"is_read": '',
},
]
contacts = []
appointments = []
data = {
"sid": agent.sid,
"firstname": agent.firstname,
"lastname": agent.lastname,
"email": agent.email,
"phone_number": agent.phonenumber,
"appointment_link": '',
"role": current_user.role,
"contacts": [
{
"sid": '',
"firstname": '',
"lastname": '',
"email": '',
"phone_1": '',
"phone_2": '',
"status": '',
"bdc_status": '',
"avatar": '',
"tags": '',
"interactions": interactions
},
],
"appointments": [
{
"sid": '',
"appointment_date": '',
"appointment_type": '',
"slot_start_time": '',
"slot_end_time": '',
"is_deactivated": '',
"deactivated_on": '',
"timezone": '',
"contact": {
"firstname": '',
"lastname": '',
"email": '',
"phone_1": '',
"phone_2": '',
"country": '',
"bdc_status": '',
"status": '',
"tags": ''
}
}
]
}
return api_jsonify(data, status_code, message, success)