HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall_new/buyercall/buyercall/blueprints/phonenumbers/rest_api.py
import os
import os.path
import traceback
import logging as log

import boto
from flask import jsonify, make_response, request, current_app as app
from flask_login import current_user, login_required
from werkzeug.utils import secure_filename
from boto.s3.key import Key

from .views import phonenumbers
from .models import Phone, Audio
from buyercall.lib.util_rest import rest_method, rest_partnership_account
from buyercall.extensions import db


@phonenumbers.route('/api/v1/inbound', methods=['GET'])
@rest_method
def get_inbound():
    """ External REST API endpoint.
    """
    all_inbound = Phone.query.filter(Phone.partnership_account_id == rest_partnership_account.id).all()

    data = []
    for inbound in all_inbound:
        data.append(dict(
            id=inbound.id,
            friendly_name=inbound.friendly_name,
            phonenumber=inbound.phonenumber,
            lead_count=inbound.lead_count,
            created_on=inbound.created_on,
            updated_on=inbound.updated_on,
            local=inbound.local,
            tollfree=inbound.tollfree
        ))

    response = make_response(jsonify(inbound=data))
    response.headers['Cache-Control'] = 'no-cache'
    return response


@login_required
@phonenumbers.route('/api/phonenumbers/', methods=['GET'])
def get_user_phonenumbers():
    partnership_account_id = current_user.partnership_account_id
    viewing_partnership_account = current_user.is_viewing_partnership

    # Check if being viewed by super partner
    if viewing_partnership_account:
        partnership_account_id = current_user.get_user_viewing_partnership_account_id

    all_phonenumbers = Phone.query.filter(
        Phone.partnership_account_id == partnership_account_id
    ).all()

    return jsonify(collection=[
        dict(id=a.id, phoneNumber=a.phonenumber, local=a.local,
             tollfree=a.tollfree, active=a.active, friendlyName=a.friendly_name)
        for a in all_phonenumbers
    ])


MESSAGE_TYPES = {
    'whisperMessage': 'whisperMessage',
    'digitWhisperMessage': 'digitWhisperMessage',
    'noDigitWhisperMessage': 'noDigitWhisperMessage',
    'voicemailMessage': 'voicemailMessage',
    'callBackText': 'CallBackText',
}


@login_required
@phonenumbers.route('/api/phonenumbers/audio', methods=['POST'])
def upload_whisper_message():
    """ Uploads a new whisper message. """
    partnership_account_id = current_user.partnership_account_id
    viewing_partnership_account = current_user.is_viewing_partnership

    # Check if being viewed by super partner
    if viewing_partnership_account:
        partnership_account_id = current_user.get_user_viewing_partnership_account_id

    f = request.files['file']
    filename = secure_filename(f.filename)
    path = os.path.join('/tmp', filename)
    f.save(path)

    audio = Audio()
    audio.whisper_message_type = MESSAGE_TYPES[request.form['type']]
    audio.enabled = True
    audio.audio_url = ''
    db.session.add(audio)
    db.session.commit()

    try:
        key = '{}/{}_{}'.format(
            partnership_account_id,
            audio.id,
            filename
        )
        audio.audio_url = upload_audio_file(key, f.mimetype, path)
        db.session.commit()
    finally:
        os.unlink(path)

    return jsonify(id=audio.id)


def upload_audio_file(key, mimetype, path):
    s3 = boto.connect_s3(
        app.config['AMAZON_ACCESS_KEY'], app.config['AMAZON_SECRET_KEY']
    )

    # Get a handle to the S3 bucket
    bucket_name = app.config['WHISPER_BUCKET']
    bucket = s3.get_bucket(bucket_name)
    k = Key(bucket)

    # Read the contents of the file
    with open(path, 'rb') as f:
        # Use Boto to upload the file to the S3 bucket
        k.key = key
        k.set_metadata('Content-Type', mimetype)
        log.info("Uploading data to {} with key: {}".format(bucket_name, k.key))
        k.set_contents_from_file(f)
        k.set_acl('public-read')

        return k.generate_url(expires_in=0, query_auth=False)