File: //home/arjun/projects/aigenerator/venv/lib64/python3.12/site-packages/social_django/storage.py
"""Django ORM models for Social Auth"""
import base64
from django.core.exceptions import FieldDoesNotExist
from django.db import router, transaction
from django.db.utils import IntegrityError
from social_core.storage import (
AssociationMixin,
BaseStorage,
CodeMixin,
NonceMixin,
PartialMixin,
UserMixin,
)
class DjangoUserMixin(UserMixin):
"""Social Auth association model"""
@classmethod
def changed(cls, user):
user.save()
def set_extra_data(self, extra_data=None):
if super().set_extra_data(extra_data):
self.save()
@classmethod
def allowed_to_disconnect(cls, user, backend_name, association_id=None):
if association_id is not None:
qs = cls.objects.exclude(id=association_id)
else:
qs = cls.objects.exclude(provider=backend_name)
qs = qs.filter(user=user)
if hasattr(user, "has_usable_password"):
valid_password = user.has_usable_password()
else:
valid_password = True
return valid_password or qs.exists()
@classmethod
def disconnect(cls, entry):
entry.delete()
@classmethod
def username_field(cls):
return getattr(cls.user_model(), "USERNAME_FIELD", "username")
@classmethod
def user_exists(cls, *args, **kwargs):
"""
Return True/False if a User instance exists with the given arguments.
Arguments are directly passed to filter() manager method.
"""
if "username" in kwargs:
kwargs[cls.username_field()] = kwargs.pop("username")
return cls.user_model()._default_manager.filter(*args, **kwargs).exists()
@classmethod
def get_username(cls, user):
return getattr(user, cls.username_field(), None)
@classmethod
def create_user(cls, *args, **kwargs):
username_field = cls.username_field()
if "username" in kwargs:
if username_field not in kwargs:
kwargs[username_field] = kwargs.pop("username")
else:
# If username_field is 'email' and there is no field named "username"
# then latest should be removed from kwargs.
try:
cls.user_model()._meta.get_field("username")
except FieldDoesNotExist:
kwargs.pop("username")
try:
if hasattr(transaction, "atomic"):
# In Django versions that have an "atomic" transaction decorator / context
# manager, there's a transaction wrapped around this call.
# If the create fails below due to an IntegrityError, ensure that the transaction
# stays undamaged by wrapping the create in an atomic.
using = router.db_for_write(cls.user_model())
with transaction.atomic(using=using):
user = cls.user_model()._default_manager.create_user(
*args, **kwargs
)
else:
user = cls.user_model()._default_manager.create_user(*args, **kwargs)
except IntegrityError as exc:
# If email comes in as None it won't get found in the get
if kwargs.get("email", True) is None:
kwargs["email"] = ""
try:
user = cls.user_model()._default_manager.get(*args, **kwargs)
except cls.user_model().DoesNotExist:
raise exc
return user
@classmethod
def get_user(cls, pk=None, **kwargs):
if pk:
kwargs = {"pk": pk}
try:
return cls.user_model()._default_manager.get(**kwargs)
except cls.user_model().DoesNotExist:
return None
@classmethod
def get_users_by_email(cls, email):
user_model = cls.user_model()
email_field = getattr(user_model, "EMAIL_FIELD", "email")
return user_model._default_manager.filter(**{email_field + "__iexact": email})
@classmethod
def get_social_auth(cls, provider, uid):
if not isinstance(uid, str):
uid = str(uid)
try:
return cls.objects.get(provider=provider, uid=uid)
except cls.DoesNotExist:
return None
@classmethod
def get_social_auth_for_user(cls, user, provider=None, id=None):
qs = cls.objects.filter(user=user)
if provider:
qs = qs.filter(provider=provider)
if id:
qs = qs.filter(id=id)
return qs
@classmethod
def create_social_auth(cls, user, uid, provider):
if not isinstance(uid, str):
uid = str(uid)
if hasattr(transaction, "atomic"):
# In Django versions that have an "atomic" transaction decorator / context
# manager, there's a transaction wrapped around this call.
# If the create fails below due to an IntegrityError, ensure that the transaction
# stays undamaged by wrapping the create in an atomic.
using = router.db_for_write(cls)
with transaction.atomic(using=using):
social_auth = cls.objects.create(user=user, uid=uid, provider=provider)
else:
social_auth = cls.objects.create(user=user, uid=uid, provider=provider)
return social_auth
class DjangoNonceMixin(NonceMixin):
@classmethod
def use(cls, server_url, timestamp, salt):
return cls.objects.get_or_create(
server_url=server_url, timestamp=timestamp, salt=salt
)[1]
@classmethod
def get(cls, server_url, salt):
return cls.objects.get(
server_url=server_url,
salt=salt,
)
@classmethod
def delete(cls, nonce):
nonce.delete()
class DjangoAssociationMixin(AssociationMixin):
@classmethod
def store(cls, server_url, association):
# Don't use get_or_create because issued cannot be null
try:
assoc = cls.objects.get(server_url=server_url, handle=association.handle)
except cls.DoesNotExist:
assoc = cls(server_url=server_url, handle=association.handle)
try:
assoc.secret = base64.encodebytes(association.secret).decode()
except AttributeError:
assoc.secret = base64.encodestring(association.secret).decode()
assoc.issued = association.issued
assoc.lifetime = association.lifetime
assoc.assoc_type = association.assoc_type
assoc.save()
@classmethod
def get(cls, *args, **kwargs):
return cls.objects.filter(*args, **kwargs)
@classmethod
def remove(cls, ids_to_delete):
cls.objects.filter(pk__in=ids_to_delete).delete()
class DjangoCodeMixin(CodeMixin):
@classmethod
def get_code(cls, code):
try:
return cls.objects.get(code=code)
except cls.DoesNotExist:
return None
class DjangoPartialMixin(PartialMixin):
@classmethod
def load(cls, token):
try:
return cls.objects.get(token=token)
except cls.DoesNotExist:
return None
@classmethod
def destroy(cls, token):
partial = cls.load(token)
if partial:
partial.delete()
class BaseDjangoStorage(BaseStorage):
user = DjangoUserMixin
nonce = DjangoNonceMixin
association = DjangoAssociationMixin
code = DjangoCodeMixin