HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall/buyercall/lib/util_boto3_s3.py
from datetime import datetime, timedelta
import logging
import boto3
from botocore.client import Config as conf
from botocore.exceptions import ClientError
from urllib.parse import unquote, urlparse, parse_qs
from dateutil import parser
from pytz import UTC
from flask import current_app


log = logging.getLogger(__name__)


# Create and s3 bucket
def create_bucket(bucket_name='test', acl='private'):
    try:
        s3_client = boto3.client(
            's3',
            region_name=current_app.config['AWS_S3_REGION_NAME'],
            aws_access_key_id=current_app.config['AMAZON_ACCESS_KEY'],
            aws_secret_access_key=current_app.config['AMAZON_SECRET_KEY']
        )
        response = s3_client.create_bucket(
            ACL=acl,
            Bucket=bucket_name,
            CreateBucketConfiguration={
                'LocationConstraint': current_app.config['AWS_S3_REGION_NAME']
            }
        )
        print(response)
    except Exception as e:
        print(e)
        response = {}
    return response


#  Retrieves the key and bucket from the url passed to it
def get_recording_url_details(url):
    details = {}
    bucket = None
    key = None
    pos = 0
    url = unquote(url)

    if 'http' in url and '.com' in url:
        split_string = url.split('.com')

        if len(split_string) == 2:
            draft_bucket = split_string[0].replace("https://", "").replace("http://", "")
            bucket_split = draft_bucket.split(".s3.")

            if len(draft_bucket) >= 2:
                bucket = bucket_split[0]

            if ':' in split_string[1]:
                pos = split_string[1].index('/')

            key = split_string[1]
            key = key[pos+1:]
    elif '::' in url:
        split_string = url.split("::")

        if len(split_string) == 2:
            bucket = split_string[0]
            key = split_string[1]

    if not key and not bucket:
        log.error('No bucket or key found.')

        return None
    else:
        details['key'] = key
        details['bucket'] = bucket
        log.error('Key: ' + str(key) + '  Bucket: ' + str(bucket))
        return details


def generate_presigned_aws_url(key, bucket, frontend=None, expiration=None):
    default_expiry = 300

    try:
        s3_client = boto3.client(
            's3',
            aws_access_key_id=current_app.config['AMAZON_ACCESS_KEY'],
            aws_secret_access_key=current_app.config['AMAZON_SECRET_KEY']
        )

        if frontend:
            if isinstance(current_app.config['AMAZON_PRESIGNED_URL_FRONTEND_EXPIRY'], int):
                default_expiry = current_app.config['AMAZON_PRESIGNED_URL_FRONTEND_EXPIRY']
        else:
            if isinstance(current_app.config['AMAZON_PRESIGNED_URL_WEBHOOK_EXPIRY'], int):
                default_expiry = current_app.config['AMAZON_PRESIGNED_URL_WEBHOOK_EXPIRY']

        if expiration:
            default_expiry = expiration

        return s3_client.generate_presigned_url('get_object',
                                                Params={'Bucket': bucket,
                                                        'Key': key},
                                                ExpiresIn=default_expiry)

    except ClientError as e:
        log.error("Error generating presigned url. Error: {}".format(e))
        return ''


<<<<<<< HEAD
def generate_presigned_file_url(key, bucket):
    default_expiry = 3600
    if key and bucket:
        try:
            if 'AWS_S3_REGION_NAME' in current_app.config:
                s3_client = boto3.client(
                    's3',
                    aws_access_key_id=current_app.config['AMAZON_ACCESS_KEY'],
                    aws_secret_access_key=current_app.config['AMAZON_SECRET_KEY'],
                    config=conf(signature_version='s3v4'),
                    region_name=current_app.config['AWS_S3_REGION_NAME']
                )
            else:
                s3_client = boto3.client(
                    's3',
                    aws_access_key_id=current_app.config['AMAZON_ACCESS_KEY'],
                    aws_secret_access_key=current_app.config['AMAZON_SECRET_KEY'],
                    config=conf(signature_version='s3v4')
                )

            if 'AMAZON_PRESIGNED_URL_CUSTOM_EXPIRY' in current_app.config:
                aws_presigned_url_expiry = current_app.config['AMAZON_PRESIGNED_URL_CUSTOM_EXPIRY']
                if isinstance(aws_presigned_url_expiry, int):
                    default_expiry = aws_presigned_url_expiry

            return s3_client.generate_presigned_url('get_object',
                                                    Params={'Bucket': bucket,
                                                            'Key': key},
                                                    ExpiresIn=default_expiry)

        except ClientError as e:
            print('Presigned URL Error : ', e)
            log.error("Error generating presigned url. Error: {}".format(e))
            return
    else:
        return ''


def upload_file_object(file_obj, bucket, key, extra=None):
    try:
        s3_client = boto3.client(
            's3',
            aws_access_key_id=current_app.config['AMAZON_ACCESS_KEY'],
            aws_secret_access_key=current_app.config['AMAZON_SECRET_KEY']
        )
        resp = s3_client.upload_fileobj(file_obj, bucket, key)
        return True
    except ClientError as e:
        print('Upload file error : ', e)
        log.error(f"Error in uploading to the s3 bucket. {e}")
        return False

=======
>>>>>>> 2d4ba41568adf348959bbf7b64ffa061df87d07b
def upload_file(file_path, bucket, key_name, content_type=None):
    try:
        s3_client = boto3.client(
            's3',
            aws_access_key_id=current_app.config['AMAZON_ACCESS_KEY'],
            aws_secret_access_key=current_app.config['AMAZON_SECRET_KEY']
        )
        bucket = bucket
        file_name = file_path
        key_name = key_name
        if content_type:
            s3_client.upload_file(file_name, bucket, key_name, ExtraArgs={'ContentType': content_type})
            # Optionally if the file needs to be pulic, we can set the ACL as args as well
            # s3_client.upload_file(file_name, bucket, key_name,
            # ExtraArgs={'ContentType': "application/json", 'ACL': "public-read"})
        else:
            s3_client.upload_file(file_name, bucket, key_name)

        file_url = generate_presigned_aws_url(key_name, bucket, expiration=432000)

        return file_url
    except ClientError as e:
        log.error("Error uploading file to S3. Error: {}".format(e))
<<<<<<< HEAD
        return ''


def refresh_presigned_url(url, bucket):
    """
    Create new presigned url for s3 asset if it is expired
    :param url: s3 resource url
    :param bucket: s3 bucket name
    """
    response = {'valid_presigned_url': url, 'is_new': False}

    if url and bucket:
        parsed_url = urlparse(url)
        created_date = parse_qs(parsed_url.query).get('X-Amz-Date', None)
        validity = parse_qs(parsed_url.query).get('X-Amz-Expires', None)

        if created_date and validity:
            created_datetime = parser.parse(created_date[0])
            expiry_datetime = created_datetime + timedelta(seconds=int(validity[0]))

            if expiry_datetime < datetime.now(tz=UTC):
                key = parsed_url.path[1:]
                valid_presigned_url = generate_presigned_file_url(key, bucket)

                if valid_presigned_url:
                    response = {'valid_presigned_url': valid_presigned_url, 'is_new': True}
        else:
            response = {'valid_presigned_url': '', 'is_new': True}

    return response


def download_file(key, bucket):
    try:
        s3_client = boto3.client(
            's3',
            region_name=current_app.config['AWS_S3_REGION_NAME'],
            aws_access_key_id=current_app.config['AMAZON_ACCESS_KEY'],
            aws_secret_access_key=current_app.config['AMAZON_SECRET_KEY'],
            config=conf(signature_version='s3v4'),
        )
        return s3_client.get_object(Bucket=bucket, Key=key)
    except Exception as e:
        print('Error getting s3 object : ', key)
        return
=======
        return ''
>>>>>>> 2d4ba41568adf348959bbf7b64ffa061df87d07b