HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/unlimited-leads/Unlimited-Leads-Be/Admin/tasks.py
import pandas as pd
import logging
from celery import shared_task
from django.db import connection
from django.db.models import Max
from Admin.models import TempLead, Lead, LeadData
from Admin.helper import compute_hash, normalize_empty_to_none, convert_to_int_string, remove_timestamp_from_filename
from services.email.email_service import AdminEmailService
from services.s3_handler import S3Handler

s3_handler = S3Handler()

logger = logging.getLogger(__name__)
  

@shared_task
def process_csv_file(keys:list, file_ids:list, file_names:list, user_id) -> None:

    admin_email_service = AdminEmailService(user_id)
  
    # Fetch the current version value
    current_version_num = LeadData.objects.aggregate(Max('version_num'))['version_num__max']   
    logger.info(f"Current version number : {current_version_num}")  

    file_name_and_status = {}
    successful_files = []
    failed_files = []
    
    for i in range(len(keys)):
        
        file_key = keys[i]
        uploaded_file_id = file_ids[i]
        file_name = file_names[i]

        # Fetch initial count of Lead table
        initial_count = Lead.objects.count()
        logger.info(f"Lead table count : {str(initial_count)}")
            
        logger.info(f"Processing {uploaded_file_id}")
        lead_data = LeadData.objects.get(file_id=uploaded_file_id)
        lead_data.status = LeadData.PROCESSING
        lead_data.save()
        import time
        start_time = time.time()
        try:
            csv_object = s3_handler.get_object(file_key)
        except Exception as e:
            logger.exception(f"Unable to get csv_object from get_object, Error : {str(e)}")
            lead_data.status = LeadData.LOADING_FAILED
            lead_data.save()
            file_name_and_status[file_name] = False
            continue

        if not csv_object:
            logger.debug(f"Error CSV file not found in S3 Key : {file_key}")
            lead_data.status = LeadData.LOADING_FAILED
            lead_data.save()
            file_name_and_status[file_name] = False
            continue
             
        try:
            # Read the CSV file in chunks and insert into TempLead
            chunks = pd.read_csv(csv_object, chunksize=10000)

            # Map CSV headers from sample csv to expected column names
            header_map = {
                "Type (Business or Consumer)": "lead_type",
                "Business Name": "business_name",
                "First Name": "first_name",
                "Last Name": "last_name",
                "Street Address": "street_address",
                "City": "city",
                "State": "state",
                "Country": "country",
                "Zip Code": "zipcode",
                "Phone": "phone",
                "Phone Type (Land or Cell)": "phone_type",
                "Email": "email",
                "Email Verified (Yes/No)": "is_email_verified",
                "SIC Code (Business Only)": "sic_code",
            }
            # expected_columns = [col.strip().lower() for col in header_map.keys()]
            # logger.error(f"Expected columns: {list(expected_columns)}")
            
            # New data counter
            new_data_count = 0
            # Read the CSV in chunks
            for chunk in chunks:
                
                # Update new_data_count
                new_data_count += chunk.shape[0]
                
                # logger.debug(f"CSV Columns: {chunk.columns.tolist()}")
                # logger.info(f"First Rows: {chunk.head(3).to_dict(orient='records')}")
                
                # csv_columns = [col.strip().lower() for col in chunk.columns]
                # logger.error(f"CSV Columns Found: {list(csv_columns)}")
                
                missing_columns = [col for col in header_map.keys() if col not in chunk.columns]

                if missing_columns:
                    logger.error(f"Missing columns in CSV: {', '.join(missing_columns)}")
                    lead_data.status = LeadData.LOADING_FAILED
                    lead_data.save()
                    file_name_and_status[file_name] = False
                    continue

                # Rename headers and fill nan fields in the chunk
                chunk.rename(columns=header_map, inplace=True) 
                
                # Change dtype of 'zipcode', 'phone' and 'sic_code' columns to str
                # Conditional logic for handling NaN . Converts valid values to integers and then strings while leaving invalid values as empty strings.
                chunk['zipcode'] = chunk['zipcode'].apply(convert_to_int_string)
                chunk['phone'] = chunk['phone'].apply(convert_to_int_string)
                chunk['sic_code'] = chunk['sic_code'].apply(convert_to_int_string)

                # Fill nan values with empty string
                chunk.fillna("", inplace=True)
                
                logger.info(f"CSV Columns after renaming: {chunk.columns.tolist()}")
                
                temp_leads = [
                    TempLead(
                        lead_type=normalize_empty_to_none(row.get('lead_type')),
                        business_name=normalize_empty_to_none(row.get('business_name')),
                        first_name=normalize_empty_to_none(row.get('first_name')),
                        last_name=normalize_empty_to_none(row.get('last_name')),
                        street_address=normalize_empty_to_none(row.get('street_address')),
                        city=normalize_empty_to_none(row.get('city')),
                        state=normalize_empty_to_none(row.get('state')),
                        country=normalize_empty_to_none(row.get('country')),
                        zipcode=normalize_empty_to_none(row.get('zipcode')),
                        phone=normalize_empty_to_none(row.get('phone')),
                        phone_type=normalize_empty_to_none(row.get('phone_type')),
                        email=normalize_empty_to_none(row.get('email')),
                        is_email_verified=True if str(row.get('is_email_verified')).lower().strip() == 'yes' else False,
                        sic_code=normalize_empty_to_none(row.get('sic_code')),
                        hash_value=compute_hash(
                                normalize_empty_to_none(row.get('email')), 
                                normalize_empty_to_none(row.get('phone')), 
                                normalize_empty_to_none(row.get('lead_type')),
                                normalize_empty_to_none(row.get('business_name')),
                                normalize_empty_to_none(row.get('first_name')),
                                normalize_empty_to_none(row.get('last_name')),
                                normalize_empty_to_none(row.get('street_address')),
                                normalize_empty_to_none(row.get('city')),
                                normalize_empty_to_none(row.get('state')),
                                normalize_empty_to_none(row.get('country')),
                                normalize_empty_to_none(row.get('zipcode')),                            
                                normalize_empty_to_none(row.get('sic_code'))
                            ),
                        version_num = current_version_num + 1 # Increment version number by 1
                    )
                    for _, row in chunk.iterrows()
                ]
                TempLead.objects.bulk_create(temp_leads)

                # Move data to `Lead` table
                # logger.debug("Inside connection try block")
                try:
                    with connection.cursor() as cursor:
                        cursor.execute(
                            """
                            INSERT INTO "public"."Admin_lead" (lead_type, lead_id, business_name, first_name,
                            last_name, street_address, city, state, country, zipcode, phone, phone_type, 
                            email, is_email_verified, sic_code, created_on, is_deleted, hash_value, 
                            version_num)
                            SELECT lead_type, lead_id, business_name, first_name, last_name, street_address, 
                            city, state, country, zipcode, phone, phone_type, email, is_email_verified, 
                            sic_code, created_on, is_deleted, hash_value, version_num
                            FROM "public"."Admin_templead"
                            ON CONFLICT DO NOTHING;
                            """
                        )

                    connection.commit()
                except Exception as e:
                    # Handle the exception and log the error
                    print(f"An error occurred: {e}")
                    connection.rollback()  # Rollback the transaction in case of error
                finally:
                    # Ensure the connection is always closed
                    if connection:
                        connection.close()

                TempLead.objects.all().delete()

            # Calculate skipped count
            final_count = Lead.objects.count()
            skipped_count = (initial_count + new_data_count) - final_count

            # Update LeadData record on success
            lead_data.status = LeadData.LOADING_COMPLETED
            lead_data.skipped_count = skipped_count
            lead_data.version_num = current_version_num + 1
            lead_data.save()
            file_name_and_status[file_name] = True

            logger.info(f"File processing completed successfully. Skipped count: {skipped_count}")
            logger.info(f"Time taken for celery : {time.time() - start_time:.2f} seconds")

        except Exception as e:
            logger.error(f"Error processing CSV file: {str(e)}")
            # Update LeadData record on failure
            lead_data.status = LeadData.LOADING_FAILED
            lead_data.save()
            file_name_and_status[file_name] = False
    
    successful_files = '\n'.join([remove_timestamp_from_filename(filename) for filename in list(file_name_and_status.keys()) if file_name_and_status[filename] != False])
    failed_files = '\n'.join([remove_timestamp_from_filename(filename) for filename in list(file_name_and_status.keys()) if file_name_and_status[filename] != True])
    
    # Send email to Admin
    if successful_files and failed_files:
        admin_email_service.send_confirmation_email(
            subject='CSV Processing Report',
            message=f"""Successful files : {successful_files}
                        Failed files : {failed_files}
                    """,
        )
    elif successful_files:
        admin_email_service.send_confirmation_email(
            subject='CSV Processing Successful',
            message=f"All CSV files are processed successfully!",
        )       
    else:
        admin_email_service.send_confirmation_email(
            subject='CSV Processing Failed',
            message=f"Failed to process all CSV files.",
        )