File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/lib/util_twilio.py
import json
import logging
import time
import redis
from flask import current_app as app
from twilio.rest import Client
from buyercall.lib.util_crypto import AESCipher
from buyercall.extensions import db
log = logging.getLogger(__name__)
HOURS = 3600
DAYS = 86400 # The length of a day in seconds
def account_client(partnership_id):
from buyercall.blueprints.partnership.models import PartnershipCpaasProviders
partner = partnership_id
# Get the encryption key to decrypt credentials
encrypt_key = app.config['CRYPTO_SECRET_KEY']
# Retrieve the twilio credentials
partner_cpaas = PartnershipCpaasProviders.partnership_twilio_credentials(partner)
cipher = AESCipher(encrypt_key)
# Decrypt the credentials
decrypted_user_id = cipher.decrypt(partner_cpaas.cpaas_account_id)
decrypted_token = cipher.decrypt(partner_cpaas.cpaas_api_token)
if app.config.get('USE_TEST_API', False):
twilio_client = Client(
decrypted_user_id,
decrypted_token,
)
else:
twilio_client = Client(
decrypted_user_id,
decrypted_token
)
return twilio_client
def subaccount_client(subaccount_sid, partnership_id):
from buyercall.blueprints.partnership.models import PartnershipCpaasProviders
partner = partnership_id
# Get the encryption key to decrypt credentials
encrypt_key = app.config['CRYPTO_SECRET_KEY']
# Retrieve the twilio credentials
partner_cpaas = PartnershipCpaasProviders.partnership_twilio_credentials(partner)
cipher = AESCipher(encrypt_key)
# Decrypt the credentials
decrypted_user_id = cipher.decrypt(partner_cpaas.cpaas_account_id)
decrypted_token = cipher.decrypt(partner_cpaas.cpaas_api_token)
twilio_client = Client(
decrypted_user_id,
decrypted_token,
account_sid=subaccount_sid
)
return twilio_client
def bw_client(partnership_id):
from buyercall.lib.bandwidth import Bandwidth
from buyercall.blueprints.partnership.models import PartnershipCpaasProviders
# Get the encryption key to decrypt credentials
encrypt_key = app.config['CRYPTO_SECRET_KEY']
partnership = partnership_id
# Retrieve the Bandwidth credentials
partner_cpaas = PartnershipCpaasProviders.partnership_bandwidth_credentials(partnership)
cipher = AESCipher(encrypt_key)
# Decrypt the credentials
decrypted_token = cipher.decrypt(partner_cpaas.cpaas_api_token)
decrypted_secret = cipher.decrypt(partner_cpaas.cpaas_api_secret)
decrypted_user_id = cipher.decrypt(partner_cpaas.cpaas_user_id)
client = Bandwidth(
decrypted_token,
decrypted_secret,
decrypted_user_id,
)
return client
def bw_dashboard_client(partnership_id):
partnership = partnership_id
from buyercall.blueprints.partnership.models import PartnershipCpaasProviders
from buyercall.lib.bandwidth_api_v2 import Bandwidth
# Get the encryption key to decrypt credentials
encrypt_key = app.config['CRYPTO_SECRET_KEY']
# Retrieve the Bandwidth credentials
partner_cpaas = PartnershipCpaasProviders.partnership_bandwidth_credentials(partnership)
cipher = AESCipher(encrypt_key)
# Decrypt the credentials
decrypted_account_id = cipher.decrypt(partner_cpaas.cpaas_account_id)
decrypted_api_username = cipher.decrypt(partner_cpaas.cpaas_api_username)
decrypted_api_password = cipher.decrypt(partner_cpaas.cpaas_api_password)
site_id = partner_cpaas.cpaas_site_id
location_id = partner_cpaas.cpaas_location_id
client = Bandwidth(
decrypted_api_username,
decrypted_api_password,
decrypted_account_id,
site_id,
location_id
)
return client
def select_voice(language):
gender = 'female'
if language == 'es':
locale, voice = 'es_mx', 'esperanza'
else:
locale, voice = 'en_US', 'susan'
return gender, locale, voice
def update_response_time(call_status, agent_id, lead_id, redis_db):
from buyercall.blueprints.leads.models import Lead
time_key = 'TIME_{}_{}'.format(lead_id, agent_id)
if call_status == 'ringing':
timestamp_now = time.time()
redis_db.setnx(time_key, timestamp_now)
redis_db.expire(time_key, 2 * DAYS)
elif call_status == 'in-progress':
timestamp_now = time.time()
timestamp_then = float(redis_db.get(time_key) or 0)
if timestamp_then:
Lead.query.filter(Lead.id == lead_id).update({
Lead.response_time_seconds: timestamp_now - timestamp_then
})
db.session.commit()
def split_name(name):
""" Split a name into (first name, last name) tuple.
"""
full_name = name or 'Unknown'
first_name, last_name = full_name, ''
if ' ' in full_name:
first_name, last_name = full_name.split(' ', 1)
return first_name, last_name
class BusyAgents(object):
""" Track the agents that are currently busy in a call with a user.
"""
@staticmethod
def filter_from(all_agents):
""" Return only those agents from `all_agents` who are not busy.
:param all_agents: a list of agent ids
"""
# Declare redis url
redis_db = redis.StrictRedis(host=app.config['REDIS_CONFIG_URL'], port=app.config['REDIS_CONFIG_PORT'])
return [
agent_id for agent_id in all_agents
if not redis_db.get('BUSY_{}'.format(agent_id))
]
@staticmethod
def add(agent_id):
""" Mark an agent as busy."""
# Declare redis url
redis_db = redis.StrictRedis(host=app.config['REDIS_CONFIG_URL'], port=app.config['REDIS_CONFIG_PORT'])
# The 10-minute expiry time is there to ensure that we don't
# accidentally receive the `in-progress`, `completed` events for a call
# in the opposite order, so the agent stays busy forever.
redis_db.setex('BUSY_{}'.format(agent_id), 600, 1)
@staticmethod
def remove(agent_id):
""" Mark an agent as available."""
# Declare redis url
redis_db = redis.StrictRedis(host=app.config['REDIS_CONFIG_URL'], port=app.config['REDIS_CONFIG_PORT'])
redis_db.delete('BUSY_{}'.format(agent_id))
class RedisAttr(object):
def __init__(self, name):
self.__name = name
def __set__(self, instance, value):
log.debug("{} -> {}".format(self.__name, value))
if value is None:
instance.redis_db.hdel(instance._lead_key, self.__name)
else:
instance.redis_db.hset(instance._lead_key, self.__name, value)
def __get__(self, instance, owner):
return instance.redis_db.hget(instance._lead_key, self.__name)
class BoolAttr(RedisAttr):
def __set__(self, instance, value):
redis_value = 'x' if bool(value) else ''
super(BoolAttr, self).__set__(instance, redis_value)
def __get__(self, instance, owner):
redis_value = super(BoolAttr, self).__get__(instance, owner)
return bool(redis_value)
class JsonAttr(RedisAttr):
def __init__(self, name):
super(JsonAttr, self).__init__(name)
def __set__(self, instance, dict_):
value = json.dumps(dict_)
super(JsonAttr, self).__set__(instance, value)
def __get__(self, instance, owner):
value = super(JsonAttr, self).__get__(instance, owner)
if not value:
return None
if isinstance(value, bytes):
value = value.decode()
return json.loads(value)
class CallStorage(object):
""" A wrapper over a Redis database for storing call-related information.
"""
state = JsonAttr('state')
# The agents call order
call_order = RedisAttr('call_order')
# Indicates if it's an outbound call initiated by agent
agent_outbound_call = JsonAttr('agent_outbound_call')
# The Twilio/BW call ID of the agent who took the call
agent_call_id = RedisAttr('agent_call_id')
# The Twilio/BW lead call ID
call_sid = JsonAttr('call_sid')
# Bridge call leg A call id
bridge_call_sid_a = JsonAttr('bridge_call_sid_a')
# Bridge call leg B call id
bridge_call_sid_b = JsonAttr('bridge_call_sid_b')
caller_name = JsonAttr('caller_name')
caller_number = JsonAttr('caller_number')
# The Twilio subaccount SID of the user
subaccount_sid = JsonAttr('subaccount_sid')
prompt_task = RedisAttr('prompt_task')
# The ID of the agent who took the call
agent_id = JsonAttr('agent_id')
manual_call = JsonAttr('manual_call')
# Send only one webhook for agents end call if no agents are available
agent_disconnect_webhook_sent = JsonAttr('agent_disconnect_webhook_sent')
# Just the current routing configuration (agents assigned, call order)
routing_config = JsonAttr('routing_config')
# The entire phone configuration
routing = JsonAttr('routing')
# The Bandwidth conference ID
bw_conf_id = RedisAttr('bw_conf_id')
# The Bandwidth conference ID
bw_conf_member_id = RedisAttr('bw_conf_member_id')
# The Bandwidth phone number
inbound = RedisAttr('inbound')
# The timestamp when a Bandwidth call starts
start_time = JsonAttr('start_time')
# The timestamp when an agent picks up
connect_time = JsonAttr('connect_time')
# The id of agent or group that receives first call in routing
first_call_agent_id = JsonAttr('first_call_agent_id')
# The cause why a call ended
call_cause = JsonAttr('call_cause')
# The cause why a call ended
cause_description = JsonAttr('cause_description')
# A list of agent ids and phone numbers that's associated with the inbound call
agent_list = JsonAttr('agent_list')
# This is used to save the digit that was pressed for digit routing
pressed_digit = RedisAttr('pressed_digit')
# Whether we are currently calling a group
is_group = BoolAttr('is_group')
# Indicates if the call is a retry call or still the initial call
is_call_retry = BoolAttr('is_call_retry')
# The sip endpoint id
sip_endpoint_id = JsonAttr('sip_endpoint_id')
# The sip endpoint id
sip_call_direction = JsonAttr('sip_call_direction')
def __init__(self, redis_db, lead_id):
self.redis_db = redis_db
self.lead_id = lead_id
self._lead_key = 'LEAD:{}'.format(self.lead_id)
self._callback_cnt_key = 'CALLBACK_CNT_{}'.format(self.lead_id)
self._retry_key = 'RETRY_{}'.format(self.lead_id)
self._lock_key = 'LOCK_{}'.format(self.lead_id)
self._agents_key = 'AGENTS_{}'.format(self.lead_id)
self._agent_calls_key = 'CALLS_{}'.format(self.lead_id)
def init(self):
self.redis_db.hset(self._lead_key, 'lead_id', self.lead_id)
self.redis_db.expire(self._lead_key, 4 * DAYS)
@property
def agents_to_call(self):
return [int(x) for x in self.redis_db.lrange(self._agents_key, 0, -1)]
@property
def current_agent_calls(self):
def to_agent_call_tuple(item):
tpl = item.split('_')
return int(tpl[0]), tpl[1]
return [
to_agent_call_tuple(x)
for x in self.redis_db.smembers(self._agent_calls_key)
]
def set_agents_to_call(self, agent_ids):
if agent_ids:
self.redis_db.lpush(self._agents_key, *reversed(agent_ids))
def next_agent_to_call(self):
value = self.redis_db.lpop(self._agents_key)
return int(value) if value else None
def push_agent_call(self, agent_id, call_sid):
value = '{}_{}'.format(agent_id, call_sid)
self.redis_db.sadd(self._agent_calls_key, value)
self.redis_db.expire(self._agent_calls_key, 4 * DAYS)
def remove_agent_call(self, agent_id, call_sid):
value = '{}_{}'.format(agent_id, call_sid)
with self.lock():
self.redis_db.srem(self._agent_calls_key, value)
return self.redis_db.scard(self._agent_calls_key)
def clear_agent_calls(self, predicate=None):
""" Delete from Redis, and return a list of (agent_id, call_sid) pairs,
representing the in-progress agent call attempts for the current lead.
"""
with self.lock():
values = self.redis_db.smembers(self._agent_calls_key)
values = (value.decode() if isinstance(value, bytes) else value for value in values)
agent_sids = [
(int(lst[0]), lst[1])
for lst in (x.split('_') for x in values)
if not predicate or predicate(int(lst[0]), lst[1])
]
if agent_sids:
self.redis_db.srem(self._agent_calls_key, *[
'{}_{}'.format(agent_id, call_sid)
for agent_id, call_sid in agent_sids
])
return agent_sids
def clear_callback_cnt(self):
self.redis_db.setex(self._callback_cnt_key, 4 * DAYS, 0)
@property
def callback_cnt(self):
return int(self.redis_db.get(self._callback_cnt_key))
def inc_callback_cnt(self):
return self.redis_db.incr(self._callback_cnt_key)
def clear_retry_cnt(self):
self.redis_db.setex(self._retry_key, 2 * HOURS, 0)
def inc_retry_cnt(self):
return self.redis_db.incr(self._retry_key)
def lock(self):
return self.redis_db.lock(self._lock_key, timeout=10)
class InboundCallState(object):
NEW = 'NEW'
CALLBACK = 'CALLBACK'
LEAD_ONHOLD = 'LEAD_ONHOLD'
AGENT_ONHOLD = 'AGENT_ONHOLD'
CALLBACK_PROMPT = 'CALLBACK_PROMPT'
ANSWERED = 'ANSWERED'
ONGOING = 'ONGOING'
MISSED = 'MISSED'
BLOCKED = 'BLOCKED'
ERROR = 'ERROR'
MACHINE = 'MACHINE'
CAPTURED = 'CAPTURED'
CALL_ME_BACK = 'CALL_ME_BACK'
DIGIT_MISSED = 'DIGIT_MISSED'