HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/unlimited-leads/Unlimited-Leads-Be/user/tasks.py
from datetime import datetime
import io
import random
import re
from typing import List
import pandas as pd
import logging
import time
import uuid
from celery import shared_task
from django.utils import timezone
from django.utils.text import slugify
from django.core.paginator import Paginator
from django.db.models import Max
from django.db.models import Q
from io import BytesIO
from tempfile import TemporaryDirectory, NamedTemporaryFile
from services.numverify import NUMVERIFY
from services.proofy_validation import ProofyEmailVerifier
from user.models import EmailOrPhoneVerifyProcessedFile, EmailOrPhoneVerifyFileUpload,UserLeadsSearchUsage, UserLeadSearch,ScrappingSearchTask,SrcappingSearchResult
from services.s3_handler import S3Handler
from authorization.models import UnlimitedLeadUser
from phonenumbers import parse, region_code_for_country_code
from payment.models import Transaction

from Admin.models import Lead

from utils.csv_splitter import to_csv, zip_csv_files
from utils.utils import format_count_to_short
from services.twilio_service import TwilioService
from services.email.email_service import UserLeadsEmailService
from services.twilio_service import TwilioService
from .serializers import UserLeadDownloadSerializer
from .filters import UserLeadFilter
import requests
from bs4 import BeautifulSoup
from googlesearch import search
from itertools import cycle


logger = logging.getLogger('user')

DOWNLOAD_RESULT_LIMIT = 1000000

def clean_phone_number(number: str):
    """
    Clean the phone number by removing non-numeric characters, but retain the '+' symbol if present at the start.
    """
    if isinstance(number, str):
        # Retain '+' if it is the first character, then remove all non-numeric characters
        cleaned_number = ''.join(char for char in number if char.isdigit() or (char == '+' and number.index(char) == 0))
        return cleaned_number
    elif isinstance(number, float):
        # If the number is in float format, convert it to a string and clean it
        cleaned_number = str(int(number))  # Convert float to int to remove decimals
        return cleaned_number
    else:
        # If the number is not a string or float, return as is
        return str(number)

def retry_email_verification(emails: List[str], delay: int = 5, max_attempts: int = 3) -> List[dict]:
    """
    Retries initiating and fetching bulk email verification results until they are complete.
    If 'cid' is not retrieved, retries initiation a fixed number of times.
    """
    email_verifier = ProofyEmailVerifier()
    cid = None
    attempts = 0

    # Step 1: Retry initiating bulk verification until a CID is obtained
    while cid is None and attempts < max_attempts:
        try:
            cid = email_verifier.initiate_bulk_verification(emails)
            time.sleep(2)
            logger.info(f"Successfully initiated bulk email verification. CID: {cid}")
        except ValueError as e:
            attempts += 1
            logger.error(f"Attempt {attempts}/{max_attempts} failed to initiate verification: {e}")
            if attempts < max_attempts:
                logger.info(f"Retrying to initiate verification in {delay} seconds...")
                time.sleep(delay)
            else:
                logger.error("Max attempts reached. Could not initiate verification.")
                raise ValueError("Failed to initiate bulk email verification after multiple attempts.") from e

    # Step 2: Retry fetching results until verification is complete
    while True:
        try:
            results = email_verifier.fetch_bulk_verification_results(cid)
            # Check for the "Check not yet complete" message
            if any(result.get("message") == "Check not yet complete. Please retry later." for result in results):
                logger.warning(f"Verification results not yet complete for CID {cid}. Retrying in {delay} seconds...")
                time.sleep(delay)
                continue
            # Return final results when complete
            return results
        except ValueError as e:
            logger.error(f"Error fetching verification results for CID {cid}: {e}")
            time.sleep(delay)

def get_country_alpha_code(phone_number: str):
    """
    Extract the numeric country code from a phone number and convert it to an alpha-2 country code.
    """
    try:
        # Parse the phone number to extract the country code
        parsed_number = parse(phone_number, None)
        country_code = parsed_number.country_code
        logger.info(f"parsed,Country Code: {country_code} National Number: {parsed_number.national_number}")
        
        # Convert numeric country code to alpha-2 code
        country_alpha_code = region_code_for_country_code(country_code)
        logger.info(f"code,{country_code}")
        logger.info(f"country,{country_alpha_code}")
        
        return country_alpha_code,parsed_number.national_number
    except Exception as e:
        logger.error(f"Error extracting country alpha code from {phone_number}: {e}")
        return None

def retry_phone_verification(phone_numbers, delay=5):
    """
    Retries the phone verification using NUMVERIFY indefinitely until valid results are returned.
    """
    phone_verifier = NUMVERIFY()
    while True:
        try:
            # Clean phone numbers before verification
            cleaned_numbers = []
            for number in phone_numbers:
                clean_number = clean_phone_number(number)
                country_code , national_number= get_country_alpha_code(clean_number) or 'US'  # Default to 'IN' if unknown
                cleaned_numbers.append({'phone_number': national_number, 'country_code': country_code})

            phone_results = phone_verifier.verify_numbers(cleaned_numbers)
            return phone_results
        except Exception as e:
            logger.error(f"Error verifying phone numbers: {e}")
            time.sleep(delay)  # Wait before retrying in case of error
            continue  # Retry the verification

@shared_task(bind=True)
def process_uploaded_data(self, file_key: str, user_id: int, payment_id: int):
    """
    Download a CSV file from S3, process the data (verify emails and phone numbers),
    and upload the results back to S3.
    """
    s3_handler = S3Handler()

    try:
        # Retrieve user and file upload objects
        user = UnlimitedLeadUser.objects.get(id=user_id)
        file_upload = EmailOrPhoneVerifyFileUpload.objects.get(file_key=file_key)
    except UnlimitedLeadUser.DoesNotExist:
        logger.error(f"User with ID {user_id} not found.")
        return {"error": f"User with ID {user_id} not found."}
    except EmailOrPhoneVerifyFileUpload.DoesNotExist:
        logger.error(f"FileUpload record not found for file_key: {file_key}")
        return {"error": f"FileUpload record not found for file_key: {file_key}"}

    try:
        # Download file from S3
        file_obj = s3_handler.get_object(key=file_key)
        if not file_obj:
            raise ValueError(f"Failed to retrieve file from S3 for file_key: {file_key}")

        # Read the file content
        file_content = file_obj.read()
        if isinstance(file_content, bytes):
            csv_data = file_content.decode('utf-8').splitlines()
        else:
            csv_data = file_content.splitlines()

        logger.debug(f"Read {len(csv_data)} lines from file.")

        # Parse the CSV data
        df = pd.read_csv(io.StringIO("\n".join(csv_data)), dtype={'phone_number': str})
        logger.debug(f"Original DataFrame:\n{df}")

        # Initialize result lists
        emails = []
        phone_numbers = []
        results = []

        # Process emails if the column exists
        if 'email' in df.columns:
            emails = df['email'].dropna().astype(str).str.strip().tolist()
            emails = [email for email in emails if email != '']  # Filter out empty emails
            logger.debug(f"Filtered emails: {emails}")

        # Process phone numbers if the column exists
        if 'phone_number' in df.columns:
            phone_numbers = df['phone_number'].dropna().astype(str).str.strip().tolist()
            phone_numbers = [phone for phone in phone_numbers if phone != '']  # Filter out empty phone numbers
            # Clean phone numbers to handle decimals
            phone_numbers = [clean_phone_number(phone) for phone in phone_numbers]
            logger.debug(f"Filtered phone numbers: {phone_numbers}")

        deliverable_email_count = 0
        valid_phone_count = 0

        # Only trigger email verification if email list has emails
        if emails:
            try:
                email_results = retry_email_verification(emails)
                logger.info(f"email_results: {email_results}") 
            except Exception as e:
                logger.error(f"Error verifying emails: {e}")
                raise

            # Store email verification results
            for email, result in zip(emails, email_results):
                status = result.get('statusName', 'unknown')
                if status == 'deliverable':
                    deliverable_email_count += 1
                results.append({'email': email, 'status': status, 'phone_number': None, 'phone_status': None})

        # Only trigger phone number verification if phone number list has phone numbers
        if phone_numbers:
            try:
                phone_results = retry_phone_verification(phone_numbers)
                logger.info(f"phone_results: {phone_results}")                                                                                                    
            except Exception as e:
                logger.error(f"Error verifying phone numbers: {e}")
                raise

            # Store phone verification results
            for phone_number, result in zip(phone_numbers, phone_results):
                valid = result.get('valid', False)
                if valid:
                    valid_phone_count += 1
                results.append({'email': None, 'status': None, 'phone_number': phone_number, 'phone_status': valid})

        # Prepare the combined results
        combined_results = []

# Separate results into email and phone buckets
        email_results = [{'email': result['email'], 'email_status': result['status']} for result in results if result['email']]
        phone_results = [{'phone_number': result['phone_number'], 'phone_status': result['phone_status']} for result in results if result['phone_number']]

        # Check if both email and phone results exist
        if email_results and phone_results:
            # Align emails and phone numbers by length
            max_len = max(len(email_results), len(phone_results))
            email_results.extend([{'email': '', 'email_status': ''}] * (max_len - len(email_results)))
            phone_results.extend([{'phone_number': '', 'phone_status': ''}] * (max_len - len(phone_results)))

            # Combine aligned data into rows
            for email, phone in zip(email_results, phone_results):
                combined_results.append({
                    'email': email['email'],
                    'email_status': email['email_status'],
                    'phone_number': phone['phone_number'],
                    'phone_status': phone['phone_status']
                })
        else:
            # Process individual email and phone results separately
            for result in results:
                if result['email']:
                    combined_results.append({
                        'email': result['email'],
                        'email_status': result['status'],
                    })
                if result['phone_number']:
                    combined_results.append({
                        'phone_number': result['phone_number'],
                        'phone_status': result['phone_status']
                    })

        # Convert to DataFrame
        result_df = pd.DataFrame(combined_results)
        logger.info(f"final: {result_df}")

        # Save results to a new CSV file in memory
        buffer = io.StringIO()
        result_df.to_csv(buffer, index=False)
        buffer.seek(0)

        # Upload the processed file to S3
        result_file_key = s3_handler.get_key(f"email-or-phone-validation/processed-file/{file_key}")
        if not s3_handler.upload_file(buffer, result_file_key):
            raise Exception(f"Failed to upload the results file to S3 for file_key: {file_key}")

        # Update database records
        EmailOrPhoneVerifyProcessedFile.objects.create(
            file_key=result_file_key,
            user=user,
            total_deliverable_emails=deliverable_email_count,
            total_valid_phone_numbers=valid_phone_count,
            total_email_count = file_upload.email_count,
            total_phone_num_count = file_upload.phone_num_count,
            type = file_upload.type,
            uploaded_file_name=file_upload.uploaded_file_name,
            notification_type = file_upload.notification_type


        )
        file_upload.status = 'completed'
        file_upload.processed_at = datetime.now()
        file_upload.save()
        download_link = s3_handler.generate_presigned_url(result_file_key)
        if file_upload.notification_type == 'email':
            email_service = UserLeadsEmailService(user_id)
            subject = "Your Unlimited Leads Phone or Email Validation is ready."
            try:
                email_service.send_user_validation_link(download_link, subject)
            except Exception:
                logger.exception("Failed to send email notification")
                # Optionally, take additional action, like retrying or alerting
        elif file_upload.notification_type == 'sms':
            sms_service = TwilioService()
            phone_number = user.phone_number  # Assuming user.phone_number exists
            message = f"Your file is ready for download. Use the following link: {download_link}"
            try:
                sms_sid = sms_service.send_sms(to=phone_number, message=message)
                if sms_sid:
                    logger.info(f"SMS sent successfully, SID: {sms_sid}")
                else:
                    logger.warning("SMS notification failed")
            except Exception:
                logger.exception("Failed to send SMS notification")

        return {"status": "completed", "results_file_key": result_file_key}

    except Exception as e:
        logger.error(f"Error processing uploaded data for file_key {file_key}: {e}")
        file_upload.status = 'failed'
        file_upload.save()
        return {"error": f"Failed to process data: {e}"}


@shared_task
def reset_user_usage_limit():
    users = UnlimitedLeadUser.objects.filter(is_verified=True, is_deleted=False)
    for user in users:
        transaction = (
            Transaction.objects.filter(
                customer=user, subscription_status="active", is_subscription=True
            )
            .order_by("-created_at")
            .first()
        )
        
        if transaction is None:
            continue
        
        if transaction.product.billing_period == 'day':
            usage, _ = UserLeadsSearchUsage.objects.get_or_create(user=user)
            usage.usage_count = 0
            usage.usage_reset_at = timezone.now()
            usage.save()


@shared_task
def download_user_leads_search(user_id, search_id):
    search = UserLeadSearch.objects.get(id=search_id, user=user_id)
    queryset = Lead.objects.filter(is_deleted=False)

    header = [
        "Type (Business or Consumer)",
        "Business Name",
        "First Name",
        "Last Name",
        "Street Address",
        "City",
        "State",
        "Country",
        "Zip Code",
        "Phone",
        "Phone Type (Land or Cell)",
        "Email",
        "Email Verified (Yes/No)",
        "SIC Code (Business Only)",
    ]

    search_filter_param = search.search_filter_parameter
    ordering = search_filter_param.get("ordering", "")

    queryset = queryset.filter(
        version_num__lte=search.lead_version
    )
    filter_set = UserLeadFilter(data=search_filter_param, queryset=queryset)
    queryset = filter_set.qs

    ordering = [order for order in ordering.split(",")]
    queryset = queryset.order_by(*ordering)
    
    if search.title != "":
        filename = f"unlimited-leads-{slugify(search.title)}"
    else:
        filename = f"unlimited-leads-{timezone.now().strftime('%y%m%d')}"
            
    s3_handler = S3Handler()
    paginator = Paginator(queryset, DOWNLOAD_RESULT_LIMIT)
    if paginator.num_pages == 1:
        with TemporaryDirectory() as download_dir:
            page = paginator.page(1)
            serializer = UserLeadDownloadSerializer(page.object_list, many=True)
            csv_filepath = to_csv(
                [data.values() for data in serializer.data],
                header,
                download_dir,
                filename,
            )
            
            download_key = f"downloads/leads/{uuid.uuid4()}/{filename}.csv"
            with open(csv_filepath, "rb") as csv_file:
                is_success = s3_handler.upload_file(csv_file, s3_handler.get_key(download_key))
                if is_success:                
                    search.download_key = download_key
                    search.is_upload_success = is_success
                    search.save()
                    download_notify_email.delay(user_id, download_key)
    else:
        with TemporaryDirectory() as download_dir:
            for page_number in paginator.page_range:
                page = paginator.page(page_number)
                serializer = UserLeadDownloadSerializer(page.object_list, many=True)
                csv_filename = f"part_{page_number}_({len(page)})"
                to_csv(
                    [data.values() for data in serializer.data],
                    header,
                    download_dir,
                    csv_filename,
                )
            
            download_key = f"downloads/leads/{uuid.uuid4()}/{filename}.zip"
            with NamedTemporaryFile(suffix=".zip") as csv_file:
                zip_csv_files(download_dir, csv_file)
                csv_file.seek(0)
                is_success = s3_handler.upload_file(csv_file, s3_handler.get_key(download_key))
                if is_success:                
                    search.download_key = download_key
                    search.is_upload_success = is_success
                    search.save()
                    download_notify_email.delay(user_id, download_key)


@shared_task
def download_notify_email(user_id, download_key):
    s3_handler = S3Handler()
    download_link = s3_handler.generate_presigned_url(s3_handler.get_key(download_key))
    email_service = UserLeadsEmailService(user_id)
    subject = "Your Unlimited Leads download is ready."
    email_service.send_user_lead_download_link(download_link,subject)

@shared_task
def download_notify_email_scrapping(user_id, download_key):
    s3_handler = S3Handler()
    download_link = s3_handler.generate_presigned_url(s3_handler.get_key(download_key))
    email_service = UserLeadsEmailService(user_id)
    subject = "Your Unlimited Leads scrapping is ready."
    email_service.send_user_lead_download_link(download_link,subject)


@shared_task
def notify_saved_search_new_records():
    saved_searches = UserLeadSearch.objects.filter(
        Q(is_saved=True)
        & Q(is_deleted=False)
        & Q(new_records_found=False)
        & ~Q(receive_notification=UserLeadSearch.NO)
    )

    logger.info(f"Found {saved_searches.count()} saved searches to notify")
    leads = Lead.objects.filter(is_deleted=False)
    for saved_search in saved_searches:
        filter_set = UserLeadFilter(
            data=saved_search.search_filter_parameter, queryset=leads
        )

        queryset = filter_set.qs
        max_version = queryset.aggregate(max_version=Max("version_num", default=0))["max_version"]
        if max_version <= saved_search.lead_version:
            continue

        new_records_count = queryset.filter(version_num=max_version).count()
        saved_search.new_records_found = True
        saved_search.save()

        twilio = TwilioService()
        email_service = UserLeadsEmailService(saved_search.user.id)

        phone_number = saved_search.user.phone_number
        if saved_search.receive_notification == UserLeadSearch.EMAIL:
            email_service.send_saved_search_new_records_email(new_records_count)
        else:
            if phone_number is not None:
                twilio.send_sms(
                    phone_number,
                    f"Hi, we've found {format_count_to_short(new_records_count)} new leads that match the filter you saved in Unlimited Leads. Check them out now!",
                )

            if saved_search.receive_notification == UserLeadSearch.BOTH:
                email_service.send_saved_search_new_records_email(new_records_count)



PROXIES = [
    "http://45.77.24.239:3128",
    "http://103.152.5.254:8080",
    "http://89.223.121.160:8080",
    "http://159.65.69.157:9300",
    "http://20.99.152.172:3128",
]

USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
]

proxy_pool = cycle(PROXIES)

def scrape_emails_phones(url):
    proxy = next(proxy_pool)
    headers = {"User-Agent": random.choice(USER_AGENTS)}

    try:
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, "html.parser")

        # Extract text content
        page_text = soup.get_text(separator=" ")

        # Regex for extracting emails and phone numbers
        phone_pattern = r"\+?\d{1,4}?[-.\s]?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4,6}"
        email_pattern = r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}"

        phones = set(re.findall(phone_pattern, page_text))
        emails = set(re.findall(email_pattern, page_text))

        return list(emails)[:5], list(phones)[:5]
    except Exception as e:
        return [], []

@shared_task
def start_scrape_task(task_id, user_id):
    """Celery task to perform scraping."""
    try:
        task = ScrappingSearchTask.objects.get(id=task_id)
        user = UnlimitedLeadUser.objects.get(id=user_id)
        

        logger.info(f"[TASK START] Task ID: {task.id}, User ID: {user.id}")
        task.task_status = "In Progress"
        task.save()

        query = f"{task.search_keyword} {task.lead_type} {task.location} (contact OR 'get in touch' OR 'about us' OR 'customer service' OR 'business directory' OR 'contact details' OR 'phone number' OR 'email')"
        logger.info(f"[SEARCH QUERY] {query}")

        # List to store the results for CSV generation
        results = []

        for url in search(query, num_results=10):
            logger.info(f"[PROCESSING URL] {url}")
            emails, phones = scrape_emails_phones(url)

            if emails or phones:
                result = SrcappingSearchResult.objects.create(
                    user=user,
                    task=task,
                    website_name=url.split("//")[-1].split("/")[0],
                    website_link=url,
                    phone_number=", ".join(phones),
                    email=", ".join(emails),
                )
                logger.info(f"[RESULT SAVED] URL: {url}, Emails: {emails}, Phones: {phones}")

                # Add result to CSV list
                results.append({
                    'website_name': result.website_name,
                    'website_link': result.website_link,
                    'phone_number': result.phone_number,
                    'email': result.email,
                })

            time.sleep(random.uniform(1, 5))  # Random delay

        # Create the CSV file in memory using StringIO
        if results:
            result_df = pd.DataFrame(results)

            # Use StringIO as an in-memory file-like object
            buffer = io.StringIO()
            result_df.to_csv(buffer, index=False)  # Convert DataFrame to CSV and write to buffer
            buffer.seek(0)  # Go to the beginning of the buffer

            # Upload the processed file to S3
            s3_handler = S3Handler()
            result_file_key = s3_handler.get_key(f"scrapped-data/{task.id}_results.csv")
            
            # Upload the CSV file to S3
            if not s3_handler.upload_file(buffer, result_file_key):
                logger.error(f"[UPLOAD FAILED] Task ID: {task.id}, S3 Key: {result_file_key}")

            
            # Optionally, store the result file key in the task model

            logger.info(f"[CSV UPLOADED] Task ID: {task.id}, S3 Key: {result_file_key}")
        task.download_key = f"scrapped-data/{task.id}_results.csv"
        task.task_status = "completed"
        task.save()
        download_link = s3_handler.generate_presigned_url(result_file_key)
        if task.notification_type == 'email':
            email_service = UserLeadsEmailService(user_id)
            subject = "Your website search is completed."
            try:
                email_service.send_user_scrapping_link(download_link, subject)
            except Exception:
                logger.exception("Failed to send email notification")
                # Optionally, take additional action, like retrying or alerting
        elif task.notification_type == 'sms':
            sms_service = TwilioService()
            phone_number = user.phone_number  # Assuming user.phone_number exists
            message = f"Your website search is completed{download_link}"
            try:
                sms_sid = sms_service.send_sms(to=phone_number, message=message)
                if sms_sid:
                    logger.info(f"SMS sent successfully, SID: {sms_sid}")
                else:
                    logger.warning("SMS notification failed")
            except Exception:
                logger.exception("Failed to send SMS notification")
        logger.info(f"[TASK COMPLETED] Task ID: {task.id}")
    except ScrappingSearchTask.DoesNotExist:
        logger.error(f"[TASK NOT FOUND] Task ID: {task_id}")
    except Exception as e:
        logger.error(f"[TASK ERROR] Task ID: {task_id}, Error: {str(e)}")