File: //home/arjun/projects/buyercall_new/buyercall/buyercall/blueprints/agents/views.py
import csv
import json
import datetime
import logging as log
import traceback
import random
import string
from contextlib import closing
from io import StringIO
from functools import reduce
from datetime import date
from datetime import datetime
from html import parser
from flask import (
Blueprint,
request,
flash,
url_for,
jsonify,
redirect,
make_response,
render_template)
from flask_login import login_required, current_user, login_user
from buyercall.blueprints.agents.forms import AgentForm, GroupForm
from flask_wtf import FlaskForm
from buyercall.blueprints.agents.models import AgentSchedule
from buyercall.blueprints.agents.models import Agent
from buyercall.blueprints.leads.models import Lead
from flask_babel import gettext as _
from sqlalchemy import func, or_, and_, extract, text, desc, cast, String
from sqlalchemy.orm import contains_eager, load_only
from buyercall.blueprints.user.decorators import anonymous_required
from buyercall.blueprints.billing.decorators import subscription_required
from buyercall.blueprints.user.decorators import role_required
from buyercall.extensions import csrf
from buyercall.extensions import db
agents = Blueprint('agents', __name__, template_folder='templates')
log = log.getLogger(__name__)
def getboolean(param):
if param == 'true' or param == 'True':
return True
else:
return False
def getintkey(param):
if param == 'None' or param == 'none' or param == 'null' or param == 'Null' or param == '-1' or param == -1:
return 0
else:
return int(param)
# Agents on boarding page
@agents.route('/agents_onboarding', methods=['GET', 'POST'])
@csrf.exempt
@login_required
def onboarding_agents():
if request.method == 'POST':
current_user.agents_onboard = True
db.session.commit()
if current_user.agents_onboard:
flash(_(
'Great, you are ready to get started with the My Agents page. '
'Remember to checkout the support section or FAQ if you have any '
'additional agents questions.'
), 'success')
return redirect(url_for('agents.call_agents'))
return render_template('agents/agents_onboarding.jinja2')
@agents.route('/agents')
@login_required
@role_required('admin', 'agent')
def call_agents():
partnership_account_id = current_user.partnership_account_id
is_admin_in_group = current_user.is_admin_user_with_groups
viewing_partnership_account = current_user.is_viewing_partnership
# Check if being viewed by super partner
if is_admin_in_group and viewing_partnership_account:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
elif not viewing_partnership_account and is_admin_in_group:
return redirect(url_for('partnership.company_accounts'))
# Check if on boarding was excepted
if current_user.agents_onboard is False:
return redirect(url_for('agents.onboarding_agents'))
form = FlaskForm()
# Monthly calls per agent bar chart
agent_id = current_user.agent.id if current_user.agent else None
if viewing_partnership_account:
filter_by = and_(Lead.partnership_account_id == partnership_account_id, current_user.role == 'admin')
else:
filter_by = (agent_id == Lead.agent_id) | and_(
Lead.partnership_account_id == partnership_account_id, current_user.role == 'admin')
# Monthly total call count
month = datetime.now().strftime("%m")
monthly_calls = Lead.query.filter(extract('month', Lead.created_on) == month)\
.filter(or_(filter_by)).first()
# Total agents for current subscriber
if viewing_partnership_account:
filter_by_user = and_(Agent.partnership_account_id == partnership_account_id,
or_(current_user.role == 'admin', current_user.role == 'partner'))
else:
filter_by_user = (current_user.id == Agent.user_id) | and_(
Agent.partnership_account_id == partnership_account_id,
or_(current_user.role == 'admin', current_user.role == 'partner'))
# defining the initial query depending on your purpose
account_agents = Agent.query.filter(filter_by_user).filter(Agent.is_deactivated.is_(False))
# Return values for the horizontal bar chart agent labels
agent_labels = list()
# Dictionaries containing agent lead data for horizontal bar chart
agent_leads = list()
response_time = list()
# Create list with dict contain lead count and time responds per agent
leads_by_agent = []
leads_by_pa = Lead.query.options(
load_only('id', 'agent_id', 'response_time_seconds')
).filter(Lead.partnership_account_id == partnership_account_id).all()
for lpa in leads_by_pa:
lead_pa_dict = dict(
Lead_id=lpa.id,
agent_id=lpa.agent_id,
response_time=lpa.response_time_seconds
)
leads_by_agent.append(lead_pa_dict)
# Add first names from the result query to labels dict above
for row in account_agents:
agent_name = "{} {}".format(row.firstname, row.lastname)
agent_labels.append(agent_name)
l_a_c = 0
for lead in leads_by_agent:
if row.id == lead['agent_id']:
l_a_c = l_a_c + 1
if lead['response_time']:
response_time.append(int(lead['response_time']))
agent_leads.append(int(l_a_c))
# Current Available Agents
available_agents = (Agent.query.filter(Agent.partnership_account_id == partnership_account_id).filter(Agent.available_now == 'Yes').filter(Agent.is_deactivated == '0').count())
# Total Agents
total_agents = Agent.query\
.filter(filter_by_user)\
.count()
# Average calls per Agent
if agent_leads:
avg_calls_agent = round(reduce(lambda x, y: x + y, agent_leads) / len(agent_leads), 2)
else:
avg_calls_agent = '0'
if response_time:
avg_response_agent = reduce(lambda x, y: x + y, response_time) / len(response_time)
avg_response_agent_min = int(avg_response_agent // 60)
avg_response_agent_sec = int(round(avg_response_agent % 60))
if avg_response_agent_min < 10:
avg_response_agent_min = f'0{avg_response_agent_min}'
if avg_response_agent_sec < 10:
avg_response_agent_sec = f'0{avg_response_agent_sec}'
else:
avg_response_agent = 0
avg_response_agent_min = '00'
avg_response_agent_sec = '00'
return render_template('agents/agents.jinja2',
form=form,
monthly_calls=monthly_calls,
available_agents=available_agents,
agent_labels=agent_labels,
agent_leads=agent_leads,
avg_calls_agent=avg_calls_agent,
total_agents=total_agents,
avg_response_agent=avg_response_agent,
account_agents=account_agents,
avg_response_agent_min=avg_response_agent_min,
avg_response_agent_sec=avg_response_agent_sec,
)
# return agent data into jquery data tables
@agents.route('/agents/agent_data')
@login_required
@role_required('admin', 'agent')
def data():
partnership_account_id = current_user.partnership_account_id
viewing_partnership_account = current_user.is_viewing_partnership
# Check if being viewed by super partner
if viewing_partnership_account:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
# TODO: if agent has user account load email, phonenumber, extension, name from User table rather then Agent
search = request.args.get('search[value]', '')
order = int(request.args.get('order[0][column]', '-1'))
direction = request.args.get('order[0][dir]', 'asc')
offset = int(request.args.get('start', 0))
limit = int(request.args.get('length', 99))
data = list()
"""Return server side data."""
# defining columns
columns = [
Agent.id, Agent.full_name, Agent.description, Agent.phonenumber, Agent.type
]
if viewing_partnership_account:
filter_by = and_(Agent.partnership_account_id == partnership_account_id,
current_user.role == 'admin', Agent.is_deactivated.is_(False))
else:
filter_by = (current_user.id == Agent.user_id) | and_(
Agent.partnership_account_id == partnership_account_id,
current_user.role == 'admin', Agent.is_deactivated.is_(False)
)
# defining the initial query depending on your purpose
total = Agent.query.filter(filter_by).filter(Agent.is_deactivated.is_(False))
filtered = total
if search:
pattern = '%{}%'.format(search)
filtered = total.filter(or_(
Agent.firstname.ilike(pattern),
Agent.lastname.ilike(pattern),
Agent.title.ilike(pattern),
Agent.email.ilike(pattern),
Agent.phonenumber.ilike(pattern),
Agent.mobile.ilike(pattern),
cast(Agent.extension, String).ilike(pattern),
Agent.department.ilike(pattern)
))
# filtered = filtered.with_entities(*columns)
sorted_ = filtered
if order in range(len(columns)):
order_pred = columns[order]
if direction == 'desc':
order_pred = desc(order_pred)
sorted_ = sorted_.order_by(order_pred)
sorted_ = sorted_.offset(offset).limit(limit)
for row in sorted_:
data_row = {
0: row.id,
1: row.full_name,
2: row.description,
3: row.phonenumber,
4: row.type,
5: row.available_now
}
data.append(data_row)
# returns what is needed by DataTable
return jsonify(data=data,
draw=request.args['draw'],
recordsFiltered=filtered.count(),
recordsTotal=filtered.count()
)
# export data to csv
@agents.route('/agents/csv')
@login_required
def data_csv():
"""Return server side data."""
header = [
'No', 'First Name', 'Last Name', 'Title', 'Email',
'Phone Number', 'Mobile', 'Extension', 'Department',
'Unique Inbound Calls', 'Unique Outbound Calls',
'Unique Inbound Messages', 'Unique Outbound Messages'
]
inbound = 'inbound'
outbound = 'outbound'
partnership_account_id = current_user.partnership_account_id
viewing_partnership_account = current_user.is_viewing_partnership
# Check if being viewed by super partner
if viewing_partnership_account:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
# Date filtering
date_from = str(request.args.get('df'))
date_to = str(request.args.get('dt'))
converted_date_from = datetime.strptime(date_from, "%m%d%Y").date()
converted_date_to = datetime.strptime(date_to, "%m%d%Y").date()
if current_user.role == 'agent':
agent_query = Agent.query.filter(current_user.id == Agent.user_id)
else:
agent_query = Agent.query.filter(partnership_account_id == Agent.partnership_account_id)
agent_query = agent_query.filter(Agent.is_deactivated.is_(False))
# Retrieve agent details
search = request.args.get('search[value]', '')
if search:
pattern = '%{}%'.format(search)
agent_query = agent_query.filter(or_(
Agent.firstname.ilike(pattern),
Agent.lastname.ilike(pattern),
Agent.title.ilike(pattern),
Agent.email.ilike(pattern),
Agent.phonenumber.ilike(pattern),
Agent.mobile.ilike(pattern),
cast(Agent.extension, String).ilike(pattern),
Agent.department.ilike(pattern)
))
agent_query_result = agent_query.order_by(Agent.id).all()
agent_ids = [r.id for r in agent_query_result]
# Retrieve lead details
filter_lead_by_date = and_(
func.date(Lead.created_on) >= converted_date_from,
func.date(Lead.created_on) <= converted_date_to
)
lead_query = Lead.query.filter(filter_lead_by_date)
if current_user.role == 'agent':
agent_user_query = Agent.query.filter(current_user.id == Agent.user_id).first()
if agent_user_query:
lead_query = lead_query\
.filter(Lead.agent_id == agent_user_query.id)
else:
lead_query = lead_query\
.filter(Lead.partnership_account_id == partnership_account_id,
Lead.agent_id.in_(agent_ids))
lead_query_result = [l for l in lead_query.all()]
# Retrieve message details
outbound_message_stmt = "select agents.id, count(distinct agents_with_messages.to) from agents " \
"inner join (select messages.to, direction, created_on, agent_id_msg " \
"from messages, lateral json_array_elements(agent_id) as agent_id_msg) agents_with_messages " \
"on agents.id = agents_with_messages.agent_id_msg::text::int " \
"where agents_with_messages.direction = 'outbound' " \
"and agents.partnership_account_id = '" + str(partnership_account_id) + "'" \
"and agents_with_messages.created_on >= '" + str(converted_date_from) + " 00:00:00' " \
"and agents_with_messages.created_on <= '" + str(converted_date_to) + " 23:59:59' " \
"group by agents.id"
inbound_message_stmt = "select agents.id, count(distinct agents_with_messages.from_) from agents " \
"inner join (select messages.from_, direction, created_on, agent_id_msg " \
"from messages, lateral json_array_elements(agent_id) as agent_id_msg) agents_with_messages " \
"on agents.id = agents_with_messages.agent_id_msg::text::int " \
"where agents_with_messages.direction = 'inbound' " \
"and agents.partnership_account_id = '" + str(partnership_account_id) + "'" \
"and agents_with_messages.created_on >= '" + str(converted_date_from) + " 00:00:00' " \
"and agents_with_messages.created_on <= '" + str(converted_date_to) + " 23:59:59' " \
"group by agents.id"
outbound_message_result = db.session.execute(outbound_message_stmt)
inbound_message_result = db.session.execute(inbound_message_stmt)
outbound_dict = {}
inbound_dict = {}
for om in outbound_message_result:
outbound_dict[om[0]] = om[1]
for im in inbound_message_result:
inbound_dict[im[0]] = im[1]
# Build the CSV
row_no = 0
with closing(StringIO()) as out:
writer = csv.writer(out)
writer.writerow(header)
for a in agent_query_result:
row_no += 1
unique_inbound_calls = []
unique_outbound_calls = []
unique_inbound_messages = 0
unique_outbound_messages = 0
unused_leads = []
for lead in lead_query_result:
if lead.agent_id == a.id:
if lead.call_type == inbound and lead.phonenumber not in unique_inbound_calls:
unique_inbound_calls.append(lead.phonenumber)
if lead.call_type == outbound and lead.phonenumber not in unique_outbound_calls:
unique_outbound_calls.append(lead.phonenumber)
else:
unused_leads.append(lead)
lead_query_result = unused_leads
if a.id in outbound_dict:
unique_outbound_messages = outbound_dict[a.id]
if a.id in inbound_dict:
unique_inbound_messages = inbound_dict[a.id]
csv_row = [
row_no, a.firstname, a.lastname, a.title, a.email,
a.phonenumber, a.mobile, a.extension, a.department,
len(unique_inbound_calls), len(unique_outbound_calls),
unique_inbound_messages, unique_outbound_messages
]
writer.writerow(csv_row)
filename = 'Buyercall Agents - {}.csv'.format(
date.today().strftime('%Y-%m-%d')
)
resp = make_response(out.getvalue())
resp.headers['Content-Type'] = 'text/csv'
resp.headers['Content-Disposition'] = \
'attachment; filename="{}"'.format(filename)
return resp
# return agent data for charts
@agents.route('/agents/data_chart')
@login_required
@role_required('admin', 'agent')
def data_chart():
date_from = str(request.args.get('df'))
date_to = str(request.args.get('dt'))
friendly_name = str(request.args.get('fn'))
phone_number = str(request.args.get('pn'))
filter_by_type = text('1=1')
converted_dateFrom = datetime.strptime(date_from, "%m%d%Y").date()
converted_dateTo = datetime.strptime(date_to, "%m%d%Y").date()
filter_by_date = and_(func.date(Lead.created_on) >= converted_dateFrom,
func.date(Lead.created_on) <= converted_dateTo)
partnership_account_id = current_user.partnership_account_id
viewing_partnership_account = current_user.is_viewing_partnership
# Check if being viewed by super partner
if viewing_partnership_account:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
# Total agents for current subscriber
if viewing_partnership_account:
filter_by_user = and_(Agent.partnership_account_id == partnership_account_id, current_user.role == 'admin')
else:
filter_by_user = (current_user.id == Agent.user_id) | and_(
Agent.partnership_account_id == partnership_account_id, current_user.role == 'admin')
# defining the initial query depending on your purpose
account_agents = Agent.query\
.filter(filter_by_user)\
.filter(Agent.is_deactivated.is_(False))
# Current Available Agents
total_available_agents = 0
for agent in account_agents:
if agent.available_now:
total_available_agents = total_available_agents + 1
# Total Agents
total_agents = account_agents.count()
# Create the filter for the type drop downs
if (friendly_name != 'null' and friendly_name is not None) and (phone_number == 'null' or phone_number is None):
if '-' in friendly_name:
filter_by_type = text("leads.widget_guid = '{}'".format(friendly_name))
else:
filter_by_type = text('leads.inbound_id = {}'.format(phone_number))
elif (friendly_name == 'null' or friendly_name is None) and (phone_number != 'null' and phone_number is not None):
filter_by_type = text('leads.inbound_id = {}'.format(phone_number))
elif (friendly_name != 'null' and friendly_name is not None) and \
(phone_number != 'null' and phone_number is not None):
if '-' in friendly_name:
filter_by_type = and_(
"leads.widget_guid = '{}'".format(friendly_name),
'leads.inbound_id = {}'.format(phone_number)
)
else:
filter_by_type = text('leads.inbound_id = {}'.format(phone_number))
# Return values for the horizontal bar chart agent labels
agent_labels = list()
# Dictionaries containing agent lead data for horizontal bar chart
agent_leads = list()
response_time = list()
# Create list with dict contain lead count and time responds per agent
leads_by_agent = []
leads_by_pa = Lead.query.options(
load_only('id', 'agent_id', 'response_time_seconds')
).filter(Lead.partnership_account_id == partnership_account_id).filter(filter_by_date).filter(filter_by_type).all()
for lpa in leads_by_pa:
lead_pa_dict = dict(
Lead_id=lpa.id,
agent_id=lpa.agent_id,
response_time=lpa.response_time_seconds
)
leads_by_agent.append(lead_pa_dict)
# Add first names from the result query to labels dict above
for row in account_agents:
agent_name = "{} {}".format(row.firstname, row.lastname)
agent_labels.append(agent_name)
l_a_c = 0
for lead in leads_by_agent:
if row.id == lead['agent_id']:
l_a_c = l_a_c + 1
if lead['response_time']:
response_time.append(int(lead['response_time']))
agent_leads.append(int(l_a_c))
# Average calls per Agent
if agent_leads:
avg_calls_agent = round(reduce(lambda x, y: x + y, agent_leads) / len(agent_leads), 2)
else:
avg_calls_agent = '0'
if response_time:
avg_response_agent = reduce(lambda x, y: x + y, response_time) / len(response_time)
avg_response_agent_min = int(avg_response_agent // 60)
avg_response_agent_sec = int(round(avg_response_agent % 60))
if avg_response_agent_min < 10:
avg_response_agent_min = f'0{avg_response_agent_min}'
if avg_response_agent_sec < 10:
avg_response_agent_sec = f'0{avg_response_agent_sec}'
else:
avg_response_agent = 0
avg_response_agent_min = '00'
avg_response_agent_sec = '00'
return jsonify(
agent_labels=agent_labels,
agent_leads=agent_leads,
available=total_available_agents,
average_calls=avg_calls_agent,
avg_response_agent=avg_response_agent,
avg_response_agent_min=avg_response_agent_min,
avg_response_agent_sec=avg_response_agent_sec,
total_agents=total_agents
)
# return data for the filter options
@agents.route('/agents/filteroptions')
@csrf.exempt
@login_required
def filteroptions():
agent_id = current_user.agent.id if current_user.agent else None
partnership_account_id = current_user.partnership_account_id
viewing_partnership_account = current_user.is_viewing_partnership
# Check if being viewed by super partner
if viewing_partnership_account:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
if viewing_partnership_account:
filter_by = and_(Lead.partnership_account_id == partnership_account_id, current_user.role == 'admin')
else:
filter_by = (agent_id == Lead.agent_id) | and_(
Lead.partnership_account_id == partnership_account_id, current_user.role == 'admin')
# retrieve the inbound friendly names from phone numbers
inbound_data = Lead.query.outerjoin(Lead.inbound).options(
contains_eager(Lead.inbound)
).with_entities('phonenumbers.id', 'phonenumbers.friendly_name', 'phonenumbers.phonenumber').filter(
filter_by)\
.distinct()
# retrieve the outbound friendly names from widgets
outbound_data = Lead.query.outerjoin(Lead.widget).options(
contains_eager(Lead.widget)
).with_entities('widgets.guid', 'widgets.name')\
.filter(filter_by)\
.distinct()
tool_name_result = {}
phonenumber_result = {}
# populate result with the phonenumbers
for i in inbound_data:
if i is not None and i is not '':
if i[0] is not None and i[2] is not None:
id = str(i[0])
name = str(i[2])
phonenumber_result[id] = name
# populate result with the inbound friendly names
for i in inbound_data:
if i is not None and i is not '':
if i[0] is not None and i[1] is not None:
id = str(i[0])
name = 'Inbound - {}'.format(i[1])
tool_name_result[id] = name
# populate result with outbound friendly names
for i in outbound_data:
if i is not None and i is not '':
if i[0] is not None and i[1] is not None:
id = str(i[0])
name = 'Outbound - {}'.format(i[1])
tool_name_result[id] = name
return jsonify(
tool_name_data=tool_name_result,
phonenumber_data=phonenumber_result
)
# This function is used to create new agents
@agents.route('/agents/new', methods=['GET', 'POST'])
@subscription_required
@login_required
@role_required('admin')
def agent_new():
agent = Agent()
sunday = AgentSchedule(0, '08:00 AM', '17:00 PM', False, '-1', '-1')
monday = AgentSchedule(1, '08:00 AM', '17:00 PM', True, '-1', '-1')
tuesday = AgentSchedule(2, '08:00 AM', '17:00 PM', True, '-1', '-1')
wednesday = AgentSchedule(3, '08:00 AM', '17:00 PM', True, '-1', '-1')
thursday = AgentSchedule(4, '08:00 AM', '17:00 PM', True, '-1', '-1')
friday = AgentSchedule(5, '08:00 AM', '17:00 PM', True, '-1', '-1')
saturday = AgentSchedule(6, '08:00 AM', '17:00 PM', False, '-1', '-1')
agent.all_hours = True
agent.schedules.append(sunday)
agent.schedules.append(monday)
agent.schedules.append(tuesday)
agent.schedules.append(wednesday)
agent.schedules.append(thursday)
agent.schedules.append(friday)
agent.schedules.append(saturday)
total_agents = Agent.query.filter(current_user.id == Agent.user_id).count()
partnership_account_id = current_user.partnership_account_id
is_admin_in_group = current_user.is_admin_user_with_groups
viewing_partnership_account = current_user.is_viewing_partnership
# Check if being viewed by super partner
if is_admin_in_group and viewing_partnership_account:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
elif not viewing_partnership_account and is_admin_in_group:
return redirect(url_for('partnership.company_accounts'))
form = AgentForm(obj=agent)
if form.validate_on_submit():
if total_agents >= current_user.subscription.agent_limit:
flash(_(
'You need to <strong>upgrade your account</strong> to add more agents. '
'<a href='"/subscription/update"' style='"color:#ffffff"'><strong>Change your plan</strong></a>'),
'danger')
return redirect(url_for('agents.agent_new'))
else:
agent.firstname = form.firstname.data
agent.user_id = None
agent.lastname = form.lastname.data
agent.title = form.title.data
agent.email = form.email.data.lower()
agent.phonenumber = form.phonenumber.data
agent.mobile = form.mobile.data
try:
agent.extension = int(form.extension.data) if form.extension.data else None
except ValueError:
agent.extension = None
agent.description = form.description.data
agent.department = 'none' # form.department.data
agent.timezone = form.timezone.data
agent.all_hours = getboolean(form.allhours_str.data)
load_groups(agent, form.groups_str.data)
params = {
'user_id': None,
'firstname': agent.firstname,
'lastname': agent.lastname,
'title': agent.title,
'email': agent.email,
'phonenumber': agent.phonenumber,
'mobile': agent.mobile,
'extension': agent.extension,
'description': agent.description,
'department': agent.department,
'timezone': agent.timezone,
'partnership_account_id': partnership_account_id,
'all_hours': agent.all_hours
}
result = Agent.create(params)
if result > 0:
h = parser
schedulequery = str(form.schedules_str.data)
day_result = 0
if len(schedulequery) > 0:
scheduleobj = json.loads(h.unescape(schedulequery))
day1obj = scheduleobj[0]
day1param = {
'day': int(day1obj["id"]),
'available_from': str(day1obj["start"]),
'available_to': str(day1obj["stop"]),
'is_active': getboolean(str(day1obj["active"])),
'partnership_account_id': int(partnership_account_id),
'agent_id': int(result)
}
day2obj = scheduleobj[1]
day2param = {
'day': int(day2obj["id"]),
'available_from': str(day2obj["start"]),
'available_to': str(day2obj["stop"]),
'is_active': getboolean(str(day2obj["active"])),
'partnership_account_id': int(partnership_account_id),
'agent_id': int(result)
}
day3obj = scheduleobj[2]
day3param = {
'day': int(day3obj["id"]),
'available_from': str(day3obj["start"]),
'available_to': str(day3obj["stop"]),
'is_active': getboolean(str(day3obj["active"])),
'partnership_account_id': int(partnership_account_id),
'agent_id': int(result)
}
day4obj = scheduleobj[3]
day4param = {
'day': int(day4obj["id"]),
'available_from': str(day4obj["start"]),
'available_to': str(day4obj["stop"]),
'is_active': getboolean(str(day4obj["active"])),
'partnership_account_id': int(partnership_account_id),
'agent_id': int(result)
}
day5obj = scheduleobj[4]
day5param = {
'day': int(day5obj["id"]),
'available_from': str(day5obj["start"]),
'available_to': str(day5obj["stop"]),
'is_active': getboolean(str(day5obj["active"])),
'partnership_account_id': int(partnership_account_id),
'agent_id': int(result)
}
day6obj = scheduleobj[5]
day6param = {
'day': int(day6obj["id"]),
'available_from': str(day6obj["start"]),
'available_to': str(day6obj["stop"]),
'is_active': getboolean(str(day6obj["active"])),
'partnership_account_id': int(partnership_account_id),
'agent_id': int(result)
}
day7obj = scheduleobj[6]
day7param = {
'day': int(day7obj["id"]),
'available_from': str(day7obj["start"]),
'available_to': str(day7obj["stop"]),
'is_active': getboolean(str(day7obj["active"])),
'partnership_account_id': int(partnership_account_id),
'agent_id': int(result)
}
day_result = AgentSchedule.create(
day1param, day2param, day3param, day4param, day5param, day6param, day7param
)
else:
day_result = 1
if day_result > 0:
flash(_('Your agent has been created successfully.'), 'success')
return redirect(url_for('agents.call_agents'))
form.groups_str.data = ",".join(str(g.id) for g in agent.groups)
groups = Agent.query.filter(
Agent.partnership_account_id == partnership_account_id,
Agent.is_group == True,
).all()
return render_template(
'agents/new.jinja2', form=form, agent=agent, total_agents=total_agents, groups=groups
)
@agents.route('/agents/delete/<int:id>', methods=['DELETE'])
@subscription_required
@login_required
@role_required('admin')
def agent_delete(id):
partnership_account_id = current_user.partnership_account_id
viewing_partnership_account = current_user.is_viewing_partnership
# Check if being viewed by super partner
if viewing_partnership_account:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
agent = Agent.query.get(id)
Agent.deactivate(id, partnership_account_id)
schedules_exist = AgentSchedule.query.filter(agent.id == AgentSchedule.agent_id).count()
if schedules_exist > 0:
agentscheduleslist = AgentSchedule.query.filter(AgentSchedule.agent_id == id)
agentscheduleslist.delete()
return jsonify(success=True)
# Display lead details and Add notes to a lead on edit page
@agents.route('/agents/edit/<int:id>', methods=['GET', 'POST'])
@role_required('admin', 'agent')
@subscription_required
@login_required
def agent_edit(id):
partnership_account_id = current_user.partnership_account_id
is_admin_in_group = current_user.is_admin_user_with_groups
viewing_partnership_account = current_user.is_viewing_partnership
# Check if being viewed by super partner
if is_admin_in_group and viewing_partnership_account:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
elif not viewing_partnership_account and is_admin_in_group:
return redirect(url_for('partnership.company_accounts'))
agent = Agent.query.filter(
# TODO: fix this as it can't find the record
Agent.id == id, Agent.partnership_account_id == partnership_account_id
).first()
if not agent:
flash('Agent with ID {} not found.'.format(id))
return redirect(url_for('agents.call_agents'))
elif agent.is_deactivated is True:
flash(_('Agent with ID {} has been deactivated and no longer available.'.format(id)), 'danger')
return redirect(url_for('agents.call_agents'))
# Lookup the subscriber id for lead from the Lead table
filter_by_agent = agent.id == Lead.agent_id
# Lead count for current agent
lead_count_agent = Lead.query.filter(agent.id == Lead.agent_id).count()
# Return the total duration call time for the agent
agent_call_duration = db.session.query(func.sum(Lead.duration)).filter(filter_by_agent).scalar()
# Return the total response call time for the agent
agent_call_response = db.session.query(func.sum(Lead.response_time_seconds)).filter(filter_by_agent).scalar()
# Check for None values in the duration column and set them to 0
if agent_call_duration is None:
agent_call_duration = 0
else:
pass
# Check for None values in the call response column and set them to 0
if agent_call_response is None:
agent_call_response = 0
else:
pass
if agent_call_duration == 0:
average_call_duration = 0
average_call_duration_sec = 0
else:
# Average Duration of calls for current subscriber by min
average_call_duration = int((float(agent_call_duration) / float(lead_count_agent)) / 60)
# Average Duration of calls for current subscriber by sec
average_call_duration_sec = int((float(agent_call_duration) / float(lead_count_agent)) % 60)
if agent_call_response == 0:
average_call_response = 0
average_call_response_sec = 0
else:
# Average Response time for current subscriber by min
average_call_response = int((float(agent_call_response) / float(lead_count_agent)) / 60)
# Average Response time for current subscriber by sec
average_call_response_sec = int((float(agent_call_response) / float(lead_count_agent)) % 60)
schedules_edit_exist = AgentSchedule.query.filter(agent.id == AgentSchedule.agent_id).count()
if schedules_edit_exist == 0 or agent.all_hours is None:
agent.all_hours = True
form = AgentForm(obj=agent)
if form.validate_on_submit():
try:
agent.firstname = form.firstname.data
agent.lastname = form.lastname.data
agent.title = form.title.data
agent.email = form.email.data.lower()
agent.phonenumber = form.phonenumber.data
agent.mobile = form.mobile.data
try:
agent.extension = int(form.extension.data) if form.extension.data else None
except ValueError:
agent.extension = None
agent.description = form.description.data
agent.department = 'none'
agent.timezone = form.timezone.data
agent.all_hours = getboolean(form.allhours_str.data)
load_groups(agent, form.groups_str.data)
if agent.user_id:
agent.user.email = agent.email.lower()
agent.user.phonenumber = agent.phonenumber
agent.user.extension = agent.extension
schedules_exist = AgentSchedule.query.filter(agent.id == AgentSchedule.agent_id).count()
schedulequery = str(form.schedules_str.data)
scheduleobj = list()
if len(schedulequery) > 0:
h = parser
scheduleobj = json.loads(h.unescape(schedulequery))
if schedules_exist > 0:
for day_obj in scheduleobj:
AgentSchedule.update(
int(day_obj["scheid"]),
str(day_obj["start"]),
str(day_obj["stop"]),
getboolean(str(day_obj["active"])),
int(partnership_account_id)
)
flash(_('The agent has been updated successfully.'), 'success')
return redirect(url_for('agents.call_agents'))
elif scheduleobj:
day_params = [{
'day': int(dayobj['id']),
'available_from': str(dayobj['start']),
'available_to': str(dayobj['stop']),
'is_active': getboolean(str(dayobj["active"])),
'partnership_account_id': int(partnership_account_id),
'agent_id': int(agent.id)
} for dayobj in scheduleobj]
AgentSchedule.query.filter(AgentSchedule.agent_id == agent.id).delete()
day_result = AgentSchedule.create(*day_params)
if day_result:
flash(_('The agent has been updated successfully.'), 'success')
return redirect(url_for('agents.call_agents'))
else:
agent.all_hours = True
agent.save()
db.session.commit()
flash(_('The agent has been updated successfully.'), 'success')
return redirect(url_for('agents.call_agents'))
except Exception as e:
log.error(traceback.format_exc())
db.session.rollback()
flash(_("Error saving agent. The email address is used by another user. Try a different email address."), 'danger')
form.groups_str.data = ",".join(str(g.id) for g in agent.groups)
groups = Agent.query.filter(
Agent.partnership_account_id == partnership_account_id,
Agent.is_group == True, # noqa
).all()
return render_template('agents/edit.jinja2',
agent=agent,
form=form,
groups=groups,
lead_count_agent=lead_count_agent,
average_call_duration=average_call_duration,
average_call_duration_sec=average_call_duration_sec,
average_call_response=average_call_response,
average_call_response_sec=average_call_response_sec)
@agents.route('/agents/convert', methods=['POST'])
@subscription_required
@login_required
@role_required('admin', 'partner')
def convert_to_user():
agent = Agent.query.get(request.form.get('a_to_u_agent_id'))
partnership_account_id = current_user.partnership_account_id
is_admin_in_group = current_user.is_admin_user_with_groups
viewing_partnership_account = current_user.is_viewing_partnership
# Check if being viewed by super partner
if is_admin_in_group and viewing_partnership_account:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
elif not viewing_partnership_account and is_admin_in_group:
return redirect(url_for('partnership.company_accounts'))
if agent is None:
flash(_('Such Account does not exist'))
return redirect(url_for('agents.call_agents'))
if agent.partnership_account_id != partnership_account_id:
flash(_("You don't have access to this account"))
return redirect(url_for('agents.call_agents'))
from buyercall.blueprints.user.models import User
# Set an agent terms of service as true based on BuyerCall TOS
tos = True
if agent.user_id is None:
random_string = ''.join(random.choice(string.ascii_letters + string.digits + string.punctuation) for _ in range(8))
try:
params = {
'role': 'agent',
'firstname': agent.firstname,
'lastname': agent.lastname,
'email': agent.email,
'password': random_string,
'company': current_user.company,
'title': agent.title or '',
'department': agent.department or '',
'phonenumber': agent.phonenumber,
'extension': agent.extension,
'partnership_id': current_user.partnership_id,
'partnership_account_id': partnership_account_id,
'tos_agreement': tos
}
user = User(**params)
user.save()
agent.user_id = user.id
db.session.add(agent)
db.session.commit()
reset_token = user.serialize_token()
from buyercall.blueprints.user.tasks import send_agent_invitation_email
send_agent_invitation_email.delay(user.id, reset_token, user.partnership.name, user.partnership.logo, user.partnership.partner_url)
flash(_('An email has been sent to %(email)s.',
email=user.email), 'success')
except Exception as e:
log.error(traceback.format_exc())
db.session.rollback()
flash(_("Error converting the agent to an user. The email address is used by another user. "
"Try a different email address."), 'danger')
else:
user = User.query.filter(User.id == agent.user_id, User.is_deactivated == True, func.lower(User.email) == func.lower(agent.email)).first()
if user:
user.partnership_id = current_user.partnership_id
user.partnership_account_id = partnership_account_id
user.company = current_user.company
user.is_deactivated = False
user.role = 'agent'
user.deactivated_on = None
db.session.commit()
reset_token = user.serialize_token()
from ...blueprints.user.tasks import send_agent_invitation_email
send_agent_invitation_email.delay(user.id, reset_token, user.partnership.name, user.partnership.logo, user.partnership.partner_url)
flash(_('An email has been sent to %(email)s.',
email=user.email), 'success')
else:
flash(_('The agent is already an user.'), 'warning')
user = agent.user
return redirect(url_for('agents.call_agents'))
@agents.route('/account/agent_invite', methods=['GET', 'POST'])
@anonymous_required()
def agent_invite(reset_token=None):
from buyercall.blueprints.user.models import User
from buyercall.blueprints.user.forms import PasswordResetForm
form = PasswordResetForm(reset_token=request.args.get('reset_token'))
if form.validate_on_submit():
u = User.deserialize_token(request.form.get('reset_token'))
if u is None:
flash(_('Your reset token has expired or was tampered with.'),
'danger')
return redirect(url_for('user.login'))
form.populate_obj(u)
u.password = User.encrypt_password(request.form.get('password', None))
u.save()
if login_user(u):
flash(_('Your password has been reset.'), 'success')
return redirect(url_for('user.settings'))
return render_template('agents/create_credentials.jinja2', form=form)
# This function is used to create new groups
@agents.route('/groups/new', methods=['GET', 'POST'])
@subscription_required
@login_required
@role_required('admin', 'partner')
def group_new():
form = GroupForm()
partnership_account_id = current_user.partnership_account_id
if form.validate_on_submit():
is_admin_in_group = current_user.is_admin_user_with_groups
viewing_partnership_account = current_user.is_viewing_partnership
# Check if being viewed by super partner
if is_admin_in_group and viewing_partnership_account:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
elif not viewing_partnership_account and is_admin_in_group:
return redirect(url_for('partnership.company_accounts'))
g = Agent()
g.partnership_account_id = partnership_account_id
g.firstname = form.name.data
g.lastname = ''
g.email = ''
g.phonenumber = ''
g.mobile = ''
g.description = form.description.data
g.is_group = True
load_agents(g, form.group_agents.data)
g.save()
return redirect(url_for('agents.call_agents'))
agents = Agent.query.filter(
Agent.partnership_account_id == partnership_account_id,
Agent.is_group == False, # noqa
).all()
return render_template('agents/new_group.jinja2', form=form, agents=agents)
# This function is used to edit groups
@agents.route('/groups/edit/<int:id>', methods=['GET', 'POST'])
@subscription_required
@login_required
def group_edit(id):
form = GroupForm()
partnership_account_id = current_user.partnership_account_id
is_admin_in_group = current_user.is_admin_user_with_groups
viewing_partnership_account = current_user.is_viewing_partnership
# Check if being viewed by super partner
if is_admin_in_group and viewing_partnership_account:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
elif not viewing_partnership_account and is_admin_in_group:
return redirect(url_for('partnership.company_accounts'))
g = Agent.query.filter(
Agent.id == id,
Agent.is_group == True # noqa
).first()
if g is None:
return redirect(url_for('agents.group_new'))
if form.validate_on_submit():
g.firstname = form.name.data
g.description = form.description.data
load_agents(g, form.group_agents.data)
g.save()
return redirect(url_for('agents.call_agents'))
form.name.data = g.firstname
form.description.data = g.description
form.group_agents.data = ','.join(str(a.id) for a in g.agents)
all_agents = Agent.query.options(
load_only('id', 'firstname', 'lastname')
).filter(
Agent.partnership_account_id == partnership_account_id,
Agent.is_group == False # noqa
).all()
available_agents = [x for x in all_agents if x not in g.agents]
return render_template(
'agents/edit_group.jinja2', form=form, group=g,
agents=available_agents, group_agents=g.agents
)
def load_agents(group, agent_string):
""" Parses a comma-separated string, containing the agent IDs, and assigns
them to the group.
"""
partnership_account_id = current_user.partnership_account_id
viewing_partnership_account = current_user.is_viewing_partnership
# Check if being viewed by super partner
if viewing_partnership_account:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
ids = [int(id) for id in agent_string.split(',') if id]
agents = Agent.query.filter(
Agent.partnership_account_id == partnership_account_id,
Agent.is_group == False, # noqa
Agent.id.in_(ids),
).all()
for agent in list(group.agents):
group.agents.remove(agent)
for agent in agents:
group.agents.append(agent)
def load_groups(agent, group_string):
""" Parses a comma-separated string, containing the group IDs, and assigns
them to the agent.
"""
partnership_account_id = current_user.partnership_account_id
viewing_partnership_account = current_user.is_viewing_partnership
# Check if being viewed by super partner
if viewing_partnership_account:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
ids = [int(x) for x in group_string.split(',') if x]
new_groups = Agent.query.filter(
Agent.partnership_account_id == partnership_account_id,
Agent.is_group == True, # noqa
Agent.id.in_(ids)
)
for g in agent.groups:
agent.groups.remove(g)
for g in new_groups:
agent.groups.append(g)