File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/blueprints/widgets/bw_tasks.py
import logging as log
import random
import traceback
import redis
from flask import url_for, current_app
from buyercall.app import create_celery_app
from buyercall.extensions import db
from buyercall.lib.util_twilio import (
CallStorage,
InboundCallState as State,
select_voice,
)
from buyercall.lib.util_bandwidth import bw_client
from buyercall.lib.bandwidth import (
BandwidthException,
)
from buyercall.blueprints.leads.models import Lead
from buyercall.blueprints.agents.models import Agent
DAYS = 86400 # The length of a day in seconds
DIAL_TONE_URL = "https://s3.amazonaws.com/buyercall-static-sounds/dialtone.mp3"
MSG_LEAD_CALL_ERROR = 'We\'re sorry, there was an error calling the leed.'
celery = create_celery_app(current_app)
@celery.task()
def call_again(lead_id, **kwargs):
# Declare redis url
redis_db = redis.StrictRedis(host=celery.conf['REDIS_CONFIG_URL'], port=celery.conf['REDIS_CONFIG_PORT'])
from .routing import (
BandwidthRouting as Routing, send_notify_email
)
log.info('Trying to call lead again...')
lead = Lead.query.filter(Lead.id == lead_id).first()
if not lead:
log.warning('Lead {} not available.'.format(lead_id))
return
# import partnership information to get partnership id
from buyercall.blueprints.partnership.models import Partnership, PartnershipAccount
partner_account = PartnershipAccount.query \
.filter(PartnershipAccount.id == lead.partnership_account_id).first()
subscription = lead.partnership_account.subscription
if not subscription:
subscription = lead.partnership_account.partnership.subscription
if subscription.usage_over_limit:
log.warning('Partnership account {} ({}) has exceeded their quota.'.format(
lead.partnership_account.name,
lead.partnership_account_id
))
return
if lead.status in ['ringing', 'completed']:
log.debug('Lead is currently in another call')
return
storage = CallStorage(redis_db, lead.id)
storage.inc_callback_cnt()
storage.agent_id = None
# Route the call
agent_ids = [
asg.agent_id
for asg in sorted(lead.widget.assignments, key=lambda x: x.order)
if asg.agent.available_now
]
if not agent_ids:
log.warning('No agents were configured for this widget.')
return
lead.status = 'ringing'
lead.call_count += 1
db.session.commit()
if lead.widget.options['routeSimultaneously']:
call_parallel(lead.id)
return
if lead.widget.options['routeRandomly']:
random.shuffle(agent_ids)
client = bw_client(partner_account.partnership_id, 'voice')
r = Routing(client)
try:
r.route_sequential(agent_ids, lead.widget, lead)
except Exception as e:
log.error(traceback.format_exc())
lead.status = 'missed'
db.session.commit()
send_notify_email(lead, lead.widget)
@celery.task
def call_parallel(lead_id, **kwargs):
# Declare redis url
redis_db = redis.StrictRedis(host=celery.conf['REDIS_CONFIG_URL'], port=celery.conf['REDIS_CONFIG_PORT'])
from .routing import (
BandwidthRouting as Routing,
# AGENT_TIMEOUT,
send_notify_email,
get_available_agent_ids
)
log.info('Parallel call task started.')
lead = Lead.query.join(Lead.widget).filter(Lead.id == lead_id).one()
widget = lead.widget
# import partnership information to get partnership id
from buyercall.blueprints.partnership.models import Partnership, PartnershipAccount
partner_account = PartnershipAccount.query \
.filter(PartnershipAccount.id == lead.partnership_account_id).first()
agent_ids = get_available_agent_ids(widget)
print('Agent ids: ' + str(agent_ids))
if not agent_ids:
# TODO: Notify lead
log.warning('No agents are available for widget {}.'.format(widget.id))
lead.status = 'missed'
db.session.commit()
send_notify_email(lead, lead.widget)
storage = CallStorage(redis_db, lead_id)
storage.state = State.NEW
storage.clear_retry_cnt()
called_agents_key = 'LIST{}'.format(lead_id)
lead_capture_key = 'SUCC{}'.format(lead_id) # 1 if the lead was captured
connect_key = 'CONNECT{}'.format(lead_id)
redis_db.setex(lead_capture_key, 2 * DAYS, '0')
redis_db.setex(connect_key, 2 * DAYS, '0')
agents = Agent.query.filter(Agent.id.in_(agent_ids)).all()
log.info('Calling {} agents...'.format(len(agents)))
call = None
client = bw_client(partner_account.partnership_id, 'voice')
r = Routing(client)
for agent in agents:
try:
agent_busy = redis_db.get('BUSY_{}'.format(agent.id))
log.info('agent busy: {}'.format(agent_busy))
called_agents_key = 'LIST{}'.format(lead_id)
calls = redis_db.lrange(called_agents_key, 0, -1)
log.info('The calls are: {}'.format(calls))
call = r.call_agent_parallel(lead, agent, widget)
# Did any agent pick up?
with storage.lock():
if storage.agent_id and storage.agent_id != str(agent.id):
call.hangup()
break
redis_db.lpush(
called_agents_key, '{}_{}'.format(agent.id, call)
)
except Exception as e:
log.warning(traceback.format_exc())
log.warning('Error calling agent {}...'.format(
agent.id
))
if call is None:
# None of the calls got through
redis_db.setex(connect_key, 2 * DAYS, '-1')
lead.status = 'missed'
db.session.commit()
redis_db.expire(called_agents_key, 2 * DAYS)
@celery.task
def bw_call_lead(lead_id, agent_id, **kwargs):
""" Play dial tone on the agent's side and call the lead.
"""
# Declare redis url
redis_db = redis.StrictRedis(host=celery.conf['REDIS_CONFIG_URL'], port=celery.conf['REDIS_CONFIG_PORT'])
storage = CallStorage(redis_db, lead_id)
lead = Lead.query.join(Lead.widget).filter(Lead.id == lead_id).one()
widget = lead.widget
# import partnership information to get partnership id
from buyercall.blueprints.partnership.models import Partnership, PartnershipAccount
partner_account = PartnershipAccount.query \
.filter(PartnershipAccount.id == lead.partnership_account_id).first()
partner = Partnership.query.filter(Partnership.id == partner_account.partnership_id).first()
client = bw_client(partner.id)
agent_call_id_decoded = storage.agent_call_id.decode()
try:
client.calls[agent_call_id_decoded].audio(
file_url=DIAL_TONE_URL,
loop_enabled='true'
)
except BandwidthException:
log.error(traceback.format_exc())
try:
call_lead = client.call(
from_=widget.inbound.phonenumber,
to=lead.phonenumber,
callback_url=url_for(
'bw_outbound.lead_status_callback',
lead_id=lead.id,
agent_id=agent_id,
_external=True,
_scheme='https'
),
callback_http_method='POST',
conference_id=storage.bw_conf_id
)
# Save the lead id to Redis storage
storage.call_sid = call_lead.id
lead.call_sid = call_lead.id
db.session.commit()
except BandwidthException:
log.error(traceback.format_exc())
language = lead.widget.options.get('language', 'en')
gender, locale, voice = select_voice(language)
client.calls[storage.agent_call_id].audio(file_url='')
client.calls[storage.agent_call_id].audio(
sentence=MSG_LEAD_CALL_ERROR,
tag='lead_call_error',
gender=gender,
locale=locale,
voice=voice,
)