HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/blueprints/user/tasks.py
import pytz
from datetime import datetime

from flask import current_app, url_for

from buyercall.app import create_celery_app
from buyercall.blueprints.user.models import User
from buyercall.lib.flask_mailplus import _try_renderer_template
from buyercall.lib.util_ses_email import send_ses_email, send_ses_notification


celery = create_celery_app(current_app)


@celery.task()
def deliver_password_reset_email(user_id, reset_token, partnership_name='', partnership_logo='', **kwargs):
    """
    Send a reset password e-mail to a user.

    :param user_id: The user id
    :type user_id: int
    :param reset_token: The reset token
    :type reset_token: str
    :param partnership_name: partnership name
    :type partnership_name: str
    :param partnership_logo: The reset token
    :type partnership_logo: str
    :return: None if a user was not found
    """
    user = User.query.get(user_id)

    if user is None:
        return

    ctx = {'user': user, 'reset_token': reset_token, 'company': partnership_name, 'logo': partnership_logo}

    # Render html template for email
    password_reset_email_template = _try_renderer_template('user/mail/password_reset', ext='html', **ctx)
    send_ses_email(recipients=[user.email],
                   p_id=user.partnership_id,
                   subject='{} - Password reset link'.format(partnership_name),
                   html=password_reset_email_template
                   )

    return None


@celery.task()
def send_agent_invitation_email(user_id, reset_token, partnership_name, partnership_logo, partner_url, **kwargs):
    """
    Send an invitation e-mail to an agent.

    :param user_id: The user id
    :type user_id: int
    :param reset_token: The reset token
    :type reset_token: str
    :param reset_token: The reset token
    :type partnership_name: str
    :return: partner_name will be used in subject of the email
    :type partnership_logo: str
    :return: partnership_logo will be used as logo for email
    :type partner_url: str
    :return: partner_ulr will be used as partner url
    """
    user = User.query.get(user_id)

    if user is None:
        return

    if partner_url:
        partnership_url = '{}/account/agent_invite?reset_token={}'.format(partner_url, reset_token)
    else:
        partnership_url = url_for('agents.agent_invite', reset_token=reset_token, _external=True)

    ctx = {
        'user': user,
        'reset_token': reset_token,
        'company': partnership_name,
        'logo': partnership_logo,
        'url': partnership_url
    }

    # Render html template for email
    agent_invite_email_template = _try_renderer_template('agents/mail/invitation', ext='html', **ctx)
    send_ses_email(recipients=[user.email],
                   p_id=user.partnership_id,
                   subject='Invitation to {}'.format(partnership_name),
                   html=agent_invite_email_template
                   )
    return None


@celery.task()
def send_partners_invitation_email(user_id, reset_token, partnership_name, **kwargs):
    """
    Send an invitation e-mail to an agent.

    :param user_id: The user instance id
    :type user_id: int
    :param reset_token: The reset token
    :type reset_token: str
    :param reset_token: The reset token
    :type partnership_name: str
    :return: partnership_name will be used in subject of the email
    """
    user = User.query.get(user_id)

    if user is None:
        return

    ctx = {'user': user, 'reset_token': reset_token}

    # Render html template for email
    partner_invite_email_template = _try_renderer_template('partnership/mail/invitation', ext='html', **ctx)
    send_ses_email(recipients=[user.email],
                   p_id=user.partnership_id,
                   subject='Invitation to {}'.format(partnership_name),
                   html=partner_invite_email_template
                   )
    return None


@celery.task()
def send_partners_invitation_email_by_id(user_id, reset_token, partnership_name, **kwargs):
    """
    Send an invitation e-mail to an agent.

    :param user_id: The user id
    :type user_id: int
    :param reset_token: The reset token
    :type reset_token: str
    :param reset_token: The reset token
    :type partnership_name: str
    :return: partnership_name will be used in subject of the email
    """
    user = User.query.get(user_id)

    send_partners_invitation_email(user, reset_token, partnership_name)

    return None


@celery.task
def send_failed_login_notification(location=None, ip=None, device=None, email=None, partnership_name=None,
                                   partnership_account_name=None, partnership_logo=None, **kwargs):
    """ send failed login email notification"""
    # Get the UTC timezone
    utc_timezone = pytz.timezone('UTC')

    # Get the Eastern Standard Time (EST) timezone
    est_timezone = pytz.timezone('US/Eastern')

    # Get the current time in UTC
    current_utc_time = datetime.now(tz=utc_timezone)

    # Convert to Eastern Standard Time (EST)
    current_est_time = current_utc_time.astimezone(est_timezone)

    formatted_date = current_est_time.strftime("%B %d %Y")
    formatted_time = current_est_time.strftime("%H:%M:%S")

    logo = partnership_logo

    if not partnership_logo:
        from buyercall.blueprints.partnership.models import Partnership
        default_partner = Partnership.query.filter(Partnership.id == 1).first()
        logo = default_partner.logo

    ctx = {
        'location': location,
        'ip': ip,
        'device': device,
        'user': email,
        "date": f"{formatted_date}, {formatted_time} EST",
        "partnership_name": partnership_name,
        "partnership_account_name": partnership_account_name,
        "logo": logo
    }

    # Render html template for email
    login_failed_notify_admin = _try_renderer_template('user/mail/login_failed_limit_exceeded_admin',
                                                       ext='html', **ctx)
    send_ses_notification(recipients=current_app.config.get('SUPPORT_EMAILS', []),
                          subject=f'Account Security: Multiple failed login attempts on {email}',
                          sender=current_app.config['SES_EMAIL_SOURCE'],
                          html=login_failed_notify_admin)
    return None


@celery.task()
def deliver_password_expire_email(user_id, reset_token, username='', message="", **kwargs):
    """
    Send a reset password e-mail to a user.
    :param user_id: The user id
    :type user_id: int
    :param reset_token: The reset token
    :type reset_token: str
    :param username: partnership name
    :type username: str
    :param message: The reset token
    """
    from buyercall.app import create_app
    from buyercall.extensions import db
    # Create a context for the database connection.
    app = create_app()
    db.app = app
    with app.app_context():
        user = User.query.get(user_id)
        if user is None:
            return
        else:
            if user.partnership:
                partner_name = user.partnership.name
                partner_logo = user.partnership.logo
            else:
                from buyercall.blueprints.partnership.models import Partnership
                default_partner = Partnership.query.filter(Partnership.id == 1).first()
                if default_partner:
                    partner_name = default_partner.name
                    partner_logo = default_partner.logo

        ctx = {
            'partner_name': partner_name,
            'reset_token': reset_token,
            'username': username,
            'message': message,
            'logo': partner_logo}

        # Render html template for email
        password_reset_email_template = _try_renderer_template('user/mail/password_expired', ext='html', **ctx)
        send_ses_email(recipients=[user.email],
                       p_id=user.partnership_id,
                       subject='{} - Password reset link'.format(username),
                       html=password_reset_email_template
                       )


@celery.task
def send_new_os_notification(subject=None, username=None, device=None,
                             email=None, location=None, ip=None,
                             partnership_name=None, reset_token=None, partnership_logo=None, **kwargs):
    """ send suspicious email notification
    """

    logo = partnership_logo

    if not partnership_logo:
        from buyercall.blueprints.partnership.models import Partnership
        default_partner = Partnership.query.filter(Partnership.id == 1).first()
        logo = default_partner.logo

    # Get the UTC timezone
    utc_timezone = pytz.timezone('UTC')

    # Get the Eastern Standard Time (EST) timezone
    est_timezone = pytz.timezone('US/Eastern')

    # Get the current time in UTC
    current_utc_time = datetime.now(tz=utc_timezone)

    # Convert to Eastern Standard Time (EST)
    current_est_time = current_utc_time.astimezone(est_timezone)

    formatted_date = current_est_time.strftime("%B %d, %Y")
    formatted_time = current_est_time.strftime("%H:%M")

    ctx = {'device': device,
           "partnership_name": partnership_name,
           "reset_token": reset_token,
           "location": location,
           "email": email,
           'username': username,
           'subject': subject,
           "date": f"{formatted_date}, {formatted_time} EST",
           "ip": ip,
           "logo": logo,
           }
    # Render html template for email
    new_ip_notification = _try_renderer_template('user/mail/login_new_os_notification',
                                                 ext='html', **ctx)
    send_ses_notification(recipients=[email],
                          subject=subject,
                          sender=current_app.config['SES_EMAIL_SOURCE'],
                          html=new_ip_notification)
    return None


@celery.task
def send_support_new_os_notification(subject=None, device=None,
                                     email=None, location=None, ip=None,
                                     partnership_name=None, partnership_logo=None, **kwargs):
    """ send suspicious email notification
    """
    logo = partnership_logo

    if not partnership_logo:
        from buyercall.blueprints.partnership.models import Partnership
        default_partner = Partnership.query.filter(Partnership.id == 1).first()
        logo = default_partner.logo

    # Get the UTC timezone
    utc_timezone = pytz.timezone('UTC')

    # Get the Eastern Standard Time (EST) timezone
    est_timezone = pytz.timezone('US/Eastern')

    # Get the current time in UTC
    current_utc_time = datetime.now(tz=utc_timezone)

    # Convert to Eastern Standard Time (EST)
    current_est_time = current_utc_time.astimezone(est_timezone)

    formatted_date = current_est_time.strftime("%B %d %Y")
    formatted_time = current_est_time.strftime("%H:%M")

    ctx = {'device': device,
           "partnership_name": partnership_name,
           "location": location,
           "email": email,
           'subject': subject,
           "date": f"{formatted_date}, {formatted_time} EST",
           "ip": ip,
           "logo": logo
           }
    # Render html template for email
    new_ip_notification = _try_renderer_template('user/mail/login_support_new_os_notification',
                                                 ext='html', **ctx)
    send_ses_notification(recipients=current_app.config['SUPPORT_EMAILS'],
                          subject=subject,
                          sender=current_app.config['SES_EMAIL_SOURCE'],
                          html=new_ip_notification)
    return None


@celery.task
def send_password_expiry_email():
    """ Send password expiry notification emails """
    # This feature is deactivated for now
    # User().logic_password_expiry()
    pass
    return {}


@celery.task()
def send_2fa_email_verification(user_email, user_name, verification_code, p_id=None, **kwargs):
    """
    Send a 2fa verification email to user.

    :param user_email: The user email
    :type user_email: str
    :param user_name: The user's name
    :type user_name: str
    :param verification_code: The verification code
    :type verification_code: str
    :param p_id: The partner id
    :type p_id: int
    :return: None
    """
    ctx = vars()
    from buyercall.blueprints.partnership.models import Partnership
    default_partner = Partnership.query.filter(Partnership.id == 1).first()
    ctx['logo'] = default_partner.logo
    ctx['partner'] = default_partner.name
    if type(user_email) == str:
        user_email = [user_email]
    if user_name:
        ctx['user'] = user_name
    sender = ''
    p_name = default_partner.name
    if p_id:
        from buyercall.blueprints.partnership.models import Partnership
        partner = Partnership.query.filter(Partnership.id == p_id).first()
        if partner:
            ctx['logo'] = partner.logo
            ctx['partner'] = partner.name
            if partner.email_sender:
                sender = partner.email_sender
            p_name = partner.name
    ctx['verification_code'] = verification_code

    # Render html template for email
    verification_code_template = _try_renderer_template('user/mail/verification_code', ext='html', **ctx)
    subject = 'Your verification code'
    send_ses_email(user_email, p_id, subject=subject, html=verification_code_template,
                   sender=sender, account_name=p_name)

    return None