File: //home/arjun/projects/unlimited-leads/Unlimited-Leads-Be/services/proofy_validation.py
import requests
import json
from typing import Union, List
from django.conf import settings
import time
import logging
logger = logging.getLogger('services')
class ProofyEmailVerifier:
BASE_URL = "https://api.proofy.io"
def __init__(self):
self.PROOFY_USER_ID = settings.PROOFY_USER_ID
self.PROOFY_API_KEY = settings.PROOFY_API_KEY
def initiate_bulk_verification(self, input_data: List[str]) -> str:
"""
Initiates the bulk email verification process.
Returns the 'cid' to track the verification process.
"""
if not isinstance(input_data, list):
raise ValueError("Input data must be a list of email addresses.")
response = requests.post(
f"{self.BASE_URL}/verifylist",
params={"aid": self.PROOFY_USER_ID, "key": self.PROOFY_API_KEY},
data=json.dumps(input_data),
headers={"Content-Type": "application/json"},
)
if response.status_code == 200:
cid = response.json().get("cid")
logger.info(f"Bulk email verification initiated successfully. CID: {cid}")
return cid
else:
logger.error(f"Error initiating bulk email verification: {response.text}")
raise ValueError("Error initiating bulk email verification: " + response.text)
def fetch_bulk_verification_results(self, cid: str) -> List[dict]:
"""
Fetches the results of bulk email verification using the provided 'cid'.
Returns the verification results or a message if not yet complete.
"""
response = requests.get(
f"{self.BASE_URL}/getlistresult",
params={"aid": self.PROOFY_USER_ID, "key": self.PROOFY_API_KEY, "cid": cid},
)
if response.status_code == 200:
data = response.json()
if data.get("checked"):
logger.info(f"Verification results for CID {cid}: {data.get('result')}")
return data.get("result", [])
else:
logger.warning(f"Verification not yet complete for CID {cid}.")
return [{"message": "Check not yet complete. Please retry later."}]
else:
logger.error(f"Error retrieving results for CID {cid}: {response.text}")
raise ValueError("Error retrieving check result: " + response.text)