File: //home/arjun/projects/unlimited-leads/Unlimited-Leads-Be/user/views.py
import re
import io
import csv
import os
import math
from typing import Counter
from django.forms import ValidationError
from rest_framework.parsers import MultiPartParser, FormParser
from rest_framework.exceptions import NotFound, AuthenticationFailed
import uuid
from django.shortcuts import get_object_or_404, render
from rest_framework.decorators import api_view
from rest_framework.response import Response
import logging
from rest_framework.views import APIView
from rest_framework.response import Response
from user.models import EmailOrPhoneVerifyFileUpload,EmailOrPhoneVerifyProcessedFile,ThirdPartyServicePerRequestCost,ScrappingSearchTask,SrcappingSearchResult
from django.core.exceptions import ObjectDoesNotExist
import io
import csv
from drf_yasg.utils import swagger_auto_schema
from drf_yasg import openapi
from rest_framework.permissions import IsAuthenticated
from rest_framework.views import APIView
from rest_framework.generics import ListAPIView, CreateAPIView
from rest_framework import status
from rest_framework.parsers import MultiPartParser
from user.serializers import UserProfileSerializer, ChangePasswordSerializer, MyPlanSerializer,uploadEmailOrPhoneSeriaizer,EmailOrPhoneVerifyProcessedFileSerializer,EmailOrPhoneVerifyFileUploadSerializer,ScrappingSearchTaskGetSerializer,SrcappingSearchResultSerializer
from django.contrib.auth import update_session_auth_hash
from services.s3_handler import S3Handler
import os
from rest_framework.exceptions import PermissionDenied
from django.contrib.auth.hashers import make_password
from payment.models import Transaction
import pandas as pd
from user.tasks import process_uploaded_data,start_scrape_task
from user.paginations import UserPagination
from rest_framework.generics import ListAPIView
from rest_framework.parsers import MultiPartParser, FormParser
from rest_framework.filters import OrderingFilter
from django.contrib.auth import update_session_auth_hash
from django.db.models import Max
from django_filters.rest_framework import DjangoFilterBackend
from services.s3_handler import S3Handler
from utils.utils import get_first_error
from Admin.models import Lead
from Admin.serializers import LeadSerializer
from payment.models import Transaction
from django.conf import settings
from user.serializers import UserProfileSerializer, ChangePasswordSerializer, MyPlanSerializer, UserLeadSavedSearchCreateSerializer, UserLeadSavedSearchSerializer, UserLeadSearchParamSerializer, UserLeadDownloadCreateSerializer,ChangePasswordSerializer,ScrappingSearchTaskSerializer
from .models import UserLeadsSearchUsage, UserLeadSearch
from .helpers import UserSearchLimit
from .paginations import UserSearchPagination
from .filters import UserLeadFilter
from .tasks import download_user_leads_search, download_notify_email,download_notify_email_scrapping
from authorization.models import UnlimitedLeadUser
logger = logging.getLogger(__name__)
# @api_view(['GET'])
# def hello_world(request):
# logger.debug("hello_world API called - DEBUG level") # Log a debug message
# logger.info("hello_world API called - INFO level") # Log an info message
# logger.warning("hello_world API - WARNING test message") # Log a warning (for testing)
# logger.error("hello_world API - ERROR test message") # Log an error (for testing)
# return Response({"message": "Hello, World!"})
class UserProfileView(APIView):
permission_classes = [IsAuthenticated]
parser_classes = [MultiPartParser,FormParser]
def _get_serializer(self, request, partial=False):
"""Helper method to initialize the serializer."""
user = request.user
if request.method == "GET":
return UserProfileSerializer(user, context={"request": request})
elif request.method == "PATCH":
return UserProfileSerializer(
user, data=request.data, partial=partial, context={"request": request}
)
def get(self, request):
serializer = self._get_serializer(request)
s3_handler = S3Handler()
user = request.user
active_subscription = Transaction.objects.filter(
customer=user,
subscription_status='active',
is_subscription=True
).order_by('-subscription_start').first()
is_subscription = active_subscription is not None
response_data = {
"data": {
"id": str(user.id),
"profile_image": s3_handler.generate_presigned_url(user.profile_image) if user.profile_image else None,
"first_name": serializer.data['first_name'],
"last_name": serializer.data['last_name'],
"phone_number": serializer.data['phone_number'],
"email": serializer.data['email'],
"credit_limit":serializer.data['credit_limit'],
"is_subscription": is_subscription,
},
"success": True,
"message": "User profile retrieved successfully.",
"statusCode": 200,
}
return Response(response_data, status=status.HTTP_200_OK)
@swagger_auto_schema(
manual_parameters=[
openapi.Parameter(
name="profile_image",
in_=openapi.IN_FORM,
type=openapi.TYPE_FILE,
description="Profile image to upload.",
required=False, # Because it's not mandatory in your serializer
),
],
request_body=UserProfileSerializer,
responses={
200: openapi.Response(
description="User profile updated successfully.",
schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"data": openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"id": openapi.Schema(type=openapi.TYPE_STRING, description="User ID"),
"profile_image": openapi.Schema(
type=openapi.TYPE_STRING,
format=openapi.FORMAT_URI,
description="Profile image URL or None",
),
"first_name": openapi.Schema(type=openapi.TYPE_STRING, description="First name"),
"last_name": openapi.Schema(type=openapi.TYPE_STRING, description="Last name"),
"email": openapi.Schema(type=openapi.TYPE_STRING, description="Email address"),
"phone_number": openapi.Schema(type=openapi.TYPE_STRING, description="Phone number"),
},
),
"success": openapi.Schema(type=openapi.TYPE_BOOLEAN),
"message": openapi.Schema(type=openapi.TYPE_STRING),
"statusCode": openapi.Schema(type=openapi.TYPE_INTEGER),
},
),
),
400: openapi.Response(
description="Validation error or bad request.",
schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"error": openapi.Schema(type=openapi.TYPE_OBJECT),
"success": openapi.Schema(type=openapi.TYPE_BOOLEAN),
"message": openapi.Schema(type=openapi.TYPE_STRING),
"statusCode": openapi.Schema(type=openapi.TYPE_INTEGER),
},
),
),
},
)
def patch(self, request):
serializer = self._get_serializer(request, partial=True)
user = request.user
s3_handler = S3Handler()
if serializer.is_valid():
# Handle profile image upload or deletion
image_file = request.FILES.get('profile_image')
if image_file:
try:
file_extension = os.path.splitext(image_file.name)[1]
if file_extension.lower() not in ['.jpg', '.jpeg', '.png']:
return Response(
{"error": "Only JPG and PNG formats are supported."},
status=status.HTTP_400_BAD_REQUEST,
)
key = f"{user.id}/profile/avatar{file_extension}"
success = s3_handler.upload_image(image_file, key)
if success:
serializer.validated_data['profile_image'] = s3_handler.get_key(key)
else:
return Response(
{"error": "Failed to upload image. Please try again."},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
except Exception as e:
# pass
logger.error(f"Image upload failed: {e}")
return Response(
{"error": "An unexpected error occurred during image upload."},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
elif 'profile_image' in request.data and request.data['profile_image'] is None:
# Delete existing profile image
if user.profile_image:
try:
s3_handler.delete_object(user.profile_image)
serializer.validated_data['profile_image'] = None
except Exception as e:
logger.error(f"Image deletion failed: {e}")
# Save updated user data
serializer.save()
p_image = serializer.validated_data.get('profile_image', user.profile_image)
response_data = {
"data": {
"id": str(user.id),
"profile_image": s3_handler.generate_presigned_url(s3_handler.get_key(p_image)) if p_image else None,
"first_name": serializer.validated_data.get('first_name', user.first_name),
"last_name": serializer.validated_data.get('last_name', user.last_name),
"email": serializer.validated_data.get('email', user.email),
"phone_number": serializer.validated_data.get('phone_number', user.phone_number),
},
"success": True,
"message": "User profile updated successfully.",
"statusCode": 200,
}
return Response(response_data, status=status.HTTP_200_OK)
return Response(
{
"error": get_first_error(serializer.errors, "Profile edit failed"),
"success": False,
"message": "Failed to update profile.",
"statusCode": 400,
},
status=status.HTTP_400_BAD_REQUEST,
)
class ChangePasswordView(APIView):
permission_classes = [IsAuthenticated]
@swagger_auto_schema(
request_body=ChangePasswordSerializer,
responses={
200: openapi.Response(
description="Password changed successfully.",
schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"success": openapi.Schema(type=openapi.TYPE_BOOLEAN),
"message": openapi.Schema(type=openapi.TYPE_STRING),
"statusCode": openapi.Schema(type=openapi.TYPE_INTEGER),
},
),
),
400: openapi.Response(
description="Validation error or bad request.",
schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"error": openapi.Schema(type=openapi.TYPE_OBJECT),
"success": openapi.Schema(type=openapi.TYPE_BOOLEAN),
"message": openapi.Schema(type=openapi.TYPE_STRING),
"statusCode": openapi.Schema(type=openapi.TYPE_INTEGER),
},
),
),
},
)
def post(self, request):
serializer = ChangePasswordSerializer(data=request.data)
if serializer.is_valid():
user = request.user
old_password = serializer.validated_data['old_password']
# Check if the old password is correct
if not user.check_password(old_password):
return Response(
{
"errors": "Incorrect password",
"success": False,
"message": "Incorrect password",
"statusCode": 400,
},
status=status.HTTP_400_BAD_REQUEST
)
if serializer.validated_data['new_password'] == old_password:
return Response(
{
"errors": "New password should not be the same as the old password",
"success": False,
"message": "New password should not be the same as the old password",
"statusCode": 400,
},
status=status.HTTP_400_BAD_REQUEST
)
# Set the new password
user.set_password(serializer.validated_data['new_password'])
user.save()
# Update session to prevent logout after password change
update_session_auth_hash(request, user)
response_data = {
"success": True,
"message": "Password changed successfully.",
"statusCode": 200
}
return Response(response_data, status=status.HTTP_200_OK)
return Response(
{
"errors": get_first_error(serializer.errors, "Failed to change password."),
"success": False,
"message": "Failed to change password.",
"statusCode": 400
},
status=status.HTTP_400_BAD_REQUEST
)
class MyPlanView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request):
user = request.user
# Fetch the active subscription for the logged-in user
active_subscription = Transaction.objects.filter(
customer=user,
subscription_status='active',
is_subscription=True
).order_by('-subscription_start').first()
# if not active_subscription:
# return Response(
# {"detail": "No active subscription found."},
# status=status.HTTP_404_NOT_FOUND
# )
if not active_subscription:
return Response(
{
"data": {},
"success": True,
"message": "No active subscription found.",
"statusCode": 200
},
status=status.HTTP_200_OK
)
serializer = MyPlanSerializer(active_subscription)
response_data = {
"data": serializer.data,
"success": True,
"message": "Active subscription retrieved successfully.",
"statusCode": 200
}
return Response(response_data, status=status.HTTP_200_OK)
logger = logging.getLogger('emailverification')
class UploadEmailOrPhoneUploadView(APIView):
parser_classes = [MultiPartParser, FormParser]
permission_classes = [IsAuthenticated]
def __init__(self):
self.MIN_STRIPE_AMOUNT = float(settings.MIN_STRIPE_AMOUNT)
@swagger_auto_schema(
manual_parameters=[
openapi.Parameter(
name="files",
in_=openapi.IN_FORM,
type=openapi.TYPE_ARRAY, # Accept multiple files
items=openapi.Items(type=openapi.TYPE_FILE),
description="One or more files containing emails or phone numbers.",
required=False,
),
openapi.Parameter(
name="data_type",
in_=openapi.IN_FORM,
type=openapi.TYPE_STRING,
description="Data type: 'email', 'phone', or 'both'.",
required=True,
),
openapi.Parameter(
name="data",
in_=openapi.IN_FORM,
type=openapi.TYPE_STRING,
description="Comma-separated list of emails or phone numbers.",
required=False,
),
openapi.Parameter(
name="notification_type",
in_=openapi.IN_FORM,
type=openapi.TYPE_STRING,
description="notification_type: 'email', 'sms', or 'both'.",
required=True,
),
],
tags=['Email or Number Verification'],
# request_body=uploadEmailOrPhoneSeriaizer,
responses={
200: openapi.Response(
description="Upload files for email or phone verification.",
schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"data": openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"data_type": openapi.Schema(type=openapi.TYPE_STRING, description="email or phone or both"),
"notification_type": openapi.Schema(type=openapi.TYPE_STRING, description="email or sms or both"),
"data": openapi.Schema(type=openapi.TYPE_STRING, description="emailId or phone number"),
"files": openapi.Schema(
type=openapi.TYPE_ARRAY,
items=openapi.Items(type=openapi.TYPE_FILE),
description="Uploaded files.",
),
},
),
"success": openapi.Schema(type=openapi.TYPE_BOOLEAN),
"message": openapi.Schema(type=openapi.TYPE_STRING),
"statusCode": openapi.Schema(type=openapi.TYPE_INTEGER),
},
),
),
400: openapi.Response(
description="Validation error or bad request.",
schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"error": openapi.Schema(type=openapi.TYPE_OBJECT),
"success": openapi.Schema(type=openapi.TYPE_BOOLEAN),
"message": openapi.Schema(type=openapi.TYPE_STRING),
"statusCode": openapi.Schema(type=openapi.TYPE_INTEGER),
},
),
),
},
)
def post(self, request):
user = request.user
combined_data = [] # To collect valid data from all sources
duplicate_emails = Counter() # To count duplicate emails
duplicate_phone_numbers = Counter() # To count duplicate phone numbers
# Validate request data using serializer
serializer = uploadEmailOrPhoneSeriaizer(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
data_type = request.data.get("data_type", "").lower()
notification_type = request.data.get("notification_type", "").lower()
if data_type not in ["email", "phone", "both"]:
return Response({
"status": False,
"message": "Invalid data_type. Use 'email', 'phone', or 'both'.",
"status_code": status.HTTP_400_BAD_REQUEST
}, status=status.HTTP_400_BAD_REQUEST)
if notification_type not in ["email", "sms", "both"]:
return Response({
"status": False,
"message": "Invalid notification_type. Use 'email', 'sms', or 'both'.",
"status_code": status.HTTP_400_BAD_REQUEST
}, status=status.HTTP_400_BAD_REQUEST)
# Handle uploaded files
file_names = []
if 'files' in request.FILES:
for csv_file in request.FILES.getlist('files'):
print(f"Processing file: {csv_file.name}")
file_names.append(csv_file.name)
try:
# Read the CSV file with appropriate column handling
df = pd.read_csv(csv_file, dtype={"phone_number": str}) # Ensure phone numbers are read as strings
print(df)
# Check for required columns based on data_type
if data_type == "email":
if "email" not in df.columns:
return self._file_error_response(csv_file.name, "email")
if "phone_number" in df.columns:
return self._file_error_response(csv_file.name, "extraneous phone data")
elif data_type == "phone":
if "phone_number" not in df.columns:
return self._file_error_response(csv_file.name, "phone")
if "email" in df.columns:
return self._file_error_response(csv_file.name, "extraneous email data")
elif data_type == "both":
if "email" not in df.columns or "phone_number" not in df.columns:
return self._file_error_response(csv_file.name, "both")
# Validate and append data for both email and phone number
for _, row in df.iterrows():
email = row.get("email", "").strip() if isinstance(row.get("email", ""), str) else ""
phone_number = str(row.get("phone_number", "")).strip() if pd.notnull(row.get("phone_number", "")) else ""
# Append valid email or phone_number based on data_type
if email and self.is_valid_email(email):
duplicate_emails[email] += 1
combined_data.append({"email": email})
if phone_number:
if not self.is_valid_phone_number(phone_number):
return Response({
"status": False,
"message": f"Phone number '{phone_number}' is invalid or missing country code.",
"status_code": status.HTTP_400_BAD_REQUEST
}, status=status.HTTP_400_BAD_REQUEST)
duplicate_phone_numbers[phone_number] += 1
combined_data.append({"phone_number": phone_number})
except Exception as e:
logger.error(f"Error processing file {csv_file.name}: {str(e)}")
return Response({
"status": False,
"message": f"Invalid file format for {csv_file.name}.",
"errors": [str(e)],
"status_code": status.HTTP_400_BAD_REQUEST
}, status=status.HTTP_400_BAD_REQUEST)
print(f"Uploaded file names: {file_names}")
# Handle plain text data
elif "data" in request.data:
text_input = request.data.get("data", "").strip()
# Initialize flags to check if both email and phone are present
has_email = False
has_phone = False
# Iterate over each item in the comma-separated input data
for item in text_input.split(','):
item = item.strip()
if data_type in ["email", "both"] and self.is_valid_email(item):
has_email = True
duplicate_emails[item] += 1
combined_data.append({"email": item})
elif data_type in ["phone", "both"] and self.is_valid_phone_number(item):
has_phone = True
duplicate_phone_numbers[item] += 1
combined_data.append({"phone_number": item})
else:
return Response({
"status": False,
"message": f"Invalid input: '{item}'. Ensure phone numbers include a country code.",
"status_code": status.HTTP_400_BAD_REQUEST
}, status=status.HTTP_400_BAD_REQUEST)
# After processing, check if both email and phone numbers are present for "both" data_type
if data_type == "both":
if not has_email or not has_phone:
# Return response if either emails or phone numbers are missing
return Response({
"success": False,
"status_code": status.HTTP_400_BAD_REQUEST,
"message": "Both email and phone number must be provided for 'both' data type."
}, status=status.HTTP_400_BAD_REQUEST)
logger.info("Both email and phone numbers are present in the input data.")
print(combined_data)
if not combined_data:
return Response({
"status": False,
"message": "No valid emails or phone numbers found.",
"status_code": status.HTTP_400_BAD_REQUEST
}, status=status.HTTP_400_BAD_REQUEST)
# Remove duplicates
unique_emails = list(set([item['email'] for item in combined_data if 'email' in item]))
unique_phone_numbers = list(set([item['phone_number'] for item in combined_data if 'phone_number' in item]))
final_data = []
for email in unique_emails:
final_data.append({"email": email})
for phone_number in unique_phone_numbers:
final_data.append({"phone_number": phone_number})
# Count unique and duplicate values
unique_email_count = len(unique_emails)
unique_phone_number_count = len(unique_phone_numbers)
duplicate_email_count = sum(duplicate_emails.values()) - unique_email_count
duplicate_phone_number_count = sum(duplicate_phone_numbers.values()) - unique_phone_number_count
# Combine and upload data
print(final_data)
s3_handler = S3Handler()
file_name = s3_handler.get_key(f"email-or-phone-validation/{user.id}_{uuid.uuid4()}_data.csv")
try:
csv_buffer = self.create_csv(final_data)
file_key = s3_handler.upload_file(csv_buffer, file_name)
if not file_key:
raise Exception("File upload failed.")
except Exception as e:
logger.error(f"Error: {str(e)}")
return Response({
"status": False,
"message": "Error uploading data.",
"errors": str(e),
"status_code": status.HTTP_500_INTERNAL_SERVER_ERROR
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# Save record in database
file_names_str = ','.join(file_names) if file_names else "Data From Input Field"
cost = ThirdPartyServicePerRequestCost.objects.all()
email_cost = cost.get(service='email')
phone_cost = cost.get(service='phone')
calculation = self.calculate_total(data_type, unique_email_count, unique_phone_number_count, cost, user,self.MIN_STRIPE_AMOUNT)
FileUploadObject = EmailOrPhoneVerifyFileUpload.objects.create(
user=user,
file_key=file_name,
status="uploaded",
payment_status=False,
type = data_type,
email_count = unique_email_count,
phone_num_count = unique_phone_number_count,
uploaded_file_name=file_names_str,
notification_type = notification_type,
credit_used=calculation["credit_used"],
remaining_credit=calculation["remaining_credit"],
credit_to_return=calculation["credit_to_return"],
payment_mode="credit" if not calculation["payment"] else "normal",
)
FileUploadObject.save()
if data_type == 'email':
response_data = {
"status": True,
"message": "Files uploaded successfully.",
"data": {
"id":FileUploadObject.id,
"file_key": file_name,
"unique_email_count": unique_email_count,
"email_per_request_cost": email_cost.amount,
"total_amount": calculation["total_amount"],
"currency": email_cost.currency,
"credit_used": calculation["credit_used"],
"remaining_credit": calculation["remaining_credit"],
"credit_to_return": calculation["credit_to_return"],
"duplicate_email_count": duplicate_email_count,
"payment":calculation['payment']
},
"status_code": status.HTTP_200_OK,
}
elif data_type == 'phone':
response_data = {
"status": True,
"message": "Files uploaded successfully.",
"data": {
"id":FileUploadObject.id,
"file_key": file_name,
"unique_phone_number_count": unique_phone_number_count,
"phone_per_request_cost": phone_cost.amount,
"total_amount": calculation["total_amount"],
"currency": phone_cost.currency,
"credit_used": calculation["credit_used"],
"remaining_credit": calculation["remaining_credit"],
"credit_to_return": calculation["credit_to_return"],
"duplicate_phone_number_count": duplicate_phone_number_count,
"payment":calculation['payment']
},
"status_code": status.HTTP_200_OK,
}
elif data_type == "both":
response_data = {
"status": True,
"message": "Files uploaded successfully.",
"data": {
"id":FileUploadObject.id,
"file_key": file_name,
"unique_email_count": unique_email_count,
"email_per_request_cost": email_cost.amount,
"unique_phone_number_count": unique_phone_number_count,
"phone_per_request_cost": phone_cost.amount,
"duplicate_email_count": duplicate_email_count,
"duplicate_phone_number_count": duplicate_phone_number_count,
"email_total_amount": round(email_cost.amount * unique_email_count, 2),
"phone_total_amount": round(phone_cost.amount * unique_phone_number_count, 2),
"total_amount": calculation["total_amount"],
"currency": email_cost.currency or phone_cost.currency,
"credit_used": calculation["credit_used"],
"remaining_credit": calculation["remaining_credit"],
"credit_to_return": calculation["credit_to_return"],
"payment":calculation['payment']
},
"status_code": status.HTTP_200_OK,
}
return Response(response_data, status=status.HTTP_200_OK)
def _file_error_response(self, file_name, expected_type):
return Response({
"status": False,
"message": f"The file {file_name} must contain required columns for '{expected_type}'.",
"status_code": status.HTTP_400_BAD_REQUEST
}, status=status.HTTP_400_BAD_REQUEST)
def is_valid_email(self, email):
# Basic email validation
return "@" in email
def is_valid_phone_number(self, phone_number):
# Basic phone number validation using regex
phone_number_regex = r'^\+\d{1,3}[-.\s]?\d{1,14}$'
return re.match(phone_number_regex, phone_number) is not None
def create_csv(self, data):
# Gather all possible fieldnames
fieldnames = ["email", "phone_number"]
# Create a CSV file from the given data and return a buffer
buffer = io.StringIO()
writer = csv.DictWriter(buffer, fieldnames=fieldnames)
writer.writeheader()
# Ensure each row has both 'email' and 'phone_number' keys, with None for missing values
for row in data:
# Ensure the row contains both 'email' and 'phone_number', even if they're missing
row.setdefault('email', '') # If 'email' is missing, add an empty string
row.setdefault('phone_number', '') # If 'phone_number' is missing, add an empty string
writer.writerow(row)
buffer.seek(0)
return buffer
def calculate_total(self,data_type, unique_email_count, unique_phone_number_count, cost, user,min_stripe_cost):
min_stripe_amount=min_stripe_cost
# Get user credit
credit = UnlimitedLeadUser.objects.filter(id=user.id).first().credit_limit
# Retrieve costs per service
email_cost = cost.get(service='email').amount if data_type in ['email', 'both'] else 0
phone_cost = cost.get(service='phone').amount if data_type in ['phone', 'both'] else 0
# Calculate totals
email_total = email_cost * unique_email_count
phone_total = phone_cost * unique_phone_number_count
total_amount = email_total + phone_total
total_count = unique_email_count + unique_phone_number_count
# If credit is sufficient to cover the entire count
if credit > 0 and total_count <= credit:
new_credit = credit - total_count
return {
"payment": False,
"credit_used": total_count,
"remaining_credit": new_credit,
"total_amount": 0.0,
"credit_to_return": 0,
}
# If credit partially covers the count
if credit > 0:
# Deduct credit from total count
total_count -= credit
total_amount = email_cost * total_count
credit_used = credit
else:
credit_used = 0
# Ensure Stripe minimum payment ($0.5)
if total_amount < min_stripe_amount:
adjusted_amount = min_stripe_amount
credit_to_return = round(adjusted_amount - total_amount, 2)*100
else:
adjusted_amount = round(total_amount, 2)
credit_to_return = 0
# Payment required
return {
"payment": True,
"credit_used": credit_used,
"remaining_credit": 0 if credit <= 0 else credit - credit_used,
"total_amount": adjusted_amount,
"credit_to_return": credit_to_return,
}
class ProcessUploadedFileView(APIView):
permission_classes = [IsAuthenticated]
@swagger_auto_schema(
operation_summary="Trigger File Processing Task",
operation_description=(
"This endpoint triggers a background task to process the uploaded file after payment. "
"It validates the provided `file_key` and `payment_id`, checks payment status, and initiates the processing."
),
tags=['Email or Number Verification'],
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
required=["file_key"], # Make payment_id optional
properties={
"file_key": openapi.Schema(
type=openapi.TYPE_STRING,
description="Unique identifier for the uploaded file in S3."
),
"payment_id": openapi.Schema(
type=openapi.TYPE_STRING,
description="Payment identifier for verifying payment status. Required for non-credit payments."
),
},
),
responses={
200: openapi.Response(
description="Processing started successfully.",
examples={
"application/json": {
"status": True,
"message": "Processing started. Please check the status later.",
"status_code": 200
}
}
),
400: openapi.Response(
description="Validation or payment error.",
examples={
"application/json": {
"status": False,
"message": "Payment not successful.",
"status_code": 400
}
}
),
404: openapi.Response(
description="File or payment record not found.",
examples={
"application/json": {
"detail": "Not found."
}
}
),
},
)
def post(self, request, *args, **kwargs):
"""
This view triggers the Celery task to process the uploaded data after payment.
Requires file_key and (conditionally) payment_id as inputs.
"""
# Retrieve parameters from the request
file_key = request.data.get("file_key")
payment_id = request.data.get("payment_id")
user = request.user
# Validate input parameters
if not file_key:
return Response({
"status": False,
"message": "file_key is required.",
"status_code": status.HTTP_400_BAD_REQUEST
}, status=status.HTTP_400_BAD_REQUEST)
# Fetch the file upload record from the database
file_upload = get_object_or_404(EmailOrPhoneVerifyFileUpload, file_key=file_key)
# Check if the payment mode is 'credit'
if file_upload.payment_mode == 'credit':
# Skip payment verification if payment mode is credit
pass
else:
# Validate payment_id
if not payment_id:
return Response({
"status": False,
"message": "payment_id is required for non-credit payments.",
"status_code": status.HTTP_400_BAD_REQUEST
}, status=status.HTTP_400_BAD_REQUEST)
# Fetch the transaction details to check payment status
transaction = get_object_or_404(Transaction, transaction_id=payment_id, customer= user)
if transaction.payment_status in ['pending', 'failed']:
return Response({
"status": False,
"message": "Payment not successful.",
"status_code": status.HTTP_400_BAD_REQUEST
}, status=status.HTTP_400_BAD_REQUEST)
# Check if the file has already been processed
if file_upload.status in ["processing","completed"] :
return Response({
"status": False,
"message": "File is already being processed.",
"status_code": status.HTTP_400_BAD_REQUEST
}, status=status.HTTP_400_BAD_REQUEST)
# Update the FileUpload status to 'processing'
file_upload.status = "processing"
file_upload.save()
if not file_upload.status=='failed':
leadUser= UnlimitedLeadUser.objects.get(id=user.id)
leadUser.credit_limit = file_upload.remaining_credit + file_upload.credit_to_return
leadUser.save()
# Trigger the Celery task to process the data
process_uploaded_data.apply_async(
args=[file_key, file_upload.user.id, payment_id],
countdown=10 # Optionally add a delay before task starts
)
# Return response indicating task was triggered
return Response({
"status": True,
"message": "Processing started. Please check the status later.",
"status_code": status.HTTP_200_OK
}, status=status.HTTP_200_OK)
class ProcessedFileListView(ListAPIView):
permission_classes = [IsAuthenticated]
serializer_class = EmailOrPhoneVerifyProcessedFileSerializer
pagination_class = UserPagination
def get_queryset(self):
# Ensure the user is authenticated
if not self.request.user.is_authenticated:
raise AuthenticationFailed("Authentication credentials were not provided.")
# Filter files based on the logged-in user
files = EmailOrPhoneVerifyProcessedFile.objects.filter(user=self.request.user).order_by('-processed_at')
# If no files are found, raise a NotFound exception
# if not files.exists():
# raise NotFound("No processed files found for the user.")
return files
def list(self, request, *args, **kwargs):
# Get the paginated queryset
queryset = self.filter_queryset(self.get_queryset())
page = self.paginate_queryset(queryset)
if not queryset.exists():
return Response(
{
"data": [],
"success": True,
"message": "No data found.",
"statusCode": 200
},
status=status.HTTP_200_OK
)
# Create an instance of S3Handler to generate presigned URLs
s3_handler = S3Handler()
# Process the paginated data
files_with_urls = []
for file in page:
# Generate the presigned URL for the file's key
presigned_url = s3_handler.generate_presigned_url(file.file_key)
# Serialize file data
file_data = self.get_serializer(file).data
file_data['file_url'] = presigned_url # Add the file URL to the serialized data
# Conditionally exclude fields based on the type
if file_data.get('type') == 'email':
file_data.pop('total_valid_phone_numbers', None) # Remove phone-related fields
file_data.pop('total_phone_num_count', None)
elif file_data.get('type') == 'phone':
file_data.pop('total_deliverable_emails', None) # Remove email-related fields
file_data.pop('total_email_count', None)
# Keep all fields if the type is 'both'
files_with_urls.append(file_data)
# Return paginated response
return self.get_paginated_response(files_with_urls)
class EmailOrPhoneVerifyFileUploadListView(ListAPIView):
permission_classes = [IsAuthenticated]
serializer_class = EmailOrPhoneVerifyFileUploadSerializer
pagination_class = UserPagination
def get_queryset(self):
"""
Filters the file uploads for the authenticated user.
"""
return EmailOrPhoneVerifyFileUpload.objects.filter(user=self.request.user).order_by('-uploaded_at')
def list(self, request, *args, **kwargs):
"""
Override the list method to customize the response format.
"""
try:
queryset = self.get_queryset()
if not queryset.exists():
return Response({
'success': False,
'message': 'No file uploads found for the user.',
'statusCode': status.HTTP_200_OK,
'data': []
}, status=status.HTTP_200_OK)
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return Response({
'success': True,
'message': 'File uploads retrieved successfully',
'statusCode': status.HTTP_200_OK,
'data': serializer.data
}, status=status.HTTP_200_OK)
except Exception as e:
return Response({
'success': False,
'message': f"An error occurred: {str(e)}",
'statusCode': status.HTTP_500_INTERNAL_SERVER_ERROR
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class UserLeadsView(ListAPIView, UserSearchLimit):
queryset = Lead.objects.filter(is_deleted=False)
serializer_class = LeadSerializer
filter_backends = [DjangoFilterBackend, OrderingFilter]
filterset_class = UserLeadFilter
ordering_fields = ["created_on", "first_name", "last_name", "city", "state"]
ordering = ["-created_on"]
pagination_class = UserSearchPagination
permission_classes = (IsAuthenticated,)
def list(self, request, *args, **kwargs):
search_param_serializer = UserLeadSearchParamSerializer(data=request.query_params)
if not search_param_serializer.is_valid():
return Response(
{
"success": False,
"message": get_first_error(search_param_serializer.errors, "Save failed"),
"statusCode": status.HTTP_400_BAD_REQUEST,
},
status=status.HTTP_400_BAD_REQUEST,
)
try:
user_search = UserLeadSearch.objects.get(
user=request.user,
search_filter_parameter__exact=search_param_serializer.data,
is_saved=False,
)
except UserLeadSearch.DoesNotExist:
user_search = None
usage, _ = UserLeadsSearchUsage.objects.get_or_create(user=request.user)
plan_limit = self.get_user_plan_limit(request.user)
queryset = self.filter_queryset(self.get_queryset())
query_count = queryset.count()
page = self.paginate_queryset(queryset)
if user_search is None:
if plan_limit is None:
return Response(
{
"success": False,
"message": "Purchase any available plan",
"statusCode": status.HTTP_400_BAD_REQUEST,
},
status=status.HTTP_400_BAD_REQUEST,
)
if self.is_user_limit_exceed(usage.usage_count, plan_limit):
return Response(
{
"success": False,
"message": "Limit Reached",
"statusCode": status.HTTP_400_BAD_REQUEST,
},
status=status.HTTP_400_BAD_REQUEST,
)
# calculate the query count based on the remaining limit
if plan_limit != math.inf:
limit_left = plan_limit - usage.usage_count
if limit_left < 0:
return Response(
{
"success": False,
"message": "Please try again later",
"statusCode": status.HTTP_400_BAD_REQUEST,
},
status=status.HTTP_400_BAD_REQUEST,
)
if query_count > limit_left:
query_count = limit_left
page = self.paginate_queryset(queryset[:query_count])
max_version = queryset.aggregate(Max("version_num"))
user_search = UserLeadSearch.objects.create(
user=request.user,
search_filter_parameter=search_param_serializer.data,
lead_version=max_version["version_num__max"] or 1,
leads_search_count=query_count,
)
usage.usage_count += query_count
usage.save()
serializer = self.get_serializer(page, many=True)
response = self.get_paginated_response(serializer.data)
response.data["search_id"] = user_search.id
return response
# already searched query
query_count_left = query_count - user_search.leads_search_count
if plan_limit == math.inf:
# no calculation for unlimited user
if query_count_left > 0:
user_search.leads_search_count += query_count_left
user_search.save()
usage.usage_count += query_count_left
usage.save()
elif plan_limit is not None:
# If the user is limited, calculate the query count based on the
# remaining limit, and adjust the balance of queries.
limit_left = plan_limit - usage.usage_count
if query_count_left > 0 and limit_left > 0:
if query_count_left > limit_left:
user_search.leads_search_count += limit_left
usage.usage_count += limit_left
else:
user_search.leads_search_count += query_count_left
usage.usage_count += query_count_left
user_search.save()
usage.save()
page = self.paginate_queryset(queryset[:user_search.leads_search_count])
serializer = self.get_serializer(page, many=True)
response = self.get_paginated_response(serializer.data)
response.data["search_id"] = user_search.id
return response
class UserSavedSearchView(APIView):
permission_classes = [IsAuthenticated]
@swagger_auto_schema(
operation_summary="Saved Search List",
responses={
status.HTTP_200_OK: openapi.Response(
description="",
schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"success": openapi.Schema(type=openapi.TYPE_BOOLEAN),
"message": openapi.Schema(type=openapi.TYPE_STRING),
"statusCode": openapi.Schema(type=openapi.TYPE_INTEGER),
"count": openapi.Schema(type=openapi.TYPE_INTEGER),
"current_page": openapi.Schema(type=openapi.TYPE_INTEGER),
"next_page": openapi.Schema(type=openapi.TYPE_INTEGER),
"previous_page": openapi.Schema(type=openapi.TYPE_INTEGER),
"total_pages": openapi.Schema(type=openapi.TYPE_INTEGER),
"data": openapi.Schema(
type=openapi.TYPE_ARRAY,
items=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"new_records_found": openapi.Schema(
type=openapi.TYPE_BOOLEAN
),
"title": openapi.Schema(type=openapi.TYPE_STRING),
"receive_notification": openapi.Schema(
type=openapi.TYPE_INTEGER
),
"search_filter_parameter": openapi.Schema(
type=openapi.TYPE_OBJECT
),
},
),
),
},
),
),
status.HTTP_400_BAD_REQUEST: openapi.Response(
description="Bad request, validation error.",
schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"errors": openapi.Schema(
type=openapi.TYPE_ARRAY,
items=openapi.Schema(type=openapi.TYPE_STRING),
),
"message": openapi.Schema(type=openapi.TYPE_STRING),
"success": openapi.Schema(type=openapi.TYPE_BOOLEAN),
"statusCode": openapi.Schema(type=openapi.TYPE_INTEGER),
},
),
),
},
)
def get(self, request):
user_search = UserLeadSearch.objects.filter(user=request.user, is_saved=True, is_deleted=False)
pagination = UserSearchPagination()
paginated_queryset = pagination.paginate_queryset(
user_search.order_by("-updated_at"), request
)
serializer = UserLeadSavedSearchSerializer(paginated_queryset, many=True)
return pagination.get_paginated_response(serializer.data)
@swagger_auto_schema(
operation_summary="Saved search create",
request_body=UserLeadSavedSearchCreateSerializer,
responses={
status.HTTP_200_OK: openapi.Response(
description="",
schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"data": openapi.Schema(type=openapi.TYPE_STRING),
"message": openapi.Schema(type=openapi.TYPE_STRING),
"success": openapi.Schema(type=openapi.TYPE_BOOLEAN),
"statusCode": openapi.Schema(type=openapi.TYPE_INTEGER),
},
),
),
status.HTTP_400_BAD_REQUEST: openapi.Response(
description="Bad request, validation error.",
schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"errors": openapi.Schema(
type=openapi.TYPE_ARRAY,
items=openapi.Schema(type=openapi.TYPE_STRING),
),
"message": openapi.Schema(type=openapi.TYPE_STRING),
"success": openapi.Schema(type=openapi.TYPE_BOOLEAN),
"statusCode": openapi.Schema(type=openapi.TYPE_INTEGER),
},
),
),
},
)
def post(self, request):
serializer = UserLeadSavedSearchCreateSerializer(
data=request.data, context={"request": request}
)
if not serializer.is_valid():
return Response(
{
"success": False,
"message": get_first_error(serializer.errors, "Save failed"),
"statusCode": status.HTTP_400_BAD_REQUEST,
},
status=status.HTTP_400_BAD_REQUEST,
)
validated_data = serializer.validated_data
title = validated_data.get("title")
receive_notification = validated_data.get("receive_notification")
lead_search = validated_data.get("lead_search")
scrapped_search_id = validated_data.get("scrapped_search_id")
if lead_search.user.id != request.user.id:
return Response(
{
"success": False,
"message": "Invalid search ID.",
"statusCode": status.HTTP_400_BAD_REQUEST,
},
status=status.HTTP_400_BAD_REQUEST,
)
if lead_search.is_saved:
return Response(
{
"success": False,
"message": "Search already saved",
"statusCode": status.HTTP_400_BAD_REQUEST,
},
status=status.HTTP_400_BAD_REQUEST,
)
lead_search.title = title
lead_search.receive_notification = receive_notification
lead_search.search_filter_parameter = lead_search.search_filter_parameter
lead_search.is_saved = True
lead_search.scrapped_search_id=scrapped_search_id
lead_search.save()
return Response(
{
"success": True,
"message": "Saved successfully",
"statusCode": status.HTTP_201_CREATED,
},
status=status.HTTP_201_CREATED,
)
@swagger_auto_schema(
operation_summary="Update Saved Search",
manual_parameters=[
openapi.Parameter(
"id", # The parameter name (matches the query param)
openapi.IN_QUERY, # The parameter is in the query string
description="UUID of the saved search to update",
type=openapi.TYPE_STRING, # UUIDs are strings
),
],
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"title": openapi.Schema(
type=openapi.TYPE_STRING,
description="Updated title for the saved search",
),
},
required=["title"],
),
responses={
status.HTTP_200_OK: openapi.Response(
description="Update successful.",
schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"success": openapi.Schema(type=openapi.TYPE_BOOLEAN),
"message": openapi.Schema(type=openapi.TYPE_STRING),
"statusCode": openapi.Schema(type=openapi.TYPE_INTEGER),
},
),
),
status.HTTP_404_NOT_FOUND: "Search not found.",
},
)
def patch(self, request):
id = request.query_params.get("id")
if not id:
return Response(
{
"success": False,
"message": "Search ID is required.",
"statusCode": status.HTTP_400_BAD_REQUEST,
},
status=status.HTTP_400_BAD_REQUEST,
)
# Validate UUID format
try:
uuid.UUID(id) # This will raise ValueError if not a valid UUID
except ValueError:
return Response(
{
"success": False,
"message": "Invalid UUID format.",
"statusCode": status.HTTP_400_BAD_REQUEST,
},
status=status.HTTP_400_BAD_REQUEST,
)
try:
lead_search = UserLeadSearch.objects.get(id=id, user=request.user, is_saved=True, is_deleted=False)
except UserLeadSearch.DoesNotExist:
return Response(
{
"success": False,
"message": "Search not found.",
"statusCode": status.HTTP_404_NOT_FOUND,
},
status=status.HTTP_404_NOT_FOUND,
)
title = request.data.get("title")
if not title:
return Response(
{
"success": False,
"message": "Title is required.",
"statusCode": status.HTTP_400_BAD_REQUEST,
},
status=status.HTTP_400_BAD_REQUEST,
)
lead_search.title = title
lead_search.save()
return Response(
{
"success": True,
"message": "Title updated successfully.",
"statusCode": status.HTTP_200_OK,
},
status=status.HTTP_200_OK,
)
@swagger_auto_schema(
operation_summary="Delete Saved Search",
manual_parameters=[
openapi.Parameter(
"id", # The parameter name (matches the query param)
openapi.IN_QUERY, # The parameter is in the query string
description="UUID of the saved search to delete",
type=openapi.TYPE_STRING, # UUIDs are strings
),
],
responses={
status.HTTP_200_OK: openapi.Response(
description="Deletion successful.",
schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"success": openapi.Schema(type=openapi.TYPE_BOOLEAN),
"message": openapi.Schema(type=openapi.TYPE_STRING),
"statusCode": openapi.Schema(type=openapi.TYPE_INTEGER),
},
),
),
status.HTTP_404_NOT_FOUND: "Search not found.",
},
)
def delete(self, request):
id = request.query_params.get("id")
if not id:
return Response(
{
"success": False,
"message": "Search ID is required.",
"statusCode": status.HTTP_400_BAD_REQUEST,
},
status=status.HTTP_400_BAD_REQUEST,
)
# Validate UUID format
try:
uuid.UUID(id) # This will raise ValueError if not a valid UUID
except ValueError:
return Response(
{
"success": False,
"message": "Invalid UUID format.",
"statusCode": status.HTTP_400_BAD_REQUEST,
},
status=status.HTTP_400_BAD_REQUEST,
)
try:
lead_search = UserLeadSearch.objects.get(id=id, user=request.user, is_saved=True, is_deleted=False)
except UserLeadSearch.DoesNotExist:
return Response(
{
"success": False,
"message": "Search not found.",
"statusCode": status.HTTP_404_NOT_FOUND,
},
status=status.HTTP_404_NOT_FOUND,
)
lead_search.is_deleted = True
lead_search.save()
return Response(
{
"success": True,
"message": "Search deleted successfully.",
"statusCode": status.HTTP_200_OK,
},
status=status.HTTP_200_OK,
)
class UserLeadsSavedSearchResultsView(ListAPIView):
queryset = Lead.objects.filter(is_deleted=False)
serializer_class = LeadSerializer
pagination_class = UserSearchPagination
permission_classes = (IsAuthenticated,)
def list(self, request, *args, **kwargs):
saved_search_id = kwargs.get("id")
try:
saved_search = UserLeadSearch.objects.get(
id=saved_search_id, user=request.user, is_saved=True, is_deleted=False
)
except UserLeadSearch.DoesNotExist:
return Response(
{
"success": False,
"message": "Search not found",
"statusCode": status.HTTP_404_NOT_FOUND,
},
status=status.HTTP_404_NOT_FOUND,
)
search_filter_param = saved_search.search_filter_parameter
ordering = search_filter_param.get("ordering", "")
queryset = self.get_queryset().filter(
version_num__lte=saved_search.lead_version
)
filter_set = UserLeadFilter(data=search_filter_param, queryset=queryset)
queryset = filter_set.qs
ordering = [order for order in ordering.split(",")]
queryset = queryset.order_by(*ordering)
page = self.paginate_queryset(queryset[:saved_search.leads_search_count])
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
class UserLeadsDownload(CreateAPIView):
permission_classes = (IsAuthenticated,)
serializer_class = UserLeadDownloadCreateSerializer
@swagger_auto_schema(
operation_summary="Download leads",
request_body=UserLeadDownloadCreateSerializer,
responses={
status.HTTP_201_CREATED: openapi.Response(
description="Saved succesfully.",
schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"success": openapi.Schema(type=openapi.TYPE_BOOLEAN),
"message": openapi.Schema(type=openapi.TYPE_STRING),
"statusCode": openapi.Schema(type=openapi.TYPE_INTEGER),
},
),
),
status.HTTP_400_BAD_REQUEST: openapi.Response(
description="Bad request, validation error.",
schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"success": openapi.Schema(type=openapi.TYPE_BOOLEAN),
"message": openapi.Schema(type=openapi.TYPE_STRING),
"statusCode": openapi.Schema(type=openapi.TYPE_INTEGER),
},
),
),
},
)
def post(self, request, *args, **kwargs):
return super().post(request, *args, **kwargs)
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
if not serializer.is_valid():
return Response(
{
"success": False,
"message": get_first_error(serializer.errors, "Invalid search ID"),
"statusCode": status.HTTP_404_NOT_FOUND,
},
status=status.HTTP_404_NOT_FOUND,
)
search = serializer.validated_data["id"]
scrapped_search_id = serializer.validated_data.get("scrapped_search_id") # Optional field
if scrapped_search_id:
key = ScrappingSearchTask.objects.get(id=scrapped_search_id)
download_notify_email_scrapping(request.user.id, key.download_key)
if search.download_key != "" and search.is_upload_success:
download_notify_email.delay(request.user.id, search.download_key)
else:
download_user_leads_search.delay(request.user.id, search.id)
return Response(
{
"success": True,
"message": "Preparing download started. Please check your email after sometime.",
"statusCode": status.HTTP_201_CREATED,
},
status=status.HTTP_201_CREATED,
)
class SrappingSearchTaskView(APIView):
permission_classes = [IsAuthenticated]
@swagger_auto_schema(
operation_summary="Create a Search Task",
operation_description=(
"Create a new search task by providing a search keyword, lead type, location, and SIC code. "
"This will trigger a background Celery task to scrape data."
),
tags=['scrapping'],
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"search_keyword": openapi.Schema(
type=openapi.TYPE_STRING,
description="Search keyword for the scraping task.",
),
"lead_type": openapi.Schema(
type=openapi.TYPE_STRING,
enum=["Business", "Consumer"], # Enum for choice fields
description="Type of lead to search for (Business or Consumer).",
),
"location": openapi.Schema(
type=openapi.TYPE_STRING,
description="Location for the search (e.g., city, state, country).",
),
"notification_type": openapi.Schema(
type=openapi.TYPE_STRING,
description="notification_type",
)
},
required=["search_keyword", "lead_type","notification_type"], # Specify required fields
),
responses={
status.HTTP_201_CREATED: openapi.Response(
description="Task started successfully!",
schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"message": openapi.Schema(
type=openapi.TYPE_STRING, description="Success message."
),
"task_id": openapi.Schema(
type=openapi.TYPE_INTEGER, description="ID of the created task."
),
},
),
),
status.HTTP_400_BAD_REQUEST: openapi.Response(
description="Validation error.",
schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"error": openapi.Schema(
type=openapi.TYPE_STRING, description="Error message."
)
},
),
),
},
)
def post(self, request):
serializer = ScrappingSearchTaskSerializer(data=request.data)
if serializer.is_valid():
search_task = serializer.save(user=request.user)
start_scrape_task.delay(search_task.id,request.user.id) # Trigger the Celery task
return Response({"message": "Task started successfully!", "task_id": search_task.id}, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class SrcappingSearchResultListView(ListAPIView):
permission_classes = [IsAuthenticated]
serializer_class = SrcappingSearchResultSerializer
pagination_class = UserPagination
def get_queryset(self):
# Get task_id from the URL parameters
task_id = self.kwargs.get('task_id')
# Filter SrcappingSearchResult objects based on the authenticated user and task ID
queryset = SrcappingSearchResult.objects.filter(user=self.request.user, task_id=task_id).order_by('-task__created_at')
return queryset
@swagger_auto_schema(
operation_description="Retrieve scraping results for a specific task.",
responses={
status.HTTP_200_OK: 'Scraping results retrieved successfully.',
status.HTTP_400_BAD_REQUEST: 'Bad request.',
status.HTTP_500_INTERNAL_SERVER_ERROR: 'Internal server error.'
},
tags=['Scraping Results']
)
def list(self, request, *args, **kwargs):
try:
# Get the queryset based on task_id
queryset = self.get_queryset()
# If no results are found, return a message
if not queryset.exists():
return Response({
'success': False,
'message': 'No results found for the given task.',
'statusCode': status.HTTP_200_OK,
'data': []
}, status=status.HTTP_200_OK)
# Apply pagination if needed
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
# If no pagination is needed, serialize the full queryset
serializer = self.get_serializer(queryset, many=True)
return Response({
'success': True,
'message': 'Scraping Search Results Retrieved Successfully',
'statusCode': status.HTTP_200_OK,
'data': serializer.data
}, status=status.HTTP_200_OK)
except Exception as e:
return Response({
'success': False,
'message': f"An error occurred: {str(e)}",
'statusCode': status.HTTP_500_INTERNAL_SERVER_ERROR
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class ScrappingTaskResultsView(APIView):
permission_classes = [IsAuthenticated]
pagination_class = UserPagination
def get_queryset(self, task):
# Filter SrcappingSearchResult objects based on the authenticated user and task ID
return SrcappingSearchResult.objects.filter(user=self.request.user, task=task).order_by('-id')
@swagger_auto_schema(
operation_description="Retrieve results for a specific scraping task.",
responses={
status.HTTP_200_OK: 'Task results retrieved successfully.',
status.HTTP_404_NOT_FOUND: 'Task not found.',
status.HTTP_500_INTERNAL_SERVER_ERROR: 'Internal server error.'
},
tags=['scrapping'],
manual_parameters=[
openapi.Parameter(
'page', # The name of the query parameter
openapi.IN_QUERY, # Specifies that this is a query parameter
description="Page number for pagination", # Description of the parameter
type=openapi.TYPE_INTEGER, # Data type of the parameter
required=False, # Optional parameter
default=1 # Default value
),
openapi.Parameter(
'page_size', # Name of the query parameter
openapi.IN_QUERY, # Specifies that this is a query parameter
description="Number of results per page", # Description of the parameter
type=openapi.TYPE_INTEGER, # Data type of the parameter
required=False, # Optional parameter
default=10 # Default value
)
]
)
def get(self, request, task_id):
try:
# Get the task using the provided task_id
task = get_object_or_404(ScrappingSearchTask, id=task_id, user=request.user)
# Check if the task status is "Completed"
if task.task_status != "completed":
return Response({
"task_id": task.id,
"status": task.task_status
}, status=status.HTTP_200_OK)
# If the task is completed, fetch the related results
queryset = self.get_queryset(task)
# Apply pagination
paginator = self.pagination_class()
page = paginator.paginate_queryset(queryset, request, view=self)
if page is not None:
serializer = SrcappingSearchResultSerializer(page, many=True)
return paginator.get_paginated_response(serializer.data)
# If no pagination is needed, serialize the full queryset
serializer = SrcappingSearchResultSerializer(queryset, many=True)
return Response({
"task_id": task.id,
"status": task.task_status,
"results": serializer.data
}, status=status.HTTP_200_OK)
except Exception as e:
return Response({
'success': False,
'message': f"An error occurred: {str(e)}",
'statusCode': status.HTTP_500_INTERNAL_SERVER_ERROR
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)