File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/lib/bandwidth_api_v2.py
import os
import json
import logging
import xml.etree.ElementTree as ET
import xmltodict
import urllib.parse
import requests
import hashlib
from requests.auth import HTTPBasicAuth
from buyercall.blueprints.filters import format_phone_number
log = logging.getLogger(__name__)
def camel_case(thing):
if isinstance(thing, dict):
return {camel_case(key): val for key, val in thing.items()}
words = thing.split('_')
return words[0] + ''.join(x.capitalize() for x in words[1:])
# ET SubElement does not naturally allow the passing of of a text value in constructor
# Hence we need to define a function to do it for us. ET.SubElement should be replaced
# with text_element where ever you need to add a text value to XML note. This is not for
# and attribute. The ET.SubElement allows for attributes like; ET.SubElement(parent, attrb='')
def text_element(parent, tag, text, *args, **kwargs):
element = ET.SubElement(parent, tag, *args, **kwargs)
element.text = text
return element
def is_absolute(url):
return bool(urllib.parse.urlparse(url).netloc)
class BandwidthException(Exception):
pass
# Handle Bandwidth Dashboard number search and provisioning using Bandwidth API v2. We are on a Hybrid account
# meaning our code base is still on V1 of the API, but Bandwidth is forcing all provisioning to happen with V2 and
# therefore we need to provision with V2 and then import the numbers to application platform with V1 until full
# V2 version is available.
def api_root_url(request_type):
if request_type == 'messaging':
api_root = os.environ.get(
'BANDWIDTH_MESSAGING_API_ROOT', 'https://messaging.bandwidth.com/api/v2/users'
)
elif request_type == 'voice':
api_root = os.environ.get(
'BANDWIDTH_VOICE_API_ROOT', 'https://voice.bandwidth.com/api/v2/accounts'
)
else:
api_root = os.environ.get(
'BANDWIDTH_DASHBOARD_API_ROOT', 'https://dashboard.bandwidth.com/api/accounts'
)
return api_root
class Bandwidth(object):
def __init__(self, request_type, username, password, account_id, site_id, location_id,
sms_application_id, voice_application_id, cnam_password):
api_root = api_root_url(request_type)
self.auth = HTTPBasicAuth(username, password)
self.account_id = account_id
self.api_root = api_root
self.username = username
self.password = password
self.site_id = site_id
self.location_id = location_id
self.sms_application_id = sms_application_id
self.voice_application_id = voice_application_id
self.cnam_password = cnam_password
self.url_root = '{}/{}'.format(self.api_root, self.account_id)
log.info('The root url is: {}'.format(self.url_root))
@property
def available_numbers(self):
return AvailableNumbers(self)
@property
def subscriptions(self):
return Subscriptions(self)
@property
def phone_numbers(self):
return PhoneNumbers(self)
@property
def phone_numbers_options(self):
return PhoneNumbersOptions(self)
@property
def in_service_number(self):
return InserviceNumber(self)
@property
def media(self):
return Media(self)
@property
def messages(self):
return Messages(self)
@property
def call(self):
return PhoneCall(self)
@property
def realms(self):
return Realms(self)
def cnam_lookup(self, number, test=False):
url = 'https://cnam.dashcs.com/?'
password = urllib.parse.quote_plus(self.cnam_password)
format_number = format_phone_number(number)
params = {"companyId": self.account_id, "password": password,
"number": format_number, "dnis": format_number, "test": test}
url_parse = urllib.parse.urlparse(url)
query = url_parse.query
url_dict = dict(urllib.parse.parse_qsl(query))
url_dict.update(params)
url_new_query = urllib.parse.urlencode(url_dict)
url_parse = url_parse._replace(query=url_new_query)
new_url = urllib.parse.urlunparse(url_parse)
r = requests.get(new_url)
if 300 <= r.status_code:
raise BandwidthException('STATUS_CODE:{} - DETAIL: {}'
.format(r.status_code, r.content.decode()))
return r.content.decode()
def global_url(self, resource):
return '{}/{}/{}'.format(self.api_root, self.account_id, resource)
def get(self, url, **kwargs):
kwargs = {k: v for k, v in kwargs.items() if v is not None}
log.debug('GET {}?{}\n'.format(
url,
'&'.join('{}={}'.format(key, val) for key, val in kwargs.items())
))
r = requests.get(url, auth=self.auth, params=camel_case(kwargs))
if 300 <= r.status_code:
error_content = xmltodict.parse(r.content.decode())
json_error_content = json.dumps(error_content)
return_error_json = json.loads(json_error_content)
error_desc = return_error_json['SearchResult']['Error']['Description']
error_code = return_error_json['SearchResult']['Error']['Code']
raise BandwidthException('STATUS_CODE:{} - CATEGORY: {} - REASON: {} - DETAIL: {}'
.format(r.status_code, r.reason, error_code, error_desc))
return r.content.decode()
def post_tn(self, url, data, headers, request_type=None):
r = requests.post(url, auth=self.auth, data=data, headers=headers)
if 300 <= r.status_code:
error_content = xmltodict.parse(r.content.decode())
json_error_content = json.dumps(error_content)
return_error_json = json.loads(json_error_content)
if request_type == 'order':
error_desc = return_error_json['OrderResponse']['ErrorList']['Error']['Description']
error_code = return_error_json['OrderResponse']['ErrorList']['Error']['Code']
elif request_type == 'subscription':
error_desc = return_error_json['SubscriptionResponse']['ResponseStatus']['Description']
error_code = return_error_json['SubscriptionResponse']['ResponseStatus']['ErrorCode']
elif request_type == 'tn_option':
error_desc = return_error_json['TnOptionOrderResponse']['ResponseStatus']['Description']
error_code = return_error_json['TnOptionOrderResponse']['ResponseStatus']['ErrorCode']
else:
error_desc = return_error_json['ErrorList']['Error']['Description']
error_code = return_error_json['ErrorList']['Error']['Code']
raise BandwidthException('STATUS_CODE:{} - CATEGORY: {} - REASON: {} - DETAIL: {}'
.format(r.status_code, r.reason, error_code, error_desc))
return r.headers.get('Location')
def post_sip(self, url, data, headers):
r = requests.post(url, auth=self.auth, data=data, headers=headers)
if 300 <= r.status_code:
raise BandwidthException('STATUS_CODE:{} - CATEGORY: {} - DETAIL: {}'
.format(r.status_code, r.reason, r.content.decode()))
return r
def put_sip(self, url, data, headers):
r = requests.put(url, auth=self.auth, data=data, headers=headers)
if 300 <= r.status_code:
raise BandwidthException('STATUS_CODE:{} - CATEGORY: {} - DETAIL: {}'
.format(r.status_code, r.reason, r.content.decode()))
return r
def post_msg_json(self, url, data, headers):
r = requests.post(url, auth=self.auth, data=data, headers=headers)
log.info('The msg request data: {}'.format(data))
return r.content.decode()
def post_call_json(self, url, data, headers):
r = requests.post(url, auth=self.auth, data=data, headers=headers)
log.info('The request data: {}'.format(data))
return r.content.decode()
def delete(self, url):
log.debug('DELETE {}\n'.format(url))
r = requests.delete(url, auth=self.auth)
if 300 <= r.status_code:
raise BandwidthException('STATUS_CODE:{} - CATEGORY: {} - DETAIL: {}'
.format(r.status_code, r.reason, r.content.decode()))
return r.status_code
class Collection(object):
def __init__(self, client, name, child_class=None):
self.client, self.name = client, name
self.base_url = self.client.resource_url(self.name)
self.child_class = child_class
def __getitem__(self, id_):
url = '{}/{}'.format(self.base_url, id_)
return self.child_class(self.client, url)
class AvailableNumbers(Collection):
def __init__(self, client):
self.client, self.name = client, 'availableNumbers'
self.base_url = self.client.global_url(self.name)
log.info('The base url is: {}'.format(self.base_url))
log.info('The client account id looks like: {}'.format(client.account_id))
self.child_class = None
def search(self, **kwargs):
url = '{}'.format(self.base_url)
r = xmltodict.parse(self.client.get(url, **kwargs))
return json.dumps(r)
class Subscriptions(Collection):
def __init__(self, client):
self.client, self.name = client, 'subscriptions'
self.base_url = self.client.global_url(self.name)
self.child_class = None
def subscribe(self, order_type, callback_url, expiry):
url = '{}'.format(self.base_url)
root = ET.Element('Subscription')
text_element(root, 'OrderType', order_type)
callback_subscription_elt = ET.SubElement(root, 'CallbackSubscription')
text_element(callback_subscription_elt, 'URL', callback_url)
text_element(callback_subscription_elt, 'Expiry', expiry)
callback_credentials_elt = ET.SubElement(callback_subscription_elt, 'CallbackCredentials')
callback_auth_elt = ET.SubElement(callback_credentials_elt, 'BasicAuthentication')
text_element(callback_auth_elt, 'Username', self.client.username)
text_element(callback_auth_elt, 'Password', self.client.password)
subscribe_xml = ET.tostring(root)
headers = {'Content-Type': 'application/xml; charset=UTF-8'}
return self.client.post_tn(url, subscribe_xml, headers, 'subscription')
def all_subscriptions(self):
self.base_url = self.client.global_url(self.name)
return self.client.get('{}'.format(self.base_url))
def single_subscription(self, id_):
self.base_url = self.client.global_url(self.name)
return self.client.get('{}/{}'.format(self.base_url, id_))
def delete_subscription(self, id_):
self.base_url = self.client.global_url(self.name)
return self.client.delete('{}/{}'.format(self.base_url, id_))
class PhoneNumbers(Collection):
def __init__(self, client):
self.client, self.name = client, 'orders'
self.base_url = self.client.global_url(self.name)
self.child_class = None
def order(self, partner_account_id, name, phonenumber):
url = '{}'.format(self.base_url)
root = ET.Element('Order')
text_element(root, 'CustomerOrderId', partner_account_id)
text_element(root, 'Name', name)
existing_pn_order = ET.SubElement(root, 'ExistingTelephoneNumberOrderType')
pn_list = ET.SubElement(existing_pn_order, 'TelephoneNumberList')
text_element(pn_list, 'TelephoneNumber', phonenumber)
text_element(root, 'SiteId', self.client.site_id)
text_element(root, 'PeerId', self.client.location_id)
order_xml = ET.tostring(root)
log.info('The xml looks like: {}'.format(order_xml))
headers = {'Content-Type': 'application/xml; charset=UTF-8'}
return self.client.post_tn(url, order_xml, headers, 'order')
def disconnect(self, partner_account_name, phonenumber):
url = '{}'.format(self.client.global_url('disconnects'))
root = ET.Element('DisconnectTelephoneNumberOrder')
text_element(root, 'Name', partner_account_name)
disconnect_pn_order = ET.SubElement(root, 'DisconnectTelephoneNumberOrderType')
pn_list = ET.SubElement(disconnect_pn_order, 'TelephoneNumberList')
text_element(pn_list, 'TelephoneNumber', phonenumber)
disconnect_xml = ET.tostring(root)
log.info('The xml looks like: {}'.format(disconnect_xml))
headers = {'Content-Type': 'application/xml; charset=UTF-8'}
return self.client.post_tn(url, disconnect_xml, headers)
class Messages(Collection):
def __init__(self, client):
self.client, self.name = client, 'messages'
self.base_url = self.client.global_url(self.name)
self.child_class = None
def create(self, m_to, m_from, m_body, media=None, m_tag=None):
url = '{}'.format(self.base_url)
if media:
data = {'to': m_to,
'from': m_from,
'text': m_body,
'applicationId': self.client.sms_application_id,
'media': media,
'tag': m_tag}
else:
data = {'to': m_to,
'from': m_from,
'text': m_body,
'applicationId': self.client.sms_application_id,
'tag': m_tag}
json_data = json.dumps(data)
headers = {'Content-Type': 'application/json'}
return self.client.post_msg_json(url, json_data, headers)
# Bandwidth REST API V2 Class and functions
class PhoneCall(Collection):
def __init__(self, client):
self.client, self.name = client, 'calls'
self.base_url = self.client.global_url(self.name)
self.child_class = None
def create(self, c_from, c_to, answer_callback_url, answer_fallback_callback_url, disconnect_callback_url,
call_timeout=30, tag=None, machine_detection=None):
url = '{}'.format(self.base_url)
data = {'from': c_from,
'to': c_to,
'applicationId': self.client.voice_application_id,
'answerUrl': answer_callback_url,
'disconnectUrl': disconnect_callback_url,
'answerFallbackUrl': answer_fallback_callback_url,
'callTimeout': call_timeout,
'tag': tag,
'machineDetection': machine_detection}
json_data = json.dumps(data)
headers = {'Content-Type': 'application/json'}
return self.client.post_call_json(url, json_data, headers)
def recording(self, call_id, recording_id):
url = '{}/{}/recordings/{}/media'.format(self.base_url, call_id, recording_id)
r = requests.get(
url,
auth=self.client.auth,
stream=True)
return r
def transcription(self, call_id, recording_id):
url = '{}/{}/recordings/{}/transcription'.format(self.base_url, call_id, recording_id)
r = requests.get(
url,
auth=self.client.auth)
r_json = json.loads(r.content.decode())
return r_json
def info(self, call_id):
url = '{}/{}'.format(self.base_url, call_id)
r = requests.get(
url,
auth=self.client.auth)
r_json = json.loads(r.content.decode())
return r_json
def update(self, call_id, state, redirect_url=None, redirect_fallback_url=None, tag=None):
url = '{}/{}'.format(self.base_url, call_id)
data = {'state': state,
'redirectUrl': redirect_url,
'redirectFallbackUrl': redirect_fallback_url,
'tag': tag
}
json_data = json.dumps(data)
headers = {'Content-Type': 'application/json'}
return self.client.post_call_json(url, json_data, headers)
class PhoneNumbersOptions(Collection):
def __init__(self, client):
self.client, self.name = client, 'tnoptions'
self.base_url = self.client.global_url(self.name)
self.child_class = None
def update(self, partner_account_id, phonenumber, sms_option, sms_campaign_id=None, sms_campaign_class=None):
url = '{}'.format(self.base_url)
root = ET.Element('TnOptionOrder')
text_element(root, 'CustomerOrderId', partner_account_id)
tn_options_groups = ET.SubElement(root, 'TnOptionGroups')
tn_options_group = ET.SubElement(tn_options_groups, 'TnOptionGroup')
text_element(tn_options_group, 'Sms', sms_option)
if sms_option == 'on':
sms_settings = ET.SubElement(tn_options_group, 'A2pSettings')
if sms_campaign_id:
text_element(sms_settings, 'MessageClass', sms_campaign_class)
text_element(sms_settings, 'CampaignId', sms_campaign_id)
text_element(sms_settings, 'Action', 'asSpecified')
else:
text_element(sms_settings, 'Action', 'unchanged')
pn_list = ET.SubElement(tn_options_group, 'TelephoneNumbers')
text_element(pn_list, 'TelephoneNumber', phonenumber)
order_xml = ET.tostring(root)
log.info('The xml looks like: {}'.format(order_xml))
headers = {'Content-Type': 'application/xml; charset=UTF-8'}
return self.client.post_tn(url, order_xml, headers, 'tn_option')
class InserviceNumber(Collection):
def __init__(self, client):
self.client, self.name = client, 'inserviceNumbers'
self.base_url = self.client.global_url(self.name)
self.child_class = None
def validate(self, tn):
url = '{}/{}'.format(self.base_url, tn)
r = requests.get(
url,
auth=self.client.auth)
return r.status_code
class Media(Collection):
def __init__(self, client):
self.client, self.name = client, 'media'
self.base_url = self.client.global_url(self.name)
self.child_class = None
def list_media(self):
url = '{}'.format(self.base_url)
r = requests.get(
url,
auth=self.client.auth,
stream=True
)
return r.content
def get_media(self, media_name):
url = '{}/{}'.format(self.base_url, media_name)
r = requests.get(
url,
auth=self.client.auth,
stream=True
)
return r
# A V2 realm is equivalent to a sip domain in V1
class Realms(Collection):
def __init__(self, client):
self.client, self.name = client, 'realms'
self.base_url = self.client.global_url(self.name)
self.child_class = None
# A V2 sip credentials are equivalent to V1 endpoints
def create_sip_credentials(self, realm_id, realm, sip_username, sip_password):
url = '{}/{}/sipcredentials'.format(self.base_url, realm_id)
root = ET.Element('SipCredentials')
sip_cred = ET.SubElement(root, 'SipCredential')
text_element(sip_cred, 'UserName', sip_username)
if sip_password:
hash_1_composite = (sip_username + ':' + realm + ':' + sip_password).encode()
hash_1 = hashlib.md5(hash_1_composite).hexdigest()
hash_1_b_composite = (sip_username + '@' + realm + ':' + realm + ':' + sip_password).encode()
hash_1_b = hashlib.md5(hash_1_b_composite).hexdigest()
text_element(sip_cred, 'Hash1', hash_1)
text_element(sip_cred, 'Hash1b', hash_1_b)
text_element(sip_cred, 'HttpVoiceV2AppId', self.client.voice_application_id)
order_xml = ET.tostring(root)
log.info('The xml looks like: {}'.format(order_xml))
headers = {'Content-Type': 'application/xml; charset=UTF-8'}
return self.client.post_sip(url, order_xml, headers)
def delete_sip_credentials(self, realm_id, endpoint_id):
url = '{}/{}/sipcredentials/{}'.format(self.base_url, realm_id, endpoint_id)
log.info('The url is: {}'.format(url))
return self.client.delete(url)
def update_sip_credentials(self, realm_id, realm, endpoint_id, sip_username, sip_password):
url = '{}/{}/sipcredentials/{}'.format(self.base_url, realm_id, endpoint_id)
root = ET.Element('SipCredential')
if sip_password:
hash_1_composite = (sip_username + ':' + realm + ':' + sip_password).encode()
hash_1 = hashlib.md5(hash_1_composite).hexdigest()
hash_1_b_composite = (sip_username + '@' + realm + ':' + realm + ':' + sip_password).encode()
hash_1_b = hashlib.md5(hash_1_b_composite).hexdigest()
text_element(root, 'Hash1', hash_1)
text_element(root, 'Hash1b', hash_1_b)
text_element(root, 'Realm', realm)
text_element(root, 'RealmId', realm_id)
text_element(root, 'HttpVoiceV2AppId', self.client.voice_application_id)
order_xml = ET.tostring(root)
log.info('The xml looks like: {}'.format(order_xml))
headers = {'Content-Type': 'application/xml; charset=UTF-8'}
return self.client.put_sip(url, order_xml, headers)