HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/lib/bandwidth.py
import os
import json
import logging
import re
from urllib.parse import urlparse
import xml.etree.ElementTree as ET

import requests
from requests.auth import HTTPBasicAuth


log = logging.getLogger(__name__)


def camel_case(thing):
    if isinstance(thing, dict):
        return {camel_case(key): val for key, val in thing.items()}

    words = thing.split('_')
    return words[0] + ''.join(x.capitalize() for x in words[1:])


def is_absolute(url):
    return bool(urlparse(url).netloc)


class BandwidthException(Exception):
    def __init__(self, message):
        self.message = message


class Bandwidth(object):
    API_ROOT = os.environ.get(
        'BANDWIDTH_API_ROOT', 'https://api.catapult.inetwork.com/v1'
    )

    def __init__(self, token, secret, user_id, api_root=API_ROOT):
        self.auth = HTTPBasicAuth(token, secret)
        self.user_id = user_id
        self.api_root = api_root
        self.url_root = '{}/{}'.format(self.api_root, self.user_id)

    @property
    def calls(self):
        return Calls(self)

    @property
    def bridges(self):
        return Bridges(self)

    @property
    def recordings(self):
        return Recordings(self)

    @property
    def available_numbers(self):
        return AvailableNumbers(self)

    @property
    def phone_numbers(self):
        return PhoneNumbers(self)

    @property
    def applications(self):
        return Applications(self)

    @property
    def conferences(self):
        return Conferences(self)

    @property
    def messages(self):
        return Messages(self)

    @property
    def domains(self):
        return Domains(self)

    @property
    def media(self):
        return Media(self)

    def number_info(self, number):
        """ Make a CNAM lookup on an E.123-formatted number. """
        if not re.match(r'^\+?\d+$', number):
            log.error("Invalid number format: {}".format(number))
            return None
        try:
            url = '{}/phoneNumbers/numberInfo/{}'.format(self.api_root, number)
            return self.get(url)
        except:
            log.error('No CNAM value was found for number: {}'.format(number))
            return None

    def bridge(self, call_ids=[], bridge_audio=True):
        """ Create a new bridge. """
        url = self.post(
            self.resource_url('bridges'),
            call_ids=call_ids, bridge_audio=bridge_audio
        )
        return Bridge(self, url)

    def call(
        self, from_, to, callback_http_method='GET', **kwargs
    ):
        """ Create a new call. """
        json_data = {
            "from": from_, "to": to,
            "sipHeaders": {},
            "callbackHttpMethod": callback_http_method
        }
        json_data.update(kwargs)

        call_url = self.post(self.resource_url('calls'), json_data)
        return Call(self, call_url)

    def post(self, url, *args, **kwargs):
        params = kwargs
        if args and isinstance(args[0], dict):
            params = args[0]
        params = camel_case(params)
        for key, val in params.items():
            if type(val) is bool:
                params[key] = 'true' if val else 'false'

        log.debug('POST {}\n{}\n'.format(url, json.dumps(params)))

        r = requests.post(url, auth=self.auth, json=params)
        if 300 <= r.status_code:
            error_content = json.loads(r.content.decode())
            raise BandwidthException('STATUS_CODE:{} - CATEGORY: {} - REASON: {} - DETAIL: {}'
                                     .format(r.status_code, r.reason, error_content['code'], error_content['message']))
        return r.headers.get('Location')

    def get(self, url, **kwargs):
        kwargs = {k: v for k, v in kwargs.items() if v is not None}
        log.debug('GET {}?{}\n'.format(
            url,
            '&'.join('{}={}'.format(key, val) for key, val in kwargs.items())
        ))

        r = requests.get(url, auth=self.auth, params=camel_case(kwargs))
        if 300 <= r.status_code:
            error_content = json.loads(r.content.decode())
            raise BandwidthException('STATUS_CODE:{} - CATEGORY:{} - REASON:{} - DETAIL: {}'
                                     .format(r.status_code, r.reason, error_content['code'], error_content['message']))
        return r.json()

    def delete(self, url):
        log.debug('DELETE {}\n'.format(url))
        r = requests.delete(url, auth=self.auth)
        if 300 <= r.status_code:
            error_content = json.loads(r.content.decode())
            raise BandwidthException(r.reason)

    def global_url(self, resource):
        return '{}/{}'.format(self.api_root, resource)

    def resource_url(self, resource, id_=None):
        if id_ is None:
            return '{}/users/{}/{}'.format(
                self.api_root, self.user_id, resource)
        return '{}/users/{}/{}/{}'.format(
            self.api_root, self.user_id, resource, id_)

    def domain_url(self, resource, id_=None):
        if id_ is None:
            return '{}/users/{}/{}'.format(
                self.api_root, self.user_id, resource)
        return '{}/users/{}/{}/{}'.format(
            self.api_root, self.user_id, resource, id_)

    def hangup(self, sid):
        r = requests.post(
            '{}/{}/calls/{}'.format(self.api_root, self.user_id, sid),
            auth=self.auth,
            json={
                "state": "completed"
            }
        )
        if 300 <= r.status_code:
            raise BandwidthException(r.reason)

    def conference(self, conf_id):
        return Conference(self, conf_id)


class Collection(object):
    def __init__(self, client, name, child_class=None):
        self.client, self.name = client, name
        self.base_url = self.client.resource_url(self.name)
        self.child_class = child_class

    def __getitem__(self, id_):
        url = '{}/{}'.format(self.base_url, id_)
        return self.child_class(self.client, url)

    def search(self, **kwargs):
        return self.client.get(self.base_url, **kwargs)

    def delete(self, id_):
        self.client.delete('{}/{}'.format(self.base_url, id_))

    def post(self, **kwargs):
        return self.client.post(self.base_url, **kwargs)

    def create(self, **kwargs):
        return self.child_class(self.client, self.post(**kwargs))


class Resource(object):
    def __init__(self, client, url):
        if not is_absolute(url):
            url = '{}/{}'.format(client.api_root, url)
        self.url = url
        self.client = client
        self.id_ = self.url.split('/')[-1]

    @property
    def id(self):
        return self.id_

    def get(self):
        return self.client.get(self.url)

    def details(self):
        """ A synonym for `Resource.get` """
        return self.client.get(self.url)

    def post(self, **kwargs):
        return self.client.post(self.url, **kwargs)

    def set(self, **kwargs):
        """ A synonym for `Resource.post` """
        return self.post(**kwargs)


# This is the classes to manage the BW messaging
# http://dev.bandwidth.com/ap-docs/methods/messages/postMessages.html
class Messages(Collection):
    def __init__(self, client):
        super(Messages, self).__init__(client, 'messages', Message)


class Message(Resource):
    pass


class Conferences(Collection):
    def __init__(self, client):
        super(Conferences, self).__init__(client, 'conferences', Conference)


class Conference(Resource):
    def add_member(self, call_id):
        return self.client.post(
            '{}/members'.format(self.url),
            call_id=call_id
        )

    def remove_member(self, call_id):
        return self.client.post(
            '{}/members/{}'.format(self.url, call_id),
            state='completed'
        )

    def members(self):
        return self.client.get(
            '{}/members'.format(self.url)
        )

    def audio(self, **kwargs):
        return self.client.post(
            '{}/audio'.format(self.url),
            **kwargs
        )


class Applications(Collection):
    def __init__(self, client):
        super(Applications, self).__init__(client, 'applications', Application)

    def update_incoming_message_url(self, application_id, incoming_message_url):
        application_url = '{}/{}'.format(self.base_url, application_id)
        return self.client.post(application_url, incoming_message_url=incoming_message_url)


class Application(Resource):
    pass


class Bridges(object):
    def __init__(self, client):
        self.client = client

    def __getitem__(self, id_):
        url = self.client.resource_url('bridges', id_)
        return Bridge(self.client, url)


class Recordings(object):
    def __init__(self, client):
        self.client = client

    def __getitem__(self, id_):
        url = self.client.resource_url('recordings', id_)
        return Recording(self.client, url)


class Recording(Resource):
    def __init__(self, client, url, **kwargs):
        # TODO: un-camel-case
        super(Recording, self).__init__(client, url)
        for key, val in kwargs.items():
            if key == 'id':
                key = 'id_'
            setattr(self, key, val)

    def data(self):
        """ Assuming self.media is set """
        r = requests.get(
            self.media,
            auth=self.client.auth,
            stream=True
        )
        return r


class PhoneNumbers(Collection):
    def __init__(self, client):
        super(PhoneNumbers, self).__init__(client, 'phoneNumbers', PhoneNumber)

    def purchase(
        self, number=None, name=None, application_id=None, fallback_number=None
    ):
        url = self.post(
            number=number, application_id=application_id,
            name=name, fallback_number=fallback_number
        )

        return PhoneNumber(self.client, url)

    # This function is used to import a phone number into the Application platform from the Dashboard platform
    def import_number(self, number=None, name=None, application_id=None, provider=None):
        url = self.post(
            number=number, name=name,
            applicationId=application_id, provider=provider
        )

        return PhoneNumber(self.client, url)


class PhoneNumber(Resource):
    pass


class AvailableNumbers(Collection):
    def __init__(self, client):
        self.client, self.name = client, 'availableNumbers'
        self.base_url = self.client.global_url(self.name)
        self.child_class = None

    def search(self, type='local', **kwargs):
        url = '{}/{}'.format(self.base_url, type)
        return self.client.get(url, **kwargs)


class Domains(Collection):
    def __init__(self, client):
        super(Domains, self).__init__(client, 'domains', Domain)

    def create_endpoint(self, domain_id, name, description, application_id, enabled, credentials):
        endpoint_url = '{}/{}/endpoints'.format(self.base_url, domain_id)
        url = self.client.post(endpoint_url,
                               domain_id=domain_id,
                               name=name,
                               description=description,
                               application_id=application_id,
                               enabled=enabled,
                               credentials=credentials)
        return Endpoint(self.client, url)

    def get_endpoint(self, domain_id, endpoint_id):
        url = '{}/{}/endpoints/{}'.format(self.base_url,
                                          domain_id,
                                          endpoint_id)
        return self.client.get(url, domain_id=domain_id, endpoint_id=endpoint_id)

    def get_endpoints(self, domain_id):
        url = '{}/{}/endpoints'.format(self.base_url,
                                          domain_id)
        return self.client.get(url, domain_id=domain_id)

    def update_endpoint(self, domain_id, endpoint_id, description, enabled, credentials):
        url = '{}/{}/endpoints/{}'.format(self.base_url,
                                          domain_id,
                                          endpoint_id,)
        return self.client.post(url,
                                description=description,
                                enabled=enabled,
                                credentials=credentials
                                )

    def delete_endpoint(self, domain_id, endpoint_id):
        domain_url = self.client.domain_url('domains', domain_id)
        endpoint_url = '{}/endpoints/{}'.format(domain_url, endpoint_id)
        log.info('The url is: {}'.format(endpoint_url))
        return self.client.delete(endpoint_url)


class Media(Collection):
    def __init__(self, client):
        super(Media, self).__init__(client, 'media', Medias)

    def get_media(self, media_name):
        url = '{}/{}'.format(self.base_url, media_name)
        r = requests.get(
            url,
            auth=self.client.auth,
            stream=True
        )
        return r.content


class Domain(Resource):
    pass


class Medias(Resource):
    pass


class Endpoint(Resource):
    pass


class Bridge(Resource):
    def set(self, calls=[], bridge_audio=True):
        super(Bridge, self).set(calls, bridge_audio)


class Calls(object):
    def __init__(self, client):
        self.client = client

    def __getitem__(self, call_id):
        url = self.client.resource_url('calls', call_id)
        return Call(self.client, url)

    def hangup(self, call_id):
        call_url = self.client.resource_url('calls', call_id)
        self.client.post(call_url, state='completed')

    def redirect(self, call_id, url):
        call_url = self.client.resource_url('calls', call_id)
        self.client.post(
            call_url, state='active', callback_url=url
        )

    def update(self, call_id):
        call_url = self.client.resource_url('calls', call_id)
        self.client.post(call_url, state='active')

    def transfer(self, call_id, phone_number):
        call_url = self.client.resource_url('calls', call_id)
        self.client.post(call_url, state='transferring', transfer_to=phone_number)

    def info(self, call_id):
        call_url = self.client.resource_url('calls', call_id)
        return self.client.get(call_url)


class Call(Resource):
    def audio(self, *args, **kwargs):
        url = '{}/audio'.format(self.url)
        self.client.post(url, **kwargs)

    def gather(self, *args, **kwargs):
        url = '{}/gather'.format(self.url)
        self.client.post(url, *args, **kwargs)

    def reject(self):
        self.set(state='rejected')

    def accept(self):
        self.set(state='active')

    def hangup(self):
        self.set(state='completed')

    def redirect(self, url):
        self.post(callback_url=url)

    def recordings(self):
        url = '{}/recordings'.format(self.url)
        return [
            Recording(
                self.client, '{}/{}'.format(self.url, item['id']), **item
            ) for item in self.client.get(url)
        ]

    def say(
        self, text, gender='female', locale='en_US', voice='susan', **kwargs
    ):
        url = '{}/audio'.format(self.url)
        self.client.post(
            url, sentence=text, gender=gender, locale=locale, voice=voice,
            **kwargs
        )


def xattrs(method):
    def wrapper(self, *args, **kwargs):
        elt = method(self, *args)
        for key, val in kwargs.items():
            if val:
                elt.set(camel_case(key), str(val))
        return elt

    return wrapper


class Response(object):
    def __init__(self, elt=None):
        self.root = elt if elt is not None else ET.Element('Response')

    def __str__(self):
        return ET.tostring(self.root, encoding="unicode")

    @xattrs
    def say(self, text, gender='female', locale='en_US', voice='susan'):
        elt = ET.Element(
            'SpeakSentence', gender=gender, locale=locale, voice=voice
        )
        elt.text = text
        self.root.append(elt)
        return elt

    @xattrs
    def pause(self):
        elt = ET.Element(
            'Pause', length='4'
        )
        self.root.append(elt)
        return elt

    @xattrs
    def custom_pause(self, length='2'):
        elt = ET.Element(
            'Pause', length=length
        )
        self.root.append(elt)
        return elt

    @xattrs
    def play_audio(self, url):
        elt = ET.Element('PlayAudio', volume='-4')
        elt.text = url
        self.root.append(elt)
        return elt

    @xattrs
    def conference(self):
        """
        :param from_: Required.
        :param status_callback_url: Optional.
        :param join_tone: Optional. 'true'/'false'
        :param leaving_tone: Optional. 'true'/'false'
        :param tag: Optional.
        :param mute: Optional. 'true'/'false'
        :param hold: Optional. 'true'/'false'
        """
        elt = ET.Element('Conference')
        self.root.append(elt)
        return elt

    @xattrs
    def redirect(self, request_url):
        elt = ET.Element(
            'Redirect',
            attrib={
                "requestUrl": request_url,
                "requestUrlTimeout": '5000'
            }
        )
        self.root.append(elt)
        return elt

    def hangup(self):
        elt = ET.Element('Hangup')
        self.root.append(elt)
        return elt

    def reject(self):
        elt = ET.Element('Reject')
        self.root.append(elt)
        return elt

    def gather(self, request_url, **kwargs):
        elt = ET.Element(
            'Gather',
            requestUrl=request_url
        )
        self.root.append(elt)
        for key, val in kwargs.items():
            if val:
                elt.set(camel_case(key), str(val))

        return Gather(elt)

    def transfer(self, **kwargs):
        elt = ET.Element(
            'Transfer'
        )
        self.root.append(elt)
        for key, val in kwargs.items():
            if val:
                elt.set(camel_case(key), str(val))
        return Transfer(elt)

    def record(self, **kwargs):
        elt = ET.Element(
            'Record'
        )
        self.root.append(elt)
        for key, val in kwargs.items():
            if val:
                elt.set(camel_case(key), str(val))
        return Transfer(elt)


class Gather(object):

    def __init__(self, elt):
        self.root = elt

    def __enter__(self):
        return self

    def __exit__(self, type_, value, tb):
        pass

    def say(self, *args, **kwargs):
        return Response(self.root).say(*args, **kwargs)

    def play_audio(self, url):
        elt = ET.Element('PlayAudio', volume='-4')
        elt.text = url
        self.root.append(elt)
        return elt


class Transfer(object):

    def __init__(self, elt):
        self.root = elt
        self.resp = Response(self.root)

    def __enter__(self):
        return self

    def __exit__(self, type_, value, tb):
        pass

    def say(self, *args, **kwargs):
        return self.resp.say(*args, **kwargs)

    def phone_number(self, text):
        elt = ET.Element(
            'PhoneNumber'
        )
        self.root.append(elt)
        elt.text = text
        return elt

    def record(self, **kwargs):
        elt = ET.Element(
            'Record'
        )
        self.root.append(elt)
        for key, val in kwargs.items():
            if val:
                elt.set(camel_case(key), str(val))
        return elt

    def pause(self):
        elt = ET.Element(
            'Pause', length='3'
        )
        self.root.append(elt)
        return elt

    def play_audio(self, url):
        elt = ET.Element('PlayAudio', volume='-4')
        elt.text = url
        self.root.append(elt)
        return elt