HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/blueprints/phonenumbers/routing.py
import logging as log
import traceback

from sqlalchemy.orm import (
    load_only,
    subqueryload,
)
from flask import (
    url_for,
)

from .models import (
    HoldMusic,
)
from ..agents.models import Agent
from ..leads.models import Lead
from buyercall.lib.util_twilio import (
    InboundCallState as State, BusyAgents
)

HOURS = 3600
DAYS = 86400  # The length of a day in seconds

DEFAULT_HOLD_MUSIC_URL = "https://s3.amazonaws.com/buyercall-static-sounds/holdmusic.mp3"  # noqa

DEFAULT_DIALTONE_URL = "https://s3.amazonaws.com/buyercall-static-sounds/dialtone.mp3"  # noqa


def get_routing_agents(routing, expand_groups=False):
    """Return a list of available agents for this routing.
    """
    agent_ids = []
    for a in routing['agents']:
        if a['id']:
            try:
                agent_ids.append(int(a['id']))
            except Exception as e:
                log.error('Error retrieving agent id of {} for {} from routing. Error: {}'.format(a['id'],
                                                                                                  a['fullName'],
                                                                                                  traceback.format_exc()))
    agents = get_available_agents(agent_ids, expand_groups)
    log.debug('%d available agents' % len(agents))
    return agents


def get_sms_routing_agents(routing, expand_groups=False):
    """Return a list of routing agents for this sms sending.
        This function is a combination of the get_routing_agents
        and get_available_agents, except it doesn't care if
        the agent is available. It will always send the message
    """
    agent_ids = [int(a['id']) for a in routing['agents']]

    result = []

    if expand_groups:
        group_list = Agent.query.options(
            subqueryload(Agent.agents)
        ).filter(
            Agent.is_group.is_(True),
            Agent.is_deactivated.is_(False),
            Agent.id.in_(agent_ids)
        ).all()
        groups = {g.id: g for g in group_list}

        agent_list = Agent.query.filter(
            Agent.is_group.is_(False),
            Agent.is_deactivated.is_(False),
            Agent.id.in_(agent_ids),
        ).all()
        agents = {a.id: a for a in agent_list}

        # Preserve order
        for id_ in agent_ids:
            if id_ in groups:
                result.extend(groups[id_].agents)
            elif id_ in agents:
                result.append(agents[id_])
    else:
        agents = Agent.query.filter(Agent.id.in_(agent_ids), Agent.is_deactivated.is_(False),).all()
        result = agents

    log.debug('%d sms agent(s) going to receive sms messages' % len(agents))
    return result


def get_available_agents(agent_ids, expand_groups=False):
    """Select the currently available agents from a collection of agent IDs.

    :return: a list of Agent objects
    """
    result = []

    if expand_groups:
        group_list = Agent.query.options(
            subqueryload(Agent.agents)
        ).filter(
            Agent.is_group.is_(True),
            Agent.is_deactivated.is_(False),
            Agent.id.in_(agent_ids)
        ).all()
        groups = {g.id: g for g in group_list}

        agent_list = Agent.query.filter(
            Agent.is_group.is_(False),
            Agent.is_deactivated.is_(False),
            Agent.id.in_(agent_ids),
        ).all()
        agents = {a.id: a for a in agent_list}

        # Preserve order
        for id_ in agent_ids:
            if id_ in groups:
                result.extend(groups[id_].agents)
            elif id_ in agents:
                result.append(agents[id_])
    else:
        agent_list = Agent.query.filter(Agent.id.in_(agent_ids), Agent.is_deactivated.is_(False)).all()
        agents = {a.id: a for a in agent_list}
        # Preserve order
        for id_ in agent_ids:
            if id_ in agents:
                result.append(agents[id_])

    result = [a for a in result if a.available_now]
    return result


def schedule_callback(storage):
    from .tasks import callback_lead

    lead_id = storage.lead_id
    routing_config = storage.routing_config

    if not routing_config.get('callBack'):
        return False

    cnt = storage.callback_cnt
    routing = storage.routing
    delay_minutes = None

    if not routing:
        return False

    if cnt == 0:
        delay_minutes = routing_config['firstCallBack']
    elif cnt == 1:
        delay_minutes = routing_config['secondCallBack']
    elif cnt == 2:
        delay_minutes = routing_config['thirdCallBack']

    if not delay_minutes:
        Lead.query.filter(Lead.id == lead_id).update(
            {Lead.status: 'missed'},
            synchronize_session='evaluate'
        )
        return False

    delay_seconds = int(delay_minutes) * 60
    Lead.query.filter(Lead.id == lead_id).update(
        {Lead.status: 'retry-pending'},
        synchronize_session='evaluate'
    )
    storage.state = State.CALL_ME_BACK
    callback_lead.apply_async(args=[lead_id], countdown=delay_seconds)
    return True


def get_agent_number(agent, routing):
    contact_method = 'phone'
    for a in routing['agents']:
        if a['id'] == agent.id:
            if 'contactUsing' in a:
                contact_method = a['contactUsing']

    if contact_method == 'phone':
        log.info('The agent phone number is {}'.format(agent.phonenumber))
        return agent.phonenumber, None
    elif contact_method == 'extension':
        log.info('The agent phone extension is {}'.format(agent.extension))
        return agent.phonenumber, agent.extension
    elif contact_method == 'mobile':
        log.info('The agent mobile number is {}'.format(agent.mobile))
        return agent.mobile, None
    elif contact_method == 'app':
        log.info('The agent app phone number is {}'.format(agent.app_number))
        return agent.app_number, None

    return None, None


def hold_music_url(routing_config):
    if not routing_config.get('playHoldMusic'):
        return DEFAULT_DIALTONE_URL

    if not routing_config.get('customHoldMusic'):
        return DEFAULT_HOLD_MUSIC_URL

    # TODO: Upload hold music to S3
    guid = routing_config.get('holdMusicId')
    hold_music = HoldMusic.query.options(
        load_only("id", "uuid", "url")
    ).filter(HoldMusic.uuid == guid).first()

    if not hold_music or not hold_music.url:
        log.error('Could not find stored custom hold music in DB')
        return DEFAULT_HOLD_MUSIC_URL

    return hold_music.url or url_for(
        'phonenumbers.hold_music',
        file_guid=guid,
        _external=True,
        _scheme='https'
    )


def get_agent_text(
    hidden_information, lead_name=None, callback=False, manual_call=False
):
    """ Builds the text that the call center agent will head before picking up a
    call.
    """
    agent_text = '..'

    if lead_name:
        agent_text += 'The caller name is ..{}.....'.format(lead_name)

    if hidden_information and not manual_call:
        agent_text += '...{}... '.format(hidden_information)

    if callback and not manual_call:
        agent_text += 'This is a scheduled callback to a missed lead. '

    return agent_text