HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/lib/bandwidth_api_v2.py
import os
import json
import logging
import xml.etree.ElementTree as ET
import xmltodict
import urllib.parse
import requests
import hashlib
from requests.auth import HTTPBasicAuth

from buyercall.blueprints.filters import format_phone_number


log = logging.getLogger(__name__)


def camel_case(thing):
    if isinstance(thing, dict):
        return {camel_case(key): val for key, val in thing.items()}

    words = thing.split('_')
    return words[0] + ''.join(x.capitalize() for x in words[1:])


# ET SubElement does not naturally allow the passing of of a text value in constructor
# Hence we need to define a function to do it for us. ET.SubElement should be replaced
# with text_element where ever you need to add a text value to XML note. This is not for
# and attribute. The ET.SubElement allows for attributes like; ET.SubElement(parent, attrb='')
def text_element(parent, tag, text, *args, **kwargs):
    element = ET.SubElement(parent, tag, *args, **kwargs)
    element.text = text
    return element


def is_absolute(url):
    return bool(urllib.parse.urlparse(url).netloc)


class BandwidthException(Exception):
    pass


# Handle Bandwidth Dashboard number search and provisioning using Bandwidth API v2. We are on a Hybrid account
# meaning our code base is still on V1 of the API, but Bandwidth is forcing all provisioning to happen with V2 and
# therefore we need to provision with V2 and then import the numbers to application platform with V1 until full
# V2 version is available.
def api_root_url(request_type):
    if request_type == 'messaging':
        api_root = os.environ.get(
            'BANDWIDTH_MESSAGING_API_ROOT', 'https://messaging.bandwidth.com/api/v2/users'
        )
    elif request_type == 'voice':
        api_root = os.environ.get(
            'BANDWIDTH_VOICE_API_ROOT', 'https://voice.bandwidth.com/api/v2/accounts'
        )
    else:
        api_root = os.environ.get(
            'BANDWIDTH_DASHBOARD_API_ROOT', 'https://dashboard.bandwidth.com/api/accounts'
        )
    return api_root


class Bandwidth(object):
    def __init__(self, request_type, username, password, account_id, site_id, location_id,
                 sms_application_id, voice_application_id, cnam_password):
        api_root = api_root_url(request_type)
        self.auth = HTTPBasicAuth(username, password)
        self.account_id = account_id
        self.api_root = api_root
        self.username = username
        self.password = password
        self.site_id = site_id
        self.location_id = location_id
        self.sms_application_id = sms_application_id
        self.voice_application_id = voice_application_id
        self.cnam_password = cnam_password
        self.url_root = '{}/{}'.format(self.api_root, self.account_id)
        log.info('The root url is: {}'.format(self.url_root))

    @property
    def available_numbers(self):
        return AvailableNumbers(self)

    @property
    def subscriptions(self):
        return Subscriptions(self)

    @property
    def phone_numbers(self):
        return PhoneNumbers(self)

    @property
    def phone_numbers_options(self):
        return PhoneNumbersOptions(self)

    @property
    def in_service_number(self):
        return InserviceNumber(self)

    @property
    def media(self):
        return Media(self)

    @property
    def messages(self):
        return Messages(self)

    @property
    def call(self):
        return PhoneCall(self)

    @property
    def realms(self):
        return Realms(self)

    def cnam_lookup(self, number, test=False):
        url = 'https://cnam.dashcs.com/?'
        password = urllib.parse.quote_plus(self.cnam_password)
        format_number = format_phone_number(number)
        params = {"companyId": self.account_id, "password": password,
                  "number": format_number, "dnis": format_number, "test": test}
        url_parse = urllib.parse.urlparse(url)
        query = url_parse.query
        url_dict = dict(urllib.parse.parse_qsl(query))
        url_dict.update(params)
        url_new_query = urllib.parse.urlencode(url_dict)
        url_parse = url_parse._replace(query=url_new_query)
        new_url = urllib.parse.urlunparse(url_parse)
        r = requests.get(new_url)
        if 300 <= r.status_code:
            raise BandwidthException('STATUS_CODE:{} - DETAIL: {}'
                                     .format(r.status_code, r.content.decode()))
        return r.content.decode()

    def global_url(self, resource):
        return '{}/{}/{}'.format(self.api_root, self.account_id, resource)

    def get(self, url, **kwargs):
        kwargs = {k: v for k, v in kwargs.items() if v is not None}
        log.debug('GET {}?{}\n'.format(
            url,
            '&'.join('{}={}'.format(key, val) for key, val in kwargs.items())
        ))
        r = requests.get(url, auth=self.auth, params=camel_case(kwargs))
        if 300 <= r.status_code:
            error_content = xmltodict.parse(r.content.decode())
            json_error_content = json.dumps(error_content)
            return_error_json = json.loads(json_error_content)
            error_desc = return_error_json['SearchResult']['Error']['Description']
            error_code = return_error_json['SearchResult']['Error']['Code']
            raise BandwidthException('STATUS_CODE:{} - CATEGORY: {} - REASON: {} - DETAIL: {}'
                                     .format(r.status_code, r.reason, error_code, error_desc))
        return r.content.decode()

    def post_tn(self, url, data, headers, request_type=None):
        r = requests.post(url, auth=self.auth, data=data, headers=headers)
        if 300 <= r.status_code:
            error_content = xmltodict.parse(r.content.decode())
            json_error_content = json.dumps(error_content)
            return_error_json = json.loads(json_error_content)
            if request_type == 'order':
                error_desc = return_error_json['OrderResponse']['ErrorList']['Error']['Description']
                error_code = return_error_json['OrderResponse']['ErrorList']['Error']['Code']
            elif request_type == 'subscription':
                error_desc = return_error_json['SubscriptionResponse']['ResponseStatus']['Description']
                error_code = return_error_json['SubscriptionResponse']['ResponseStatus']['ErrorCode']
            elif request_type == 'tn_option':
                error_desc = return_error_json['TnOptionOrderResponse']['ResponseStatus']['Description']
                error_code = return_error_json['TnOptionOrderResponse']['ResponseStatus']['ErrorCode']
            else:
                error_desc = return_error_json['ErrorList']['Error']['Description']
                error_code = return_error_json['ErrorList']['Error']['Code']
            raise BandwidthException('STATUS_CODE:{} - CATEGORY: {} - REASON: {} - DETAIL: {}'
                                     .format(r.status_code, r.reason, error_code, error_desc))
        return r.headers.get('Location')

    def post_sip(self, url, data, headers):
        r = requests.post(url, auth=self.auth, data=data, headers=headers)
        if 300 <= r.status_code:
            raise BandwidthException('STATUS_CODE:{} - CATEGORY: {} - DETAIL: {}'
                                     .format(r.status_code, r.reason, r.content.decode()))
        return r

    def put_sip(self, url, data, headers):
        r = requests.put(url, auth=self.auth, data=data, headers=headers)
        if 300 <= r.status_code:
            raise BandwidthException('STATUS_CODE:{} - CATEGORY: {} - DETAIL: {}'
                                     .format(r.status_code, r.reason, r.content.decode()))
        return r

    def post_msg_json(self, url, data, headers):
        r = requests.post(url, auth=self.auth, data=data, headers=headers)
        log.info('The msg request data: {}'.format(data))
        return r.content.decode()

    def post_call_json(self, url, data, headers):
        r = requests.post(url, auth=self.auth, data=data, headers=headers)
        log.info('The request data: {}'.format(data))
        return r.content.decode()

    def delete(self, url):
        log.debug('DELETE {}\n'.format(url))
        r = requests.delete(url, auth=self.auth)
        if 300 <= r.status_code:
            raise BandwidthException('STATUS_CODE:{} - CATEGORY: {} - DETAIL: {}'
                                     .format(r.status_code, r.reason, r.content.decode()))
        return r.status_code


class Collection(object):
    def __init__(self, client, name, child_class=None):
        self.client, self.name = client, name
        self.base_url = self.client.resource_url(self.name)
        self.child_class = child_class

    def __getitem__(self, id_):
        url = '{}/{}'.format(self.base_url, id_)
        return self.child_class(self.client, url)


class AvailableNumbers(Collection):
    def __init__(self, client):
        self.client, self.name = client, 'availableNumbers'
        self.base_url = self.client.global_url(self.name)
        log.info('The base url is: {}'.format(self.base_url))
        log.info('The client account id looks like: {}'.format(client.account_id))
        self.child_class = None

    def search(self, **kwargs):
        url = '{}'.format(self.base_url)
        r = xmltodict.parse(self.client.get(url, **kwargs))
        return json.dumps(r)


class Subscriptions(Collection):
    def __init__(self, client):
        self.client, self.name = client, 'subscriptions'
        self.base_url = self.client.global_url(self.name)
        self.child_class = None

    def subscribe(self, order_type, callback_url, expiry):
        url = '{}'.format(self.base_url)
        root = ET.Element('Subscription')
        text_element(root, 'OrderType', order_type)
        callback_subscription_elt = ET.SubElement(root, 'CallbackSubscription')
        text_element(callback_subscription_elt, 'URL', callback_url)
        text_element(callback_subscription_elt, 'Expiry', expiry)
        callback_credentials_elt = ET.SubElement(callback_subscription_elt, 'CallbackCredentials')
        callback_auth_elt = ET.SubElement(callback_credentials_elt, 'BasicAuthentication')
        text_element(callback_auth_elt, 'Username', self.client.username)
        text_element(callback_auth_elt, 'Password', self.client.password)
        subscribe_xml = ET.tostring(root)
        headers = {'Content-Type': 'application/xml; charset=UTF-8'}
        return self.client.post_tn(url, subscribe_xml, headers, 'subscription')

    def all_subscriptions(self):
        self.base_url = self.client.global_url(self.name)
        return self.client.get('{}'.format(self.base_url))

    def single_subscription(self, id_):
        self.base_url = self.client.global_url(self.name)
        return self.client.get('{}/{}'.format(self.base_url, id_))

    def delete_subscription(self, id_):
        self.base_url = self.client.global_url(self.name)
        return self.client.delete('{}/{}'.format(self.base_url, id_))


class PhoneNumbers(Collection):
    def __init__(self, client):
        self.client, self.name = client, 'orders'
        self.base_url = self.client.global_url(self.name)
        self.child_class = None

    def order(self, partner_account_id, name, phonenumber):
        url = '{}'.format(self.base_url)
        root = ET.Element('Order')
        text_element(root, 'CustomerOrderId', partner_account_id)
        text_element(root, 'Name', name)
        existing_pn_order = ET.SubElement(root, 'ExistingTelephoneNumberOrderType')
        pn_list = ET.SubElement(existing_pn_order, 'TelephoneNumberList')
        text_element(pn_list, 'TelephoneNumber', phonenumber)
        text_element(root, 'SiteId', self.client.site_id)
        text_element(root, 'PeerId', self.client.location_id)
        order_xml = ET.tostring(root)
        log.info('The xml looks like: {}'.format(order_xml))
        headers = {'Content-Type': 'application/xml; charset=UTF-8'}
        return self.client.post_tn(url, order_xml, headers, 'order')

    def disconnect(self, partner_account_name, phonenumber):
        url = '{}'.format(self.client.global_url('disconnects'))
        root = ET.Element('DisconnectTelephoneNumberOrder')
        text_element(root, 'Name', partner_account_name)
        disconnect_pn_order = ET.SubElement(root, 'DisconnectTelephoneNumberOrderType')
        pn_list = ET.SubElement(disconnect_pn_order, 'TelephoneNumberList')
        text_element(pn_list, 'TelephoneNumber', phonenumber)
        disconnect_xml = ET.tostring(root)
        log.info('The xml looks like: {}'.format(disconnect_xml))
        headers = {'Content-Type': 'application/xml; charset=UTF-8'}
        return self.client.post_tn(url, disconnect_xml, headers)


class Messages(Collection):
    def __init__(self, client):
        self.client, self.name = client, 'messages'
        self.base_url = self.client.global_url(self.name)
        self.child_class = None

    def create(self, m_to, m_from, m_body, media=None, m_tag=None):
        url = '{}'.format(self.base_url)
        if media:
            data = {'to': m_to,
                    'from': m_from,
                    'text': m_body,
                    'applicationId': self.client.sms_application_id,
                    'media': media,
                    'tag': m_tag}
        else:
            data = {'to': m_to,
                    'from': m_from,
                    'text': m_body,
                    'applicationId': self.client.sms_application_id,
                    'tag': m_tag}
        json_data = json.dumps(data)
        headers = {'Content-Type': 'application/json'}
        return self.client.post_msg_json(url, json_data, headers)


# Bandwidth REST API V2 Class and functions
class PhoneCall(Collection):
    def __init__(self, client):
        self.client, self.name = client, 'calls'
        self.base_url = self.client.global_url(self.name)
        self.child_class = None

    def create(self, c_from, c_to, answer_callback_url, answer_fallback_callback_url, disconnect_callback_url,
               call_timeout=30, tag=None, machine_detection=None):
        url = '{}'.format(self.base_url)
        data = {'from': c_from,
                'to': c_to,
                'applicationId': self.client.voice_application_id,
                'answerUrl': answer_callback_url,
                'disconnectUrl': disconnect_callback_url,
                'answerFallbackUrl': answer_fallback_callback_url,
                'callTimeout': call_timeout,
                'tag': tag,
                'machineDetection': machine_detection}
        json_data = json.dumps(data)
        headers = {'Content-Type': 'application/json'}
        return self.client.post_call_json(url, json_data, headers)

    def recording(self, call_id, recording_id):
        url = '{}/{}/recordings/{}/media'.format(self.base_url, call_id, recording_id)
        r = requests.get(
            url,
            auth=self.client.auth,
            stream=True)
        return r

    def transcription(self, call_id, recording_id):
        url = '{}/{}/recordings/{}/transcription'.format(self.base_url, call_id, recording_id)
        r = requests.get(
            url,
            auth=self.client.auth)
        r_json = json.loads(r.content.decode())
        return r_json

    def info(self, call_id):
        url = '{}/{}'.format(self.base_url, call_id)
        r = requests.get(
            url,
            auth=self.client.auth)
        r_json = json.loads(r.content.decode())
        return r_json

    def update(self, call_id, state, redirect_url=None, redirect_fallback_url=None, tag=None):
        url = '{}/{}'.format(self.base_url, call_id)
        data = {'state': state,
                'redirectUrl': redirect_url,
                'redirectFallbackUrl': redirect_fallback_url,
                'tag': tag
                }
        json_data = json.dumps(data)
        headers = {'Content-Type': 'application/json'}
        return self.client.post_call_json(url, json_data, headers)


class PhoneNumbersOptions(Collection):
    def __init__(self, client):
        self.client, self.name = client, 'tnoptions'
        self.base_url = self.client.global_url(self.name)
        self.child_class = None

    def update(self, partner_account_id, phonenumber, sms_option, sms_campaign_id=None, sms_campaign_class=None):
        url = '{}'.format(self.base_url)
        root = ET.Element('TnOptionOrder')
        text_element(root, 'CustomerOrderId', partner_account_id)
        tn_options_groups = ET.SubElement(root, 'TnOptionGroups')
        tn_options_group = ET.SubElement(tn_options_groups, 'TnOptionGroup')
        text_element(tn_options_group, 'Sms', sms_option)
        if sms_option == 'on':
            sms_settings = ET.SubElement(tn_options_group, 'A2pSettings')
            if sms_campaign_id:
                text_element(sms_settings, 'MessageClass', sms_campaign_class)
                text_element(sms_settings, 'CampaignId', sms_campaign_id)
                text_element(sms_settings, 'Action', 'asSpecified')
            else:
                text_element(sms_settings, 'Action', 'unchanged')
        pn_list = ET.SubElement(tn_options_group, 'TelephoneNumbers')
        text_element(pn_list, 'TelephoneNumber', phonenumber)
        order_xml = ET.tostring(root)
        log.info('The xml looks like: {}'.format(order_xml))
        headers = {'Content-Type': 'application/xml; charset=UTF-8'}
        return self.client.post_tn(url, order_xml, headers, 'tn_option')


class InserviceNumber(Collection):
    def __init__(self, client):
        self.client, self.name = client, 'inserviceNumbers'
        self.base_url = self.client.global_url(self.name)
        self.child_class = None

    def validate(self, tn):
        url = '{}/{}'.format(self.base_url, tn)
        r = requests.get(
            url,
            auth=self.client.auth)
        return r.status_code


class Media(Collection):
    def __init__(self, client):
        self.client, self.name = client, 'media'
        self.base_url = self.client.global_url(self.name)
        self.child_class = None

    def list_media(self):
        url = '{}'.format(self.base_url)
        r = requests.get(
            url,
            auth=self.client.auth,
            stream=True
        )
        return r.content

    def get_media(self, media_name):
        url = '{}/{}'.format(self.base_url, media_name)
        r = requests.get(
            url,
            auth=self.client.auth,
            stream=True
        )
        return r


# A V2 realm is equivalent to a sip domain in V1
class Realms(Collection):
    def __init__(self, client):
        self.client, self.name = client, 'realms'
        self.base_url = self.client.global_url(self.name)
        self.child_class = None

    # A V2 sip credentials are equivalent to V1 endpoints
    def create_sip_credentials(self, realm_id, realm, sip_username, sip_password):
        url = '{}/{}/sipcredentials'.format(self.base_url, realm_id)
        root = ET.Element('SipCredentials')
        sip_cred = ET.SubElement(root, 'SipCredential')
        text_element(sip_cred, 'UserName', sip_username)
        if sip_password:
            hash_1_composite = (sip_username + ':' + realm + ':' + sip_password).encode()
            hash_1 = hashlib.md5(hash_1_composite).hexdigest()
            hash_1_b_composite = (sip_username + '@' + realm + ':' + realm + ':' + sip_password).encode()
            hash_1_b = hashlib.md5(hash_1_b_composite).hexdigest()
            text_element(sip_cred, 'Hash1', hash_1)
            text_element(sip_cred, 'Hash1b', hash_1_b)
        text_element(sip_cred, 'HttpVoiceV2AppId', self.client.voice_application_id)
        order_xml = ET.tostring(root)
        log.info('The xml looks like: {}'.format(order_xml))
        headers = {'Content-Type': 'application/xml; charset=UTF-8'}
        return self.client.post_sip(url, order_xml, headers)

    def delete_sip_credentials(self, realm_id, endpoint_id):
        url = '{}/{}/sipcredentials/{}'.format(self.base_url, realm_id, endpoint_id)
        log.info('The url is: {}'.format(url))
        return self.client.delete(url)

    def update_sip_credentials(self, realm_id, realm, endpoint_id, sip_username, sip_password):
        url = '{}/{}/sipcredentials/{}'.format(self.base_url, realm_id, endpoint_id)
        root = ET.Element('SipCredential')
        if sip_password:
            hash_1_composite = (sip_username + ':' + realm + ':' + sip_password).encode()
            hash_1 = hashlib.md5(hash_1_composite).hexdigest()
            hash_1_b_composite = (sip_username + '@' + realm + ':' + realm + ':' + sip_password).encode()
            hash_1_b = hashlib.md5(hash_1_b_composite).hexdigest()
            text_element(root, 'Hash1', hash_1)
            text_element(root, 'Hash1b', hash_1_b)
        text_element(root, 'Realm', realm)
        text_element(root, 'RealmId', realm_id)
        text_element(root, 'HttpVoiceV2AppId', self.client.voice_application_id)
        order_xml = ET.tostring(root)
        log.info('The xml looks like: {}'.format(order_xml))
        headers = {'Content-Type': 'application/xml; charset=UTF-8'}
        return self.client.put_sip(url, order_xml, headers)