HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/app.py
from flask import Flask, has_request_context, request
from celery import Celery

from buyercall.blueprints.user.models import User
from buyercall.register import (
    api_keys,
    blueprints,
    error_templates,
    exception_handler,
    extensions,
    logging_handler,
    middleware,
    template_processors
)
from buyercall.initialize import authentication, locale

CELERY_TASK_LIST = [
    'buyercall.blueprints.activity.tasks',
    'buyercall.blueprints.user.tasks',
    'buyercall.blueprints.issue.tasks',
    'buyercall.blueprints.billing.tasks',
    'buyercall.blueprints.admin.tasks',
    'buyercall.blueprints.contacts.contact_tasks',
    'buyercall.blueprints.widgets.tasks',
    'buyercall.blueprints.widgets.bw_tasks',
    'buyercall.blueprints.form_leads.form_tasks',
    'buyercall.blueprints.reports.tasks',
    'buyercall.blueprints.partnership.tasks',
    'buyercall.blueprints.phonenumbers.tasks',
    'buyercall.blueprints.phonenumbers.bw_tasks',
    'buyercall.blueprints.phonenumbers.bw_operational_tasks',
    'buyercall.blueprints.mobile.tasks',
    'buyercall.blueprints.sms.bw_sms_tasks',
    'buyercall.blueprints.sms.tw_sms_tasks',
    'buyercall.blueprints.sms.tasks',
    # request-log tasks
    'buyercall.blueprints.sysadmin.tasks',
]


def create_celery_app(app=None):
    """
    Create a new celery object and tie together the celery config to the app's
    config. Wrap all tasks in the context of the application.

    :param app: Flask app
    :return: Celery app
    """
    app = app or create_app()

    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'],
                    result_backend=app.config['CELERY_RESULT_BACKEND'])
    celery.autodiscover_tasks(CELERY_TASK_LIST, force=True)
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        CONTEXT_ARG_NAME = '_flask_request_context'

        def _include_request_context(self, kwargs):
            """Includes all the information about current HTTP request context
            as an additional argument to the task.
            """
            if not has_request_context():
                return

            context = {
                'path': request.path,
                'base_url': request.url_root,
                'method': request.method,
                'headers': dict(request.headers),
            }
            if '?' in request.url:
                context['query_string'] = request.url[(request.url.find('?') + 1):]

            kwargs[self.CONTEXT_ARG_NAME] = context

        def apply_async(self, args=None, kwargs=None, **rest):
            args = args or ()
            kwargs = kwargs or {}

            self._include_request_context(kwargs)
            return super(ContextTask, self).apply_async(args, kwargs, **rest)

        def __call__(self, *args, **kwargs):
            context = kwargs.pop(self.CONTEXT_ARG_NAME, None)
            if context is None or has_request_context():
                return TaskBase.__call__(self, *args, **kwargs)

            with app.test_request_context(**context):
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery


def create_app(application_name=__name__, settings_override=None):
    """
    Create an application using the Flask app factory pattern:
    http://flask.pocoo.org/docs/0.10/patterns/appfactories

    :param application_name: Name of the application
    :param settings_override: Override settings
    :type settings_override: dict
    :return: Flask app
    """
    app = Flask(application_name, instance_relative_config=True)
    configure_settings(app, settings_override)

    # Register.
    api_keys(app)
    middleware(app)
    blueprints(app)
    extensions(app)
    template_processors(app)
    logging_handler(app)
    exception_handler(app)
    error_templates(app)

    # Initialize.
    authentication(app, User)
    if not app.babel_instance.locale_selector_func:
        locale(app)

    return app


def configure_settings(app, settings_override=None):
    """
    Modify the settings of the application (mutates the app passed in).

    :param app: Flask application instance
    :param settings_override: Override settings
    :type settings_override: dict
    :return: Add configuration settings
    """
    app.config.from_object('config.settings')
    app.config.from_pyfile('settings.py', silent=True)
    app.config.SWAGGER_UI_DOC_EXPANSION = 'none'

    if settings_override:
        app.config.update(settings_override)

    return app.config