HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall_new/buyercall/buyercall/blueprints/sysadmin/tasks.py
import redis
from flask import current_app

from buyercall.app import create_celery_app
from buyercall.blueprints.partnership.models import Partnership, PartnershipAccount
from buyercall.blueprints.sysadmin.models import RequestLog
from buyercall.blueprints.sysadmin.utilities.ip_api import IpApi
from buyercall.blueprints.sysadmin.utilities.utc_to_timezone import FromUTC
from buyercall.blueprints.user.models import User
from buyercall.lib.flask_mailplus import _try_renderer_template as render_template
from buyercall.lib.util_ses_email import send_ses_notification

celery = create_celery_app(current_app)


@celery.task
def create_request_log(request_data, **kwargs):
    """ Create request log
    """
    RequestLog().create_record(request_data)
    return {}


@celery.task
def update_request_log(request_id, update_data, response_code, **kwargs):
    """ Update request log
    """
    RequestLog().update_record(request_id, update_data)
    # send email notification when the status is 400 and above
    if response_code >= 400:
        send_request_fail_notification.delay(request_id, "System Request Failure Notification")
    return {}


@celery.task
def send_request_fail_notification(request_id, subject, **kwargs):
    """ send suspicious email notification
    """

    record = RequestLog.query.filter_by(request_id=request_id).first()
    user = User.query.filter_by(id=record.user_id).first() if record.user_id else None

    emails = current_app.config['SUPPORT_EMAILS']

    default_partner = Partnership.query.filter(Partnership.id == 1).first()
    logo = default_partner.logo

    # get ip based details
    details = IpApi.get_request_complete_details(record.remote_ip_address)

    if details:
        ip_and_values = f"{record.remote_ip_address}, {details.get('city')}," \
                        f" {details.get('regionName')}, {details.get('country')}"
    else:
        ip_and_values = record.remote_ip_address

    # convert UTC to EST
    est_datetime = FromUTC.get_est_custom_date(record.created_on)

    ctx = {'date_and_time': est_datetime.strftime("%B %d %Y, %H:%M EST"),
           'partnership_name': record.partnership_name,
           'partnership_account_name': record.partnership_account_name,
           'user': user.username if user else None,
           'end_point': record.path_info,
           'method': record.method,
           'remote_ip_address': ip_and_values,
           'response_code': record.response_code,
           'subject': subject,
           'user_agent': record.user_agent,
           'logo': logo
           }
    redis_client = redis.StrictRedis(host=current_app.config.get('REDIS_CONFIG_URL'),
                                     port=current_app.config.get('REDIS_CONFIG_PORT'))
    # Find whether the request is unique
    _hash = record.hash()
    if redis_client.exists(f"requests:{_hash}"):
        return

    redis_client.set(f"requests:{_hash}", 0)
    redis_client.expire(f"requests:{_hash}", current_app.config.get('REQUEST_FAIL_NOTIFY_INTERVAL', 7200))

    # Render html template for email
    partner_invite_email_template = render_template('request_log/mail/failed_request_notification',
                                                    ext='html', **ctx)
    send_ses_notification(recipients=emails,
                          subject=subject,
                          sender=current_app.config['SES_EMAIL_SOURCE'],
                          html=partner_invite_email_template)
    return None


@celery.task
def delete_request_log():
    """ delete request log
    """
    return RequestLog().delete_old_records()


@celery.task()
def send_request_limit_exceeded_mail(user_id, endpoint, limit, **kwargs):
    """
    Send a notification email to sysadmin about overloaded requests
    """
    _user = User.query.get(user_id)
    if not _user:
        return
    default_partner = Partnership.query.filter(Partnership.id == 1).first()
    logo = default_partner.logo
    partner_name = default_partner.name
    _partnership_account_name = ''
    _partnership = Partnership.query.filter(Partnership.id == _user.partnership_id).first()
    if _partnership:
        _partnership_account = PartnershipAccount.query.filter(
            PartnershipAccount.id == _user.partnership_account_id).first()
        if _partnership_account:
            _partnership_account_name = _partnership_account.name
        partner_name = _partnership.name if _partnership else None
        logo = _partnership.logo

    _request_method = kwargs.get('method')
    _user_ip = kwargs.get('user_ip')
    _location = kwargs.get('location')
    _subject = f'Too many requests from {partner_name}'
    _recipients = current_app.config.get('REQUEST_RATE_LIMIT_NOTIFY_EMAILS')
    _est_now = FromUTC.get_est_now()
    if not _recipients:
        return

    ctx = {
        'date_and_time': _est_now,
        'partnership_name': partner_name,
        'partnership_account_name': _partnership_account_name,
        'user': _user.email,
        'end_point': endpoint,
        'request_limit': limit,
        'method': _request_method,
        'ip_address': _user_ip,
        'location': _location,
        'subject': _subject,
        'logo': logo
    }

    # Render html template for email
    request_limit_exceeded_mail_template = render_template('rate_limit/mail/suspicious_request_mail',
                                                           ext='html', **ctx)
    send_ses_notification(recipients=_recipients,
                          subject=_subject,
                          sender=current_app.config['SES_EMAIL_SOURCE'],
                          html=request_limit_exceeded_mail_template)

    return None