HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall_new/buyercall/buyercall/blueprints/widgets/tasks.py
import os
import logging as log
import random
import traceback

import redis
from flask import url_for, current_app

from buyercall.app import create_celery_app
from buyercall.extensions import db
from buyercall.lib.util_twilio import (
    CallStorage,
    InboundCallState as State,
    subaccount_client,
)
from boto.s3.key import Key
import boto
from buyercall.blueprints.leads.models import Lead
from buyercall.blueprints.agents.models import Agent
from buyercall.blueprints.partnership.models import PartnershipAccount, Partnership
from buyercall.blueprints.contacts.models import Contact


DAYS = 86400  # The length of a day in seconds

celery = create_celery_app(current_app)


@celery.task()
def call_again(lead_id, **kwargs):
    # Declare redis url
    redis_db = redis.StrictRedis(host=celery.conf['REDIS_CONFIG_URL'], port=celery.conf['REDIS_CONFIG_PORT'])

    from .routing import (
        Routing,
        after_call_events,
    )

    log.info('Trying to call lead again...')

    lead = Lead.query.join(Lead.partnership_account).join(PartnershipAccount.partnership).filter(Lead.id == lead_id).first()
    if not lead:
        log.warning('Lead {} not available.'.format(lead_id))
        return

    # import partnership information to get partnership id
    partner_account = PartnershipAccount.query \
        .filter(PartnershipAccount.id == lead.partnership_account_id).first()
    # Get the partner id to get relevant twilio credentails with twilio client
    partner = Partnership.query.filter(Partnership.id == partner_account.partnership_id).first()

    subscription = lead.partnership_account.subscription
    if not subscription:
        subscription = lead.partnership_account.partnership.subscription

    if subscription.usage_over_limit:
        log.warning('Partnership account {} has exceeded their quota.'.format(
            lead.partnership_account_id
        ))
        return

    if lead.status in ['ringing', 'completed']:
        log.debug('Lead is currently in another call')
        return

    storage = CallStorage(redis_db, lead.id)
    storage.inc_callback_cnt()
    storage.agent_id = None

    # Route the call
    agent_ids = [
        asg.agent_id
        for asg in sorted(lead.widget.assignments, key=lambda x: x.order)
        if asg.agent.available_now
    ]

    if not agent_ids:
        log.warning('No agents were configured for this widget.')
        return

    lead.status = 'ringing'
    lead.call_count += 1
    db.session.commit()

    if lead.widget.options['routeSimultaneously']:
        call_parallel(lead.id)
        return

    if lead.widget.options['routeRandomly']:
        random.shuffle(agent_ids)

    subaccount_sid = subscription.twilio_subaccount_sid
    client = subaccount_client(subaccount_sid, partner.id)
    r = Routing(client)

    try:
        r.route_sequential(agent_ids, lead.widget, lead)
    except Exception as e:
        log.error(traceback.format_exc())
        lead.status = 'missed'
        db.session.commit()
        after_call_events(lead, lead.widget)
        # send_notify_email(lead, lead.widget)


@celery.task
def call_parallel(lead_id, **kwargs):
    # Declare redis url
    redis_db = redis.StrictRedis(host=celery.conf['REDIS_CONFIG_URL'], port=celery.conf['REDIS_CONFIG_PORT'])

    from .routing import (
        Routing,
        # AGENT_TIMEOUT,
        after_call_events,
        get_available_agent_ids
    )

    log.info('Parallel call task started.')

    lead = Lead.query.join(Lead.widget).filter(Lead.id == lead_id).one()

    # import partnership information to get partnership id
    partner_account = PartnershipAccount.query \
        .filter(PartnershipAccount.id == lead.partnership_account_id).first()
    # Get the partner id to get relevant twilio credentails with twilio client
    partner = Partnership.query.filter(Partnership.id == partner_account.partnership_id).first()

    widget = lead.widget

    agent_ids = get_available_agent_ids(widget)
    print('Agent ids: {}'.format(agent_ids))

    if not agent_ids:
        # TODO: Notify lead
        log.warning('No agents are available for widget {}.'.format(widget.id))
        lead.status = 'missed'
        db.session.commit()
        after_call_events(lead, lead.widget)

    storage = CallStorage(redis_db, lead_id)
    storage.state = State.NEW
    storage.clear_retry_cnt()

    called_agents_key = 'LIST{}'.format(lead_id)
    lead_capture_key = 'SUCC{}'.format(lead_id)  # 1 if the lead was captured
    connect_key = 'CONNECT{}'.format(lead_id)

    redis_db.setex(lead_capture_key, 2 * DAYS, '0')
    redis_db.setex(connect_key, 2 * DAYS, '0')

    agents = Agent.query.filter(Agent.id.in_(agent_ids)).all()
    log.info('Calling {} agents...'.format(len(agents)))
    call = None

    subscription = widget.partnership_account.subscription
    if not subscription:
        subscription = widget.partnership_account.partnership.subscription
    subaccount_sid = subscription.twilio_subaccount_sid
    client = subaccount_client(subaccount_sid, partner.id)
    r = Routing(client)

    for agent in agents:
        try:
            call = r.call_agent_parallel(lead, agent, widget)

            # Did any agent pick up?
            with storage.lock():
                if storage.agent_id and storage.agent_id != str(agent.id):
                    call.hangup()
                    break
                redis_db.lpush(
                    called_agents_key, '{}_{}'.format(agent.id, call.sid)
                )
        except Exception as e:
            log.warning(traceback.format_exc())
            log.warning('Error calling agent {}...'.format(
                agent.id
            ))

    if call is None or call.sid is None:
        # None of the calls got through
        redis_db.setex(connect_key, 2 * DAYS, '-1')
        lead.status = 'missed'
        db.session.commit()

    redis_db.expire(called_agents_key, 2 * DAYS)


@celery.task
def call_lead(lead_id, agent_id, **kwargs):
    lead = Lead.query.join(Lead.widget).filter(Lead.id == lead_id).one()

    # import partnership information to get partnership id
    partner_account = PartnershipAccount.query \
        .filter(PartnershipAccount.id == lead.partnership_account_id).first()
    # Get the partner id to get relevant twilio credentails with twilio client
    partner = Partnership.query.filter(Partnership.id == partner_account.partnership_id).first()

    widget = lead.widget

    subscription = lead.partnership_account.subscription
    if not subscription:
        subscription = lead.partnership_account.partnership.subscription
    subaccount_sid = subscription.twilio_subaccount_sid
    client = subaccount_client(subaccount_sid, partner.id)

    client.calls.create(
        from_=widget.inbound.phonenumber,
        to=lead.phonenumber,
        url=url_for(
            'twilio_api.lead_greeting',
            lead_id=lead.id,
            agent_id=agent_id,
            _external=True,
            _scheme='https'
        ),
        status_callback=url_for(
            'twilio_api.lead_outbound_call_status',
            lead_id=lead.id,
            _external=True,
            _scheme='https'
        )
    )


@celery.task
def add_contact(phonenumber, firstname, lastname, email, partnership_account_id, **kwargs):
    # Query the leads table to retrieve the latest lead that was added
    latest_lead = Lead.query.filter(Lead.phonenumber == phonenumber).order_by(Lead.created_on.desc()).first()
    log.info('latest lead: {}'.format(latest_lead))

    # Query contacts to see if lead already exist in the contacts table
    contact = Contact.query.\
        filter(Contact.phonenumber_1 == latest_lead.phonenumber).\
        filter(Contact.partnership_account_id == partnership_account_id).first()

    if contact:
        log.info('a lead exist with number {}'.format(contact.phonenumber_1))
        contact.updated_on = latest_lead.updated_on
        if contact.firstname == '':
            contact.firstname = firstname
        if contact.lastname == '':
            contact.lastname = lastname
        if contact.email == '':
            contact.email = email
        log.info('The updated date is: {}'.format(latest_lead.updated_on))
        db.session.commit()
    else:
        contact = Contact(
            firstname=firstname,
            lastname=lastname,
            phonenumber_1=phonenumber,
            email=email,
            partnership_account_id=partnership_account_id,
        )
        db.session.add(contact)
        db.session.commit()
        log.info('The contact has been added')

    new_contact = Contact.query.\
        filter(Contact.phonenumber_1 == latest_lead.phonenumber).\
        filter(Contact.partnership_account_id == partnership_account_id).first()
    if new_contact:
        latest_lead.contact_id = new_contact.id
        db.session.commit()
    else:
        log.info('no lead contact exist')


@celery.task
def upload_widget_image(partner_name, partnership_account_id, file_name, file_path, file_ext, **kwargs):
    """ Upload the given MMS Image file to S3.
    """
    try:
        key = '{}_{}/{}_{}{}'.format(
            partner_name, partnership_account_id, 'widget_image', file_name, file_ext)
        log.debug('The widget image key is {}'.format(key))
    finally:
        log.debug('Could not set key for widget image url')

    with open(file_path, 'rb') as f:
        conn = boto.connect_s3(
            celery.conf['AMAZON_ACCESS_KEY'], celery.conf['AMAZON_SECRET_KEY']
        )
        bucket = conn.get_bucket(celery.conf['WIDGET_BUCKET'])
        k = Key(bucket)
        k.key = key
        k.set_contents_from_file(f)
        k.set_acl('public-read')
        log.info('Uploaded file {}'.format(k.key))

    os.remove(file_path)
    widget_url = k.generate_url(expires_in=0, query_auth=False)

    return widget_url