HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/blueprints/widgets/bw_tasks.py
import logging as log
import random
import traceback

import redis
from flask import url_for, current_app

from buyercall.app import create_celery_app
from buyercall.extensions import db
from buyercall.lib.util_twilio import (
    CallStorage,
    InboundCallState as State,
    select_voice,
)
from buyercall.lib.util_bandwidth import bw_client
from buyercall.lib.bandwidth import (
    BandwidthException,
)
from buyercall.blueprints.leads.models import Lead
from buyercall.blueprints.agents.models import Agent


DAYS = 86400  # The length of a day in seconds

DIAL_TONE_URL = "https://s3.amazonaws.com/buyercall-static-sounds/dialtone.mp3"

MSG_LEAD_CALL_ERROR = 'We\'re sorry, there was an error calling the leed.'

celery = create_celery_app(current_app)


@celery.task()
def call_again(lead_id, **kwargs):
    # Declare redis url
    redis_db = redis.StrictRedis(host=celery.conf['REDIS_CONFIG_URL'], port=celery.conf['REDIS_CONFIG_PORT'])

    from .routing import (
        BandwidthRouting as Routing, send_notify_email
    )

    log.info('Trying to call lead again...')

    lead = Lead.query.filter(Lead.id == lead_id).first()
    if not lead:
        log.warning('Lead {} not available.'.format(lead_id))
        return

    # import partnership information to get partnership id
    from buyercall.blueprints.partnership.models import Partnership, PartnershipAccount
    partner_account = PartnershipAccount.query \
        .filter(PartnershipAccount.id == lead.partnership_account_id).first()

    subscription = lead.partnership_account.subscription
    if not subscription:
        subscription = lead.partnership_account.partnership.subscription

    if subscription.usage_over_limit:
        log.warning('Partnership account {} ({}) has exceeded their quota.'.format(
            lead.partnership_account.name,
            lead.partnership_account_id
        ))
        return

    if lead.status in ['ringing', 'completed']:
        log.debug('Lead is currently in another call')
        return

    storage = CallStorage(redis_db, lead.id)
    storage.inc_callback_cnt()
    storage.agent_id = None

    # Route the call
    agent_ids = [
        asg.agent_id
        for asg in sorted(lead.widget.assignments, key=lambda x: x.order)
        if asg.agent.available_now
    ]

    if not agent_ids:
        log.warning('No agents were configured for this widget.')
        return

    lead.status = 'ringing'
    lead.call_count += 1
    db.session.commit()

    if lead.widget.options['routeSimultaneously']:
        call_parallel(lead.id)
        return

    if lead.widget.options['routeRandomly']:
        random.shuffle(agent_ids)

    client = bw_client(partner_account.partnership_id, 'voice')
    r = Routing(client)

    try:
        r.route_sequential(agent_ids, lead.widget, lead)
    except Exception as e:
        log.error(traceback.format_exc())
        lead.status = 'missed'
        db.session.commit()
        send_notify_email(lead, lead.widget)


@celery.task
def call_parallel(lead_id, **kwargs):
    # Declare redis url
    redis_db = redis.StrictRedis(host=celery.conf['REDIS_CONFIG_URL'], port=celery.conf['REDIS_CONFIG_PORT'])

    from .routing import (
        BandwidthRouting as Routing,
        # AGENT_TIMEOUT,
        send_notify_email,
        get_available_agent_ids
    )

    log.info('Parallel call task started.')

    lead = Lead.query.join(Lead.widget).filter(Lead.id == lead_id).one()
    widget = lead.widget

    # import partnership information to get partnership id
    from buyercall.blueprints.partnership.models import Partnership, PartnershipAccount
    partner_account = PartnershipAccount.query \
        .filter(PartnershipAccount.id == lead.partnership_account_id).first()

    agent_ids = get_available_agent_ids(widget)
    print('Agent ids: ' + str(agent_ids))

    if not agent_ids:
        # TODO: Notify lead
        log.warning('No agents are available for widget {}.'.format(widget.id))
        lead.status = 'missed'
        db.session.commit()
        send_notify_email(lead, lead.widget)

    storage = CallStorage(redis_db, lead_id)
    storage.state = State.NEW
    storage.clear_retry_cnt()

    called_agents_key = 'LIST{}'.format(lead_id)
    lead_capture_key = 'SUCC{}'.format(lead_id)  # 1 if the lead was captured
    connect_key = 'CONNECT{}'.format(lead_id)

    redis_db.setex(lead_capture_key, 2 * DAYS, '0')
    redis_db.setex(connect_key, 2 * DAYS, '0')

    agents = Agent.query.filter(Agent.id.in_(agent_ids)).all()
    log.info('Calling {} agents...'.format(len(agents)))
    call = None

    client = bw_client(partner_account.partnership_id, 'voice')
    r = Routing(client)

    for agent in agents:
        try:
            agent_busy = redis_db.get('BUSY_{}'.format(agent.id))
            log.info('agent busy: {}'.format(agent_busy))
            called_agents_key = 'LIST{}'.format(lead_id)
            calls = redis_db.lrange(called_agents_key, 0, -1)
            log.info('The calls are: {}'.format(calls))
            call = r.call_agent_parallel(lead, agent, widget)

            # Did any agent pick up?
            with storage.lock():
                if storage.agent_id and storage.agent_id != str(agent.id):
                    call.hangup()
                    break
                redis_db.lpush(
                    called_agents_key, '{}_{}'.format(agent.id, call)
                )
        except Exception as e:
            log.warning(traceback.format_exc())
            log.warning('Error calling agent {}...'.format(
                agent.id
            ))

    if call is None:
        # None of the calls got through
        redis_db.setex(connect_key, 2 * DAYS, '-1')
        lead.status = 'missed'
        db.session.commit()

    redis_db.expire(called_agents_key, 2 * DAYS)


@celery.task
def bw_call_lead(lead_id, agent_id, **kwargs):
    """ Play dial tone on the agent's side and call the lead.
    """
    # Declare redis url
    redis_db = redis.StrictRedis(host=celery.conf['REDIS_CONFIG_URL'], port=celery.conf['REDIS_CONFIG_PORT'])

    storage = CallStorage(redis_db, lead_id)

    lead = Lead.query.join(Lead.widget).filter(Lead.id == lead_id).one()
    widget = lead.widget
    # import partnership information to get partnership id
    from buyercall.blueprints.partnership.models import Partnership, PartnershipAccount
    partner_account = PartnershipAccount.query \
        .filter(PartnershipAccount.id == lead.partnership_account_id).first()
    partner = Partnership.query.filter(Partnership.id == partner_account.partnership_id).first()

    client = bw_client(partner.id)
    agent_call_id_decoded = storage.agent_call_id.decode()
    try:
        client.calls[agent_call_id_decoded].audio(
            file_url=DIAL_TONE_URL,
            loop_enabled='true'
        )
    except BandwidthException:
        log.error(traceback.format_exc())
    try:
        call_lead = client.call(
            from_=widget.inbound.phonenumber,
            to=lead.phonenumber,
            callback_url=url_for(
                'bw_outbound.lead_status_callback',
                lead_id=lead.id,
                agent_id=agent_id,
                _external=True,
                _scheme='https'
            ),
            callback_http_method='POST',
            conference_id=storage.bw_conf_id
        )
        # Save the lead id to Redis storage
        storage.call_sid = call_lead.id
        lead.call_sid = call_lead.id
        db.session.commit()
    except BandwidthException:
        log.error(traceback.format_exc())

        language = lead.widget.options.get('language', 'en')
        gender, locale, voice = select_voice(language)

        client.calls[storage.agent_call_id].audio(file_url='')
        client.calls[storage.agent_call_id].audio(
            sentence=MSG_LEAD_CALL_ERROR,
            tag='lead_call_error',
            gender=gender,
            locale=locale,
            voice=voice,
        )