HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/unlimited-leads/Unlimited-Leads-Be/services/s3_handler.py
import boto3
import io
from botocore.exceptions import ClientError
import logging
from django.conf import settings
import chardet

logger = logging.getLogger(__name__)

class S3Handler:
    def __init__(self):

        self.AWS_STORAGE_BUCKET_NAME = settings.AWS_STORAGE_BUCKET_NAME
        self.client = boto3.client(
                's3',
                aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
                aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
                region_name=settings.AWS_S3_REGION_NAME
            )
        
    expiration = 86400 # 1 day
    
    def get_key(self, key:str) -> str:
        return f"unlimited-leads/{key}"

    def generate_presigned_url(self, key:str, expiration:int=expiration):
        """Generate a presigned URL for an S3 object."""
        signed_url = None
        try:

            signed_url = self.client.generate_presigned_url(
                ClientMethod='get_object',
                Params={
                    'Bucket': self.AWS_STORAGE_BUCKET_NAME,
                    'Key': key
                },
                ExpiresIn=expiration
            )
        except ClientError as e:
            logger.error(f"Error generating presigned URL: {str(e)}")
        return signed_url

    def upload_file(self, file, key:str) -> bool:
        """Upload a file to S3."""
        try:

            response = self.client.create_multipart_upload(Bucket=self.AWS_STORAGE_BUCKET_NAME, Key=key)
            upload_id = response['UploadId']
            part_number = 1
            parts = []
            while chunk := file.read(10 * 1024 * 1024):  # 10MB chunks
                response = self.client.upload_part(
                    Bucket=self.AWS_STORAGE_BUCKET_NAME,
                    Key=key,
                    PartNumber=part_number,
                    UploadId=upload_id,
                    Body=chunk
                )
                parts.append({'ETag': response['ETag'], 'PartNumber': part_number})
                part_number += 1

            self.client.complete_multipart_upload(
                Bucket=self.AWS_STORAGE_BUCKET_NAME,
                Key=key,
                MultipartUpload={'Parts': parts},
                UploadId=upload_id
            )
            return True 
        except ClientError as e:
            logger.error(f"Error uploading file: {str(e)}")
            return False

    def upload_image(self, image_file, key):
        """Upload an image to S3."""
        try:
            self.client.upload_fileobj(image_file, self.AWS_STORAGE_BUCKET_NAME, self.get_key(key))
            return True 
        except ClientError as e:
            logger.error(f"Error uploading image: {str(e)}")
            return False

    def delete_object(self, key:str) -> bool:
        try:

            self.client.delete_object(Bucket=self.AWS_STORAGE_BUCKET_NAME, Key=key)
            return True
        except ClientError as e:
            logger.error(f"Error deleting object: {str(e)}")
            return False

    def get_object(self, key:str) -> io.StringIO | bool:
        """
        Retrieve a  file from S3 .

        Args:
            key (str): The S3 object key for the file.

        Returns:
            io.StringIO or None: A file-like object containing the data, or None if an error occurs.
        """
        try:
            logger.info(f"get_object key : {key}")

            # Fetch the object from S3
            response = self.client.get_object(
                Bucket=self.AWS_STORAGE_BUCKET_NAME,
                Key=key
            )
            raw_content = response['Body'].read()

        # Detect the encoding of the content
            detected_encoding = chardet.detect(raw_content)
            logger.info(f"Detected encoding: {detected_encoding}")

            # Decode the content with the detected encoding, default to utf-8 with errors replaced
            file_content = raw_content.decode(detected_encoding['encoding'] or 'utf-8', errors='replace')

            return io.StringIO(file_content)
        except Exception as e:
            logger.error(f"Error retrieving file from S3: {str(e)}")
            return None