File: //home/arjun/projects/aigenerator/venv/lib64/python3.12/site-packages/allauth/core/ratelimit.py
import hashlib
import time
from collections import namedtuple
from typing import Optional
from django.conf import settings
from django.core.cache import cache
from django.core.exceptions import ImproperlyConfigured
from django.http import HttpResponse
from django.shortcuts import render
from allauth import app_settings
from allauth.utils import import_callable
Rate = namedtuple("Rate", "amount duration per")
def _parse_duration(duration):
if len(duration) == 0:
raise ValueError(duration)
unit = duration[-1]
value = duration[0:-1]
unit_map = {"s": 1, "m": 60, "h": 3600, "d": 86400}
if unit not in unit_map:
raise ValueError("Invalid duration unit: %s" % unit)
if len(value) == 0:
value = 1
else:
value = float(value)
return value * unit_map[unit]
def _parse_rate(rate):
parts = rate.split("/")
if len(parts) == 2:
amount, duration = parts
per = "ip"
elif len(parts) == 3:
amount, duration, per = parts
else:
raise ValueError(rate)
amount = int(amount)
duration = _parse_duration(duration)
return Rate(amount, duration, per)
def _parse_rates(rates):
ret = []
if rates:
rates = rates.strip()
if rates:
parts = rates.split(",")
for part in parts:
ret.append(_parse_rate(part.strip()))
return ret
def _cache_key(request, *, action, rate, key=None, user=None):
from allauth.account.adapter import get_adapter
if rate.per == "ip":
source = ("ip", get_adapter().get_client_ip(request))
elif rate.per == "user":
if user is None:
if not request.user.is_authenticated:
raise ImproperlyConfigured(
"ratelimit configured per user but used anonymously"
)
user = request.user
source = ("user", str(user.pk))
elif rate.per == "key":
if key is None:
raise ImproperlyConfigured(
"ratelimit configured per key but no key specified"
)
key_hash = hashlib.sha256(key.encode("utf8")).hexdigest()
source = (key_hash,)
else:
raise ValueError(rate.per)
keys = ["allauth", "rl", action, *source]
return ":".join(keys)
def clear(request, *, action, key=None, user=None):
from allauth.account import app_settings
rates = _parse_rates(app_settings.RATE_LIMITS.get(action))
for rate in rates:
cache_key = _cache_key(request, action=action, rate=rate, key=key, user=user)
cache.delete(cache_key)
def consume(request, *, action, key=None, user=None, dry_run: bool = False):
from allauth.account import app_settings
if not request or request.method == "GET":
return True
rates = _parse_rates(app_settings.RATE_LIMITS.get(action))
if not rates:
return True
allowed = True
for rate in rates:
if not _consume_rate(
request, action=action, rate=rate, key=key, user=user, dry_run=dry_run
):
allowed = False
return allowed
def _consume_rate(request, *, action, rate, key=None, user=None, dry_run: bool = False):
cache_key = _cache_key(request, action=action, rate=rate, key=key, user=user)
history = cache.get(cache_key, [])
now = time.time()
while history and history[-1] <= now - rate.duration:
history.pop()
allowed = len(history) < rate.amount
if allowed and not dry_run:
history.insert(0, now)
cache.set(cache_key, history, rate.duration)
return allowed
def _handler429(request):
from allauth.account import app_settings
return render(request, "429." + app_settings.TEMPLATE_EXTENSION, status=429)
def respond_429(request) -> HttpResponse:
if app_settings.HEADLESS_ENABLED and hasattr(request.allauth, "headless"):
from allauth.headless.base.response import RateLimitResponse
return RateLimitResponse(request)
try:
handler429 = import_callable(settings.ROOT_URLCONF + ".handler429")
handler429 = import_callable(handler429)
except (ImportError, AttributeError):
handler429 = _handler429
return handler429(request)
def consume_or_429(request, *args, **kwargs) -> Optional[HttpResponse]:
if not consume(request, *args, **kwargs):
return respond_429(request)
return None