File: //proc/self/root/home/arjun/projects/buyercall/buyercall/blueprints/phonenumbers/endpoints.py
import os
import json
import logging as log
from re import S
from flask import Blueprint, request, current_app
from flask_login import current_user
from buyercall.blueprints.user.decorators import api_role_required
from sqlalchemy import and_
from buyercall.blueprints.phonenumbers import Phone, Audio
from buyercall.blueprints.partnership.models import Partnership, PartnershipAccount
from buyercall.lib.util_rest import api_jsonify
from werkzeug.utils import secure_filename
from buyercall.extensions import db
from buyercall.lib.util_boto3_s3 import generate_presigned_file_url, upload_file_object
from buyercall.lib.util_boto3_polly import AwsPolly
from buyercall.blueprints.notification.utilities import send_notifications
from buyercall.blueprints.sources.models import Source
from buyercall.blueprints.channels.models import Channel, ChannelType
from werkzeug.datastructures import FileStorage
from buyercall.blueprints.widgets.models import Widget
from buyercall.blueprints.phonenumbers.views import purchase_bw
phone_number_api = Blueprint('phone_number_api', __name__, url_prefix='/api/phone-numbers')
logger = log.getLogger(__name__)
# @api_role_required('admin')
def create_phone_channel():
"""This endpoint provisions a number which is selected by the user
Returns:
json: data contains the sid of the phone number created
"""
success = True
message = "Phone number channel added successfully!"
status_code = 200
data = {}
# Convert form data to dict
form_data = request.form.to_dict(flat=True)
try:
received = json.loads(form_data.get('data', None))
except:
received = {}
if received:
from buyercall.blueprints.partnership.models import Partnership
partnership_id = current_user.partnership_id or 1
partnership = Partnership.query.filter(Partnership.id == partnership_id).first()
source_id = received.get('sourceid', None)
source_id = Source.get_id_from_sid(source_id)
# widget not required
widget_id = received.get('widget', None)
widget = Widget.query.filter(Widget.guid == widget_id).first() if widget_id else None
params = {
"phonenumber": received.get('provisionedPhoneNumber', ''),
"friendly_name": received.get('name', ''),
"source_id": source_id,
"local": True if received.get('numberType', False) == 'Local' else False,
"tollfree": True if received.get('numberType', False) == 'toll-free' else False,
"routing_config": {
'enableSMS': received.get('enableSMS', False),
'requestCallAcceptance': received.get('requestCallAcceptance', False)
},
"provider": received.get('provider', 'bandwidth'),
"partnership_account_id": current_user.partnership_account_id or 1,
"caller_id": received.get('leadNumAsCallerIDForIncomingCalls', False)
}
phone_number = Phone.create_phone(params)
if phone_number:
# Create a channel
channel = Channel()
channel.source = source_id
if widget:
channel.widgets = [widget]
channel.content = {
'sid': str(phone_number.sid),
'phonenumber': phone_number.phonenumber,
'is_active': phone_number.active
}
channel.name = phone_number.friendly_name
channel.description = ''
_channel_type = ChannelType.query.filter(ChannelType.name == 'phone-number').first()
if _channel_type:
channel.type = _channel_type.id
channel.related_id = phone_number.id
channel.created_by = current_user.id
channel.partnership_id = partnership_id
channel.partnership_account_id = current_user.partnership_account_id or 1
channel.save()
# Purchase bandwidth
# purchase_bw(phone_number)
# Create notification payload
es_data = {
'user_id': current_user.sid,
'notify_message_type': 'PHONE_CHANNEL_ADDED',
'user_related_entities': "You've",
'other_user_related_entities': f'{current_user.firstname} {current_user.lastname}',
'hyperlink': f'{partnership.partner_url}/partnership/sources'
}
# Send notification
es_response = send_notifications(**es_data)
# Add phone data to response data
data = {
"phoneId": phone_number.sid,
"name": phone_number.friendly_name,
"numberType": received.get('numberType', ''),
"enableSMS": phone_number.routing_config.get('enableSMS', False),
"leadNumAsCallerIDForIncomingCalls": phone_number.caller_id,
"requestCallAcceptance": phone_number.routing_config.get('requestCallAcceptance', False),
"enableCallRecording": False,
"textToSpeechToSpeakCustGreetMsg": None,
"textToSpeechaccentVoice": None,
"uploadedCustomAudio": None,
"uploadedSelfRecordAudio": None,
"enablePlayCustBeforeConnectMessage": False,
"usecustomvoicemail": None,
"texttospeechmessage": None,
"voicemailaccentVoice": None,
"uploadedCustomVoiceMail": None,
"provisionedPhoneNumber": phone_number.phonenumber
}
else:
success = False
message = "Invalid parameters!"
status_code = 400
return api_jsonify(data, status_code, message, success)
# @api_role_required('admin')
def update_phone_channel(sid):
"""This endpoint updates the provisioned phone number with configuration data
Args:
sid (str): The sid of the phone number created
Returns:
json: The success/error response
"""
def get_accent_code(lang="English"):
"""Get accent voice and language code for a language
Args:
lang (str, optional): Language for which we need accent voice and language code.
Defaults to "English".
Returns:
dict | None: accent voice id and language code
"""
if lang == 'Spanish':
tts_voice = 'Conchita'
language_code = 'es-ES'
elif lang == 'English':
tts_voice = 'Joanna'
language_code = 'en-US'
else:
return
voicemail_accent = {
'voiceId': tts_voice,
'languageCode': language_code
}
return voicemail_accent
success = True
message = "Phone number channel updated successfully!"
status_code = 200
data = {}
# Convert form data to dict
form_data = request.form.to_dict(flat=True)
try:
received = json.loads(form_data.get('data', None))
except:
received = {}
if received and sid:
phone_number = Phone.query.filter(Phone.sid == sid).first()
partnership = Partnership.query.filter(Partnership.id == current_user.partnership_id).first()
if not partnership:
partnership = Partnership.query.get(1)
partnership_account_id = current_user.partnership_account_id or 1
partnership_account_sid = PartnershipAccount.get_sid_from_id(partnership_account_id)
if phone_number:
# Get existing s3 url or new audio files
uploaded_greeting = received.get('uploadedCustomAudio', '')
if not uploaded_greeting:
uploaded_greeting = request.files.get('uploadedCustomAudio', None)
recorded_greeting = received.get('uploadedSelfRecordAudio', '')
if not recorded_greeting:
recorded_greeting = request.files.get('uploadedSelfRecordAudio', None)
uploaded_voicemail = received.get('uploadedCustomVoiceMail', '')
if not uploaded_voicemail:
uploaded_voicemail = request.files.get('uploadedCustomVoiceMail', None)
uploaded_greeting_file = recorded_greeting_file = uploaded_voicemail_file = {}
# If not str, save to s3, otherwise create dict for database entry
if uploaded_greeting:
if isinstance(uploaded_greeting, str):
uploaded_greeting_file = {
'audio_url': uploaded_greeting,
'result': phone_number.routing_config.get('uploadedCustomAudio', None)
}
elif isinstance(uploaded_greeting, FileStorage):
uploaded_greeting_file = save_audio_to_s3(partnership_account_sid,
phone_number.sid, 'customgreeting', uploaded_greeting)
if recorded_greeting:
if isinstance(recorded_greeting, str):
recorded_greeting_file = {
'audio_url': recorded_greeting,
'result': phone_number.routing_config.get('uploadedSelfRecordAudio', None)
}
elif isinstance(recorded_greeting, FileStorage):
recorded_greeting_file = save_audio_to_s3(partnership_account_sid,
phone_number.sid, 'recordedgreeting', recorded_greeting)
if uploaded_voicemail:
if isinstance(uploaded_voicemail, str):
uploaded_voicemail_file = {
'audio_url': uploaded_voicemail,
'result': phone_number.routing_config.get('uploadedCustomVoiceMail', None)
}
elif isinstance(uploaded_voicemail, FileStorage):
uploaded_voicemail_file = save_audio_to_s3(partnership_account_sid,
phone_number.sid, 'customvoicemail', uploaded_voicemail)
# Greeting message accent
greeting_tts_accent = None
greeting_tts_lang = received.get("textToSpeechaccentVoice", None)
if greeting_tts_lang:
greeting_tts_accent = get_accent_code(greeting_tts_lang)
# Voicemail message accent
voicemail_tts_accent = None
voicemail_tts_lang = received.get("voicemailaccentVoice", {})
if voicemail_tts_lang:
voicemail_tts_accent = get_accent_code(voicemail_tts_lang)
phone_number.active = received.get('isActive', True)
phone_number.friendly_name = received.get('name', '')
phone_number.caller_id = received.get('leadNumAsCallerIDForIncomingCalls', False)
source_sid = received.get('sourceid', None)
if source_sid:
source_id = Source.get_id_from_sid(source_sid)
if source_id:
phone_number.source_id = source_id
phone_number.routing_config = {
"enableCallRecording": received.get('enableCallRecording', False),
"textToSpeechToSpeakCustGreetMsg": received.get("textToSpeechToSpeakCustGreetMsg", None),
"enablePlayCustBeforeConnectMessage": received.get("enablePlayCustBeforeConnectMessage", None),
"textToSpeechaccentVoice": greeting_tts_accent,
"voicemailaccentVoice": voicemail_tts_accent,
"usecustomvoicemail": received.get("usecustomvoicemail", False),
"requestCallAcceptance": received.get("requestCallAcceptance", False),
"uploadedCustomAudio": uploaded_greeting_file.get('result', None),
"uploadedSelfRecordAudio": recorded_greeting_file.get('result', None),
"uploadedCustomVoiceMail": uploaded_voicemail_file.get('result', None),
"enableSMS": received.get("enableSMS", False),
"texttospeechmessage": received.get("texttospeechmessage", False),
}
phone_number.save()
if phone_number:
es_data = {
'user_id': current_user.sid,
'notify_message_type': 'PHONE_CHANNEL_EDITED',
'user_related_entities': "You've",
'other_user_related_entities': f'{current_user.firstname} {current_user.lastname}',
'hyperlink': f'{partnership.partner_url}/partnership/sources'
}
es_response = send_notifications(**es_data)
tts_accent_voice = phone_number.routing_config.get('textToSpeechaccentVoice', {})
if tts_accent_voice:
if tts_accent_voice.get('languageCode', None) == 'en-US':
tts_accent_voice = 'English'
elif tts_accent_voice.get('languageCode', None) == 'es-ES':
tts_accent_voice = 'Spanish'
else:
tts_accent_voice = None
tts_voicemail_accent = phone_number.routing_config.get('voicemailaccentVoice', None)
if tts_voicemail_accent:
if tts_voicemail_accent.get('languageCode', None) == 'en-US':
tts_voicemail_accent = 'English'
elif tts_voicemail_accent.get('languageCode', None) == 'es-ES':
tts_voicemail_accent = 'Spanish'
else:
tts_voicemail_accent = None
# Add phone data to response data
data = {
"phoneId": str(phone_number.sid),
"name": phone_number.friendly_name,
"numberType": received.get('numberType', ''),
"enableSMS": phone_number.routing_config.get('enableSMS', False),
"leadNumAsCallerIDForIncomingCalls": phone_number.caller_id,
"requestCallAcceptance": phone_number.routing_config.get('requestCallAcceptance', False),
"enableCallRecording": phone_number.routing_config.get('enableCallRecording', False),
"textToSpeechToSpeakCustGreetMsg": phone_number.routing_config.get('textToSpeechToSpeakCustGreetMsg', None),
"textToSpeechaccentVoice": tts_accent_voice,
"uploadedCustomAudio": uploaded_greeting_file.get('audio_url', None),
"uploadedSelfRecordAudio": recorded_greeting_file.get('audio_url', None),
"uploadedCustomVoiceMail": uploaded_voicemail_file.get('audio_url', None),
"enablePlayCustBeforeConnectMessage": phone_number.routing_config.get('enablePlayCustBeforeConnectMessage', False),
"usecustomvoicemail": phone_number.routing_config.get('usecustomvoicemail', None),
"texttospeechmessage": phone_number.routing_config.get('texttospeechmessage', None),
"voicemailaccentVoice": tts_voicemail_accent,
"provisionedPhoneNumber": phone_number.phonenumber
}
else:
success = False
message = "Phone number not found!"
status_code = 404
else:
success = False
message = "Invalid parameters!"
status_code = 400
return api_jsonify(data, status_code, message, success)
def get_presigned_url_from_routing_config(config):
presigned_url = ''
if config:
bucket, key = config.split('::') if '::' in config else ['', '']
presigned_url = generate_presigned_file_url(key, bucket)
return presigned_url
# @api_role_required('admin')
def get_phone_number(sid):
"""This endpoint gets the provisioned phone number with configuration data
Args:
sid (str): The sid of the phone number
Returns:
json: The success/error response
"""
success = True
message = "Phone number fetched successfully!"
status_code = 200
data = {}
if sid:
phone_number = Phone.query.filter(Phone.sid == sid).first()
if phone_number:
try:
# Add phone data to response data
uploaded_custom_audio = phone_number.routing_config.get('uploadedCustomAudio', '::')
uploaded_greeting = get_presigned_url_from_routing_config(uploaded_custom_audio)
recorded_greeting_audio = phone_number.routing_config.get('uploadedSelfRecordAudio', '::')
recorded_greeting = get_presigned_url_from_routing_config(recorded_greeting_audio
)
uploaded_voicemail_audio = phone_number.routing_config.get('uploadedCustomVoiceMail', '::')
uploaded_voicemail = get_presigned_url_from_routing_config(uploaded_voicemail_audio)
tts_accent_voice = phone_number.routing_config.get('textToSpeechaccentVoice', {})
if tts_accent_voice:
if tts_accent_voice.get('languageCode', None) == 'en-US':
tts_accent_voice = 'English'
elif tts_accent_voice.get('languageCode', None) == 'es-ES':
tts_accent_voice = 'Spanish'
else:
tts_accent_voice = None
tts_voicemail_accent = phone_number.routing_config.get('voicemailaccentVoice', None)
if tts_voicemail_accent:
if tts_voicemail_accent.get('languageCode', None) == 'en-US':
tts_voicemail_accent = 'English'
elif tts_voicemail_accent.get('languageCode', None) == 'es-ES':
tts_voicemail_accent = 'Spanish'
else:
tts_voicemail_accent = None
data = {
"phoneId": str(phone_number.sid),
"name": phone_number.friendly_name,
"source": Source.get_sid_from_id(phone_number.source_id) if phone_number.source_id else None,
"numberType": 'Local' if phone_number.local else 'toll-free',
"enableSMS": phone_number.routing_config.get('enableSMS', False),
"leadNumAsCallerIDForIncomingCalls": phone_number.caller_id,
"requestCallAcceptance": phone_number.routing_config.get('requestCallAcceptance', False),
"enableCallRecording": phone_number.routing_config.get('enableCallRecording', False),
"textToSpeechToSpeakCustGreetMsg": phone_number.routing_config.get('textToSpeechToSpeakCustGreetMsg', None),
"textToSpeechaccentVoice": tts_accent_voice,
"uploadedCustomAudio": uploaded_greeting,
"uploadedSelfRecordAudio": recorded_greeting,
"uploadedCustomVoiceMail": uploaded_voicemail,
"enablePlayCustBeforeConnectMessage": phone_number.routing_config.get('enablePlayCustBeforeConnectMessage', None),
"usecustomvoicemail": phone_number.routing_config.get('usecustomvoicemail', None),
"texttospeechmessage": phone_number.routing_config.get('texttospeechmessage', None),
"voicemailaccentVoice": tts_voicemail_accent,
"provisionedPhoneNumber": phone_number.phonenumber
}
except Exception as e:
print('Phone : ', e)
else:
success = False
message = "Phone number not found!"
status_code = 404
else:
success = False
message = "Invalid parameters!"
status_code = 400
return api_jsonify(data, status_code, message, success)
# @api_role_required('admin')
def search_phone_number():
"""API for searching phone numbers in bandwidth
Args(via request data):
tollfree (bool): Is the number tollfree or not
prefix (str): Area code in which phone number should be searched
phrase (str): The requested vanity number. Valid range is from 4 to 7 alphanumeric characters
city (str): The name of the city.
state (str): The state the RateCenter is in.
partnership_account_id (int): The id of the partnership account
Returns:
dict: data key with list of phone numbers having dict with each number on a key "value"
"""
received = request.get_json()
success = True
message = "Phone numbers fetched successfully!"
status_code = 200
data = []
phone_numbers = {}
if received:
from buyercall.blueprints.phonenumbers.views import bandwidth_search_api
partnership_id = current_user.partnership_id or 1
if partnership_id:
tollfree_number = True if received.get('numberType', '') == 'toll-free' else False
area_code = received.get('areaCode', None)
area_code = str(area_code) if area_code else None
city = received.get('city', None)
state = received.get('state', None)
phrase = '*{}*'.format(received.get('phrase', None)) if received.get('phrase', None) else None
# Search bandwidth
phone_numbers = bandwidth_search_api(tollfree_number, area_code, phrase,
city, state, partnership_id)
if phone_numbers.get('success', False):
for ph in phone_numbers.get('numbers', []):
data.append({
'value': ph
})
else:
message = phone_numbers.get('message', 'Invalid input parameters')
success = False
status_code = 500
else:
success = False
message = "Partnership account not found"
status_code = 404
else:
success = False
message = "Invalid parameters"
status_code = 400
return api_jsonify(data, status_code, message, success)
def upload_audio_to_local():
""" Upload audio to server directory"""
received = request.get_json()
success = True
message = "Audio uploaded successfully!"
status_code = 200
data = {}
audio_file = request.files.get('audio', None)
current_chunk = int(received.get('chunk_index', 0))
chunk_offset = int(received.get('chunk_offset', 0))
total_chunks = int(received.get('total_chunks', 0))
file_size = int(received.get('file_size', 0))
# create secure filename
filename = secure_filename(audio_file.filename)
save_path = os.path.join('upload/', filename)
if os.path.exists(save_path) and current_chunk == 0:
status_code = 400
message = 'File already exists'
success = False
try:
with open(save_path, 'ab') as f:
f.seek(chunk_offset)
f.write(audio_file.stream.read())
except OSError:
status_code = 500
message = 'Error'
success = False
if current_chunk + 1 == total_chunks:
# This was the last chunk, the file should be complete and the size we expect
if os.path.getsize(save_path) != int(file_size):
message = "Size mismatch!"
status_code = 500
success = False
else:
data = {'file': filename}
log.info(f'File {audio_file.filename} has been uploaded successfully')
else:
log.debug(f'Chunk {current_chunk + 1} of {total_chunks} '
f'for file {audio_file.filename} complete')
return api_jsonify(data, status_code, message, success)
def save_audio_to_s3(partnership_account_id, phone_id, audio_type, file):
"""Save an audio file to AWS S3 bucket
Args:
partnership_account_id (str or uuid): The sid of the partnership account
phone_id (str or uuid)): The sid of the phone number
audio_type (str): The type of audio(uploadedgreeting, recordedgreeting, uploadedvoicemail etc)
file (file object): The file that need to be uploaded to s3
Returns:
dict: Contains file's s3 url and keys to save
"""
try:
result = None
data = {}
key = create_audio_filename(partnership_account_id, phone_id, audio_type)
bucket_name = current_app.config['WHISPER_BUCKET']
resp = upload_file_object(file, bucket_name, key)
if resp:
result = f'{bucket_name}::{key}'
audio = Audio()
audio.whisper_message_type = audio_type
audio.enabled = True
audio.audio_url = ''
if result:
audio.audio_url = result
db.session.add(audio)
db.session.commit()
presigned_url = generate_presigned_file_url(key, bucket_name)
data = {'audio_url': presigned_url, 'result': result}
return data
except Exception as e:
print('Error : ', e)
return {}
def create_audio_filename(pa_id, phone_id, type):
# create audio file name
file_name = f'{pa_id}-{phone_id}-{type}'
return file_name
def text_to_speech_audio():
""" Create an audio file using bandwidth api for a given text """
received = request.get_json()
text = received.get('text', None)
if text:
config = {
'access_key': current_app.config['AMAZON_ACCESS_KEY'],
'secret_key': current_app.config['AMAZON_SECRET_KEY'],
'region': current_app.config['REGION_NAME']
}
polly_client = AwsPolly(config)
output_audio = polly_client.synthesize_speech(text=text)
return output_audio
def get_us_states():
""" Get all the US states and their abbreviations """
from us import states
success = True
message = "Successfully fetched US states!"
status_code = 200
data = []
# Get a list of states with name and abbreviation
data = [{'name': state.name, 'abbr': state.abbr} for state in states.STATES]
return api_jsonify(data, status_code, message, success)