HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/lib/util_twilio.py
import json
import logging
import time

import redis
from flask import current_app as app
from twilio.rest import Client

from buyercall.lib.util_crypto import AESCipher
from buyercall.extensions import db

log = logging.getLogger(__name__)

HOURS = 3600
DAYS = 86400  # The length of a day in seconds


def account_client(partnership_id):
    from buyercall.blueprints.partnership.models import PartnershipCpaasProviders
    partner = partnership_id
    # Get the encryption key to decrypt credentials
    encrypt_key = app.config['CRYPTO_SECRET_KEY']
    # Retrieve the twilio credentials
    partner_cpaas = PartnershipCpaasProviders.partnership_twilio_credentials(partner)
    cipher = AESCipher(encrypt_key)

    # Decrypt the credentials
    decrypted_user_id = cipher.decrypt(partner_cpaas.cpaas_account_id)
    decrypted_token = cipher.decrypt(partner_cpaas.cpaas_api_token)

    if app.config.get('USE_TEST_API', False):
        twilio_client = Client(
            decrypted_user_id,
            decrypted_token,
        )
    else:
        twilio_client = Client(
            decrypted_user_id,
            decrypted_token
        )

    return twilio_client


def subaccount_client(subaccount_sid, partnership_id):
    from buyercall.blueprints.partnership.models import PartnershipCpaasProviders
    partner = partnership_id
    # Get the encryption key to decrypt credentials
    encrypt_key = app.config['CRYPTO_SECRET_KEY']
    # Retrieve the twilio credentials
    partner_cpaas = PartnershipCpaasProviders.partnership_twilio_credentials(partner)
    cipher = AESCipher(encrypt_key)

    # Decrypt the credentials
    decrypted_user_id = cipher.decrypt(partner_cpaas.cpaas_account_id)
    decrypted_token = cipher.decrypt(partner_cpaas.cpaas_api_token)

    twilio_client = Client(
        decrypted_user_id,
        decrypted_token,
        account_sid=subaccount_sid
    )
    return twilio_client


def bw_client(partnership_id):
    from buyercall.lib.bandwidth import Bandwidth
    from buyercall.blueprints.partnership.models import PartnershipCpaasProviders
    # Get the encryption key to decrypt credentials
    encrypt_key = app.config['CRYPTO_SECRET_KEY']
    partnership = partnership_id
    # Retrieve the Bandwidth credentials
    partner_cpaas = PartnershipCpaasProviders.partnership_bandwidth_credentials(partnership)
    cipher = AESCipher(encrypt_key)
    # Decrypt the credentials
    decrypted_token = cipher.decrypt(partner_cpaas.cpaas_api_token)
    decrypted_secret = cipher.decrypt(partner_cpaas.cpaas_api_secret)
    decrypted_user_id = cipher.decrypt(partner_cpaas.cpaas_user_id)

    client = Bandwidth(
        decrypted_token,
        decrypted_secret,
        decrypted_user_id,
    )
    return client


def bw_dashboard_client(partnership_id):
    partnership = partnership_id
    from buyercall.blueprints.partnership.models import PartnershipCpaasProviders
    from buyercall.lib.bandwidth_api_v2 import Bandwidth
    # Get the encryption key to decrypt credentials
    encrypt_key = app.config['CRYPTO_SECRET_KEY']
    # Retrieve the Bandwidth credentials
    partner_cpaas = PartnershipCpaasProviders.partnership_bandwidth_credentials(partnership)
    cipher = AESCipher(encrypt_key)
    # Decrypt the credentials
    decrypted_account_id = cipher.decrypt(partner_cpaas.cpaas_account_id)
    decrypted_api_username = cipher.decrypt(partner_cpaas.cpaas_api_username)
    decrypted_api_password = cipher.decrypt(partner_cpaas.cpaas_api_password)
    site_id = partner_cpaas.cpaas_site_id
    location_id = partner_cpaas.cpaas_location_id
    client = Bandwidth(
        decrypted_api_username,
        decrypted_api_password,
        decrypted_account_id,
        site_id,
        location_id
    )
    return client


def select_voice(language):
    gender = 'female'
    if language == 'es':
        locale, voice = 'es_mx', 'esperanza'
    else:
        locale, voice = 'en_US', 'susan'

    return gender, locale, voice


def update_response_time(call_status, agent_id, lead_id, redis_db):
    from buyercall.blueprints.leads.models import Lead

    time_key = 'TIME_{}_{}'.format(lead_id, agent_id)

    if call_status == 'ringing':
        timestamp_now = time.time()
        redis_db.setnx(time_key, timestamp_now)
        redis_db.expire(time_key, 2 * DAYS)
    elif call_status == 'in-progress':
        timestamp_now = time.time()
        timestamp_then = float(redis_db.get(time_key) or 0)
        if timestamp_then:
            Lead.query.filter(Lead.id == lead_id).update({
                Lead.response_time_seconds: timestamp_now - timestamp_then
            })
            db.session.commit()


def split_name(name):
    """ Split a name into (first name, last name) tuple.
    """
    full_name = name or 'Unknown'
    first_name, last_name = full_name, ''
    if ' ' in full_name:
        first_name, last_name = full_name.split(' ', 1)
    return first_name, last_name


class BusyAgents(object):
    """ Track the agents that are currently busy in a call with a user.
    """

    @staticmethod
    def filter_from(all_agents):
        """ Return only those agents from `all_agents` who are not busy.

        :param all_agents: a list of agent ids
        """
        # Declare redis url
        redis_db = redis.StrictRedis(host=app.config['REDIS_CONFIG_URL'], port=app.config['REDIS_CONFIG_PORT'])
        return [
            agent_id for agent_id in all_agents
            if not redis_db.get('BUSY_{}'.format(agent_id))
        ]

    @staticmethod
    def add(agent_id):
        """ Mark an agent as busy."""
        # Declare redis url
        redis_db = redis.StrictRedis(host=app.config['REDIS_CONFIG_URL'], port=app.config['REDIS_CONFIG_PORT'])
        # The 10-minute expiry time is there to ensure that we don't
        # accidentally receive the `in-progress`, `completed` events for a call
        # in the opposite order, so the agent stays busy forever.
        redis_db.setex('BUSY_{}'.format(agent_id), 600, 1)

    @staticmethod
    def remove(agent_id):
        """ Mark an agent as available."""
        # Declare redis url
        redis_db = redis.StrictRedis(host=app.config['REDIS_CONFIG_URL'], port=app.config['REDIS_CONFIG_PORT'])

        redis_db.delete('BUSY_{}'.format(agent_id))


class RedisAttr(object):
    def __init__(self, name):
        self.__name = name

    def __set__(self, instance, value):
        log.debug("{} -> {}".format(self.__name, value))
        if value is None:
            instance.redis_db.hdel(instance._lead_key, self.__name)
        else:
            instance.redis_db.hset(instance._lead_key, self.__name, value)

    def __get__(self, instance, owner):
        return instance.redis_db.hget(instance._lead_key, self.__name)


class BoolAttr(RedisAttr):
    def __set__(self, instance, value):
        redis_value = 'x' if bool(value) else ''
        super(BoolAttr, self).__set__(instance, redis_value)

    def __get__(self, instance, owner):
        redis_value = super(BoolAttr, self).__get__(instance, owner)
        return bool(redis_value)


class JsonAttr(RedisAttr):
    def __init__(self, name):
        super(JsonAttr, self).__init__(name)

    def __set__(self, instance, dict_):
        value = json.dumps(dict_)
        super(JsonAttr, self).__set__(instance, value)

    def __get__(self, instance, owner):
        value = super(JsonAttr, self).__get__(instance, owner)
        if not value:
            return None
        if isinstance(value, bytes):
            value = value.decode()
        return json.loads(value)


class CallStorage(object):
    """ A wrapper over a Redis database for storing call-related information.
    """
    state = JsonAttr('state')

    # The agents call order
    call_order = RedisAttr('call_order')

    # Indicates if it's an outbound call initiated by agent
    agent_outbound_call = JsonAttr('agent_outbound_call')

    # The Twilio/BW call ID of the agent who took the call
    agent_call_id = RedisAttr('agent_call_id')

    # The Twilio/BW lead call ID
    call_sid = JsonAttr('call_sid')

    # Bridge call leg A call id
    bridge_call_sid_a = JsonAttr('bridge_call_sid_a')

    # Bridge call leg B call id
    bridge_call_sid_b = JsonAttr('bridge_call_sid_b')

    caller_name = JsonAttr('caller_name')

    caller_number = JsonAttr('caller_number')

    # The Twilio subaccount SID of the user
    subaccount_sid = JsonAttr('subaccount_sid')

    prompt_task = RedisAttr('prompt_task')

    # The ID of the agent who took the call
    agent_id = JsonAttr('agent_id')

    manual_call = JsonAttr('manual_call')

    # Send only one webhook for agents end call if no agents are available
    agent_disconnect_webhook_sent = JsonAttr('agent_disconnect_webhook_sent')

    # Just the current routing configuration (agents assigned, call order)
    routing_config = JsonAttr('routing_config')

    # The entire phone configuration
    routing = JsonAttr('routing')

    # The Bandwidth conference ID
    bw_conf_id = RedisAttr('bw_conf_id')

    # The Bandwidth conference ID
    bw_conf_member_id = RedisAttr('bw_conf_member_id')

    # The Bandwidth phone number
    inbound = RedisAttr('inbound')

    # The timestamp when a Bandwidth call starts
    start_time = JsonAttr('start_time')

    # The timestamp when an agent picks up
    connect_time = JsonAttr('connect_time')

    # The id of agent or group that receives first call in routing
    first_call_agent_id = JsonAttr('first_call_agent_id')

    # The cause why a call ended
    call_cause = JsonAttr('call_cause')

    # The cause why a call ended
    cause_description = JsonAttr('cause_description')

    # A list of agent ids and phone numbers that's associated with the inbound call
    agent_list = JsonAttr('agent_list')

    # This is used to save the digit that was pressed for digit routing
    pressed_digit = RedisAttr('pressed_digit')

    # Whether we are currently calling a group
    is_group = BoolAttr('is_group')

    # Indicates if the call is a retry call or still the initial call
    is_call_retry = BoolAttr('is_call_retry')

    # The sip endpoint id
    sip_endpoint_id = JsonAttr('sip_endpoint_id')

    # The sip endpoint id
    sip_call_direction = JsonAttr('sip_call_direction')

    def __init__(self, redis_db, lead_id):
        self.redis_db = redis_db
        self.lead_id = lead_id

        self._lead_key = 'LEAD:{}'.format(self.lead_id)
        self._callback_cnt_key = 'CALLBACK_CNT_{}'.format(self.lead_id)
        self._retry_key = 'RETRY_{}'.format(self.lead_id)
        self._lock_key = 'LOCK_{}'.format(self.lead_id)
        self._agents_key = 'AGENTS_{}'.format(self.lead_id)
        self._agent_calls_key = 'CALLS_{}'.format(self.lead_id)

    def init(self):
        self.redis_db.hset(self._lead_key, 'lead_id', self.lead_id)
        self.redis_db.expire(self._lead_key, 4 * DAYS)

    @property
    def agents_to_call(self):
        return [int(x) for x in self.redis_db.lrange(self._agents_key, 0, -1)]

    @property
    def current_agent_calls(self):
        def to_agent_call_tuple(item):
            tpl = item.split('_')
            return int(tpl[0]), tpl[1]
        return [
            to_agent_call_tuple(x)
            for x in self.redis_db.smembers(self._agent_calls_key)
        ]

    def set_agents_to_call(self, agent_ids):
        if agent_ids:
            self.redis_db.lpush(self._agents_key, *reversed(agent_ids))

    def next_agent_to_call(self):
        value = self.redis_db.lpop(self._agents_key)
        return int(value) if value else None

    def push_agent_call(self, agent_id, call_sid):
        value = '{}_{}'.format(agent_id, call_sid)
        self.redis_db.sadd(self._agent_calls_key, value)
        self.redis_db.expire(self._agent_calls_key, 4 * DAYS)

    def remove_agent_call(self, agent_id, call_sid):
        value = '{}_{}'.format(agent_id, call_sid)
        with self.lock():
            self.redis_db.srem(self._agent_calls_key, value)
            return self.redis_db.scard(self._agent_calls_key)

    def clear_agent_calls(self, predicate=None):
        """ Delete from Redis, and return a list of (agent_id, call_sid) pairs,
        representing the in-progress agent call attempts for the current lead.
        """
        with self.lock():
            values = self.redis_db.smembers(self._agent_calls_key)
            values = (value.decode() if isinstance(value, bytes) else value for value in values)
            agent_sids = [
                (int(lst[0]), lst[1])
                for lst in (x.split('_') for x in values)
                if not predicate or predicate(int(lst[0]), lst[1])
            ]
            if agent_sids:
                self.redis_db.srem(self._agent_calls_key, *[
                    '{}_{}'.format(agent_id, call_sid)
                    for agent_id, call_sid in agent_sids
                ])

        return agent_sids

    def clear_callback_cnt(self):
        self.redis_db.setex(self._callback_cnt_key, 4 * DAYS, 0)

    @property
    def callback_cnt(self):
        return int(self.redis_db.get(self._callback_cnt_key))

    def inc_callback_cnt(self):
        return self.redis_db.incr(self._callback_cnt_key)

    def clear_retry_cnt(self):
        self.redis_db.setex(self._retry_key, 2 * HOURS, 0)

    def inc_retry_cnt(self):
        return self.redis_db.incr(self._retry_key)

    def lock(self):
        return self.redis_db.lock(self._lock_key, timeout=10)


class InboundCallState(object):
    NEW = 'NEW'
    CALLBACK = 'CALLBACK'
    LEAD_ONHOLD = 'LEAD_ONHOLD'
    AGENT_ONHOLD = 'AGENT_ONHOLD'
    CALLBACK_PROMPT = 'CALLBACK_PROMPT'
    ANSWERED = 'ANSWERED'
    ONGOING = 'ONGOING'
    MISSED = 'MISSED'
    BLOCKED = 'BLOCKED'
    ERROR = 'ERROR'
    MACHINE = 'MACHINE'
    CAPTURED = 'CAPTURED'
    CALL_ME_BACK = 'CALL_ME_BACK'
    DIGIT_MISSED = 'DIGIT_MISSED'