HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/env/lib/python3.10/site-packages/wtforms_alchemy/validators.py
from collections.abc import Iterable, Mapping

from sqlalchemy import Column
from sqlalchemy.orm.attributes import InstrumentedAttribute
from wtforms import ValidationError


class Unique(object):
    """Checks field values unicity against specified table fields.

    :param column:
        InstrumentedAttribute object, eg. User.name, or
        Column object, eg. user.c.name, or
        a field name, eg. 'name' or
        a tuple of InstrumentedAttributes, eg. (User.name, User.email) or
        a dictionary mapping field names to InstrumentedAttributes, eg.
        {
            'name': User.name,
            'email': User.email
        }
    :param get_session:
        A function that returns a SQAlchemy Session. This parameter is not
        needed if the given model supports Flask-SQLAlchemy styled query
        parameter.
    :param message:
        The error message.
    """
    field_flags = {'unique': True}

    def __init__(self, column, get_session=None, message=None):
        self.column = column
        self.message = message
        self.get_session = get_session

    @property
    def query(self):
        self._check_for_session(self.model)
        if self.get_session:
            return self.get_session().query(self.model)
        elif hasattr(self.model, 'query'):
            return getattr(self.model, 'query')
        else:
            raise Exception(
                'Validator requires either get_session or Flask-SQLAlchemy'
                ' styled query parameter'
            )

    def _check_for_session(self, model):
        if not hasattr(model, 'query') and not self.get_session:
            raise Exception('Could not obtain SQLAlchemy session.')

    def _syntaxes_as_tuples(self, form, field, column):
        """Converts a set of different syntaxes into a tuple of tuples"""
        if isinstance(column, str):
            return ((column, getattr(form.Meta.model, column)),)
        elif isinstance(column, Mapping):
            return tuple(
                (x[0], self._syntaxes_as_tuples(form, field, x[1])[0][1])
                for x in column.items()
            )
        elif isinstance(column, Iterable):
            return tuple(
                self._syntaxes_as_tuples(form, field, x)[0]
                for x in column
            )
        elif isinstance(column, (Column, InstrumentedAttribute)):
            return ((column.key, column),)
        else:
            raise TypeError("Invalid syntax for column")

    def __call__(self, form, field):
        columns = self._syntaxes_as_tuples(form, field, self.column)
        self.model = columns[0][1].class_
        query = self.query
        for field_name, column in columns:
            query = query.filter(column == form[field_name].data)

        obj = query.first()

        if not hasattr(form, '_obj'):
            raise Exception(
                "Couldn't access Form._obj attribute. Either make your form "
                "inherit WTForms-Alchemy ModelForm or WTForms-Components "
                "ModelForm or make this attribute available in your form."
            )

        if obj and not form._obj == obj:
            if self.message is None:
                self.message = field.gettext(u'Already exists.')
            raise ValidationError(self.message)