File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/lib/util_webhooks.py
import logging
import json
import requests
from math import ceil
from sqlalchemy import and_
from buyercall.blueprints.leads.models import Lead
from buyercall.blueprints.sms.models import Message
from buyercall.blueprints.webhooks.models import Webhook
from buyercall.extensions import db
log = logging.getLogger(__name__)
class WebhookUtil(object):
call_types = ['operational_start_call', 'operational_agent_call', 'operational_end_call', 'mobile_start_call',
'mobile_end_call']
message_types = ['operational_send_message', 'operational_receive_message', 'mobile_send_message',
'mobile_receive_message']
header_content = {"Content-Type": "application/json"}
def populate_payload(self, event_type, contact_object):
"""
Creates the JSON payload object based on webhook event type.
:return: JSON Payload
"""
payload = None
created_on = None
updated_on = None
starttime = None
endtime = None
recording_enabled = False
recording_url = None
# voicemail_enabled = False
# voicemail_url = None
if contact_object.created_on:
if type(contact_object.created_on) == str:
log.info('The date is {}'.format(contact_object.created_on))
created_on = contact_object.created_on
else:
created_on = contact_object.created_on.strftime('%Y-%m-%d %H:%M:%S')
if contact_object.updated_on:
if type(contact_object.updated_on) == str:
updated_on = contact_object.updated_on
else:
updated_on = contact_object.updated_on.strftime('%Y-%m-%d %H:%M:%S')
if event_type in self.call_types:
# this is for leads
if contact_object.starttime is not None:
if type(contact_object.starttime) == str:
starttime = contact_object.starttime
else:
starttime = contact_object.starttime.strftime('%Y-%m-%d %H:%M:%S')
if contact_object.endtime is not None:
if type(contact_object.endtime) == str:
endtime = contact_object.endtime
else:
endtime = contact_object.endtime.strftime('%Y-%m-%d %H:%M:%S')
if event_type == 'mobile_start_call':
from buyercall.blueprints.mobile.models import Endpoint
endpoint = Endpoint.query.filter(Endpoint.inbound_id == contact_object.inbound_id).first()
agent_id = endpoint.agent_id
else:
agent_id = contact_object.agent_id
if contact_object.duration is not None:
if type(contact_object.endtime) == int:
duration_min = contact_object.duration / 60
else:
duration_min = int(ceil(float(contact_object.duration) / 60.0))
log.info('The duration is {}'.format(contact_object.duration))
log.info('The duration type is {}'.format(type(contact_object.duration)))
log.info('The call duration is: {} minutes'.format(duration_min))
else:
duration_min = contact_object.duration
if contact_object.inbound_id and self.is_recorded(contact_object.inbound_id):
recording_enabled = True
if contact_object.widget_guid and contact_object.widget.options.get("recordCalls", ''):
recording_enabled = True
if recording_enabled and contact_object.recording_url:
from buyercall.lib.util_boto3_s3 import get_recording_url_details
recording_details = get_recording_url_details(contact_object.recording_url)
if recording_details and recording_details['key'] and recording_details['bucket']:
from buyercall.lib.util_boto3_s3 import generate_presigned_aws_url
recording_url = generate_presigned_aws_url(recording_details['key'], recording_details['bucket'], False)
# if contact_object.inbound_id and self.is_voicemail(contact_object.inbound_id):
# voicemail_enabled = True
# if voicemail_enabled and contact_object.recording_url:
# from buyercall.lib.util_boto3_s3 import get_recording_url_details
# voicemail_details = get_recording_url_details(contact_object.recording_url)
# if voicemail_details and voicemail_details['key'] and voicemail_details['bucket']:
# from buyercall.lib.util_boto3_s3 import generate_presigned_aws_url
# voicemail_url = generate_presigned_aws_url(voicemail_details['key'],
# voicemail_details['bucket'], False)
payload = {
'id': contact_object.id,
'created_on': created_on,
'updated_on': updated_on,
'agent_id': agent_id,
'call_source': contact_object.call_source,
'progress_status': contact_object.progress_status,
'caller_id': contact_object.caller_id,
'partnership_account_id': contact_object.partnership_account_id,
'phonenumber_id': contact_object.inbound_id,
'phonenumber': contact_object.phonenumber,
'my_phone': contact_object.my_phone,
'duration': duration_min,
'starttime': starttime,
'endtime': endtime,
'call_type': contact_object.call_type,
'recording_enabled': recording_enabled,
'recording_url': recording_url,
# 'voicemail_enabled': voicemail_enabled,
# 'voicemail_url': voicemail_url,
# 'transcription_channel_1': contact_object.transcription_text,
# 'transcription_channel_1_confidence': contact_object.transcription_1_confidence,
# 'transcription_channel_2': contact_object.transcription_text_2,
# 'transcription_channel_2_confidence': contact_object.transcription_2_confidence,
'widget_guid': contact_object.widget_guid,
'call_count': contact_object.call_count,
'status': contact_object.status,
'event_type': event_type,
'originating_number': contact_object.originating_number,
'missed_call_cause': contact_object.missed_call_cause,
# 'call_end_cause_description': contact_object.cause_description,
'type': 'phone'
}
log.info('A webhook was sent for the following event: {} '
'and partner account id: {}'.format(event_type, contact_object.partnership_account_id))
elif event_type in self.message_types:
# this for messages
payload = {
'id': contact_object.id,
'created_on': created_on,
'updated_on': updated_on,
'agent_id': self.get_agent(contact_object.inbound_id),
'partnership_account_id': contact_object.partnership_account_id,
'to': contact_object.to,
'from': contact_object.from_,
'body_text': contact_object.body_text,
'media_url': contact_object.media_url,
'status': contact_object.status,
'delivery_code': contact_object.delivery_code,
# 'delivery_type': contact_object.delivery_type,
'delivery_description': contact_object.delivery_description,
'direction': contact_object.direction,
'phonenumber_id': contact_object.inbound_id,
'originating_number': contact_object.originating_number,
'event_type': event_type,
'type': 'phone'
}
log.info('A webhook was sent for the following event: {} '
'and partner account id: {}'.format(event_type, contact_object.partnership_account_id))
else:
log.error('Unknown event type "' + event_type + '" provided. Cannot build webhook JSON payload.')
return payload
def get_lead(self, lead_id):
lead_obj = Lead.query.filter(Lead.id == lead_id).first()
return lead_obj
def get_message(self, message_id):
message_obj = Message.query.filter(Message.id == message_id).first()
return message_obj
def is_recorded(self, phonenumber_id):
from buyercall.blueprints.phonenumbers.models import Phone
phone = Phone.query.filter(Phone.id == phonenumber_id).first()
if phone:
if phone.routing_config.get('recordCalls'):
recorded = phone.routing_config.get('recordCalls')
if recorded is True:
return True
return False
# def is_voicemail(self, phonenumber_id):
# from buyercall.blueprints.phonenumbers.models import Phone
# phone = Phone.query.filter(Phone.id == phonenumber_id).first()
# if phone:
# if phone.routing_config.get('voicemail'):
# recorded = phone.routing_config.get('voicemail')
# if recorded is True:
# return True
# return False
def get_agent(self, phonenumber_id):
from buyercall.blueprints.phonenumbers.models import Phone
result_agent_id = -1
phone = Phone.query.filter(Phone.id == phonenumber_id).first()
if phone:
if phone.routing_config.get('routingType') == 'default':
routings = [phone.routing_config.get('defaultRouting')]
elif phone.routing_config.get('routingType') == 'digit':
routings = phone.routing_config.get('digitRoutings')
else:
return ''
if routings:
for r in routings:
for agent in r.get('agents', list()):
if agent['id'] and agent['id']:
result_agent_id = int(agent['id'])
break
return result_agent_id
def get_webhook(self, partnership_account_id):
from buyercall.blueprints.partnership.models import PartnershipAccount
partnership_account = PartnershipAccount\
.query\
.filter(PartnershipAccount.id == partnership_account_id)\
.first()
if partnership_account and partnership_account.partnership_id:
retrieved_webhook = db.session.query(Webhook).filter(
and_(Webhook.partnership_id == partnership_account.partnership_id,
Webhook.webhook_url != '',
Webhook.is_deactivated == False)
).first()
return retrieved_webhook
def trigger_generic_webhook(self, event_type, medium_id):
"""
Executes webhook based on event_type. operational_end_call
:return: N/A
"""
post_end_point = ''
medium_triggered = None # medium is either a call or a message
post_event = True
try:
if event_type in self.call_types and medium_id is not None:
medium_triggered = self.get_lead(medium_id)
# if event_type == 'operational_end_call':
# from buyercall.blueprints.phonenumbers.models import Phone
# phonenumber = db.session.query(Phone) \
# .filter(and_(Phone.partnership_account_id == medium_triggered.partnership_account_id,
# Phone.id == medium_triggered.inbound_id)) \
# .first()
# if phonenumber is not None and phonenumber.type == 'priority' and phonenumber.recording_enabled:
# post_event = True
# else:
# post_event = False
elif event_type in self.message_types and medium_id is not None:
medium_triggered = self.get_message(medium_id)
if medium_triggered is not None and post_event:
retrieved_webhook = self.get_webhook(medium_triggered.partnership_account_id)
if retrieved_webhook:
if retrieved_webhook.webhook_url:
post_end_point = retrieved_webhook.webhook_url
log.info('The medium triggered created on date is {}'.format(medium_triggered.created_on))
payload = self.populate_payload(event_type, medium_triggered)
if retrieved_webhook.security_token:
self.header_content['Security-Token'] = retrieved_webhook.security_token
log.info(f"end_point {post_end_point} , data : {json.dumps(payload)}")
r = requests.post(url=post_end_point, data=json.dumps(payload), headers=self.header_content)
else:
log.error('Webhook "{}" event found for partnership account {} but no webhook URL set.'
.format(event_type, medium_triggered.partnership_account_id))
except Exception as e:
log.error('Error executing webhook event "{}". Error: {}'.format(event_type, e))