HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/env/lib/python3.10/site-packages/flask_bouncer.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from functools import wraps

from flask import request, g, current_app, _app_ctx_stack as stack
from werkzeug.local import LocalProxy
from werkzeug.exceptions import Forbidden
from bouncer import Ability
from bouncer.constants import *


# Convenient references
_bouncer = LocalProxy(lambda: current_app.extensions['bouncer'])


def ensure(action, subject):
    request._authorized = True
    current_user = _bouncer.get_current_user()
    ability = Ability(current_user)
    ability.authorization_method = _bouncer.get_authorization_method()
    ability.aliased_actions = _bouncer.alias_actions
    if ability.cannot(action, subject):
        msg = "{0} does not have {1} access to {2}".format(current_user, action, subject)
        raise Forbidden(msg)


def can(action, subject):
    request._authorized = True
    current_user = _bouncer.get_current_user()
    ability = Ability(current_user)
    ability.authorization_method = _bouncer.get_authorization_method()
    ability.aliased_actions = _bouncer.alias_actions
    return ability.can(action, subject)


# alias
bounce = ensure


class Condition(object):

    def __init__(self, action, subject):
        self.action = action
        self.subject = subject

    def test(self):
        ensure(self.action, self.subject)


def requires(action, subject):
    def decorator(f):
        f._explict_rule_set = True

        @wraps(f)
        def decorated_function(*args, **kwargs):
            Condition(action, subject).test()
            return f(*args, **kwargs)
        return decorated_function
    return decorator


def skip_authorization(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        request._authorized = True
        return f(*args, **kwargs)
    return decorated_function


class Bouncer(object):

    """This class is used to control the Abilities Integration to one or more Flask applications"""

    special_methods = ["get", "put", "patch", "post", "delete", "index"]

    def __init__(self, app=None, **kwargs):

        self.authorization_method_callback = None
        self._alias_actions = self.default_alias_actions()
        self._authorization_method = None
        self.flask_classy_classes = list()
        self.explict_rules = list()
        self.get_current_user = self.default_user_loader

        self.app = None

        if app is not None:
            self.init_app(app, **kwargs)

    def get_app(self, reference_app=None):
        """Helper method that implements the logic to look up an application."""

        if reference_app is not None:
            return reference_app

        if self.app is not None:
            return self.app

        ctx = stack.top

        if ctx is not None:
            return ctx.app

        raise RuntimeError('Application not registered on Bouncer'
                           ' instance and no application bound'
                           ' to current context')

    def init_app(self, app, **kwargs):
        """ Initializes the Flask-Bouncer extension for the specified application.

        :param app: The application.
        """
        self.app = app

        self._init_extension()

        self.app.before_request(self.check_implicit_rules)

        if kwargs.get('ensure_authorization', False):
            self.app.after_request(self.check_authorization)

    def _init_extension(self):
        if not hasattr(self.app, 'extensions'):
            self.app.extensions = dict()

        self.app.extensions['bouncer'] = self

    def check_authorization(self, response):
        """checks that an authorization call has been made during the request"""
        if not hasattr(request, '_authorized'):
            raise Forbidden
        elif not request._authorized:
            raise Forbidden
        return response

    def check_implicit_rules(self):
        """ if you are using flask classy are using the standard index,new,put,post, etc ... type routes, we will
            automatically check the permissions for you
        """
        if not self.request_is_managed_by_flask_classy():
            return

        if self.method_is_explictly_overwritten():
            return

        class_name, action = request.endpoint.split(':')
        clazz = [classy_class for classy_class in self.flask_classy_classes if classy_class.__name__ == class_name][0]
        Condition(action, clazz.__target_model__).test()

    def method_is_explictly_overwritten(self):
        view_func = current_app.view_functions[request.endpoint]
        return hasattr(view_func, '_explict_rule_set') and view_func._explict_rule_set is True

    def request_is_managed_by_flask_classy(self):
        if request.endpoint is None:
            return False
        if ':' not in request.endpoint:
            return False
        class_name, action = request.endpoint.split(':')
        return any(class_name == classy_class.__name__ for classy_class in self.flask_classy_classes) \
            and action in self.special_methods

    def default_user_loader(self):
        if hasattr(g, 'current_user'):
            return g.current_user
        elif hasattr(g, 'user'):
            return g.user
        else:
            raise Exception("Excepting current_user on flask's g")

    def user_loader(self, value):
        """
        Use this method decorator to overwrite the default user loader
        """
        self.get_current_user = value
        return value

    @property
    def alias_actions(self):
        return self._alias_actions

    @alias_actions.setter
    def alias_actions(self, value):
        """if you want to override your actions"""
        self._alias_actions = value

    def default_alias_actions(self):
        return {
            READ: [INDEX, SHOW, GET],
            CREATE: [NEW, PUT, POST],
            UPDATE: [EDIT, PATCH]
        }

    def monitor(self, *classy_routes):
        self.flask_classy_classes.extend(classy_routes)

    def authorization_method(self, value):
        """
        the callback for defining user abilities
        """
        self._authorization_method = value
        return self._authorization_method

    def get_authorization_method(self):
        if self._authorization_method is not None:
            return self._authorization_method
        else:
            raise Exception('Expected authorication method to be set')