HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //proc/self/root/home/arjun/projects/buyercall/buyercall/blueprints/phonenumbers/endpoints.py
import os
import json
import logging as log
from re import S
from flask import Blueprint, request, current_app
from flask_login import current_user
from buyercall.blueprints.user.decorators import api_role_required
from sqlalchemy import and_
from buyercall.blueprints.phonenumbers import Phone, Audio
from buyercall.blueprints.partnership.models import Partnership, PartnershipAccount
from buyercall.lib.util_rest import api_jsonify
from werkzeug.utils import secure_filename
from buyercall.extensions import db
from buyercall.lib.util_boto3_s3 import generate_presigned_file_url, upload_file_object
from buyercall.lib.util_boto3_polly import AwsPolly
from buyercall.blueprints.notification.utilities import send_notifications
from buyercall.blueprints.sources.models import Source
from buyercall.blueprints.channels.models import Channel, ChannelType
from werkzeug.datastructures import FileStorage
from buyercall.blueprints.widgets.models import Widget
from buyercall.blueprints.phonenumbers.views import purchase_bw

phone_number_api = Blueprint('phone_number_api', __name__, url_prefix='/api/phone-numbers')
logger = log.getLogger(__name__)


# @api_role_required('admin')
def create_phone_channel():
    """This endpoint provisions a number which is selected by the user

    Returns:
        json: data contains the sid of the phone number created
    """
    success = True
    message = "Phone number channel added successfully!"
    status_code = 200
    data = {}

    # Convert form data to dict
    form_data = request.form.to_dict(flat=True)
    try:
        received = json.loads(form_data.get('data', None))
    except:
        received = {}

    if received:
        from buyercall.blueprints.partnership.models import Partnership
        partnership_id = current_user.partnership_id or 1
        partnership = Partnership.query.filter(Partnership.id == partnership_id).first()
        source_id = received.get('sourceid', None)
        source_id = Source.get_id_from_sid(source_id)

        # widget not required
        widget_id = received.get('widget', None)
        widget = Widget.query.filter(Widget.guid == widget_id).first() if widget_id else None

        params = {
            "phonenumber": received.get('provisionedPhoneNumber', ''),
            "friendly_name": received.get('name', ''),
            "source_id": source_id,
            "local": True if received.get('numberType', False) == 'Local' else False,
            "tollfree": True if received.get('numberType', False) == 'toll-free' else False,
            "routing_config": {
                'enableSMS': received.get('enableSMS', False),
                'requestCallAcceptance': received.get('requestCallAcceptance', False)
            },
            "provider": received.get('provider', 'bandwidth'),
            "partnership_account_id": current_user.partnership_account_id or 1,
            "caller_id": received.get('leadNumAsCallerIDForIncomingCalls', False)
        }

        phone_number = Phone.create_phone(params)
        
        if phone_number:
            # Create a channel
            channel = Channel()
            channel.source = source_id
            if widget:
                channel.widgets = [widget]
            channel.content = {
                'sid': str(phone_number.sid),
                'phonenumber': phone_number.phonenumber,
                'is_active': phone_number.active
            }
            channel.name = phone_number.friendly_name
            channel.description = ''
            _channel_type = ChannelType.query.filter(ChannelType.name == 'phone-number').first()
            if _channel_type:
                channel.type = _channel_type.id
            channel.related_id = phone_number.id
            channel.created_by = current_user.id
            channel.partnership_id = partnership_id
            channel.partnership_account_id = current_user.partnership_account_id or 1
            channel.save()

            # Purchase bandwidth
            # purchase_bw(phone_number)

            # Create notification payload
            es_data = {
                'user_id': current_user.sid,
                'notify_message_type': 'PHONE_CHANNEL_ADDED',
                'user_related_entities': "You've",
                'other_user_related_entities': f'{current_user.firstname} {current_user.lastname}',
                'hyperlink': f'{partnership.partner_url}/partnership/sources'
            }
            # Send notification
            es_response = send_notifications(**es_data)

            # Add phone data to response data
            data = {
                "phoneId": phone_number.sid,
                "name": phone_number.friendly_name,
                "numberType": received.get('numberType', ''),
                "enableSMS": phone_number.routing_config.get('enableSMS', False),
                "leadNumAsCallerIDForIncomingCalls": phone_number.caller_id,
                "requestCallAcceptance": phone_number.routing_config.get('requestCallAcceptance', False),
                "enableCallRecording": False,
                "textToSpeechToSpeakCustGreetMsg": None,
                "textToSpeechaccentVoice": None,
                "uploadedCustomAudio": None,
                "uploadedSelfRecordAudio": None,
                "enablePlayCustBeforeConnectMessage": False,
                "usecustomvoicemail": None,
                "texttospeechmessage": None,
                "voicemailaccentVoice": None,
                "uploadedCustomVoiceMail": None,
                "provisionedPhoneNumber": phone_number.phonenumber
            }

    else:
        success = False
        message = "Invalid parameters!"
        status_code = 400

    return api_jsonify(data, status_code, message, success)


# @api_role_required('admin')
def update_phone_channel(sid):
    """This endpoint updates the provisioned phone number with configuration data
    Args:
        sid (str): The sid of the phone number created
    Returns:
        json: The success/error response
    """

    def get_accent_code(lang="English"):
        """Get accent voice and language code for a language
        Args:
            lang (str, optional): Language for which we need accent voice and language code.
            Defaults to "English".
        Returns:
            dict | None: accent voice id and language code
        """
        if lang == 'Spanish':
            tts_voice = 'Conchita'
            language_code = 'es-ES'
        elif lang == 'English':
            tts_voice = 'Joanna'
            language_code = 'en-US'
        else:
            return
        voicemail_accent = {
            'voiceId': tts_voice,
            'languageCode': language_code
        }
        return voicemail_accent

    success = True
    message = "Phone number channel updated successfully!"
    status_code = 200
    data = {}
    # Convert form data to dict
    form_data = request.form.to_dict(flat=True)
    try:
        received = json.loads(form_data.get('data', None))
    except:
        received = {}

    if received and sid:
        phone_number = Phone.query.filter(Phone.sid == sid).first()
        partnership = Partnership.query.filter(Partnership.id == current_user.partnership_id).first()
        if not partnership:
            partnership = Partnership.query.get(1)
        partnership_account_id = current_user.partnership_account_id or 1
        partnership_account_sid = PartnershipAccount.get_sid_from_id(partnership_account_id)

        if phone_number:
            # Get existing s3 url or new audio files
            uploaded_greeting = received.get('uploadedCustomAudio', '')
            if not uploaded_greeting:
                uploaded_greeting = request.files.get('uploadedCustomAudio', None)

            recorded_greeting = received.get('uploadedSelfRecordAudio', '')
            if not recorded_greeting:
                recorded_greeting = request.files.get('uploadedSelfRecordAudio', None)

            uploaded_voicemail = received.get('uploadedCustomVoiceMail', '')
            if not uploaded_voicemail:
                uploaded_voicemail = request.files.get('uploadedCustomVoiceMail', None)

            uploaded_greeting_file = recorded_greeting_file = uploaded_voicemail_file = {}

            # If not str, save to s3, otherwise create dict for database entry
            if uploaded_greeting:
                if isinstance(uploaded_greeting, str):
                    uploaded_greeting_file = {
                        'audio_url': uploaded_greeting,
                        'result': phone_number.routing_config.get('uploadedCustomAudio', None)
                    }
                elif isinstance(uploaded_greeting, FileStorage):
                    uploaded_greeting_file = save_audio_to_s3(partnership_account_sid,
                                                              phone_number.sid, 'customgreeting', uploaded_greeting)
            if recorded_greeting:
                if isinstance(recorded_greeting, str):
                    recorded_greeting_file = {
                        'audio_url': recorded_greeting,
                        'result': phone_number.routing_config.get('uploadedSelfRecordAudio', None)
                    }
                elif isinstance(recorded_greeting, FileStorage):
                    recorded_greeting_file = save_audio_to_s3(partnership_account_sid,
                                                              phone_number.sid, 'recordedgreeting', recorded_greeting)
            if uploaded_voicemail:
                if isinstance(uploaded_voicemail, str):
                    uploaded_voicemail_file = {
                        'audio_url': uploaded_voicemail,
                        'result': phone_number.routing_config.get('uploadedCustomVoiceMail', None)
                    }
                elif isinstance(uploaded_voicemail, FileStorage):
                    uploaded_voicemail_file = save_audio_to_s3(partnership_account_sid,
                                                               phone_number.sid, 'customvoicemail', uploaded_voicemail)

            # Greeting message accent
            greeting_tts_accent = None
            greeting_tts_lang = received.get("textToSpeechaccentVoice", None)
            if greeting_tts_lang:
                greeting_tts_accent = get_accent_code(greeting_tts_lang)

            # Voicemail message accent
            voicemail_tts_accent = None
            voicemail_tts_lang = received.get("voicemailaccentVoice", {})
            if voicemail_tts_lang:
                voicemail_tts_accent = get_accent_code(voicemail_tts_lang)

            phone_number.active = received.get('isActive', True)
            phone_number.friendly_name = received.get('name', '')
            phone_number.caller_id = received.get('leadNumAsCallerIDForIncomingCalls', False)
            source_sid = received.get('sourceid', None)
            if source_sid:
                source_id = Source.get_id_from_sid(source_sid)
                if source_id:
                    phone_number.source_id = source_id

            phone_number.routing_config = {
                "enableCallRecording": received.get('enableCallRecording', False),
                "textToSpeechToSpeakCustGreetMsg": received.get("textToSpeechToSpeakCustGreetMsg", None),
                "enablePlayCustBeforeConnectMessage": received.get("enablePlayCustBeforeConnectMessage", None),
                "textToSpeechaccentVoice": greeting_tts_accent,
                "voicemailaccentVoice": voicemail_tts_accent,
                "usecustomvoicemail": received.get("usecustomvoicemail", False),
                "requestCallAcceptance": received.get("requestCallAcceptance", False),
                "uploadedCustomAudio": uploaded_greeting_file.get('result', None),
                "uploadedSelfRecordAudio": recorded_greeting_file.get('result', None),
                "uploadedCustomVoiceMail": uploaded_voicemail_file.get('result', None),
                "enableSMS": received.get("enableSMS", False),
                "texttospeechmessage": received.get("texttospeechmessage", False),

            }

            phone_number.save()

            if phone_number:
                es_data = {
                    'user_id': current_user.sid,
                    'notify_message_type': 'PHONE_CHANNEL_EDITED',
                    'user_related_entities': "You've",
                    'other_user_related_entities': f'{current_user.firstname} {current_user.lastname}',
                    'hyperlink': f'{partnership.partner_url}/partnership/sources'
                }
                es_response = send_notifications(**es_data)
                tts_accent_voice = phone_number.routing_config.get('textToSpeechaccentVoice', {})
                if tts_accent_voice:
                    if tts_accent_voice.get('languageCode', None) == 'en-US':
                        tts_accent_voice = 'English'
                    elif tts_accent_voice.get('languageCode', None) == 'es-ES':
                        tts_accent_voice = 'Spanish'
                    else:
                        tts_accent_voice = None

                tts_voicemail_accent = phone_number.routing_config.get('voicemailaccentVoice', None)

                if tts_voicemail_accent:
                    if tts_voicemail_accent.get('languageCode', None) == 'en-US':
                        tts_voicemail_accent = 'English'
                    elif tts_voicemail_accent.get('languageCode', None) == 'es-ES':
                        tts_voicemail_accent = 'Spanish'
                    else:
                        tts_voicemail_accent = None

                # Add phone data to response data
                data = {
                    "phoneId": str(phone_number.sid),
                    "name": phone_number.friendly_name,
                    "numberType": received.get('numberType', ''),
                    "enableSMS": phone_number.routing_config.get('enableSMS', False),
                    "leadNumAsCallerIDForIncomingCalls": phone_number.caller_id,
                    "requestCallAcceptance": phone_number.routing_config.get('requestCallAcceptance', False),
                    "enableCallRecording": phone_number.routing_config.get('enableCallRecording', False),
                    "textToSpeechToSpeakCustGreetMsg": phone_number.routing_config.get('textToSpeechToSpeakCustGreetMsg', None),
                    "textToSpeechaccentVoice": tts_accent_voice,
                    "uploadedCustomAudio": uploaded_greeting_file.get('audio_url', None),
                    "uploadedSelfRecordAudio": recorded_greeting_file.get('audio_url', None),
                    "uploadedCustomVoiceMail": uploaded_voicemail_file.get('audio_url', None),
                    "enablePlayCustBeforeConnectMessage": phone_number.routing_config.get('enablePlayCustBeforeConnectMessage', False),
                    "usecustomvoicemail": phone_number.routing_config.get('usecustomvoicemail', None),
                    "texttospeechmessage": phone_number.routing_config.get('texttospeechmessage', None),
                    "voicemailaccentVoice": tts_voicemail_accent,
                    "provisionedPhoneNumber": phone_number.phonenumber
                }
        else:
            success = False
            message = "Phone number not found!"
            status_code = 404
    else:
        success = False
        message = "Invalid parameters!"
        status_code = 400

    return api_jsonify(data, status_code, message, success)


def get_presigned_url_from_routing_config(config):
    presigned_url = ''
    if config:
        bucket, key = config.split('::') if '::' in config else ['', '']
        presigned_url = generate_presigned_file_url(key, bucket)
    return presigned_url


# @api_role_required('admin')
def get_phone_number(sid):
    """This endpoint gets the provisioned phone number with configuration data
    Args:
        sid (str): The sid of the phone number
    Returns:
        json: The success/error response
    """
    success = True
    message = "Phone number fetched successfully!"
    status_code = 200
    data = {}
    if sid:
        phone_number = Phone.query.filter(Phone.sid == sid).first()
        if phone_number:
            try:
                # Add phone data to response data
                uploaded_custom_audio = phone_number.routing_config.get('uploadedCustomAudio', '::')
                uploaded_greeting = get_presigned_url_from_routing_config(uploaded_custom_audio)

                recorded_greeting_audio = phone_number.routing_config.get('uploadedSelfRecordAudio', '::')
                recorded_greeting = get_presigned_url_from_routing_config(recorded_greeting_audio
                                                                          )
                uploaded_voicemail_audio = phone_number.routing_config.get('uploadedCustomVoiceMail', '::')
                uploaded_voicemail = get_presigned_url_from_routing_config(uploaded_voicemail_audio)

                tts_accent_voice = phone_number.routing_config.get('textToSpeechaccentVoice', {})
                if tts_accent_voice:
                    if tts_accent_voice.get('languageCode', None) == 'en-US':
                        tts_accent_voice = 'English'
                    elif tts_accent_voice.get('languageCode', None) == 'es-ES':
                        tts_accent_voice = 'Spanish'
                    else:
                        tts_accent_voice = None
                tts_voicemail_accent = phone_number.routing_config.get('voicemailaccentVoice', None)
                if tts_voicemail_accent:
                    if tts_voicemail_accent.get('languageCode', None) == 'en-US':
                        tts_voicemail_accent = 'English'
                    elif tts_voicemail_accent.get('languageCode', None) == 'es-ES':
                        tts_voicemail_accent = 'Spanish'
                    else:
                        tts_voicemail_accent = None

                data = {
                    "phoneId": str(phone_number.sid),
                    "name": phone_number.friendly_name,
                    "source": Source.get_sid_from_id(phone_number.source_id) if phone_number.source_id else None,
                    "numberType": 'Local' if phone_number.local else 'toll-free',
                    "enableSMS": phone_number.routing_config.get('enableSMS', False),
                    "leadNumAsCallerIDForIncomingCalls": phone_number.caller_id,
                    "requestCallAcceptance": phone_number.routing_config.get('requestCallAcceptance', False),
                    "enableCallRecording": phone_number.routing_config.get('enableCallRecording', False),
                    "textToSpeechToSpeakCustGreetMsg": phone_number.routing_config.get('textToSpeechToSpeakCustGreetMsg', None),
                    "textToSpeechaccentVoice": tts_accent_voice,
                    "uploadedCustomAudio": uploaded_greeting,
                    "uploadedSelfRecordAudio": recorded_greeting,
                    "uploadedCustomVoiceMail": uploaded_voicemail,
                    "enablePlayCustBeforeConnectMessage": phone_number.routing_config.get('enablePlayCustBeforeConnectMessage', None),
                    "usecustomvoicemail": phone_number.routing_config.get('usecustomvoicemail', None),
                    "texttospeechmessage": phone_number.routing_config.get('texttospeechmessage', None),
                    "voicemailaccentVoice": tts_voicemail_accent,
                    "provisionedPhoneNumber": phone_number.phonenumber
                }
            except Exception as e:
                print('Phone : ', e)
        else:
            success = False
            message = "Phone number not found!"
            status_code = 404
    else:
        success = False
        message = "Invalid parameters!"
        status_code = 400

    return api_jsonify(data, status_code, message, success)


# @api_role_required('admin')
def search_phone_number():
    """API for searching phone numbers in bandwidth

    Args(via request data):
        tollfree (bool): Is the number tollfree or not 
        prefix (str): Area code in which phone number should be searched
        phrase (str): The requested vanity number. Valid range is from 4 to 7 alphanumeric characters 
        city (str): The name of the city.
        state (str): The state the RateCenter is in.
        partnership_account_id (int): The id of the partnership account

    Returns:
        dict: data key with list of phone numbers having dict with each number on a key "value" 
    """
    received = request.get_json()
    success = True
    message = "Phone numbers fetched successfully!"
    status_code = 200
    data = []
    phone_numbers = {}
    if received:
        from buyercall.blueprints.phonenumbers.views import bandwidth_search_api
        partnership_id = current_user.partnership_id or 1
        if partnership_id:
            tollfree_number = True if received.get('numberType', '') == 'toll-free' else False
            area_code = received.get('areaCode', None)
            area_code = str(area_code) if area_code else None
            city = received.get('city', None)
            state = received.get('state', None)
            phrase = '*{}*'.format(received.get('phrase', None)) if received.get('phrase', None) else None

            # Search bandwidth
            phone_numbers = bandwidth_search_api(tollfree_number, area_code, phrase,
                                                 city, state, partnership_id)

            if phone_numbers.get('success', False):
                for ph in phone_numbers.get('numbers', []):
                    data.append({
                        'value': ph
                    })
            else:
                message = phone_numbers.get('message', 'Invalid input parameters')
                success = False
                status_code = 500
        else:
            success = False
            message = "Partnership account not found"
            status_code = 404
    else:
        success = False
        message = "Invalid parameters"
        status_code = 400

    return api_jsonify(data, status_code, message, success)


def upload_audio_to_local():
    """ Upload audio to server directory"""
    received = request.get_json()
    success = True
    message = "Audio uploaded successfully!"
    status_code = 200
    data = {}

    audio_file = request.files.get('audio', None)
    current_chunk = int(received.get('chunk_index', 0))
    chunk_offset = int(received.get('chunk_offset', 0))
    total_chunks = int(received.get('total_chunks', 0))
    file_size = int(received.get('file_size', 0))

    # create secure filename
    filename = secure_filename(audio_file.filename)
    save_path = os.path.join('upload/', filename)

    if os.path.exists(save_path) and current_chunk == 0:
        status_code = 400
        message = 'File already exists'
        success = False
    try:
        with open(save_path, 'ab') as f:
            f.seek(chunk_offset)
            f.write(audio_file.stream.read())
    except OSError:
        status_code = 500
        message = 'Error'
        success = False

    if current_chunk + 1 == total_chunks:
        # This was the last chunk, the file should be complete and the size we expect
        if os.path.getsize(save_path) != int(file_size):
            message = "Size mismatch!"
            status_code = 500
            success = False
        else:
            data = {'file': filename}
            log.info(f'File {audio_file.filename} has been uploaded successfully')
    else:
        log.debug(f'Chunk {current_chunk + 1} of {total_chunks} '
                  f'for file {audio_file.filename} complete')

    return api_jsonify(data, status_code, message, success)


def save_audio_to_s3(partnership_account_id, phone_id, audio_type, file):
    """Save an audio file to AWS S3 bucket 

    Args:
        partnership_account_id (str or uuid): The sid of the partnership account 
        phone_id (str or uuid)): The sid of the phone number
        audio_type (str): The type of audio(uploadedgreeting, recordedgreeting, uploadedvoicemail etc)
        file (file object): The file that need to be uploaded to s3

    Returns:
        dict: Contains file's s3 url and keys to save
    """
    try:
        result = None
        data = {}

        key = create_audio_filename(partnership_account_id, phone_id, audio_type)
        bucket_name = current_app.config['WHISPER_BUCKET']

        resp = upload_file_object(file, bucket_name, key)
        if resp:
            result = f'{bucket_name}::{key}'

        audio = Audio()
        audio.whisper_message_type = audio_type
        audio.enabled = True
        audio.audio_url = ''
        if result:
            audio.audio_url = result
        db.session.add(audio)
        db.session.commit()

        presigned_url = generate_presigned_file_url(key, bucket_name)
        data = {'audio_url': presigned_url, 'result': result}

        return data

    except Exception as e:
        print('Error : ', e)
    return {}


def create_audio_filename(pa_id, phone_id, type):
    # create audio file name
    file_name = f'{pa_id}-{phone_id}-{type}'
    return file_name


def text_to_speech_audio():
    """ Create an audio file using bandwidth api for a given text """
    received = request.get_json()
    text = received.get('text', None)
    if text:
        config = {
            'access_key': current_app.config['AMAZON_ACCESS_KEY'],
            'secret_key': current_app.config['AMAZON_SECRET_KEY'],
            'region': current_app.config['REGION_NAME']
        }
        polly_client = AwsPolly(config)

        output_audio = polly_client.synthesize_speech(text=text)

    return output_audio


def get_us_states():
    """ Get all the US states and their abbreviations """
    from us import states
    success = True
    message = "Successfully fetched US states!"
    status_code = 200
    data = []
    # Get a list of states with name and abbreviation
    data = [{'name': state.name, 'abbr': state.abbr} for state in states.STATES]
    return api_jsonify(data, status_code, message, success)