File: //home/arjun/projects/unlimited-leads/Unlimited-Leads-Be/services/s3_handler.py
import boto3
import io
from botocore.exceptions import ClientError
import logging
from django.conf import settings
import chardet
logger = logging.getLogger(__name__)
class S3Handler:
def __init__(self):
self.AWS_STORAGE_BUCKET_NAME = settings.AWS_STORAGE_BUCKET_NAME
self.client = boto3.client(
's3',
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
region_name=settings.AWS_S3_REGION_NAME
)
expiration = 86400 # 1 day
def get_key(self, key:str) -> str:
return f"unlimited-leads/{key}"
def generate_presigned_url(self, key:str, expiration:int=expiration):
"""Generate a presigned URL for an S3 object."""
signed_url = None
try:
signed_url = self.client.generate_presigned_url(
ClientMethod='get_object',
Params={
'Bucket': self.AWS_STORAGE_BUCKET_NAME,
'Key': key
},
ExpiresIn=expiration
)
except ClientError as e:
logger.error(f"Error generating presigned URL: {str(e)}")
return signed_url
def upload_file(self, file, key:str) -> bool:
"""Upload a file to S3."""
try:
response = self.client.create_multipart_upload(Bucket=self.AWS_STORAGE_BUCKET_NAME, Key=key)
upload_id = response['UploadId']
part_number = 1
parts = []
while chunk := file.read(10 * 1024 * 1024): # 10MB chunks
response = self.client.upload_part(
Bucket=self.AWS_STORAGE_BUCKET_NAME,
Key=key,
PartNumber=part_number,
UploadId=upload_id,
Body=chunk
)
parts.append({'ETag': response['ETag'], 'PartNumber': part_number})
part_number += 1
self.client.complete_multipart_upload(
Bucket=self.AWS_STORAGE_BUCKET_NAME,
Key=key,
MultipartUpload={'Parts': parts},
UploadId=upload_id
)
return True
except ClientError as e:
logger.error(f"Error uploading file: {str(e)}")
return False
def upload_image(self, image_file, key):
"""Upload an image to S3."""
try:
self.client.upload_fileobj(image_file, self.AWS_STORAGE_BUCKET_NAME, self.get_key(key))
return True
except ClientError as e:
logger.error(f"Error uploading image: {str(e)}")
return False
def delete_object(self, key:str) -> bool:
try:
self.client.delete_object(Bucket=self.AWS_STORAGE_BUCKET_NAME, Key=key)
return True
except ClientError as e:
logger.error(f"Error deleting object: {str(e)}")
return False
def get_object(self, key:str) -> io.StringIO | bool:
"""
Retrieve a file from S3 .
Args:
key (str): The S3 object key for the file.
Returns:
io.StringIO or None: A file-like object containing the data, or None if an error occurs.
"""
try:
logger.info(f"get_object key : {key}")
# Fetch the object from S3
response = self.client.get_object(
Bucket=self.AWS_STORAGE_BUCKET_NAME,
Key=key
)
raw_content = response['Body'].read()
# Detect the encoding of the content
detected_encoding = chardet.detect(raw_content)
logger.info(f"Detected encoding: {detected_encoding}")
# Decode the content with the detected encoding, default to utf-8 with errors replaced
file_content = raw_content.decode(detected_encoding['encoding'] or 'utf-8', errors='replace')
return io.StringIO(file_content)
except Exception as e:
logger.error(f"Error retrieving file from S3: {str(e)}")
return None