HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/env/lib64/python3.10/site-packages/flower/views/__init__.py
import re
import inspect
import traceback
import copy
import logging
import hmac

from base64 import b64decode

import tornado

from ..utils import template, bugreport, strtobool

logger = logging.getLogger(__name__)


class BaseHandler(tornado.web.RequestHandler):
    def set_default_headers(self):
        if not (self.application.options.basic_auth or self.application.options.auth):
            self.set_header("Access-Control-Allow-Origin", "*")
            self.set_header("Access-Control-Allow-Headers",
                            "x-requested-with,access-control-allow-origin,authorization,content-type")
            self.set_header('Access-Control-Allow-Methods',
                            ' PUT, DELETE, OPTIONS, POST, GET, PATCH')

    def options(self, *_, **__):
        self.set_status(204)
        self.finish()

    def render(self, *args, **kwargs):
        app_options = self.application.options
        functions = inspect.getmembers(template, inspect.isfunction)
        assert not set(map(lambda x: x[0], functions)) & set(kwargs.keys())
        kwargs.update(functions)
        kwargs.update(url_prefix=app_options.url_prefix)
        super().render(*args, **kwargs)

    def write_error(self, status_code, **kwargs):
        if status_code in (404, 403):
            message = ''
            if 'exc_info' in kwargs and kwargs['exc_info'][0] == tornado.web.HTTPError:
                message = kwargs['exc_info'][1].log_message
            self.render('404.html', message=message)
        elif status_code == 500:
            error_trace = "".join(traceback.format_exception(*kwargs['exc_info']))

            self.render('error.html',
                    debug=self.application.options.debug,
                    status_code=status_code,
                    error_trace=error_trace,
                    bugreport=bugreport())
        elif status_code == 401:
            self.set_status(status_code)
            self.set_header('WWW-Authenticate', 'Basic realm="flower"')
            self.finish('Access denied')
        else:
            message = ''
            if 'exc_info' in kwargs and kwargs['exc_info'][0] == tornado.web.HTTPError:
                message = kwargs['exc_info'][1].log_message
                self.set_header('Content-Type', 'text/plain')
                self.write(str(message))
            self.set_status(status_code)
            self.finish()

    def get_current_user(self):
        # Basic Auth
        basic_auth = self.application.options.basic_auth
        if basic_auth:
            auth_header = self.request.headers.get("Authorization", "")
            try:
                basic, credentials = auth_header.split()
                credentials = b64decode(credentials.encode()).decode()
                if basic != 'Basic':
                    raise tornado.web.HTTPError(401)
                for stored_credential in basic_auth:
                    if hmac.compare_digest(stored_credential, credentials):
                        break
                else:
                    raise tornado.web.HTTPError(401)
            except ValueError as exc:
                raise tornado.web.HTTPError(401) from exc

        # OAuth2
        if not self.application.options.auth:
            return True
        user = self.get_secure_cookie('user')
        if user:
            if not isinstance(user, str):
                user = user.decode()
            if re.match(self.application.options.auth, user):
                return user
        return None

    # pylint: disable=dangerous-default-value
    def get_argument(self, name, default=[], strip=True, type=None):
        arg = super().get_argument(name, default, strip)
        if arg and isinstance(arg, str):
            arg = tornado.escape.xhtml_escape(arg)
        if type is not None:
            try:
                if type is bool:
                    arg = strtobool(str(arg))
                else:
                    arg = type(arg)
            except (ValueError, TypeError) as exc:
                if arg is None and default is None:
                    return arg
                raise tornado.web.HTTPError(
                        400,
                        f"Invalid argument '{arg}' of type '{type.__name__}'") from exc
        return arg

    @property
    def capp(self):
        "return Celery application object"
        return self.application.capp

    def format_task(self, task):
        custom_format_task = self.application.options.format_task
        if custom_format_task:
            try:
                task = custom_format_task(copy.copy(task))
            except Exception:
                logger.exception("Failed to format '%s' task", task.uuid)
        return task

    def get_active_queue_names(self):
        queues = set([])
        for _, info in self.application.workers.items():
            for queue in info.get('active_queues', []):
                queues.add(queue['name'])

        if not queues:
            queues = set([self.capp.conf.task_default_queue]) |\
                {q.name for q in self.capp.conf.task_queues or [] if q.name}
        return sorted(queues)