File: //home/arjun/projects/aigenerator/venv/lib/python3.12/site-packages/social_core/backends/open_id.py
from openid.consumer.consumer import CANCEL, FAILURE, SUCCESS, Consumer
from openid.consumer.discover import DiscoveryFailure
from openid.extensions import ax, pape, sreg
from ..exceptions import (
AuthCanceled,
AuthException,
AuthFailed,
AuthMissingParameter,
AuthUnknownError,
)
from ..utils import url_add_parameters
from .base import BaseAuth
# OpenID configuration
OLD_AX_ATTRS = [
("http://schema.openid.net/contact/email", "old_email"),
("http://schema.openid.net/namePerson", "old_fullname"),
("http://schema.openid.net/namePerson/friendly", "old_nickname"),
]
AX_SCHEMA_ATTRS = [
# Request both the full name and first/last components since some
# providers offer one but not the other.
("http://axschema.org/contact/email", "email"),
("http://axschema.org/namePerson", "fullname"),
("http://axschema.org/namePerson/first", "first_name"),
("http://axschema.org/namePerson/last", "last_name"),
("http://axschema.org/namePerson/friendly", "nickname"),
]
SREG_ATTR = [("email", "email"), ("fullname", "fullname"), ("nickname", "nickname")]
OPENID_ID_FIELD = "openid_identifier"
SESSION_NAME = "openid"
class OpenIdAuth(BaseAuth):
"""Generic OpenID authentication backend"""
name = "openid"
URL = None
USERNAME_KEY = "username"
def get_user_id(self, details, response):
"""Return user unique id provided by service"""
return response.identity_url
def get_ax_attributes(self):
attrs = self.setting("AX_SCHEMA_ATTRS", [])
if attrs and self.setting("IGNORE_DEFAULT_AX_ATTRS", True):
return attrs
return attrs + AX_SCHEMA_ATTRS + OLD_AX_ATTRS
def get_sreg_attributes(self):
return self.setting("SREG_ATTR") or SREG_ATTR
def values_from_response(self, response, sreg_names=None, ax_names=None):
"""Return values from SimpleRegistration response or
AttributeExchange response if present.
@sreg_names and @ax_names must be a list of name and aliases
for such name. The alias will be used as mapping key.
"""
values = {}
# Use Simple Registration attributes if provided
if sreg_names:
resp = sreg.SRegResponse.fromSuccessResponse(response)
if resp:
values.update(
(alias, resp.get(name) or "") for name, alias in sreg_names
)
# Use Attribute Exchange attributes if provided
if ax_names:
resp = ax.FetchResponse.fromSuccessResponse(response)
if resp:
for src, alias in ax_names:
name = alias.replace("old_", "")
values[name] = resp.getSingle(src, "") or values.get(name)
return values
def get_user_details(self, response):
"""Return user details from an OpenID request"""
values = {
"username": "",
"email": "",
"fullname": "",
"first_name": "",
"last_name": "",
}
# update values using SimpleRegistration or AttributeExchange
# values
values.update(
self.values_from_response(
response, self.get_sreg_attributes(), self.get_ax_attributes()
)
)
fullname = values.get("fullname") or ""
first_name = values.get("first_name") or ""
last_name = values.get("last_name") or ""
email = values.get("email") or ""
if not fullname and first_name and last_name:
fullname = first_name + " " + last_name
elif fullname:
try:
first_name, last_name = fullname.rsplit(" ", 1)
except ValueError:
last_name = fullname
username_key = self.setting("USERNAME_KEY") or self.USERNAME_KEY
values.update(
{
"fullname": fullname,
"first_name": first_name,
"last_name": last_name,
"username": values.get(username_key)
or (first_name.title() + last_name.title()),
"email": email,
}
)
return values
def extra_data(self, user, uid, response, details=None, *args, **kwargs):
"""Return defined extra data names to store in extra_data field.
Settings will be inspected to get more values names that should be
stored on extra_data field. Setting name is created from current
backend name (all uppercase) plus _SREG_EXTRA_DATA and
_AX_EXTRA_DATA because values can be returned by SimpleRegistration
or AttributeExchange schemas.
Both list must be a value name and an alias mapping similar to
SREG_ATTR, OLD_AX_ATTRS or AX_SCHEMA_ATTRS
"""
sreg_names = self.setting("SREG_EXTRA_DATA")
ax_names = self.setting("AX_EXTRA_DATA")
values = self.values_from_response(response, sreg_names, ax_names)
from_details = super().extra_data(user, uid, {}, details, *args, **kwargs)
values.update(from_details)
return values
def auth_url(self):
"""Return auth URL returned by service"""
openid_request = self.setup_request(self.auth_extra_arguments())
# Construct completion URL, including page we should redirect to
return_to = self.strategy.absolute_uri(self.redirect_uri)
return openid_request.redirectURL(self.trust_root(), return_to)
def auth_html(self):
"""Return auth HTML returned by service"""
openid_request = self.setup_request(self.auth_extra_arguments())
return_to = self.strategy.absolute_uri(self.redirect_uri)
form_tag = {"id": "openid_message"}
return openid_request.htmlMarkup(
self.trust_root(), return_to, form_tag_attrs=form_tag
)
def trust_root(self):
"""Return trust-root option"""
return self.setting("OPENID_TRUST_ROOT") or self.strategy.absolute_uri("/")
def continue_pipeline(self, partial):
"""Continue previous halted pipeline"""
response = self.consumer().complete(
dict(self.data.items()), self.strategy.absolute_uri(self.redirect_uri)
)
return self.strategy.authenticate(
self,
response=response,
pipeline_index=partial.next_step,
*partial.args,
**partial.kwargs,
)
def auth_complete(self, *args, **kwargs):
"""Complete auth process"""
response = self.consumer().complete(
dict(self.data.items()), self.strategy.absolute_uri(self.redirect_uri)
)
self.process_error(response)
return self.strategy.authenticate(self, response=response, *args, **kwargs)
def process_error(self, data):
if not data:
raise AuthException(self, "OpenID relying party endpoint")
elif data.status == FAILURE:
raise AuthFailed(self, data.message)
elif data.status == CANCEL:
raise AuthCanceled(self)
elif data.status != SUCCESS:
raise AuthUnknownError(self, data.status)
def setup_request(self, params=None):
"""Setup request"""
request = self.openid_request(params)
# Request some user details. Use attribute exchange if provider
# advertises support.
if request.endpoint.supportsType(ax.AXMessage.ns_uri):
fetch_request = ax.FetchRequest()
# Mark all attributes as required, Google ignores optional ones
for attr, alias in self.get_ax_attributes():
fetch_request.add(ax.AttrInfo(attr, alias=alias, required=True))
else:
fetch_request = sreg.SRegRequest(
optional=list(dict(self.get_sreg_attributes()).keys())
)
request.addExtension(fetch_request)
# Add PAPE Extension for if configured
preferred_policies = self.setting("OPENID_PAPE_PREFERRED_AUTH_POLICIES")
preferred_level_types = self.setting("OPENID_PAPE_PREFERRED_AUTH_LEVEL_TYPES")
max_age = self.setting("OPENID_PAPE_MAX_AUTH_AGE")
if max_age is not None:
try:
max_age = int(max_age)
except (ValueError, TypeError):
max_age = None
if max_age is not None or preferred_policies or preferred_level_types:
pape_request = pape.Request(
max_auth_age=max_age,
preferred_auth_policies=preferred_policies,
preferred_auth_level_types=preferred_level_types,
)
request.addExtension(pape_request)
return request
def consumer(self):
"""Create an OpenID Consumer object for the given Django request."""
if not hasattr(self, "_consumer"):
self._consumer = self.create_consumer(self.strategy.openid_store())
return self._consumer
def create_consumer(self, store=None):
return Consumer(self.strategy.openid_session_dict(SESSION_NAME), store)
def uses_redirect(self):
"""Return true if openid request will be handled with redirect or
HTML content will be returned.
"""
return self.openid_request().shouldSendRedirect()
def openid_request(self, params=None):
"""Return openid request"""
try:
return self.consumer().begin(url_add_parameters(self.openid_url(), params))
except DiscoveryFailure as err:
raise AuthException(self, f"OpenID discovery error: {err}")
def openid_url(self):
"""Return service provider URL.
This base class is generic accepting a POST parameter that specifies
provider URL."""
if self.URL:
return self.URL
elif OPENID_ID_FIELD in self.data:
return self.data[OPENID_ID_FIELD]
else:
raise AuthMissingParameter(self, OPENID_ID_FIELD)