File: //home/arjun/projects/unlimited-leads/Unlimited-Leads-Be/Admin/views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from rest_framework.permissions import IsAdminUser, IsAuthenticated
from Admin.tasks import process_csv_file
from Admin.models import LeadData, TemplateFile, Lead
from services.s3_handler import S3Handler
from datetime import datetime, timezone
from celery.result import AsyncResult
import logging
from Admin.serializers import LeadSerializer, UploadCSVSerializer
from rest_framework.parsers import MultiPartParser
from drf_yasg.utils import swagger_auto_schema
from drf_yasg import openapi
from rest_framework.pagination import PageNumberPagination
from authorization.models import UnlimitedLeadUser
from payment.models import Transaction
from Admin.serializers import UserDetailSerializer,UserListSerializer
from django.db.models import Q
from drf_yasg.utils import swagger_auto_schema
from drf_yasg import openapi
from rest_framework.permissions import IsAdminUser
from django.db.models import Min
from Admin.helper import download_template_csv_file
from rest_framework.generics import ListAPIView
from rest_framework.filters import OrderingFilter
from django_filters.rest_framework import DjangoFilterBackend
from django.shortcuts import get_object_or_404
from Admin.paginations import LeadPagination, UserPagination
from Admin.filters import LeadFilter, UserFilter
from django.db.models import Count, Sum, F, Value, CharField, When, IntegerField, Case
from django.db.models.functions import Coalesce
from user.models import EmailOrPhoneVerifyProcessedFile
from django.db.models.functions import Concat
from .models import Country, State, City
logger = logging.getLogger(__name__)
s3_handler = S3Handler()
class UploadLeadCSVView(APIView):
"""
APIView to upload csv files to S3 bucket and insert unique data to DB and notify the Admin via email.
"""
permission_classes = [IsAdminUser, IsAuthenticated]
parser_classes = [MultiPartParser] # Required for handling file uploads
@swagger_auto_schema(
operation_summary="Add Lead data",
operation_description="Only admin users can upload lead data in CSV format.",
manual_parameters=[
openapi.Parameter(
name="files",
in_=openapi.IN_FORM,
type=openapi.TYPE_ARRAY,
items=openapi.Items(type=openapi.TYPE_FILE),
description="CSV files to upload.",
required=True,
)
],
responses={
status.HTTP_201_CREATED: openapi.Response(
description="Saved successfully.",
schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"data": openapi.Schema(type=openapi.TYPE_STRING),
"status": openapi.Schema(type=openapi.TYPE_INTEGER),
},
),
),
status.HTTP_400_BAD_REQUEST: openapi.Response(
description="Bad request, validation error.",
schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"error": openapi.Schema(
type=openapi.TYPE_ARRAY,
items=openapi.Schema(type=openapi.TYPE_STRING),
),
"status": openapi.Schema(type=openapi.TYPE_INTEGER),
},
),
),
status.HTTP_503_SERVICE_UNAVAILABLE: "Currently Unavailable.",
},
request_body=None, # Must be None because we define `manual_parameters`
)
def post(self, request):
logger.info(f"User: {request.user}")
# Validate the input using the serializer
serializer = UploadCSVSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
files = serializer.validated_data['files']
if not files:
logger.info("No files provided for upload")
return Response({
"success": False,
"message": "No files provided for upload",
"errors": "No files provided for upload",
"statusCode": status.HTTP_400_BAD_REQUEST,
}, status=status.HTTP_400_BAD_REQUEST)
uploaded_files = []
year, month, day = (
str(datetime.now().year),
str(datetime.now().month).zfill(2),
str(datetime.now().day).zfill(2),
)
for file in files:
logger.info(f"Processing {file.name}")
unique_file_name = f"{file.name}-{datetime.now(timezone.utc)}"
key_path = f"lead-csv-files/{year}/{month}/{day}/{unique_file_name}"
key = s3_handler.get_key(key_path)
# Create LeadData record
lead_data = LeadData.objects.create(s3_key=key)
success = s3_handler.upload_file(file, key)
if not success:
logger.debug(f"Uploading failed for {file.name}")
lead_data.status = LeadData.UPLOAD_FAILED
lead_data.upload_end_time = datetime.now(timezone.utc)
lead_data.save()
continue
logger.info(f"{file.name} uploaded to S3 successfully!")
lead_data.status = LeadData.UPLOAD_COMPLETED
lead_data.upload_end_time = datetime.now(timezone.utc)
lead_data.save()
uploaded_files.append({"file_id": lead_data.file_id, "file_name": file.name})
return Response({
"success": True,
"message": "Files uploaded successfully.",
"uploaded_files": uploaded_files,
"statusCode": status.HTTP_201_CREATED,
}, status=status.HTTP_201_CREATED)
class TriggerUploadLeadCSVView(APIView):
"""
APIView to trigger the Celery task for processing uploaded CSV files.
"""
permission_classes = [IsAdminUser, IsAuthenticated]
@swagger_auto_schema(
operation_summary="Trigger CSV Processing Task",
operation_description="Triggers a Celery task to process uploaded CSV files.",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"file_ids": openapi.Schema(
type=openapi.TYPE_ARRAY,
items=openapi.Schema(type=openapi.TYPE_STRING),
description="List of file IDs to process.",
),
},
),
responses={
status.HTTP_202_ACCEPTED: openapi.Response(
description="Task triggered successfully.",
schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"success": openapi.Schema(type=openapi.TYPE_BOOLEAN),
"message": openapi.Schema(type=openapi.TYPE_STRING),
"task_id": openapi.Schema(type=openapi.TYPE_STRING),
},
),
),
status.HTTP_400_BAD_REQUEST: "Bad request, validation error.",
},
)
def post(self, request):
file_ids = request.data.get("file_ids", [])
if not file_ids:
return Response({
"success": False,
"message": "No file IDs provided.",
"statusCode": status.HTTP_400_BAD_REQUEST,
}, status=status.HTTP_400_BAD_REQUEST)
# Fetch LeadData objects
lead_data_records = LeadData.objects.filter(file_id__in=file_ids, status=LeadData.UPLOAD_COMPLETED)
if not lead_data_records.exists():
return Response({
"success": False,
"message": "No valid files found for processing.",
"statusCode": status.HTTP_400_BAD_REQUEST,
}, status=status.HTTP_400_BAD_REQUEST)
# Prepare task data
keys = [record.s3_key for record in lead_data_records]
file_ids = [record.file_id for record in lead_data_records]
file_names = [record.s3_key.split("/")[-1] for record in lead_data_records]
# Trigger Celery task
task = process_csv_file.delay(keys, file_ids, file_names, request.user.id)
return Response({
"success": True,
"message": "Task triggered successfully.",
"task_id": task.id,
"statusCode": status.HTTP_202_ACCEPTED,
}, status=status.HTTP_202_ACCEPTED)
class TaskStatusView(APIView):
"""Endpoint to view the status of uploaded files"""
def get(self, request, file_id):
try:
lead_data = LeadData.objects.get(file_id=file_id)
task_result = AsyncResult(lead_data.task_id)
return Response({
"file_id": str(lead_data.file_id),
"task_id": lead_data.task_id,
"status": task_result.status,
"result": task_result.result if task_result.status == "FAILURE" else None
}, status=status.HTTP_200_OK)
except LeadData.DoesNotExist:
return Response({"error": "File not found"}, status=status.HTTP_404_NOT_FOUND)
class UploadTemplateCSVView(APIView):
"""
APIView to upload sample.csv
"""
permission_classes = [IsAdminUser, IsAuthenticated]
parser_classes = [MultiPartParser] # Required for handling file uploads
@swagger_auto_schema(
operation_summary="Upload sample CSV template",
operation_description="Only admin users can upload lead data in CSV format. The `type` parameter specifies the category of the data.",
manual_parameters=[
openapi.Parameter(
name="files",
in_=openapi.IN_FORM,
type=openapi.TYPE_ARRAY,
items=openapi.Items(type=openapi.TYPE_FILE),
description="CSV files to upload.",
required=True,
),
openapi.Parameter(
name="type",
in_=openapi.IN_FORM,
type=openapi.TYPE_STRING,
description=(
"Type of the data being uploaded. "
"Valid values are: 'lead', 'email', 'phone', 'both'."
),
enum=["lead", "email", "phone", "both"], # Define valid values
required=True,
),
],
responses={
status.HTTP_201_CREATED: openapi.Response(
description="Saved successfully.",
schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"data": openapi.Schema(type=openapi.TYPE_STRING, description="Success message"),
"status": openapi.Schema(type=openapi.TYPE_INTEGER, description="HTTP status code"),
},
),
),
status.HTTP_400_BAD_REQUEST: openapi.Response(
description="Bad request, validation error.",
schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"error": openapi.Schema(
type=openapi.TYPE_ARRAY,
items=openapi.Schema(type=openapi.TYPE_STRING, description="Validation errors"),
),
"status": openapi.Schema(type=openapi.TYPE_INTEGER, description="HTTP status code"),
},
),
),
status.HTTP_503_SERVICE_UNAVAILABLE: openapi.Response(
description="Currently Unavailable.",
),
},
request_body=None, # Must be None because we define `manual_parameters`
)
def post(self, request):
# Validate the input using the serializer
serializer = UploadCSVSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
files = serializer.validated_data['files']
type = serializer.validated_data['type']# Assuming 'files' is the key in FE
for file in files:
if not file:
logger.info("No file provided for upload")
return Response({
"success" : False,
"errors": "No file provided for upload",
"statusCode": status.HTTP_400_BAD_REQUEST
}, status=status.HTTP_400_BAD_REQUEST)
# Unique file name
unique_file_name = f"{file.name}-{datetime.now(timezone.utc)}"
key_path = s3_handler.get_key(f"template-csv-files/{unique_file_name}")
# Create a template file
template_file = TemplateFile.objects.create()
template_file.s3_key = key_path
# Save the file to S3
success = s3_handler.upload_file(file, key_path)
if not success:
template_file.status = TemplateFile.FAILED
template_file.save()
return Response({
"success" : False,
"errors": "Failed to upload file to S3",
"statusCode": status.HTTP_500_INTERNAL_SERVER_ERROR
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
template_file.status = TemplateFile.COMPLETED
template_file.type = type
template_file.save()
return Response({
"success" : True,
"message": "File uploaded successfully.",
"statusCode" : status.HTTP_202_ACCEPTED
}, status=status.HTTP_202_ACCEPTED)
class DownloadTemplateCSVFileView(APIView):
"""
APIView to handle downloading a template CSV file via a presigned S3 URL based on the specified type.
"""
permission_classes = [IsAuthenticated]
@swagger_auto_schema(
operation_summary="Download Template CSV File",
operation_description=(
"Provides a presigned S3 URL for downloading a template CSV file based on the specified type. "
"Valid types are: 'lead', 'email', 'phone', 'both'."
),
manual_parameters=[
openapi.Parameter(
"type",
openapi.IN_QUERY,
description=(
"The type of the template to download. Valid values are: 'lead', 'email', 'phone', 'both'."
),
type=openapi.TYPE_STRING,
required=True,
example="lead",
),
],
responses={
200: openapi.Response(
description="File is ready for download.",
examples={
"application/json": {
"success": True,
"message": "File is ready for download.",
"data": "https://example-s3-url.com/file.csv",
"statusCode": 200,
}
},
),
400: openapi.Response(
description="Invalid request parameters.",
examples={
"application/json": {
"success": False,
"message": "Type query parameter is required.",
"statusCode": 400,
}
},
),
404: openapi.Response(
description="No templates found.",
examples={
"application/json": {
"success": False,
"message": "No template found for type 'lead'. Kindly upload if you have any.",
"statusCode": 404,
}
},
),
500: openapi.Response(
description="An unexpected error occurred.",
examples={
"application/json": {
"success": False,
"message": "An unexpected error occurred: error details.",
"statusCode": 500,
}
},
),
},
)
def get(self, request):
try:
# Get the type from query parameters
template_type = request.query_params.get("type")
if not template_type:
return Response(
{
"success": False,
"message": "Type query parameter is required.",
"statusCode": status.HTTP_400_BAD_REQUEST,
},
status=status.HTTP_400_BAD_REQUEST,
)
# Validate the type value
if template_type not in dict(TemplateFile.TYPE_CHOICES).keys():
return Response(
{
"success": False,
"message": f"Invalid type value. Allowed values are: {', '.join(dict(TemplateFile.TYPE_CHOICES).keys())}.",
"statusCode": status.HTTP_400_BAD_REQUEST,
},
status=status.HTTP_400_BAD_REQUEST,
)
# Filter TemplateFile based on type and is_deleted
template_file = TemplateFile.objects.filter(
is_deleted=False, type=template_type, status=TemplateFile.COMPLETED
).first()
if template_file is None:
return Response(
{
"success": False,
"message": f"No template found for type '{template_type}'. Kindly upload if you have any.",
"statusCode": status.HTTP_404_NOT_FOUND,
},
status=status.HTTP_404_NOT_FOUND,
)
# Generate the presigned URL
presigned_url = download_template_csv_file(template_file.s3_key)
if presigned_url:
return Response(
{
"success": True,
"message": "File is ready for download.",
"data": presigned_url,
"statusCode": status.HTTP_200_OK,
},
status=status.HTTP_200_OK,
)
else:
return Response(
{
"success": False,
"message": "Failed to generate download URL.",
"errors": "Failed to generate download URL.",
"statusCode": status.HTTP_500_INTERNAL_SERVER_ERROR,
},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
return Response(
{
"success": False,
"message": f"An unexpected error occurred: {str(e)}.",
"errors": f"An unexpected error occurred: {str(e)}.",
"statusCode": status.HTTP_500_INTERNAL_SERVER_ERROR,
},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
class LeadListView(ListAPIView):
"""
API view to list and filter Leads.
"""
permission_classes = [IsAuthenticated]
queryset = Lead.objects.filter(is_deleted=False)
serializer_class = LeadSerializer
filter_backends = [DjangoFilterBackend, OrderingFilter]
filterset_class = LeadFilter
ordering_fields = ['created_on', 'first_name', 'last_name', 'city', 'state', 'full_name']
ordering = ['-created_on']
pagination_class = LeadPagination
def get_queryset(self):
"""
Annotate the queryset with a computed 'full_name' field.
"""
return Lead.objects.filter(is_deleted=False).annotate(
full_name=Concat(
F('first_name'), Value(' '), F('last_name'),
output_field=CharField()
)
)
@swagger_auto_schema(
operation_summary="List Leads",
operation_description=(
"Retrieve a paginated list of leads with filtering and sorting capabilities. "
"You can filter by email, phone, city, state, country, and SIC code. "
"Ordering is supported on fields like `created_on`, `first_name`, `last_name` `city` and `state`."
),
manual_parameters=[
openapi.Parameter(
'search', openapi.IN_QUERY,
description="Search leads by keywords in business name, first name, last name, email, phone, city, state, country or date in MM-DD-YYYY format.",
type=openapi.TYPE_STRING
),
openapi.Parameter(
'email_present', openapi.IN_QUERY,
description="Filter leads that have an email address. Use `true` or `false`.",
type=openapi.TYPE_BOOLEAN
),
openapi.Parameter(
'phone_present', openapi.IN_QUERY,
description="Filter leads that have a phone number. Use `true` or `false`.",
type=openapi.TYPE_BOOLEAN
),
openapi.Parameter(
'lead_type', openapi.IN_QUERY,
description="Filter by lead type. Options: `Business`, `Consumer`.",
type=openapi.TYPE_STRING,
enum=['Business', 'Consumer']
),
openapi.Parameter(
'city', openapi.IN_QUERY,
description="Filter by city name (case-insensitive, wildcard).",
type=openapi.TYPE_STRING
),
openapi.Parameter(
'state', openapi.IN_QUERY,
description="Filter by state/province name (case-insensitive, wildcard).",
type=openapi.TYPE_STRING
),
openapi.Parameter(
'country', openapi.IN_QUERY,
description="Filter by country name (case-insensitive, wildcard).",
type=openapi.TYPE_STRING
),
openapi.Parameter(
'sic_code', openapi.IN_QUERY,
description="Filter by SIC code (exact match).",
type=openapi.TYPE_INTEGER
),
openapi.Parameter(
'ordering', openapi.IN_QUERY,
description=(
"Specify ordering fields, e.g., `?ordering=created_on` for ascending or "
"`?ordering=-created_on` for descending."
),
type=openapi.TYPE_STRING
),
openapi.Parameter(
'page', openapi.IN_QUERY,
description="Page number for paginated results.",
type=openapi.TYPE_INTEGER
),
openapi.Parameter(
'page_size', openapi.IN_QUERY,
description="Number of items per page (default is 10, max is 1000).",
type=openapi.TYPE_INTEGER
),
],
responses={
200: openapi.Response(
description="A paginated list of leads.",
examples={
"application/json": {
"success": True,
"message": "Leads retrieved successfully.",
"statusCode": 200,
"count": 100,
"current_page": 1,
"next_page": 2,
"previous_page": None,
"total_pages": 10,
"data": [
{
"lead_type": "Consumer",
"lead_id": "d2a109db-ed8e-4ff9-9dfa-1e38fb489c8f",
"business_name": "ABC Corp",
"first_name": "John",
"last_name": "Doe",
"email": "john.doe@example.com",
"phone": "1234567890",
"city": "New York",
"state": "NY",
"country": "USA",
"sic_code": 1234,
"created_on": "2024-11-22T09:11:38.089774Z"
},
]
}
},
),
400: openapi.Response(description="Invalid request parameters."),
401: openapi.Response(description="Authentication credentials were not provided or are invalid."),
500: openapi.Response(description="An unexpected server error occurred."),
},
)
def get(self, request, *args, **kwargs):
try:
return super().get(request, *args, **kwargs)
except Exception as e:
logger.error(f"Error occurred on LeadListView: {str(e)}")
return Response(
{
"success": False,
"message": f"An unexpected error occurred: {str(e)}.",
"errors": f"An unexpected error occurred: {str(e)}.",
"statusCode": status.HTTP_500_INTERNAL_SERVER_ERROR
},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
class DeleteLeadView(APIView):
"""
APIView to soft delete a Lead object by its lead_id (UUID).
"""
def delete(self, request, lead_id):
try:
# Fetch the lead object or return 404 if not found
lead = get_object_or_404(Lead, lead_id=lead_id)
# Perform soft delete operation
lead.is_deleted = True
lead.save()
return Response(
{
"success" : True,
"message": "Lead deleted successfully.",
"statusCode" : status.HTTP_204_NO_CONTENT
},
status=status.HTTP_200_OK
)
except Exception as e:
return Response(
{
"success" : False,
"errors": f"An error occurred: {str(e)}",
"statusCode" : status.HTTP_500_INTERNAL_SERVER_ERROR
},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
class UserDetailView(APIView):
permission_classes = [IsAdminUser]
@swagger_auto_schema(
operation_description="Retrieve detailed information about a specific user.",
responses={
200: openapi.Response(
description="User details",
schema=UserDetailSerializer
),
}
)
def get(self, request, user_id):
user = UnlimitedLeadUser.objects.filter(id=user_id, is_deleted=False).first()
if not user:
return Response({"message": "User not found"}, status=status.HTTP_404_NOT_FOUND)
serializer = UserDetailSerializer(user)
return Response({"data": serializer.data, "message": "Operation completed successfully."}, status=status.HTTP_200_OK)
class BlockUnblockUserView(APIView):
permission_classes = [IsAdminUser]
@swagger_auto_schema(
operation_description="Block or unblock a user by action (block/unblock).",
responses={
200: openapi.Response(
description="Action successful",
schema=openapi.Schema(type=openapi.TYPE_OBJECT, properties={
'message': openapi.Schema(type=openapi.TYPE_STRING)
})
),
400: "Invalid action"
}
)
def patch(self, request, user_id, action):
user = UnlimitedLeadUser.objects.filter(id=user_id).first()
if not user:
return Response({"message": "User not found"}, status=status.HTTP_404_NOT_FOUND)
if action == "block":
user.is_active = False
user.save()
elif action == "unblock":
user.is_active = True
user.save()
else:
return Response({"message": "Invalid action. Use 'block' or 'unblock'."}, status=status.HTTP_400_BAD_REQUEST)
return Response(
{
"message": f"User has been {'blocked' if action == 'block' else 'unblocked'} successfully.",
"is_active": user.is_active
},
status=status.HTTP_200_OK
)
class UserListView(ListAPIView):
permission_classes = [IsAdminUser]
queryset = UnlimitedLeadUser.objects.filter(is_deleted=False)
serializer_class = UserListSerializer
filter_backends = [DjangoFilterBackend, OrderingFilter]
filterset_class = UserFilter
ordering_fields = ['name', 'email', 'date_joined', 'first_name', 'subscription_status', 'credit_limit']
ordering = ['date_joined']
pagination_class = UserPagination
def get_queryset(self):
try:
return UnlimitedLeadUser.objects.filter(is_deleted=False, is_superuser=False).annotate(
name=Concat(
F('first_name'), Value(' '), F('last_name'), output_field=CharField()
),
)
except Exception as e:
return Response(
{
"success": False,
"message": f"An unexpected error occurred: {str(e)}.",
"statusCode": status.HTTP_500_INTERNAL_SERVER_ERROR,
},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
class AdminDashboardSummary(APIView):
permission_classes = [IsAdminUser] # Restrict access to admin users
@swagger_auto_schema(method='GET')
def get(self, request):
try:
current_date = datetime.now()
current_month = current_date.month
current_year = current_date.year
# Signed-up users count
signed_up_users_count = UnlimitedLeadUser.objects.filter(is_deleted=False,is_superuser=False).count()
# Users count based on subscription plans
users_count_by_plans = (
Transaction.objects.filter(subscription_status='active')
.values("product__plan_name")
.annotate(user_count=Count("customer", distinct=True))
)
active_users_count = UnlimitedLeadUser.objects.filter(is_verified=True, is_deleted=False ,is_superuser=False).count()
inactive_users_count = UnlimitedLeadUser.objects.filter(is_verified=False, is_deleted=False ,is_superuser=False).count()
# Active users count
subscribed_users_count = UnlimitedLeadUser.objects.filter(
transaction__subscription_status='active'
).distinct().count()
# Total revenue
total_revenue = (
Transaction.objects.filter(payment_status="succeeded")
.aggregate(total_revenue=Sum("amount"))["total_revenue"]
or 0
)
# Total revenue by plans with currency fallback to "USD"
revenue_by_plans = (
Transaction.objects.filter(payment_status="succeeded")
.values("product__plan_name")
.annotate(
total_revenue=Sum("amount"),
currency=Coalesce(F("product__currency"), Value("USD", output_field=CharField()))
)
)
monthly_revenue = (
Transaction.objects.filter(
payment_status="succeeded",
transaction_date__month=current_month,
transaction_date__year=current_year
)
.aggregate(total_monthly_revenue=Sum("amount"))["total_monthly_revenue"]
or 0
)
# Annual revenue (based on the current year)
annual_revenue = (
Transaction.objects.filter(
payment_status="succeeded",
transaction_date__year=current_year
)
.aggregate(total_annual_revenue=Sum("amount"))["total_annual_revenue"]
or 0
)
# Prepare response data
response_data = {
"active_users_count": active_users_count,
"inactive_users_count": inactive_users_count,
"signed_up_users_count": signed_up_users_count,
"subscribed_users_count": subscribed_users_count,
"users_count_by_plans": list(users_count_by_plans),
"total_revenue": {
"amount": total_revenue,
"currency": "usd", # Assuming the total revenue is in USD
},
"monthly_revenue": {
"amount": monthly_revenue,
"currency": "usd", # Assuming default to USD
},
"annual_revenue": {
"amount": annual_revenue,
"currency": "usd", # Assuming default to USD
},
"revenue_by_plans": list(revenue_by_plans),
}
return Response({
"success": True,
"message": "Dashboard summary fetched successfully.",
"data": response_data
}, status=200)
except Exception as e:
return Response({
"success": False,
"message": f"An error occurred: {str(e)}",
}, status=500)
class ValidationCountGraph(APIView):
permission_classes = [IsAdminUser] # Restrict access to admin users
@swagger_auto_schema(
operation_summary="Get Validation Count Graph Data",
operation_description="Returns validation counts grouped by date. Optionally filter results by a date range.",
manual_parameters=[
openapi.Parameter(
"start_date",
openapi.IN_QUERY,
description="Filter results from this start date (format: YYYY-MM-DD).",
type=openapi.TYPE_STRING,
required=False,
example="2024-01-01"
),
openapi.Parameter(
"end_date",
openapi.IN_QUERY,
description="Filter results up to this end date (format: YYYY-MM-DD).",
type=openapi.TYPE_STRING,
required=False,
example="2024-12-31"
),
],
responses={
200: openapi.Response(
description="Validation count graph data fetched successfully.",
examples={
"application/json": {
"success": True,
"message": "Validation count graph data fetched successfully.",
"data": [
{
"date": "2024-01-01",
"validation_count": 100
},
{
"date": "2024-01-02",
"validation_count": 150
}
]
}
},
),
400: openapi.Response(
description="Invalid date format",
examples={
"application/json": {
"success": False,
"message": "Invalid date format. Use YYYY-MM-DD."
}
},
),
500: openapi.Response(
description="An error occurred",
examples={
"application/json": {
"success": False,
"message": "An error occurred: error details."
}
},
),
},
)
def get(self, request):
try:
# Get date range from query parameters
start_date = request.query_params.get("start_date")
end_date = request.query_params.get("end_date")
# Validate and parse dates
try:
if start_date:
start_date = datetime.strptime(start_date, "%Y-%m-%d")
if end_date:
end_date = datetime.strptime(end_date, "%Y-%m-%d")
except ValueError:
return Response({
"success": False,
"message": "Invalid date format. Use YYYY-MM-DD."
}, status=400)
# Filter files based on date range (if provided)
queryset = EmailOrPhoneVerifyProcessedFile.objects.all()
if start_date:
queryset = queryset.filter(processed_at__date__gte=start_date)
if end_date:
queryset = queryset.filter(processed_at__date__lte=end_date)
# Aggregate validation counts grouped by date
validation_data = (
queryset
.values("processed_at__date") # Group by date
.annotate(
validation_count=Sum(F('total_email_count') + F('total_phone_num_count'))
)
.order_by("processed_at__date")
)
total_validation_count = sum(data['validation_count'] or 0 for data in validation_data)
total_days = len(validation_data)
average_validation_count = total_validation_count / total_days if total_days > 0 else 0
# Format response data
graph_data = [
{
"date": data['processed_at__date'].strftime("%Y-%m-%d"),
"validation_count": data['validation_count'] or 0
}
for data in validation_data
]
return Response({
"success": True,
"message": "Validation count graph data fetched successfully.",
"data": {
"graph_data": graph_data,
"average_validation_count": average_validation_count
}
}, status=200)
except Exception as e:
return Response({
"success": False,
"message": f"An error occurred: {str(e)}",
}, status=500)
class RevenueGraph(APIView):
permission_classes = [IsAdminUser] # Restrict access to admin users
@swagger_auto_schema(
operation_summary="Get Revenue Graph Data",
operation_description="Returns revenue data grouped by date. Optionally filter results by a date range.",
manual_parameters=[
openapi.Parameter(
"start_date",
openapi.IN_QUERY,
description="Filter results from this start date (format: YYYY-MM-DD).",
type=openapi.TYPE_STRING,
required=False,
example="2024-01-01"
),
openapi.Parameter(
"end_date",
openapi.IN_QUERY,
description="Filter results up to this end date (format: YYYY-MM-DD).",
type=openapi.TYPE_STRING,
required=False,
example="2024-12-31"
),
],
responses={
200: openapi.Response(
description="Revenue graph data fetched successfully.",
examples={
"application/json": {
"success": True,
"message": "Revenue graph data fetched successfully.",
"data": [
{
"date": "2024-01-01",
"revenue": 1500.00
},
{
"date": "2024-01-02",
"revenue": 2000.00
}
]
}
},
),
400: openapi.Response(
description="Invalid date format",
examples={
"application/json": {
"success": False,
"message": "Invalid date format. Use YYYY-MM-DD."
}
},
),
500: openapi.Response(
description="An error occurred",
examples={
"application/json": {
"success": False,
"message": "An error occurred: error details."
}
},
),
},
)
def get(self, request):
try:
# Get date range from query parameters
start_date = request.query_params.get("start_date")
end_date = request.query_params.get("end_date")
# Validate and parse dates
try:
if start_date:
start_date = datetime.strptime(start_date, "%Y-%m-%d")
if end_date:
end_date = datetime.strptime(end_date, "%Y-%m-%d")
except ValueError:
return Response({
"success": False,
"message": "Invalid date format. Use YYYY-MM-DD."
}, status=400)
# Filter transactions based on date range
queryset = Transaction.objects.filter(payment_status="succeeded")
if start_date:
queryset = queryset.filter(transaction_date__date__gte=start_date)
if end_date:
queryset = queryset.filter(transaction_date__date__lte=end_date)
# Aggregate revenue grouped by date
revenue_data = (
queryset
.values("transaction_date__date") # Group by date
.annotate(revenue=Sum("amount")) # Calculate total revenue
.order_by("transaction_date__date")
)
total_revenue = sum(data['revenue'] or 0 for data in revenue_data)
total_days = len(revenue_data)
average_revenue_per_day = total_revenue / total_days if total_days > 0 else 0
# Format response data
graph_data = [
{
"date": data['transaction_date__date'].strftime("%Y-%m-%d"),
"revenue": float(data['revenue'] or 0)
}
for data in revenue_data
]
return Response({
"success": True,
"message": "Revenue graph data fetched successfully.",
"data": {
"graph_data": graph_data,
"average_revenue_per_day": round(average_revenue_per_day, 2) # Rounded to 2 decimal places
}
}, status=200)
except Exception as e:
return Response({
"success": False,
"message": f"An error occurred: {str(e)}",
}, status=500)
class UserCountGraph(APIView):
permission_classes = [IsAdminUser] # Restrict access to admin users
@swagger_auto_schema(
operation_summary="Get User Count Graph Data",
operation_description="Returns user count data grouped by date. Optionally filter results by a date range.",
manual_parameters=[
openapi.Parameter(
"start_date",
openapi.IN_QUERY,
description="Filter results from this start date (format: YYYY-MM-DD).",
type=openapi.TYPE_STRING,
required=False,
example="2024-01-01"
),
openapi.Parameter(
"end_date",
openapi.IN_QUERY,
description="Filter results up to this end date (format: YYYY-MM-DD).",
type=openapi.TYPE_STRING,
required=False,
example="2024-12-31"
),
],
responses={
200: openapi.Response(
description="User count graph data fetched successfully.",
examples={
"application/json": {
"success": True,
"message": "User count graph data fetched successfully.",
"data": [
{
"date": "2024-01-01",
"user_count": 10
},
{
"date": "2024-01-02",
"user_count": 15
}
]
}
},
),
400: openapi.Response(
description="Invalid date format",
examples={
"application/json": {
"success": False,
"message": "Invalid date format. Use YYYY-MM-DD."
}
},
),
500: openapi.Response(
description="An error occurred",
examples={
"application/json": {
"success": False,
"message": "An error occurred: error details."
}
},
),
},
)
def get(self, request):
try:
# Get date range from query parameters
start_date = request.query_params.get("start_date")
end_date = request.query_params.get("end_date")
# Validate and parse dates
try:
if start_date:
start_date = datetime.strptime(start_date, "%Y-%m-%d")
if end_date:
end_date = datetime.strptime(end_date, "%Y-%m-%d")
except ValueError:
return Response({
"success": False,
"message": "Invalid date format. Use YYYY-MM-DD."
}, status=400)
# Filter users based on date range
queryset = UnlimitedLeadUser.objects.filter(is_deleted=False,is_superuser=False)
if start_date:
queryset = queryset.filter(date_joined__gte=start_date)
if end_date:
queryset = queryset.filter(date_joined__lte=end_date)
# Aggregate user count grouped by date_joined
user_data = (
queryset
.values("date_joined__date") # Group by date_joined
.annotate(user_count=Count("id")) # Count users per day
.order_by("date_joined__date")
)
total_users = sum(data["user_count"] for data in user_data)
total_days = len(user_data)
average_user_count_per_day = int(total_users / total_days) if total_days > 0 else 0 # No decimals
# Format response data, excluding None values for date_joined
graph_data = [
{
"date": data['date_joined__date'].strftime("%Y-%m-%d") if data['date_joined__date'] else 'No Date',
"user_count": data['user_count']
}
for data in user_data
]
return Response({
"success": True,
"message": "User count graph data fetched successfully.",
"data": {
"graph_data": graph_data,
"average_user_count_per_day": average_user_count_per_day # Rounded to 2 decimal places
}
}, status=200)
except Exception as e:
return Response({
"success": False,
"message": f"An error occurred: {str(e)}",
}, status=500)
class SearchcountGraph(APIView):
permission_classes = [IsAdminUser] # Restrict access to admin users
# @swagger_auto_schema(
# operation_summary="Get Revenue Graph Data",
# operation_description="Returns revenue data grouped by date. Optionally filter results by a date range.",
# manual_parameters=[
# openapi.Parameter(
# "start_date",
# openapi.IN_QUERY,
# description="Filter results from this start date (format: YYYY-MM-DD).",
# type=openapi.TYPE_STRING,
# required=False,
# example="2024-01-01"
# ),
# openapi.Parameter(
# "end_date",
# openapi.IN_QUERY,
# description="Filter results up to this end date (format: YYYY-MM-DD).",
# type=openapi.TYPE_STRING,
# required=False,
# example="2024-12-31"
# ),
# ],
# responses={
# 200: openapi.Response(
# description="Revenue graph data fetched successfully.",
# examples={
# "application/json": {
# "success": True,
# "message": "Revenue graph data fetched successfully.",
# "data": [
# {
# "date": "2024-01-01",
# "revenue": 1500.00
# },
# {
# "date": "2024-01-02",
# "revenue": 2000.00
# }
# ]
# }
# },
# ),
# 400: openapi.Response(
# description="Invalid date format",
# examples={
# "application/json": {
# "success": False,
# "message": "Invalid date format. Use YYYY-MM-DD."
# }
# },
# ),
# 500: openapi.Response(
# description="An error occurred",
# examples={
# "application/json": {
# "success": False,
# "message": "An error occurred: error details."
# }
# },
# ),
# },
# )
def get(self, request):
try:
# # Get date range from query parameters
# start_date = request.query_params.get("start_date")
# end_date = request.query_params.get("end_date")
# # Validate and parse dates
# try:
# if start_date:
# start_date = datetime.strptime(start_date, "%Y-%m-%d")
# if end_date:
# end_date = datetime.strptime(end_date, "%Y-%m-%d")
# except ValueError:
# return Response({
# "success": False,
# "message": "Invalid date format. Use YYYY-MM-DD."
# }, status=400)
# # Filter transactions based on date range
# queryset = Transaction.objects.filter(payment_status="succeeded")
# if start_date:
# queryset = queryset.filter(transaction_date__date__gte=start_date)
# if end_date:
# queryset = queryset.filter(transaction_date__date__lte=end_date)
# # Aggregate revenue grouped by date
# revenue_data = (
# queryset
# .values("transaction_date__date") # Group by date
# .annotate(revenue=Sum("amount")) # Calculate total revenue
# .order_by("transaction_date__date")
# )
# total_revenue = sum(data['revenue'] or 0 for data in revenue_data)
# total_days = len(revenue_data)
# average_revenue_per_day = total_revenue / total_days if total_days > 0 else 0
# # Format response data
# graph_data = [
# {
# "date": data['transaction_date__date'].strftime("%Y-%m-%d"),
# "revenue": float(data['revenue'] or 0)
# }
# for data in revenue_data
# ]
return Response({
"success": True,
"message": "Search count graph data fetched successfully.",
"data": {
"graph_data": [
{
"date": "2024-11-27",
"search_count": 58
},
{
"date": "2024-11-28",
"search_count": 12
}
],
"average_search_count": 35
}
}, status=200)
except Exception as e:
return Response({
"success": False,
"message": f"An error occurred: {str(e)}",
}, status=500)
class LocationsView(APIView):
@swagger_auto_schema(
operation_summary="Search location list",
manual_parameters=[
openapi.Parameter(
"search",
openapi.IN_QUERY,
description="Location from country, city, state",
type=openapi.TYPE_STRING,
),
],
responses={
status.HTTP_200_OK: openapi.Response(
description="",
schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"success": openapi.Schema(type=openapi.TYPE_BOOLEAN),
"data": openapi.Schema(
type=openapi.TYPE_ARRAY,
items=openapi.Schema(type=openapi.TYPE_STRING),
),
"statusCode": openapi.Schema(type=openapi.TYPE_INTEGER),
},
),
)
},
)
def get(self, request):
location = request.query_params.get("search")
if location:
countries = Country.objects.filter(name__icontains=location)[
:10
].values_list("name", flat=True)
states = State.objects.filter(name__icontains=location)[:10].values_list(
"name", flat=True
)
cities = City.objects.filter(name__icontains=location)[:10].values_list(
"name", flat=True
)
data = list(countries) + list(states) + list(cities)
else:
data = Country.objects.all()[:10].values_list("name", flat=True)
return Response(
{
"success": True,
"data": data,
"statusCode": status.HTTP_200_OK,
},
status=status.HTTP_200_OK,
)