File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/blueprints/user/tasks.py
import pytz
from datetime import datetime
from flask import current_app, url_for
from buyercall.app import create_celery_app
from buyercall.blueprints.user.models import User
from buyercall.lib.flask_mailplus import _try_renderer_template
from buyercall.lib.util_ses_email import send_ses_email, send_ses_notification
celery = create_celery_app(current_app)
@celery.task()
def deliver_password_reset_email(user_id, reset_token, partnership_name='', partnership_logo='', **kwargs):
"""
Send a reset password e-mail to a user.
:param user_id: The user id
:type user_id: int
:param reset_token: The reset token
:type reset_token: str
:param partnership_name: partnership name
:type partnership_name: str
:param partnership_logo: The reset token
:type partnership_logo: str
:return: None if a user was not found
"""
user = User.query.get(user_id)
if user is None:
return
ctx = {'user': user, 'reset_token': reset_token, 'company': partnership_name, 'logo': partnership_logo}
# Render html template for email
password_reset_email_template = _try_renderer_template('user/mail/password_reset', ext='html', **ctx)
send_ses_email(recipients=[user.email],
p_id=user.partnership_id,
subject='{} - Password reset link'.format(partnership_name),
html=password_reset_email_template
)
return None
@celery.task()
def send_agent_invitation_email(user_id, reset_token, partnership_name, partnership_logo, partner_url, **kwargs):
"""
Send an invitation e-mail to an agent.
:param user_id: The user id
:type user_id: int
:param reset_token: The reset token
:type reset_token: str
:param reset_token: The reset token
:type partnership_name: str
:return: partner_name will be used in subject of the email
:type partnership_logo: str
:return: partnership_logo will be used as logo for email
:type partner_url: str
:return: partner_ulr will be used as partner url
"""
user = User.query.get(user_id)
if user is None:
return
if partner_url:
partnership_url = '{}/account/agent_invite?reset_token={}'.format(partner_url, reset_token)
else:
partnership_url = url_for('agents.agent_invite', reset_token=reset_token, _external=True)
ctx = {
'user': user,
'reset_token': reset_token,
'company': partnership_name,
'logo': partnership_logo,
'url': partnership_url
}
# Render html template for email
agent_invite_email_template = _try_renderer_template('agents/mail/invitation', ext='html', **ctx)
send_ses_email(recipients=[user.email],
p_id=user.partnership_id,
subject='Invitation to {}'.format(partnership_name),
html=agent_invite_email_template
)
return None
@celery.task()
def send_partners_invitation_email(user_id, reset_token, partnership_name, **kwargs):
"""
Send an invitation e-mail to an agent.
:param user_id: The user instance id
:type user_id: int
:param reset_token: The reset token
:type reset_token: str
:param reset_token: The reset token
:type partnership_name: str
:return: partnership_name will be used in subject of the email
"""
user = User.query.get(user_id)
if user is None:
return
ctx = {'user': user, 'reset_token': reset_token}
# Render html template for email
partner_invite_email_template = _try_renderer_template('partnership/mail/invitation', ext='html', **ctx)
send_ses_email(recipients=[user.email],
p_id=user.partnership_id,
subject='Invitation to {}'.format(partnership_name),
html=partner_invite_email_template
)
return None
@celery.task()
def send_partners_invitation_email_by_id(user_id, reset_token, partnership_name, **kwargs):
"""
Send an invitation e-mail to an agent.
:param user_id: The user id
:type user_id: int
:param reset_token: The reset token
:type reset_token: str
:param reset_token: The reset token
:type partnership_name: str
:return: partnership_name will be used in subject of the email
"""
user = User.query.get(user_id)
send_partners_invitation_email(user, reset_token, partnership_name)
return None
@celery.task
def send_failed_login_notification(location=None, ip=None, device=None, email=None, partnership_name=None,
partnership_account_name=None, partnership_logo=None, **kwargs):
""" send failed login email notification"""
# Get the UTC timezone
utc_timezone = pytz.timezone('UTC')
# Get the Eastern Standard Time (EST) timezone
est_timezone = pytz.timezone('US/Eastern')
# Get the current time in UTC
current_utc_time = datetime.now(tz=utc_timezone)
# Convert to Eastern Standard Time (EST)
current_est_time = current_utc_time.astimezone(est_timezone)
formatted_date = current_est_time.strftime("%B %d %Y")
formatted_time = current_est_time.strftime("%H:%M:%S")
logo = partnership_logo
if not partnership_logo:
from buyercall.blueprints.partnership.models import Partnership
default_partner = Partnership.query.filter(Partnership.id == 1).first()
logo = default_partner.logo
ctx = {
'location': location,
'ip': ip,
'device': device,
'user': email,
"date": f"{formatted_date}, {formatted_time} EST",
"partnership_name": partnership_name,
"partnership_account_name": partnership_account_name,
"logo": logo
}
# Render html template for email
login_failed_notify_admin = _try_renderer_template('user/mail/login_failed_limit_exceeded_admin',
ext='html', **ctx)
send_ses_notification(recipients=current_app.config.get('SUPPORT_EMAILS', []),
subject=f'Account Security: Multiple failed login attempts on {email}',
sender=current_app.config['SES_EMAIL_SOURCE'],
html=login_failed_notify_admin)
return None
@celery.task()
def deliver_password_expire_email(user_id, reset_token, username='', message="", **kwargs):
"""
Send a reset password e-mail to a user.
:param user_id: The user id
:type user_id: int
:param reset_token: The reset token
:type reset_token: str
:param username: partnership name
:type username: str
:param message: The reset token
"""
from buyercall.app import create_app
from buyercall.extensions import db
# Create a context for the database connection.
app = create_app()
db.app = app
with app.app_context():
user = User.query.get(user_id)
if user is None:
return
else:
if user.partnership:
partner_name = user.partnership.name
partner_logo = user.partnership.logo
else:
from buyercall.blueprints.partnership.models import Partnership
default_partner = Partnership.query.filter(Partnership.id == 1).first()
if default_partner:
partner_name = default_partner.name
partner_logo = default_partner.logo
ctx = {
'partner_name': partner_name,
'reset_token': reset_token,
'username': username,
'message': message,
'logo': partner_logo}
# Render html template for email
password_reset_email_template = _try_renderer_template('user/mail/password_expired', ext='html', **ctx)
send_ses_email(recipients=[user.email],
p_id=user.partnership_id,
subject='{} - Password reset link'.format(username),
html=password_reset_email_template
)
@celery.task
def send_new_os_notification(subject=None, username=None, device=None,
email=None, location=None, ip=None,
partnership_name=None, reset_token=None, partnership_logo=None, **kwargs):
""" send suspicious email notification
"""
logo = partnership_logo
if not partnership_logo:
from buyercall.blueprints.partnership.models import Partnership
default_partner = Partnership.query.filter(Partnership.id == 1).first()
logo = default_partner.logo
# Get the UTC timezone
utc_timezone = pytz.timezone('UTC')
# Get the Eastern Standard Time (EST) timezone
est_timezone = pytz.timezone('US/Eastern')
# Get the current time in UTC
current_utc_time = datetime.now(tz=utc_timezone)
# Convert to Eastern Standard Time (EST)
current_est_time = current_utc_time.astimezone(est_timezone)
formatted_date = current_est_time.strftime("%B %d, %Y")
formatted_time = current_est_time.strftime("%H:%M")
ctx = {'device': device,
"partnership_name": partnership_name,
"reset_token": reset_token,
"location": location,
"email": email,
'username': username,
'subject': subject,
"date": f"{formatted_date}, {formatted_time} EST",
"ip": ip,
"logo": logo,
}
# Render html template for email
new_ip_notification = _try_renderer_template('user/mail/login_new_os_notification',
ext='html', **ctx)
send_ses_notification(recipients=[email],
subject=subject,
sender=current_app.config['SES_EMAIL_SOURCE'],
html=new_ip_notification)
return None
@celery.task
def send_support_new_os_notification(subject=None, device=None,
email=None, location=None, ip=None,
partnership_name=None, partnership_logo=None, **kwargs):
""" send suspicious email notification
"""
logo = partnership_logo
if not partnership_logo:
from buyercall.blueprints.partnership.models import Partnership
default_partner = Partnership.query.filter(Partnership.id == 1).first()
logo = default_partner.logo
# Get the UTC timezone
utc_timezone = pytz.timezone('UTC')
# Get the Eastern Standard Time (EST) timezone
est_timezone = pytz.timezone('US/Eastern')
# Get the current time in UTC
current_utc_time = datetime.now(tz=utc_timezone)
# Convert to Eastern Standard Time (EST)
current_est_time = current_utc_time.astimezone(est_timezone)
formatted_date = current_est_time.strftime("%B %d %Y")
formatted_time = current_est_time.strftime("%H:%M")
ctx = {'device': device,
"partnership_name": partnership_name,
"location": location,
"email": email,
'subject': subject,
"date": f"{formatted_date}, {formatted_time} EST",
"ip": ip,
"logo": logo
}
# Render html template for email
new_ip_notification = _try_renderer_template('user/mail/login_support_new_os_notification',
ext='html', **ctx)
send_ses_notification(recipients=current_app.config['SUPPORT_EMAILS'],
subject=subject,
sender=current_app.config['SES_EMAIL_SOURCE'],
html=new_ip_notification)
return None
@celery.task
def send_password_expiry_email():
""" Send password expiry notification emails """
# This feature is deactivated for now
# User().logic_password_expiry()
pass
return {}
@celery.task()
def send_2fa_email_verification(user_email, user_name, verification_code, p_id=None, **kwargs):
"""
Send a 2fa verification email to user.
:param user_email: The user email
:type user_email: str
:param user_name: The user's name
:type user_name: str
:param verification_code: The verification code
:type verification_code: str
:param p_id: The partner id
:type p_id: int
:return: None
"""
ctx = vars()
from buyercall.blueprints.partnership.models import Partnership
default_partner = Partnership.query.filter(Partnership.id == 1).first()
ctx['logo'] = default_partner.logo
ctx['partner'] = default_partner.name
if type(user_email) == str:
user_email = [user_email]
if user_name:
ctx['user'] = user_name
sender = ''
p_name = default_partner.name
if p_id:
from buyercall.blueprints.partnership.models import Partnership
partner = Partnership.query.filter(Partnership.id == p_id).first()
if partner:
ctx['logo'] = partner.logo
ctx['partner'] = partner.name
if partner.email_sender:
sender = partner.email_sender
p_name = partner.name
ctx['verification_code'] = verification_code
# Render html template for email
verification_code_template = _try_renderer_template('user/mail/verification_code', ext='html', **ctx)
subject = 'Your verification code'
send_ses_email(user_email, p_id, subject=subject, html=verification_code_template,
sender=sender, account_name=p_name)
return None