File: //home/arjun/projects/buyercall/buyercall/lib/util_boto3_s3_encrypted.py
from datetime import datetime, timedelta
import logging
import boto3
from botocore.client import Config as conf
from botocore.exceptions import ClientError
from urllib.parse import unquote, urlparse, parse_qs
from dateutil import parser
from pytz import UTC
from flask import current_app
log = logging.getLogger(__name__)
class AwsS3:
def init_app(self, app) -> None:
self.s3_client = boto3.client(
's3',
region_name=app.config.get('S3_REGION_NAME', ''),
aws_access_key_id=app.config.get('AMAZON_ACCESS_KEY', ''),
aws_secret_access_key=app.config.get('AMAZON_SECRET_KEY', '')
)
# Create and s3 bucket
def create_bucket(self, bucket_name='test', acl='private'):
try:
response = self.s3_client.create_bucket(
ACL=acl,
Bucket=bucket_name,
CreateBucketConfiguration={
'LocationConstraint': current_app.config['AWS_S3_REGION_NAME']
}
)
print(response)
except Exception as e:
print(e)
response = {}
return response
# Retrieves the key and bucket from the url passed to it
def get_recording_url_details(self, url):
details = {}
bucket = None
key = None
pos = 0
url = unquote(url)
if 'http' in url and '.com' in url:
split_string = url.split('.com')
if len(split_string) == 2:
draft_bucket = split_string[0].replace("https://", "").replace("http://", "")
bucket_split = draft_bucket.split(".s3.")
if len(draft_bucket) >= 2:
bucket = bucket_split[0]
if ':' in split_string[1]:
pos = split_string[1].index('/')
key = split_string[1]
key = key[pos+1:]
elif '::' in url:
split_string = url.split("::")
if len(split_string) == 2:
bucket = split_string[0]
key = split_string[1]
if not key and not bucket:
log.error('No bucket or key found.')
return None
else:
details['key'] = key
details['bucket'] = bucket
log.error('Key: ' + str(key) + ' Bucket: ' + str(bucket))
return details
def generate_presigned_audio_url(self, key, bucket, frontend):
default_expiry = 300
try:
if frontend:
if isinstance(current_app.config['AMAZON_PRESIGNED_URL_FRONTEND_EXPIRY'], int):
default_expiry = current_app.config['AMAZON_PRESIGNED_URL_FRONTEND_EXPIRY']
else:
if isinstance(current_app.config['AMAZON_PRESIGNED_URL_WEBHOOK_EXPIRY'], int):
default_expiry = current_app.config['AMAZON_PRESIGNED_URL_WEBHOOK_EXPIRY']
return self.s3_client.generate_presigned_url('get_object',
Params={'Bucket': bucket,
'Key': key},
ExpiresIn=default_expiry)
except ClientError as e:
log.error("Error generating presigned url. Error: {}".format(e))
return ''
def generate_presigned_file_url(self, key, bucket):
default_expiry = 3600
if key and bucket:
try:
if 'AWS_S3_REGION_NAME' in current_app.config:
s3_client = boto3.client(
's3',
aws_access_key_id=current_app.config['AMAZON_ACCESS_KEY'],
aws_secret_access_key=current_app.config['AMAZON_SECRET_KEY'],
config=conf(signature_version='s3v4'),
region_name=current_app.config['AWS_S3_REGION_NAME']
)
else:
s3_client = boto3.client(
's3',
aws_access_key_id=current_app.config['AMAZON_ACCESS_KEY'],
aws_secret_access_key=current_app.config['AMAZON_SECRET_KEY'],
config=conf(signature_version='s3v4')
)
if 'AMAZON_PRESIGNED_URL_CUSTOM_EXPIRY' in current_app.config:
aws_presigned_url_expiry = current_app.config['AMAZON_PRESIGNED_URL_CUSTOM_EXPIRY']
if isinstance(aws_presigned_url_expiry, int):
default_expiry = aws_presigned_url_expiry
return s3_client.generate_presigned_url('get_object',
Params={'Bucket': bucket,
'Key': key},
ExpiresIn=default_expiry)
except ClientError as e:
log.error("Error generating presigned url. Error: {}".format(e))
return
else:
return ''
def upload_file_object(self, file_obj, bucket, key, extra=None):
try:
s3_client = boto3.client(
's3',
aws_access_key_id=current_app.config['AMAZON_ACCESS_KEY'],
aws_secret_access_key=current_app.config['AMAZON_SECRET_KEY']
)
resp = s3_client.upload_fileobj(file_obj, bucket, key)
return True
except ClientError as e:
log.error(f"Error in uploading to the s3 bucket. {e}")
return False
def refresh_presigned_url(self, url, bucket):
"""
Create new presigned url for s3 asset if it is expired
:param url: s3 resource url
:param bucket: s3 bucket name
"""
response = {'valid_presigned_url': url, 'is_new': False}
if url and bucket:
parsed_url = urlparse(url)
created_date = parse_qs(parsed_url.query).get('X-Amz-Date', None)
validity = parse_qs(parsed_url.query).get('X-Amz-Expires', None)
if created_date and validity:
created_datetime = parser.parse(created_date[0])
expiry_datetime = created_datetime + timedelta(seconds=int(validity[0]))
if expiry_datetime < datetime.now(tz=UTC):
key = parsed_url.path[1:]
valid_presigned_url = self.generate_presigned_file_url(key, bucket)
if valid_presigned_url:
response = {'valid_presigned_url': valid_presigned_url, 'is_new': True}
else:
response = {'valid_presigned_url': '', 'is_new': True}
return response