File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/blueprints/widgets/tasks.py
import os
import logging as log
import random
import traceback
import redis
from flask import url_for, current_app
from buyercall.app import create_celery_app
from buyercall.extensions import db
from buyercall.lib.util_twilio import (
CallStorage,
InboundCallState as State,
subaccount_client,
)
from boto.s3.key import Key
import boto
from buyercall.blueprints.leads.models import Lead
from buyercall.blueprints.agents.models import Agent
from buyercall.blueprints.partnership.models import PartnershipAccount, Partnership
from buyercall.blueprints.contacts.models import Contact
DAYS = 86400 # The length of a day in seconds
celery = create_celery_app(current_app)
@celery.task()
def call_again(lead_id, **kwargs):
# Declare redis url
redis_db = redis.StrictRedis(host=celery.conf['REDIS_CONFIG_URL'], port=celery.conf['REDIS_CONFIG_PORT'])
from .routing import (
Routing,
after_call_events,
)
log.info('Trying to call lead again...')
lead = Lead.query.join(Lead.partnership_account).join(PartnershipAccount.partnership).filter(Lead.id == lead_id).first()
if not lead:
log.warning('Lead {} not available.'.format(lead_id))
return
# import partnership information to get partnership id
partner_account = PartnershipAccount.query \
.filter(PartnershipAccount.id == lead.partnership_account_id).first()
# Get the partner id to get relevant twilio credentails with twilio client
partner = Partnership.query.filter(Partnership.id == partner_account.partnership_id).first()
subscription = lead.partnership_account.subscription
if not subscription:
subscription = lead.partnership_account.partnership.subscription
if subscription.usage_over_limit:
log.warning('Partnership account {} has exceeded their quota.'.format(
lead.partnership_account_id
))
return
if lead.status in ['ringing', 'completed']:
log.debug('Lead is currently in another call')
return
storage = CallStorage(redis_db, lead.id)
storage.inc_callback_cnt()
storage.agent_id = None
# Route the call
agent_ids = [
asg.agent_id
for asg in sorted(lead.widget.assignments, key=lambda x: x.order)
if asg.agent.available_now
]
if not agent_ids:
log.warning('No agents were configured for this widget.')
return
lead.status = 'ringing'
lead.call_count += 1
db.session.commit()
if lead.widget.options['routeSimultaneously']:
call_parallel(lead.id)
return
if lead.widget.options['routeRandomly']:
random.shuffle(agent_ids)
subaccount_sid = subscription.twilio_subaccount_sid
client = subaccount_client(subaccount_sid, partner.id)
r = Routing(client)
try:
r.route_sequential(agent_ids, lead.widget, lead)
except Exception as e:
log.error(traceback.format_exc())
lead.status = 'missed'
db.session.commit()
after_call_events(lead, lead.widget)
# send_notify_email(lead, lead.widget)
@celery.task
def call_parallel(lead_id, **kwargs):
# Declare redis url
redis_db = redis.StrictRedis(host=celery.conf['REDIS_CONFIG_URL'], port=celery.conf['REDIS_CONFIG_PORT'])
from .routing import (
Routing,
# AGENT_TIMEOUT,
after_call_events,
get_available_agent_ids
)
log.info('Parallel call task started.')
lead = Lead.query.join(Lead.widget).filter(Lead.id == lead_id).one()
# import partnership information to get partnership id
partner_account = PartnershipAccount.query \
.filter(PartnershipAccount.id == lead.partnership_account_id).first()
# Get the partner id to get relevant twilio credentails with twilio client
partner = Partnership.query.filter(Partnership.id == partner_account.partnership_id).first()
widget = lead.widget
agent_ids = get_available_agent_ids(widget)
print('Agent ids: {}'.format(agent_ids))
if not agent_ids:
# TODO: Notify lead
log.warning('No agents are available for widget {}.'.format(widget.id))
lead.status = 'missed'
db.session.commit()
after_call_events(lead, lead.widget)
storage = CallStorage(redis_db, lead_id)
storage.state = State.NEW
storage.clear_retry_cnt()
called_agents_key = 'LIST{}'.format(lead_id)
lead_capture_key = 'SUCC{}'.format(lead_id) # 1 if the lead was captured
connect_key = 'CONNECT{}'.format(lead_id)
redis_db.setex(lead_capture_key, 2 * DAYS, '0')
redis_db.setex(connect_key, 2 * DAYS, '0')
agents = Agent.query.filter(Agent.id.in_(agent_ids)).all()
log.info('Calling {} agents...'.format(len(agents)))
call = None
subscription = widget.partnership_account.subscription
if not subscription:
subscription = widget.partnership_account.partnership.subscription
subaccount_sid = subscription.twilio_subaccount_sid
client = subaccount_client(subaccount_sid, partner.id)
r = Routing(client)
for agent in agents:
try:
call = r.call_agent_parallel(lead, agent, widget)
# Did any agent pick up?
with storage.lock():
if storage.agent_id and storage.agent_id != str(agent.id):
call.hangup()
break
redis_db.lpush(
called_agents_key, '{}_{}'.format(agent.id, call.sid)
)
except Exception as e:
log.warning(traceback.format_exc())
log.warning('Error calling agent {}...'.format(
agent.id
))
if call is None or call.sid is None:
# None of the calls got through
redis_db.setex(connect_key, 2 * DAYS, '-1')
lead.status = 'missed'
db.session.commit()
redis_db.expire(called_agents_key, 2 * DAYS)
@celery.task
def call_lead(lead_id, agent_id, **kwargs):
lead = Lead.query.join(Lead.widget).filter(Lead.id == lead_id).one()
# import partnership information to get partnership id
partner_account = PartnershipAccount.query \
.filter(PartnershipAccount.id == lead.partnership_account_id).first()
# Get the partner id to get relevant twilio credentails with twilio client
partner = Partnership.query.filter(Partnership.id == partner_account.partnership_id).first()
widget = lead.widget
subscription = lead.partnership_account.subscription
if not subscription:
subscription = lead.partnership_account.partnership.subscription
subaccount_sid = subscription.twilio_subaccount_sid
client = subaccount_client(subaccount_sid, partner.id)
client.calls.create(
from_=widget.inbound.phonenumber,
to=lead.phonenumber,
url=url_for(
'twilio_api.lead_greeting',
lead_id=lead.id,
agent_id=agent_id,
_external=True,
_scheme='https'
),
status_callback=url_for(
'twilio_api.lead_outbound_call_status',
lead_id=lead.id,
_external=True,
_scheme='https'
)
)
@celery.task
def add_contact(phonenumber, firstname, lastname, email, partnership_account_id, **kwargs):
# Query the leads table to retrieve the latest lead that was added
latest_lead = Lead.query.filter(Lead.phonenumber == phonenumber).order_by(Lead.created_on.desc()).first()
log.info('latest lead: {}'.format(latest_lead))
# Query contacts to see if lead already exist in the contacts table
contact = Contact.query.\
filter(Contact.phonenumber_1 == latest_lead.phonenumber).\
filter(Contact.partnership_account_id == partnership_account_id).first()
if contact:
log.info('a lead exist with number {}'.format(contact.phonenumber_1))
contact.updated_on = latest_lead.updated_on
if contact.firstname == '':
contact.firstname = firstname
if contact.lastname == '':
contact.lastname = lastname
if contact.email == '':
contact.email = email
log.info('The updated date is: {}'.format(latest_lead.updated_on))
db.session.commit()
else:
contact = Contact(
firstname=firstname,
lastname=lastname,
phonenumber_1=phonenumber,
email=email,
partnership_account_id=partnership_account_id,
)
db.session.add(contact)
db.session.commit()
log.info('The contact has been added')
new_contact = Contact.query.\
filter(Contact.phonenumber_1 == latest_lead.phonenumber).\
filter(Contact.partnership_account_id == partnership_account_id).first()
if new_contact:
latest_lead.contact_id = new_contact.id
db.session.commit()
else:
log.info('no lead contact exist')
@celery.task
def upload_widget_image(partner_name, partnership_account_id, file_name, file_path, file_ext, **kwargs):
""" Upload the given MMS Image file to S3.
"""
try:
key = '{}_{}/{}_{}{}'.format(
partner_name, partnership_account_id, 'widget_image', file_name, file_ext)
log.debug('The widget image key is {}'.format(key))
finally:
log.debug('Could not set key for widget image url')
with open(file_path, 'rb') as f:
conn = boto.connect_s3(
celery.conf['AMAZON_ACCESS_KEY'], celery.conf['AMAZON_SECRET_KEY']
)
bucket = conn.get_bucket(celery.conf['WIDGET_BUCKET'])
k = Key(bucket)
k.key = key
k.set_contents_from_file(f)
k.set_acl('public-read')
log.info('Uploaded file {}'.format(k.key))
os.remove(file_path)
widget_url = k.generate_url(expires_in=0, query_auth=False)
return widget_url