File: //home/arjun/projects/unlimited-leads/Unlimited-Leads-Be/user/tasks.py
from datetime import datetime
import io
import random
import re
from typing import List
import pandas as pd
import logging
import time
import uuid
from celery import shared_task
from django.utils import timezone
from django.utils.text import slugify
from django.core.paginator import Paginator
from django.db.models import Max
from django.db.models import Q
from io import BytesIO
from tempfile import TemporaryDirectory, NamedTemporaryFile
from services.numverify import NUMVERIFY
from services.proofy_validation import ProofyEmailVerifier
from user.models import EmailOrPhoneVerifyProcessedFile, EmailOrPhoneVerifyFileUpload,UserLeadsSearchUsage, UserLeadSearch,ScrappingSearchTask,SrcappingSearchResult
from services.s3_handler import S3Handler
from authorization.models import UnlimitedLeadUser
from phonenumbers import parse, region_code_for_country_code
from payment.models import Transaction
from Admin.models import Lead
from utils.csv_splitter import to_csv, zip_csv_files
from utils.utils import format_count_to_short
from services.twilio_service import TwilioService
from services.email.email_service import UserLeadsEmailService
from services.twilio_service import TwilioService
from .serializers import UserLeadDownloadSerializer
from .filters import UserLeadFilter
import requests
from bs4 import BeautifulSoup
from googlesearch import search
from itertools import cycle
logger = logging.getLogger('user')
DOWNLOAD_RESULT_LIMIT = 1000000
def clean_phone_number(number: str):
"""
Clean the phone number by removing non-numeric characters, but retain the '+' symbol if present at the start.
"""
if isinstance(number, str):
# Retain '+' if it is the first character, then remove all non-numeric characters
cleaned_number = ''.join(char for char in number if char.isdigit() or (char == '+' and number.index(char) == 0))
return cleaned_number
elif isinstance(number, float):
# If the number is in float format, convert it to a string and clean it
cleaned_number = str(int(number)) # Convert float to int to remove decimals
return cleaned_number
else:
# If the number is not a string or float, return as is
return str(number)
def retry_email_verification(emails: List[str], delay: int = 5, max_attempts: int = 3) -> List[dict]:
"""
Retries initiating and fetching bulk email verification results until they are complete.
If 'cid' is not retrieved, retries initiation a fixed number of times.
"""
email_verifier = ProofyEmailVerifier()
cid = None
attempts = 0
# Step 1: Retry initiating bulk verification until a CID is obtained
while cid is None and attempts < max_attempts:
try:
cid = email_verifier.initiate_bulk_verification(emails)
time.sleep(2)
logger.info(f"Successfully initiated bulk email verification. CID: {cid}")
except ValueError as e:
attempts += 1
logger.error(f"Attempt {attempts}/{max_attempts} failed to initiate verification: {e}")
if attempts < max_attempts:
logger.info(f"Retrying to initiate verification in {delay} seconds...")
time.sleep(delay)
else:
logger.error("Max attempts reached. Could not initiate verification.")
raise ValueError("Failed to initiate bulk email verification after multiple attempts.") from e
# Step 2: Retry fetching results until verification is complete
while True:
try:
results = email_verifier.fetch_bulk_verification_results(cid)
# Check for the "Check not yet complete" message
if any(result.get("message") == "Check not yet complete. Please retry later." for result in results):
logger.warning(f"Verification results not yet complete for CID {cid}. Retrying in {delay} seconds...")
time.sleep(delay)
continue
# Return final results when complete
return results
except ValueError as e:
logger.error(f"Error fetching verification results for CID {cid}: {e}")
time.sleep(delay)
def get_country_alpha_code(phone_number: str):
"""
Extract the numeric country code from a phone number and convert it to an alpha-2 country code.
"""
try:
# Parse the phone number to extract the country code
parsed_number = parse(phone_number, None)
country_code = parsed_number.country_code
logger.info(f"parsed,Country Code: {country_code} National Number: {parsed_number.national_number}")
# Convert numeric country code to alpha-2 code
country_alpha_code = region_code_for_country_code(country_code)
logger.info(f"code,{country_code}")
logger.info(f"country,{country_alpha_code}")
return country_alpha_code,parsed_number.national_number
except Exception as e:
logger.error(f"Error extracting country alpha code from {phone_number}: {e}")
return None
def retry_phone_verification(phone_numbers, delay=5):
"""
Retries the phone verification using NUMVERIFY indefinitely until valid results are returned.
"""
phone_verifier = NUMVERIFY()
while True:
try:
# Clean phone numbers before verification
cleaned_numbers = []
for number in phone_numbers:
clean_number = clean_phone_number(number)
country_code , national_number= get_country_alpha_code(clean_number) or 'US' # Default to 'IN' if unknown
cleaned_numbers.append({'phone_number': national_number, 'country_code': country_code})
phone_results = phone_verifier.verify_numbers(cleaned_numbers)
return phone_results
except Exception as e:
logger.error(f"Error verifying phone numbers: {e}")
time.sleep(delay) # Wait before retrying in case of error
continue # Retry the verification
@shared_task(bind=True)
def process_uploaded_data(self, file_key: str, user_id: int, payment_id: int):
"""
Download a CSV file from S3, process the data (verify emails and phone numbers),
and upload the results back to S3.
"""
s3_handler = S3Handler()
try:
# Retrieve user and file upload objects
user = UnlimitedLeadUser.objects.get(id=user_id)
file_upload = EmailOrPhoneVerifyFileUpload.objects.get(file_key=file_key)
except UnlimitedLeadUser.DoesNotExist:
logger.error(f"User with ID {user_id} not found.")
return {"error": f"User with ID {user_id} not found."}
except EmailOrPhoneVerifyFileUpload.DoesNotExist:
logger.error(f"FileUpload record not found for file_key: {file_key}")
return {"error": f"FileUpload record not found for file_key: {file_key}"}
try:
# Download file from S3
file_obj = s3_handler.get_object(key=file_key)
if not file_obj:
raise ValueError(f"Failed to retrieve file from S3 for file_key: {file_key}")
# Read the file content
file_content = file_obj.read()
if isinstance(file_content, bytes):
csv_data = file_content.decode('utf-8').splitlines()
else:
csv_data = file_content.splitlines()
logger.debug(f"Read {len(csv_data)} lines from file.")
# Parse the CSV data
df = pd.read_csv(io.StringIO("\n".join(csv_data)), dtype={'phone_number': str})
logger.debug(f"Original DataFrame:\n{df}")
# Initialize result lists
emails = []
phone_numbers = []
results = []
# Process emails if the column exists
if 'email' in df.columns:
emails = df['email'].dropna().astype(str).str.strip().tolist()
emails = [email for email in emails if email != ''] # Filter out empty emails
logger.debug(f"Filtered emails: {emails}")
# Process phone numbers if the column exists
if 'phone_number' in df.columns:
phone_numbers = df['phone_number'].dropna().astype(str).str.strip().tolist()
phone_numbers = [phone for phone in phone_numbers if phone != ''] # Filter out empty phone numbers
# Clean phone numbers to handle decimals
phone_numbers = [clean_phone_number(phone) for phone in phone_numbers]
logger.debug(f"Filtered phone numbers: {phone_numbers}")
deliverable_email_count = 0
valid_phone_count = 0
# Only trigger email verification if email list has emails
if emails:
try:
email_results = retry_email_verification(emails)
logger.info(f"email_results: {email_results}")
except Exception as e:
logger.error(f"Error verifying emails: {e}")
raise
# Store email verification results
for email, result in zip(emails, email_results):
status = result.get('statusName', 'unknown')
if status == 'deliverable':
deliverable_email_count += 1
results.append({'email': email, 'status': status, 'phone_number': None, 'phone_status': None})
# Only trigger phone number verification if phone number list has phone numbers
if phone_numbers:
try:
phone_results = retry_phone_verification(phone_numbers)
logger.info(f"phone_results: {phone_results}")
except Exception as e:
logger.error(f"Error verifying phone numbers: {e}")
raise
# Store phone verification results
for phone_number, result in zip(phone_numbers, phone_results):
valid = result.get('valid', False)
if valid:
valid_phone_count += 1
results.append({'email': None, 'status': None, 'phone_number': phone_number, 'phone_status': valid})
# Prepare the combined results
combined_results = []
# Separate results into email and phone buckets
email_results = [{'email': result['email'], 'email_status': result['status']} for result in results if result['email']]
phone_results = [{'phone_number': result['phone_number'], 'phone_status': result['phone_status']} for result in results if result['phone_number']]
# Check if both email and phone results exist
if email_results and phone_results:
# Align emails and phone numbers by length
max_len = max(len(email_results), len(phone_results))
email_results.extend([{'email': '', 'email_status': ''}] * (max_len - len(email_results)))
phone_results.extend([{'phone_number': '', 'phone_status': ''}] * (max_len - len(phone_results)))
# Combine aligned data into rows
for email, phone in zip(email_results, phone_results):
combined_results.append({
'email': email['email'],
'email_status': email['email_status'],
'phone_number': phone['phone_number'],
'phone_status': phone['phone_status']
})
else:
# Process individual email and phone results separately
for result in results:
if result['email']:
combined_results.append({
'email': result['email'],
'email_status': result['status'],
})
if result['phone_number']:
combined_results.append({
'phone_number': result['phone_number'],
'phone_status': result['phone_status']
})
# Convert to DataFrame
result_df = pd.DataFrame(combined_results)
logger.info(f"final: {result_df}")
# Save results to a new CSV file in memory
buffer = io.StringIO()
result_df.to_csv(buffer, index=False)
buffer.seek(0)
# Upload the processed file to S3
result_file_key = s3_handler.get_key(f"email-or-phone-validation/processed-file/{file_key}")
if not s3_handler.upload_file(buffer, result_file_key):
raise Exception(f"Failed to upload the results file to S3 for file_key: {file_key}")
# Update database records
EmailOrPhoneVerifyProcessedFile.objects.create(
file_key=result_file_key,
user=user,
total_deliverable_emails=deliverable_email_count,
total_valid_phone_numbers=valid_phone_count,
total_email_count = file_upload.email_count,
total_phone_num_count = file_upload.phone_num_count,
type = file_upload.type,
uploaded_file_name=file_upload.uploaded_file_name,
notification_type = file_upload.notification_type
)
file_upload.status = 'completed'
file_upload.processed_at = datetime.now()
file_upload.save()
download_link = s3_handler.generate_presigned_url(result_file_key)
if file_upload.notification_type == 'email':
email_service = UserLeadsEmailService(user_id)
subject = "Your Unlimited Leads Phone or Email Validation is ready."
try:
email_service.send_user_validation_link(download_link, subject)
except Exception:
logger.exception("Failed to send email notification")
# Optionally, take additional action, like retrying or alerting
elif file_upload.notification_type == 'sms':
sms_service = TwilioService()
phone_number = user.phone_number # Assuming user.phone_number exists
message = f"Your file is ready for download. Use the following link: {download_link}"
try:
sms_sid = sms_service.send_sms(to=phone_number, message=message)
if sms_sid:
logger.info(f"SMS sent successfully, SID: {sms_sid}")
else:
logger.warning("SMS notification failed")
except Exception:
logger.exception("Failed to send SMS notification")
return {"status": "completed", "results_file_key": result_file_key}
except Exception as e:
logger.error(f"Error processing uploaded data for file_key {file_key}: {e}")
file_upload.status = 'failed'
file_upload.save()
return {"error": f"Failed to process data: {e}"}
@shared_task
def reset_user_usage_limit():
users = UnlimitedLeadUser.objects.filter(is_verified=True, is_deleted=False)
for user in users:
transaction = (
Transaction.objects.filter(
customer=user, subscription_status="active", is_subscription=True
)
.order_by("-created_at")
.first()
)
if transaction is None:
continue
if transaction.product.billing_period == 'day':
usage, _ = UserLeadsSearchUsage.objects.get_or_create(user=user)
usage.usage_count = 0
usage.usage_reset_at = timezone.now()
usage.save()
@shared_task
def download_user_leads_search(user_id, search_id):
search = UserLeadSearch.objects.get(id=search_id, user=user_id)
queryset = Lead.objects.filter(is_deleted=False)
header = [
"Type (Business or Consumer)",
"Business Name",
"First Name",
"Last Name",
"Street Address",
"City",
"State",
"Country",
"Zip Code",
"Phone",
"Phone Type (Land or Cell)",
"Email",
"Email Verified (Yes/No)",
"SIC Code (Business Only)",
]
search_filter_param = search.search_filter_parameter
ordering = search_filter_param.get("ordering", "")
queryset = queryset.filter(
version_num__lte=search.lead_version
)
filter_set = UserLeadFilter(data=search_filter_param, queryset=queryset)
queryset = filter_set.qs
ordering = [order for order in ordering.split(",")]
queryset = queryset.order_by(*ordering)
if search.title != "":
filename = f"unlimited-leads-{slugify(search.title)}"
else:
filename = f"unlimited-leads-{timezone.now().strftime('%y%m%d')}"
s3_handler = S3Handler()
paginator = Paginator(queryset, DOWNLOAD_RESULT_LIMIT)
if paginator.num_pages == 1:
with TemporaryDirectory() as download_dir:
page = paginator.page(1)
serializer = UserLeadDownloadSerializer(page.object_list, many=True)
csv_filepath = to_csv(
[data.values() for data in serializer.data],
header,
download_dir,
filename,
)
download_key = f"downloads/leads/{uuid.uuid4()}/{filename}.csv"
with open(csv_filepath, "rb") as csv_file:
is_success = s3_handler.upload_file(csv_file, s3_handler.get_key(download_key))
if is_success:
search.download_key = download_key
search.is_upload_success = is_success
search.save()
download_notify_email.delay(user_id, download_key)
else:
with TemporaryDirectory() as download_dir:
for page_number in paginator.page_range:
page = paginator.page(page_number)
serializer = UserLeadDownloadSerializer(page.object_list, many=True)
csv_filename = f"part_{page_number}_({len(page)})"
to_csv(
[data.values() for data in serializer.data],
header,
download_dir,
csv_filename,
)
download_key = f"downloads/leads/{uuid.uuid4()}/{filename}.zip"
with NamedTemporaryFile(suffix=".zip") as csv_file:
zip_csv_files(download_dir, csv_file)
csv_file.seek(0)
is_success = s3_handler.upload_file(csv_file, s3_handler.get_key(download_key))
if is_success:
search.download_key = download_key
search.is_upload_success = is_success
search.save()
download_notify_email.delay(user_id, download_key)
@shared_task
def download_notify_email(user_id, download_key):
s3_handler = S3Handler()
download_link = s3_handler.generate_presigned_url(s3_handler.get_key(download_key))
email_service = UserLeadsEmailService(user_id)
subject = "Your Unlimited Leads download is ready."
email_service.send_user_lead_download_link(download_link,subject)
@shared_task
def download_notify_email_scrapping(user_id, download_key):
s3_handler = S3Handler()
download_link = s3_handler.generate_presigned_url(s3_handler.get_key(download_key))
email_service = UserLeadsEmailService(user_id)
subject = "Your Unlimited Leads scrapping is ready."
email_service.send_user_lead_download_link(download_link,subject)
@shared_task
def notify_saved_search_new_records():
saved_searches = UserLeadSearch.objects.filter(
Q(is_saved=True)
& Q(is_deleted=False)
& Q(new_records_found=False)
& ~Q(receive_notification=UserLeadSearch.NO)
)
logger.info(f"Found {saved_searches.count()} saved searches to notify")
leads = Lead.objects.filter(is_deleted=False)
for saved_search in saved_searches:
filter_set = UserLeadFilter(
data=saved_search.search_filter_parameter, queryset=leads
)
queryset = filter_set.qs
max_version = queryset.aggregate(max_version=Max("version_num", default=0))["max_version"]
if max_version <= saved_search.lead_version:
continue
new_records_count = queryset.filter(version_num=max_version).count()
saved_search.new_records_found = True
saved_search.save()
twilio = TwilioService()
email_service = UserLeadsEmailService(saved_search.user.id)
phone_number = saved_search.user.phone_number
if saved_search.receive_notification == UserLeadSearch.EMAIL:
email_service.send_saved_search_new_records_email(new_records_count)
else:
if phone_number is not None:
twilio.send_sms(
phone_number,
f"Hi, we've found {format_count_to_short(new_records_count)} new leads that match the filter you saved in Unlimited Leads. Check them out now!",
)
if saved_search.receive_notification == UserLeadSearch.BOTH:
email_service.send_saved_search_new_records_email(new_records_count)
PROXIES = [
"http://45.77.24.239:3128",
"http://103.152.5.254:8080",
"http://89.223.121.160:8080",
"http://159.65.69.157:9300",
"http://20.99.152.172:3128",
]
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
]
proxy_pool = cycle(PROXIES)
def scrape_emails_phones(url):
proxy = next(proxy_pool)
headers = {"User-Agent": random.choice(USER_AGENTS)}
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Extract text content
page_text = soup.get_text(separator=" ")
# Regex for extracting emails and phone numbers
phone_pattern = r"\+?\d{1,4}?[-.\s]?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4,6}"
email_pattern = r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}"
phones = set(re.findall(phone_pattern, page_text))
emails = set(re.findall(email_pattern, page_text))
return list(emails)[:5], list(phones)[:5]
except Exception as e:
return [], []
@shared_task
def start_scrape_task(task_id, user_id):
"""Celery task to perform scraping."""
try:
task = ScrappingSearchTask.objects.get(id=task_id)
user = UnlimitedLeadUser.objects.get(id=user_id)
logger.info(f"[TASK START] Task ID: {task.id}, User ID: {user.id}")
task.task_status = "In Progress"
task.save()
query = f"{task.search_keyword} {task.lead_type} {task.location} (contact OR 'get in touch' OR 'about us' OR 'customer service' OR 'business directory' OR 'contact details' OR 'phone number' OR 'email')"
logger.info(f"[SEARCH QUERY] {query}")
# List to store the results for CSV generation
results = []
for url in search(query, num_results=10):
logger.info(f"[PROCESSING URL] {url}")
emails, phones = scrape_emails_phones(url)
if emails or phones:
result = SrcappingSearchResult.objects.create(
user=user,
task=task,
website_name=url.split("//")[-1].split("/")[0],
website_link=url,
phone_number=", ".join(phones),
email=", ".join(emails),
)
logger.info(f"[RESULT SAVED] URL: {url}, Emails: {emails}, Phones: {phones}")
# Add result to CSV list
results.append({
'website_name': result.website_name,
'website_link': result.website_link,
'phone_number': result.phone_number,
'email': result.email,
})
time.sleep(random.uniform(1, 5)) # Random delay
# Create the CSV file in memory using StringIO
if results:
result_df = pd.DataFrame(results)
# Use StringIO as an in-memory file-like object
buffer = io.StringIO()
result_df.to_csv(buffer, index=False) # Convert DataFrame to CSV and write to buffer
buffer.seek(0) # Go to the beginning of the buffer
# Upload the processed file to S3
s3_handler = S3Handler()
result_file_key = s3_handler.get_key(f"scrapped-data/{task.id}_results.csv")
# Upload the CSV file to S3
if not s3_handler.upload_file(buffer, result_file_key):
logger.error(f"[UPLOAD FAILED] Task ID: {task.id}, S3 Key: {result_file_key}")
# Optionally, store the result file key in the task model
logger.info(f"[CSV UPLOADED] Task ID: {task.id}, S3 Key: {result_file_key}")
task.download_key = f"scrapped-data/{task.id}_results.csv"
task.task_status = "completed"
task.save()
download_link = s3_handler.generate_presigned_url(result_file_key)
if task.notification_type == 'email':
email_service = UserLeadsEmailService(user_id)
subject = "Your website search is completed."
try:
email_service.send_user_scrapping_link(download_link, subject)
except Exception:
logger.exception("Failed to send email notification")
# Optionally, take additional action, like retrying or alerting
elif task.notification_type == 'sms':
sms_service = TwilioService()
phone_number = user.phone_number # Assuming user.phone_number exists
message = f"Your website search is completed{download_link}"
try:
sms_sid = sms_service.send_sms(to=phone_number, message=message)
if sms_sid:
logger.info(f"SMS sent successfully, SID: {sms_sid}")
else:
logger.warning("SMS notification failed")
except Exception:
logger.exception("Failed to send SMS notification")
logger.info(f"[TASK COMPLETED] Task ID: {task.id}")
except ScrappingSearchTask.DoesNotExist:
logger.error(f"[TASK NOT FOUND] Task ID: {task_id}")
except Exception as e:
logger.error(f"[TASK ERROR] Task ID: {task_id}, Error: {str(e)}")