HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall/buyercall/lib/util_boto3_s3_encrypted.py
from datetime import datetime, timedelta
import logging
import boto3
from botocore.client import Config as conf
from botocore.exceptions import ClientError
from urllib.parse import unquote, urlparse, parse_qs
from dateutil import parser
from pytz import UTC
from flask import current_app


log = logging.getLogger(__name__)


class AwsS3:
    def init_app(self, app) -> None:
        self.s3_client = boto3.client(
            's3',
            region_name=app.config.get('S3_REGION_NAME', ''),
            aws_access_key_id=app.config.get('AMAZON_ACCESS_KEY', ''),
            aws_secret_access_key=app.config.get('AMAZON_SECRET_KEY', '')
        )

    # Create and s3 bucket
    def create_bucket(self, bucket_name='test', acl='private'):
        try:
            response = self.s3_client.create_bucket(
                ACL=acl,
                Bucket=bucket_name,
                CreateBucketConfiguration={
                    'LocationConstraint': current_app.config['AWS_S3_REGION_NAME']
                }
            )
            print(response)
        except Exception as e:
            print(e)
            response = {}
        return response

    # Retrieves the key and bucket from the url passed to it

    def get_recording_url_details(self, url):
        details = {}
        bucket = None
        key = None
        pos = 0
        url = unquote(url)

        if 'http' in url and '.com' in url:
            split_string = url.split('.com')

            if len(split_string) == 2:
                draft_bucket = split_string[0].replace("https://", "").replace("http://", "")
                bucket_split = draft_bucket.split(".s3.")

                if len(draft_bucket) >= 2:
                    bucket = bucket_split[0]

                if ':' in split_string[1]:
                    pos = split_string[1].index('/')

                key = split_string[1]
                key = key[pos+1:]
        elif '::' in url:
            split_string = url.split("::")

            if len(split_string) == 2:
                bucket = split_string[0]
                key = split_string[1]

        if not key and not bucket:
            log.error('No bucket or key found.')

            return None
        else:
            details['key'] = key
            details['bucket'] = bucket
            log.error('Key: ' + str(key) + '  Bucket: ' + str(bucket))
            return details

    def generate_presigned_audio_url(self, key, bucket, frontend):
        default_expiry = 300

        try:
            if frontend:
                if isinstance(current_app.config['AMAZON_PRESIGNED_URL_FRONTEND_EXPIRY'], int):
                    default_expiry = current_app.config['AMAZON_PRESIGNED_URL_FRONTEND_EXPIRY']
            else:
                if isinstance(current_app.config['AMAZON_PRESIGNED_URL_WEBHOOK_EXPIRY'], int):
                    default_expiry = current_app.config['AMAZON_PRESIGNED_URL_WEBHOOK_EXPIRY']

            return self.s3_client.generate_presigned_url('get_object',
                                                         Params={'Bucket': bucket,
                                                                 'Key': key},
                                                         ExpiresIn=default_expiry)

        except ClientError as e:
            log.error("Error generating presigned url. Error: {}".format(e))
            return ''

    def generate_presigned_file_url(self, key, bucket):
        default_expiry = 3600
        if key and bucket:
            try:
                if 'AWS_S3_REGION_NAME' in current_app.config:
                    s3_client = boto3.client(
                        's3',
                        aws_access_key_id=current_app.config['AMAZON_ACCESS_KEY'],
                        aws_secret_access_key=current_app.config['AMAZON_SECRET_KEY'],
                        config=conf(signature_version='s3v4'),
                        region_name=current_app.config['AWS_S3_REGION_NAME']
                    )
                else:
                    s3_client = boto3.client(
                        's3',
                        aws_access_key_id=current_app.config['AMAZON_ACCESS_KEY'],
                        aws_secret_access_key=current_app.config['AMAZON_SECRET_KEY'],
                        config=conf(signature_version='s3v4')
                    )

                if 'AMAZON_PRESIGNED_URL_CUSTOM_EXPIRY' in current_app.config:
                    aws_presigned_url_expiry = current_app.config['AMAZON_PRESIGNED_URL_CUSTOM_EXPIRY']
                    if isinstance(aws_presigned_url_expiry, int):
                        default_expiry = aws_presigned_url_expiry

                return s3_client.generate_presigned_url('get_object',
                                                        Params={'Bucket': bucket,
                                                                'Key': key},
                                                        ExpiresIn=default_expiry)

            except ClientError as e:
                log.error("Error generating presigned url. Error: {}".format(e))
                return
        else:
            return ''

    def upload_file_object(self, file_obj, bucket, key, extra=None):
        try:
            s3_client = boto3.client(
                's3',
                aws_access_key_id=current_app.config['AMAZON_ACCESS_KEY'],
                aws_secret_access_key=current_app.config['AMAZON_SECRET_KEY']
            )
            resp = s3_client.upload_fileobj(file_obj, bucket, key)
            return True
        except ClientError as e:
            log.error(f"Error in uploading to the s3 bucket. {e}")
            return False

    def refresh_presigned_url(self, url, bucket):
        """
        Create new presigned url for s3 asset if it is expired
        :param url: s3 resource url
        :param bucket: s3 bucket name
        """
        response = {'valid_presigned_url': url, 'is_new': False}

        if url and bucket:
            parsed_url = urlparse(url)
            created_date = parse_qs(parsed_url.query).get('X-Amz-Date', None)
            validity = parse_qs(parsed_url.query).get('X-Amz-Expires', None)

            if created_date and validity:
                created_datetime = parser.parse(created_date[0])
                expiry_datetime = created_datetime + timedelta(seconds=int(validity[0]))

                if expiry_datetime < datetime.now(tz=UTC):
                    key = parsed_url.path[1:]
                    valid_presigned_url = self.generate_presigned_file_url(key, bucket)

                    if valid_presigned_url:
                        response = {'valid_presigned_url': valid_presigned_url, 'is_new': True}
            else:
                response = {'valid_presigned_url': '', 'is_new': True}

        return response