HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/lib/util_webhooks.py
import logging
import json
import requests

from math import ceil

from sqlalchemy import and_

from buyercall.blueprints.leads.models import Lead
from buyercall.blueprints.sms.models import Message
from buyercall.blueprints.webhooks.models import Webhook
from buyercall.extensions import db

log = logging.getLogger(__name__)


class WebhookUtil(object):

    call_types = ['operational_start_call', 'operational_agent_call', 'operational_end_call', 'mobile_start_call',
                  'mobile_end_call']

    message_types = ['operational_send_message', 'operational_receive_message', 'mobile_send_message',
                     'mobile_receive_message']

    header_content = {"Content-Type": "application/json"}

    def populate_payload(self, event_type, contact_object):
        """
        Creates the JSON payload object based on webhook event type.

        :return: JSON Payload
        """
        payload = None
        created_on = None
        updated_on = None
        starttime = None
        endtime = None
        recording_enabled = False
        recording_url = None
        # voicemail_enabled = False
        # voicemail_url = None

        if contact_object.created_on:
            if type(contact_object.created_on) == str:
                log.info('The date is {}'.format(contact_object.created_on))
                created_on = contact_object.created_on
            else:
                created_on = contact_object.created_on.strftime('%Y-%m-%d %H:%M:%S')

        if contact_object.updated_on:
            if type(contact_object.updated_on) == str:
                updated_on = contact_object.updated_on
            else:
                updated_on = contact_object.updated_on.strftime('%Y-%m-%d %H:%M:%S')

        if event_type in self.call_types:
            # this is for leads
            if contact_object.starttime is not None:
                if type(contact_object.starttime) == str:
                    starttime = contact_object.starttime
                else:
                    starttime = contact_object.starttime.strftime('%Y-%m-%d %H:%M:%S')

            if contact_object.endtime is not None:
                if type(contact_object.endtime) == str:
                    endtime = contact_object.endtime
                else:
                    endtime = contact_object.endtime.strftime('%Y-%m-%d %H:%M:%S')

            if event_type == 'mobile_start_call':
                from buyercall.blueprints.mobile.models import Endpoint
                endpoint = Endpoint.query.filter(Endpoint.inbound_id == contact_object.inbound_id).first()
                agent_id = endpoint.agent_id
            else:
                agent_id = contact_object.agent_id
            if contact_object.duration is not None:
                if type(contact_object.endtime) == int:
                    duration_min = contact_object.duration / 60
                else:
                    duration_min = int(ceil(float(contact_object.duration) / 60.0))
                log.info('The duration is {}'.format(contact_object.duration))
                log.info('The duration type is {}'.format(type(contact_object.duration)))
                log.info('The call duration is: {} minutes'.format(duration_min))
            else:
                duration_min = contact_object.duration

            if contact_object.inbound_id and self.is_recorded(contact_object.inbound_id):
                recording_enabled = True

            if contact_object.widget_guid and contact_object.widget.options.get("recordCalls", ''):
                recording_enabled = True

            if recording_enabled and contact_object.recording_url:
                from buyercall.lib.util_boto3_s3 import get_recording_url_details
                recording_details = get_recording_url_details(contact_object.recording_url)

                if recording_details and recording_details['key'] and recording_details['bucket']:
                    from buyercall.lib.util_boto3_s3 import generate_presigned_aws_url
                    recording_url = generate_presigned_aws_url(recording_details['key'], recording_details['bucket'], False)

            # if contact_object.inbound_id and self.is_voicemail(contact_object.inbound_id):
            #    voicemail_enabled = True

            # if voicemail_enabled and contact_object.recording_url:
            #    from buyercall.lib.util_boto3_s3 import get_recording_url_details
            #    voicemail_details = get_recording_url_details(contact_object.recording_url)

            #    if voicemail_details and voicemail_details['key'] and voicemail_details['bucket']:
            #        from buyercall.lib.util_boto3_s3 import generate_presigned_aws_url
            #        voicemail_url = generate_presigned_aws_url(voicemail_details['key'],
            #                                                   voicemail_details['bucket'], False)

            payload = {
                'id': contact_object.id,
                'created_on': created_on,
                'updated_on': updated_on,
                'agent_id': agent_id,
                'call_source': contact_object.call_source,
                'progress_status': contact_object.progress_status,
                'caller_id': contact_object.caller_id,
                'partnership_account_id': contact_object.partnership_account_id,
                'phonenumber_id': contact_object.inbound_id,
                'phonenumber': contact_object.phonenumber,
                'my_phone': contact_object.my_phone,
                'duration': duration_min,
                'starttime': starttime,
                'endtime': endtime,
                'call_type': contact_object.call_type,
                'recording_enabled': recording_enabled,
                'recording_url': recording_url,
                # 'voicemail_enabled': voicemail_enabled,
                # 'voicemail_url': voicemail_url,
                # 'transcription_channel_1': contact_object.transcription_text,
                # 'transcription_channel_1_confidence': contact_object.transcription_1_confidence,
                # 'transcription_channel_2': contact_object.transcription_text_2,
                # 'transcription_channel_2_confidence': contact_object.transcription_2_confidence,
                'widget_guid': contact_object.widget_guid,
                'call_count': contact_object.call_count,
                'status': contact_object.status,
                'event_type': event_type,
                'originating_number': contact_object.originating_number,
                'missed_call_cause': contact_object.missed_call_cause,
                # 'call_end_cause_description': contact_object.cause_description,
                'type': 'phone'
            }
            log.info('A webhook was sent for the following event: {} '
                     'and partner account id: {}'.format(event_type, contact_object.partnership_account_id))

        elif event_type in self.message_types:
            # this for messages
            payload = {
                'id': contact_object.id,
                'created_on': created_on,
                'updated_on': updated_on,
                'agent_id': self.get_agent(contact_object.inbound_id),
                'partnership_account_id': contact_object.partnership_account_id,
                'to': contact_object.to,
                'from': contact_object.from_,
                'body_text': contact_object.body_text,
                'media_url': contact_object.media_url,
                'status': contact_object.status,
                'delivery_code': contact_object.delivery_code,
                # 'delivery_type': contact_object.delivery_type,
                'delivery_description': contact_object.delivery_description,
                'direction': contact_object.direction,
                'phonenumber_id': contact_object.inbound_id,
                'originating_number': contact_object.originating_number,
                'event_type': event_type,
                'type': 'phone'
            }
            log.info('A webhook was sent for the following event: {} '
                     'and partner account id: {}'.format(event_type, contact_object.partnership_account_id))
        else:
            log.error('Unknown event type "' + event_type + '" provided. Cannot build webhook JSON payload.')

        return payload

    def get_lead(self, lead_id):
        lead_obj = Lead.query.filter(Lead.id == lead_id).first()
        return lead_obj

    def get_message(self, message_id):
        message_obj = Message.query.filter(Message.id == message_id).first()
        return message_obj

    def is_recorded(self, phonenumber_id):
        from buyercall.blueprints.phonenumbers.models import Phone
        phone = Phone.query.filter(Phone.id == phonenumber_id).first()

        if phone:
            if phone.routing_config.get('recordCalls'):
                recorded = phone.routing_config.get('recordCalls')

                if recorded is True:
                    return True

        return False

    # def is_voicemail(self, phonenumber_id):
    #    from buyercall.blueprints.phonenumbers.models import Phone
    #    phone = Phone.query.filter(Phone.id == phonenumber_id).first()

    #    if phone:
    #        if phone.routing_config.get('voicemail'):
    #            recorded = phone.routing_config.get('voicemail')

    #            if recorded is True:
    #                return True

    #    return False

    def get_agent(self, phonenumber_id):
        from buyercall.blueprints.phonenumbers.models import Phone

        result_agent_id = -1
        phone = Phone.query.filter(Phone.id == phonenumber_id).first()

        if phone:
            if phone.routing_config.get('routingType') == 'default':
                routings = [phone.routing_config.get('defaultRouting')]
            elif phone.routing_config.get('routingType') == 'digit':
                routings = phone.routing_config.get('digitRoutings')
            else:
                return ''

            if routings:
                for r in routings:
                    for agent in r.get('agents', list()):
                        if agent['id'] and agent['id']:
                            result_agent_id = int(agent['id'])
                            break

        return result_agent_id

    def get_webhook(self, partnership_account_id):
        from buyercall.blueprints.partnership.models import PartnershipAccount
        partnership_account = PartnershipAccount\
            .query\
            .filter(PartnershipAccount.id == partnership_account_id)\
            .first()

        if partnership_account and partnership_account.partnership_id:
            retrieved_webhook = db.session.query(Webhook).filter(
                and_(Webhook.partnership_id == partnership_account.partnership_id,
                     Webhook.webhook_url != '',
                     Webhook.is_deactivated == False)
            ).first()

        return retrieved_webhook

    def trigger_generic_webhook(self, event_type, medium_id):
        """
        Executes webhook based on event_type. operational_end_call

        :return: N/A
        """
        post_end_point = ''
        medium_triggered = None  # medium is either a call or a message
        post_event = True
        try:

            if event_type in self.call_types and medium_id is not None:
                medium_triggered = self.get_lead(medium_id)

                # if event_type == 'operational_end_call':
                #    from buyercall.blueprints.phonenumbers.models import Phone
                #    phonenumber = db.session.query(Phone) \
                #        .filter(and_(Phone.partnership_account_id == medium_triggered.partnership_account_id,
                #                     Phone.id == medium_triggered.inbound_id)) \
                #        .first()
                # if phonenumber is not None and phonenumber.type == 'priority' and phonenumber.recording_enabled:
                # post_event = True
                # else:
                # post_event = False

            elif event_type in self.message_types and medium_id is not None:
                medium_triggered = self.get_message(medium_id)

            if medium_triggered is not None and post_event:
                retrieved_webhook = self.get_webhook(medium_triggered.partnership_account_id)

                if retrieved_webhook:
                    if retrieved_webhook.webhook_url:
                        post_end_point = retrieved_webhook.webhook_url
                        log.info('The medium triggered created on date is {}'.format(medium_triggered.created_on))
                        payload = self.populate_payload(event_type, medium_triggered)

                        if retrieved_webhook.security_token:
                            self.header_content['Security-Token'] = retrieved_webhook.security_token
                        log.info(f"end_point {post_end_point} , data : {json.dumps(payload)}")
                        r = requests.post(url=post_end_point, data=json.dumps(payload), headers=self.header_content)
                    else:
                        log.error('Webhook "{}" event found for partnership account {} but no webhook URL set.'
                                  .format(event_type, medium_triggered.partnership_account_id))
        except Exception as e:
            log.error('Error executing webhook event "{}". Error: {}'.format(event_type, e))