File: //home/arjun/projects/unlimited-leads/Unlimited-Leads-Be/Admin/tasks.py
import pandas as pd
import logging
from celery import shared_task
from django.db import connection
from django.db.models import Max
from Admin.models import TempLead, Lead, LeadData
from Admin.helper import compute_hash, normalize_empty_to_none, convert_to_int_string, remove_timestamp_from_filename
from services.email.email_service import AdminEmailService
from services.s3_handler import S3Handler
s3_handler = S3Handler()
logger = logging.getLogger(__name__)
@shared_task
def process_csv_file(keys:list, file_ids:list, file_names:list, user_id) -> None:
admin_email_service = AdminEmailService(user_id)
# Fetch the current version value
current_version_num = LeadData.objects.aggregate(Max('version_num'))['version_num__max']
logger.info(f"Current version number : {current_version_num}")
file_name_and_status = {}
successful_files = []
failed_files = []
for i in range(len(keys)):
file_key = keys[i]
uploaded_file_id = file_ids[i]
file_name = file_names[i]
# Fetch initial count of Lead table
initial_count = Lead.objects.count()
logger.info(f"Lead table count : {str(initial_count)}")
logger.info(f"Processing {uploaded_file_id}")
lead_data = LeadData.objects.get(file_id=uploaded_file_id)
lead_data.status = LeadData.PROCESSING
lead_data.save()
import time
start_time = time.time()
try:
csv_object = s3_handler.get_object(file_key)
except Exception as e:
logger.exception(f"Unable to get csv_object from get_object, Error : {str(e)}")
lead_data.status = LeadData.LOADING_FAILED
lead_data.save()
file_name_and_status[file_name] = False
continue
if not csv_object:
logger.debug(f"Error CSV file not found in S3 Key : {file_key}")
lead_data.status = LeadData.LOADING_FAILED
lead_data.save()
file_name_and_status[file_name] = False
continue
try:
# Read the CSV file in chunks and insert into TempLead
chunks = pd.read_csv(csv_object, chunksize=10000)
# Map CSV headers from sample csv to expected column names
header_map = {
"Type (Business or Consumer)": "lead_type",
"Business Name": "business_name",
"First Name": "first_name",
"Last Name": "last_name",
"Street Address": "street_address",
"City": "city",
"State": "state",
"Country": "country",
"Zip Code": "zipcode",
"Phone": "phone",
"Phone Type (Land or Cell)": "phone_type",
"Email": "email",
"Email Verified (Yes/No)": "is_email_verified",
"SIC Code (Business Only)": "sic_code",
}
# expected_columns = [col.strip().lower() for col in header_map.keys()]
# logger.error(f"Expected columns: {list(expected_columns)}")
# New data counter
new_data_count = 0
# Read the CSV in chunks
for chunk in chunks:
# Update new_data_count
new_data_count += chunk.shape[0]
# logger.debug(f"CSV Columns: {chunk.columns.tolist()}")
# logger.info(f"First Rows: {chunk.head(3).to_dict(orient='records')}")
# csv_columns = [col.strip().lower() for col in chunk.columns]
# logger.error(f"CSV Columns Found: {list(csv_columns)}")
missing_columns = [col for col in header_map.keys() if col not in chunk.columns]
if missing_columns:
logger.error(f"Missing columns in CSV: {', '.join(missing_columns)}")
lead_data.status = LeadData.LOADING_FAILED
lead_data.save()
file_name_and_status[file_name] = False
continue
# Rename headers and fill nan fields in the chunk
chunk.rename(columns=header_map, inplace=True)
# Change dtype of 'zipcode', 'phone' and 'sic_code' columns to str
# Conditional logic for handling NaN . Converts valid values to integers and then strings while leaving invalid values as empty strings.
chunk['zipcode'] = chunk['zipcode'].apply(convert_to_int_string)
chunk['phone'] = chunk['phone'].apply(convert_to_int_string)
chunk['sic_code'] = chunk['sic_code'].apply(convert_to_int_string)
# Fill nan values with empty string
chunk.fillna("", inplace=True)
logger.info(f"CSV Columns after renaming: {chunk.columns.tolist()}")
temp_leads = [
TempLead(
lead_type=normalize_empty_to_none(row.get('lead_type')),
business_name=normalize_empty_to_none(row.get('business_name')),
first_name=normalize_empty_to_none(row.get('first_name')),
last_name=normalize_empty_to_none(row.get('last_name')),
street_address=normalize_empty_to_none(row.get('street_address')),
city=normalize_empty_to_none(row.get('city')),
state=normalize_empty_to_none(row.get('state')),
country=normalize_empty_to_none(row.get('country')),
zipcode=normalize_empty_to_none(row.get('zipcode')),
phone=normalize_empty_to_none(row.get('phone')),
phone_type=normalize_empty_to_none(row.get('phone_type')),
email=normalize_empty_to_none(row.get('email')),
is_email_verified=True if str(row.get('is_email_verified')).lower().strip() == 'yes' else False,
sic_code=normalize_empty_to_none(row.get('sic_code')),
hash_value=compute_hash(
normalize_empty_to_none(row.get('email')),
normalize_empty_to_none(row.get('phone')),
normalize_empty_to_none(row.get('lead_type')),
normalize_empty_to_none(row.get('business_name')),
normalize_empty_to_none(row.get('first_name')),
normalize_empty_to_none(row.get('last_name')),
normalize_empty_to_none(row.get('street_address')),
normalize_empty_to_none(row.get('city')),
normalize_empty_to_none(row.get('state')),
normalize_empty_to_none(row.get('country')),
normalize_empty_to_none(row.get('zipcode')),
normalize_empty_to_none(row.get('sic_code'))
),
version_num = current_version_num + 1 # Increment version number by 1
)
for _, row in chunk.iterrows()
]
TempLead.objects.bulk_create(temp_leads)
# Move data to `Lead` table
# logger.debug("Inside connection try block")
try:
with connection.cursor() as cursor:
cursor.execute(
"""
INSERT INTO "public"."Admin_lead" (lead_type, lead_id, business_name, first_name,
last_name, street_address, city, state, country, zipcode, phone, phone_type,
email, is_email_verified, sic_code, created_on, is_deleted, hash_value,
version_num)
SELECT lead_type, lead_id, business_name, first_name, last_name, street_address,
city, state, country, zipcode, phone, phone_type, email, is_email_verified,
sic_code, created_on, is_deleted, hash_value, version_num
FROM "public"."Admin_templead"
ON CONFLICT DO NOTHING;
"""
)
connection.commit()
except Exception as e:
# Handle the exception and log the error
print(f"An error occurred: {e}")
connection.rollback() # Rollback the transaction in case of error
finally:
# Ensure the connection is always closed
if connection:
connection.close()
TempLead.objects.all().delete()
# Calculate skipped count
final_count = Lead.objects.count()
skipped_count = (initial_count + new_data_count) - final_count
# Update LeadData record on success
lead_data.status = LeadData.LOADING_COMPLETED
lead_data.skipped_count = skipped_count
lead_data.version_num = current_version_num + 1
lead_data.save()
file_name_and_status[file_name] = True
logger.info(f"File processing completed successfully. Skipped count: {skipped_count}")
logger.info(f"Time taken for celery : {time.time() - start_time:.2f} seconds")
except Exception as e:
logger.error(f"Error processing CSV file: {str(e)}")
# Update LeadData record on failure
lead_data.status = LeadData.LOADING_FAILED
lead_data.save()
file_name_and_status[file_name] = False
successful_files = '\n'.join([remove_timestamp_from_filename(filename) for filename in list(file_name_and_status.keys()) if file_name_and_status[filename] != False])
failed_files = '\n'.join([remove_timestamp_from_filename(filename) for filename in list(file_name_and_status.keys()) if file_name_and_status[filename] != True])
# Send email to Admin
if successful_files and failed_files:
admin_email_service.send_confirmation_email(
subject='CSV Processing Report',
message=f"""Successful files : {successful_files}
Failed files : {failed_files}
""",
)
elif successful_files:
admin_email_service.send_confirmation_email(
subject='CSV Processing Successful',
message=f"All CSV files are processed successfully!",
)
else:
admin_email_service.send_confirmation_email(
subject='CSV Processing Failed',
message=f"Failed to process all CSV files.",
)