File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/blueprints/user/views.py
import copy
import uuid
import logging
import traceback
import requests
from sqlalchemy import func
from flask import (
Blueprint,
redirect,
request,
flash,
url_for,
render_template,
current_app,
session)
from buyercall.blueprints.activity.models import ActivityType, ActivityName, ActivityLogs
from buyercall.blueprints.user.decorators import role_required
from flask_login import (
login_required,
login_user,
current_user,
logout_user)
from flask_babel import gettext as _
from buyercall.lib.safe_next_url import safe_next_url
from buyercall.lib.util_two_factor_auth import send_verification_code
from buyercall.blueprints.user.decorators import anonymous_required
from buyercall.blueprints.user.forms import (
LoginForm,
TwoFactorAuthForm,
BeginPasswordResetForm,
PasswordResetForm,
SignupForm,
UpdateCredentials,
UpdatePersonalDetails,
UpdateLocale,
UpdatePhoneNumber,
UpdateSecurity)
from buyercall.extensions import bcrypt, db
from buyercall.blueprints.agents.models import Agent
from buyercall.blueprints.widgets.models import Widget
from buyercall.blueprints.filters import format_phone_number
from buyercall.blueprints.user.models import User
from buyercall.blueprints.partnership.models import Partnership, PartnershipAccount
from buyercall.lib.supervisor_manager import (
current_supervisor_user,
login_user as supervisor_login_user,
logout_user as supervisor_logout_user,
login_required as supervisor_login_required)
from buyercall.blueprints.filters import format_phone_number_bracket
from buyercall.blueprints.user.utilities.failed_login_notify import FailedLoginNotifier
log = logging.getLogger(__name__)
user = Blueprint('user', __name__, template_folder='templates')
@user.route('/login', methods=['GET', 'POST'])
@anonymous_required()
def login():
domain = request.host.replace('www', '')
white_label = False
primary_partner = Partnership.query.filter(Partnership.id == 1).first()
partner_logo = primary_partner.logo
partner_name = primary_partner.name
partner_url = primary_partner.partner_url
partner_custom_styles = primary_partner.custom_styles
partners = Partnership.query.filter(Partnership.active.is_(True)).all()
for partner in partners:
if partner.partner_url and partner.id != 1:
partner_domain = partner.partner_url.replace('https://', '').replace('www', '')
if partner_domain == domain:
partner_url = partner_domain
white_label = True
if partner.logo:
partner_logo = partner.logo
if partner.custom_styles:
partner_custom_styles = partner.custom_styles
partner_name = partner.name
form = LoginForm(next=request.args.get('next'))
next_url = request.form.get('next')
remember = request.form.get('remember', False)
if remember == 'y':
remember = True
else:
remember = False
recaptcha_response = request.form.get('g-recaptcha-response', None)
if not recaptcha_response:
return render_template('user/login.jinja2', white_label=white_label, partner_logo=partner_logo,
partner_name=partner_name, partner_url=partner_url,
partner_custom_styles=partner_custom_styles, form=form,
site_key=current_app.config.get('RC_SITE_KEY_V3'))
verify_url = 'https://www.google.com/recaptcha/api/siteverify'
response = requests.post(
f"{verify_url}?secret={current_app.config.get('RC_SECRET_KEY_V3')}&response={recaptcha_response}").json()
if not response.get('success', False):
log.info('Please add a valid Google ReCaptcha keys, or add your domain to the ReCaptcha admin console.')
return render_template('user/login.jinja2', white_label=white_label, partner_logo=partner_logo,
partner_name=partner_name, partner_url=partner_url,
partner_custom_styles=partner_custom_styles, form=form,
site_key=current_app.config.get('RC_SITE_KEY_V3'))
if form.validate_on_submit():
user_email = request.form.get('identity')
u = User.find_by_identity(user_email.lower())
if u is None:
flash(_(f'The email address {user_email} does not exist. Please try again or contact support.'), 'danger')
return render_template('user/login.jinja2', white_label=white_label, partner_logo=partner_logo,
partner_name=partner_name, partner_url=partner_url,
partner_custom_styles=partner_custom_styles, form=form,
site_key=current_app.config.get('RC_SITE_KEY_V3'))
if FailedLoginNotifier(u).is_blocked():
return render_template('user/login.jinja2', white_label=white_label, partner_logo=partner_logo,
partner_name=partner_name, partner_url=partner_url,
partner_custom_styles=partner_custom_styles, form=form,
site_key=current_app.config.get('RC_SITE_KEY_V3'))
try:
if u and u.authenticated(password=request.form.get('password')):
# As you can see remember me is always enabled, this was a design
# decision I made because more often than not users want this
# enabled. This allows for a less complicated login form.
#
# If however you want them to be able to select whether they
# should remain logged in then perform the following 3 steps:
# 1) Replace 'True' below with: request.form.get('remember', False)
# 2) Uncomment the 'remember' field in user/forms.py#LoginForm
# 3) Add a checkbox to the login form with the id/name 'remember'
# Set session variable to be identified during 2FA and TOS
session["user_identify"] = u.email.lower()
# logic to save ip address
u.save_ip_address(ip_address=request.remote_addr)
# start logic to save user-agent
new_os_status = u.save_user_agent(os_value=request.user_agent.string)
if new_os_status:
from buyercall.blueprints.user.tasks import send_new_os_notification, send_support_new_os_notification
from user_agents import parse
from buyercall.blueprints.user.utilities.get_request_location import UserUtilities
# fetch user-agent details
user_agent = parse(request.user_agent.string)
# get partnership name from user
partnership_name = u.get_partnership_name(u) or u.email
# get partnership logo
if u.partnership_id:
partnership_logo = u.partnership.logo
else:
partnership_logo = None
# get location details based on ip
user_ip = request.environ.get('HTTP_X_FORWARDED_FOR') \
if request.environ.get('HTTP_X_FORWARDED_FOR') else request.remote_addr
data = UserUtilities().get_request_complete_details(user_ip)
# password reset url
reset_token = u.serialize_token()
# send email for new user-agent to user
send_new_os_notification.delay(username=u.name or u.email,
partnership_name=partnership_name,
reset_token=reset_token,
location=f"{data.get('city')},"
f" {data.get('regionName')},"
f" {data.get('country')}" if data else None,
subject=f"New login alert to {partnership_name},"
f" from {user_agent.browser.family} on {user_agent.os.family}",
device=f"{user_agent.browser.family}{user_agent.browser.version_string},"
f" {user_agent.os.family}{user_agent.os.version_string}",
email=u.email,
ip=data.get('query'),
partnership_logo=partnership_logo)
"""
# send email for new user-agent to support
send_support_new_os_notification.delay(partnership_name=partnership_name,
location=f"{data.get('city')},"
f" {data.get('regionName')},"
f" {data.get('country')}" if data else None,
subject=f"New login alert to {partnership_name},"
f" from {user_agent.browser.family} on {user_agent.os.family} ",
device=f"{user_agent.browser.family}{user_agent.browser.version_string},"
f" {user_agent.os.family}{user_agent.os.version_string}",
email=u.email,
ip=data.get('query'),
partnership_logo=partnership_logo)
"""
# end start logic to save user-agent
# start password expiry check
reset_token = u.serialize_token()
password_status = u.get_password_expire_status(u)
if not password_status:
# Construct the URL with the reset_token as a query parameter
reset_url = url_for('user.password_reset') + f"?reset_token={reset_token}"
# Redirect to the reset URL
flash(_('Your password has expired, please reset your password'), 'danger')
return redirect(reset_url)
# end password expiry check
# Check to see if the user has done security on-boarding yet
if not u.two_factor_auth_onboard:
return redirect(url_for('user.additional_user_security_onboarding',
user_id=u.id,
url=next_url,
remember=remember))
# Has the user verified their account with two-factor authentication
# if two_factor_auth is set to True
if u.two_factor_auth:
ActivityLogs.add_log(u.id, ActivityType.AUTHORIZATION, ActivityName.LOGIN_SUCCESS, request)
send_verification_code(u)
return redirect(url_for('user.two_factor_auth',
user_id=u.id,
url=next_url,
remember=remember))
# Has the user agreed to tos
if not u.tos_agreement:
ActivityLogs.add_log(u.id, ActivityType.AUTHORIZATION, ActivityName.LOGIN_SUCCESS, request)
return redirect(url_for('user.tos_get',
user_id=u.id,
url=next_url,
remember=remember))
# Login the user and redirect them to the appropriate page
if u.active and login_user(u, remember=remember):
ActivityLogs.add_log(u.id, ActivityType.AUTHORIZATION, ActivityName.LOGIN_SUCCESS, request)
u.update_activity_tracking(request.remote_addr)
# Reset failed login attempts on successful login
FailedLoginNotifier(u).reset_attempt()
# Handle optionally redirecting to the next URL safely
if next_url:
return redirect(safe_next_url(next_url))
if current_user.role in ['sysadmin', 'limitsysadmin',
'partner'] and not current_user.is_admin_user_with_groups:
return redirect(url_for('admin.dashboard'))
elif current_user.role in ['admin'] and current_user.is_admin_user_with_groups \
and current_user.is_viewing_partnership \
and current_user.get_user_viewing_partnership_account_subscription_plan != 'partnershipsingle':
return redirect(url_for('dashboard.user_dashboard'))
elif current_user.role in ['admin'] and current_user.is_admin_user_with_groups \
and current_user.is_viewing_partnership \
and current_user.get_user_viewing_partnership_account_subscription_plan == 'partnershipsingle':
return redirect(url_for('contacts.contact_list'))
elif current_user.is_admin_user_with_groups:
return redirect(url_for('partnership.company_accounts'))
elif current_user.role in ['agent'] and current_user.subscription.plan != 'partnershipsingle':
return redirect(url_for('contacts.contact_list'))
elif current_user.subscription and current_user.subscription.plan == 'partnershipsingle':
return redirect(url_for('user.settings'))
else:
return redirect(url_for('dashboard.user_dashboard'))
else:
ActivityLogs.add_log(u.id, ActivityType.AUTHORIZATION, ActivityName.LOGIN_FAILED, request)
if FailedLoginNotifier(u).notify():
flash(_("We’ve detected 3 failed attempts to login. The account has been temporarily locked. "
"Please try again in 15 minutes."), 'danger')
else:
flash(_('This account has been disabled.'), 'danger')
else:
if u:
ActivityLogs.add_log(u.id, ActivityType.AUTHORIZATION, ActivityName.LOGIN_FAILED, request)
# Set user login failed attempts
if FailedLoginNotifier(u).notify():
flash(_("We’ve detected 3 failed attempts to login. The account has been temporarily locked. "
"Please try again in 15 minutes."), 'danger')
else:
flash(_('Email address or password is incorrect.'), 'danger')
# save email id posted to the request log error
from buyercall.blueprints.sysadmin.models import RequestLog
import json
request_id = request.environ.get('HTTP_X_REQUEST_ID')
# updating the request log with the error
RequestLog().update_record(
request_id, {"error": f"Login Failed with email: {request.form.get('identity').lower()}"})
flash(_('Identity or password is incorrect.'), 'danger')
except Exception as e:
print(f"The exception in login is {e}")
log.error(traceback.format_exc())
if u:
log.info('There is an issue with the password for user: {}'.format(u.email))
ActivityLogs.add_log(u.id, ActivityType.AUTHORIZATION, ActivityName.LOGIN_FAILED, request)
# Set user login failed attempts
if FailedLoginNotifier(u).notify():
flash(_("We’ve detected 3 failed attempts to login. The account has been temporarily locked. "
"Please try again in 15 minutes."), 'danger')
else:
flash(_("There is a problem with your password. Please reset your password by clicking on "
"the 'forget your password?' link below the submit button."), 'danger')
else:
flash(_("There is a problem with your password. Please reset your password by clicking on "
"the 'forget your password?' link below the submit button."), 'danger')
flash(_("There is a problem with your password. Please reset your password by clicking on "
"the 'forget your password?' link below the submit button."), 'danger')
return render_template('user/login.jinja2', white_label=white_label, partner_logo=partner_logo,
partner_name=partner_name, partner_url=partner_url,
partner_custom_styles=partner_custom_styles, form=form,
site_key=current_app.config.get('RC_SITE_KEY_V3'))
# Ask the user if they want to do 2-factor authentication onboarding page
# The user will be able to enable 2-factor authentication, which will be
# prompted every time they log in
@user.route('/additional_user_security/<int:user_id>', methods=['GET', 'POST'])
def additional_user_security_onboarding(user_id):
# Find the current active user in the database
active_user = User.query.filter(User.id == user_id, User.is_deactivated.is_(False)).first()
if active_user and active_user.email.lower() == session.get('user_identify'):
choice_len = 0
domain = request.host.replace('www', '')
primary_partner = Partnership.query.filter(Partnership.id == 1).first()
partner_url = primary_partner.partner_url
partner_custom_styles = primary_partner.custom_styles
partner_name = primary_partner.name
partners = Partnership.query.filter(Partnership.active.is_(True)).all()
for partner in partners:
if partner.partner_url and partner.id != 1:
partner_domain = partner.partner_url.replace('https://', '').replace('www', '')
if partner_domain == domain:
partner_url = partner_domain
if partner.custom_styles:
partner_custom_styles = partner.custom_styles
partner_name = partner.name
form = UpdateSecurity()
crt_user_phone = format_phone_number_bracket(active_user.phonenumber)
crt_user_email = active_user.email
sms_label = 'SMS - ' + crt_user_phone
email_label = 'Email - ' + crt_user_email
# Generate choices based on boolean fields from the database
choices = []
if active_user.role in ('admin', 'agent'):
if active_user.partnership.is_2fa_sms_enabled and active_user.partnership_account.is_2fa_sms_enabled:
choices.append(('field1', sms_label))
if active_user.partnership.is_2fa_email_enabled and active_user.partnership_account.is_2fa_email_enabled:
if active_user.partnership.is_2fa_sms_enabled and active_user.partnership_account.is_2fa_sms_enabled:
choices.append(('field2', email_label))
else:
choices.append(('field1', email_label))
elif active_user.role == 'partner':
if active_user.partnership.is_2fa_sms_enabled:
choices.append(('field1', sms_label))
if active_user.partnership.is_2fa_email_enabled:
if active_user.partnership.is_2fa_sms_enabled:
choices.append(('field2', email_label))
else:
choices.append(('field1', email_label))
elif active_user.role in ('sysadmin', 'limitsysadmin'):
choices.append(('field1', sms_label))
choices.append(('field2', email_label))
form.choice.choices = choices
if len(choices) > 1:
choice_len = 2
# Gather the next url from the request. This is the
# url the user intends to visit after doing the on-boarding
next_url = request.args.get('url')
# Get the 'remember me' boolean from the request
remember = request.args.get('remember', 'False')
if remember == 'True':
remember = True
else:
remember = False
if form.validate_on_submit():
if form.auth_method.data == 'SMS':
active_user.is_2fa_sms = True
active_user.is_2fa_email = False
elif form.auth_method.data == 'EMAIL':
active_user.is_2fa_sms = False
active_user.is_2fa_email = True
# Set the on-boarding field to true. If set to True
# the user will not see the on-boarding screen again
active_user.two_factor_auth_onboard = True
# By setting two_factor_auth True two-factor authentication will
# be enabled for the user
active_user.two_factor_auth = True
db.session.commit()
flash(_('Great. Two step verification has been enabled on your account. After'
' logging in you will be asked to verify your account through a verification code.'), 'success')
# Redirect the user back to log in them to use the two-step verification
return redirect(url_for('user.login'))
return render_template('user/two_factor_auth_onboarding.jinja2', form=form,
user=active_user,
next_url=next_url,
remember=remember,
partner_custom_styles=partner_custom_styles,
partner_url=partner_url,
partner_name=partner_name,
choice_len=choice_len
)
else:
flash(_('We were unable to identify your account. Please try again or contact support.'), 'danger')
# Redirect the user back to log in due to failing session check
return redirect(url_for('user.login'))
# This function gets called when the user decides to decline two-factor authentication
# on the on-boarding page
@user.route('/decline_additional_securty/<int:user_id>', methods=['GET', 'POST'])
def decline_additional_security(user_id):
active_user = User.query.filter(User.id == user_id, User.is_deactivated.is_(False)).first()
if active_user and active_user.email.lower() == session.get('user_identify'):
next_url = request.args.get('url')
remember = request.args.get('remember', 'False')
if remember == 'True':
remember = True
else:
remember = False
if request.method == 'POST':
active_user.two_factor_auth_onboard = True
active_user.two_factor_auth = False
db.session.commit()
# Has the user agreed to the terms and conditions
if active_user.active and active_user.tos_agreement is False:
return redirect(url_for('user.tos_get',
user_id=active_user.id,
url=next_url,
remember=remember))
# Login the user and redirect them to the appropriate page
"""
if active_user.active and login_user(active_user, remember=remember):
ActivityLogs.add_log(current_user.id, ActivityType.AUTHORIZATION, ActivityName.LOGIN_SUCCESS, request)
active_user.update_activity_tracking(request.remote_addr)
if next_url:
return redirect(safe_next_url(next_url))
if active_user.role in ['sysadmin', 'limitsysadmin', 'partner']:
return redirect(url_for('admin.dashboard'))
elif active_user.role in ['agent'] and current_user.subscription.plan != 'partnershipsingle':
return redirect(url_for('contacts.contact_list'))
elif active_user.subscription and active_user.subscription.plan == 'partnershipsingle':
return redirect(url_for('user.settings'))
else:
return redirect(url_for('dashboard.user_dashboard'))
else:
ActivityLogs.add_log(current_user.id, ActivityType.AUTHORIZATION, ActivityName.LOGIN_FAILED, request)
flash(_('This account has been disabled.'), 'danger')
return redirect(url_for('user.login'))
"""
flash(_('You have opted out of two step verification. You can enable it at any time under '
'your account settings, which we highly recommend. Please login to continue.'), 'warning')
return redirect(url_for('user.login'))
else:
flash(_('We were unable to decline your actions. Please try again or contact support.'), 'danger')
# Redirect the user back to log in due to failing session check
return redirect(url_for('user.login'))
"""
# This function allows the user to skip the two factor authentication on-boarding
# until the next time they log in.
@user.route('/skip_additional_security/<int:user_id>', methods=['GET', 'POST'])
def skip_additional_security(user_id):
active_user = User.query.filter(User.id == user_id, User.is_deactivated.is_(False)).first()
next_url = request.args.get('url')
remember = request.args.get('remember', 'False')
if remember == 'True':
remember = True
else:
remember = False
active_user.two_factor_auth_onboard = False
active_user.two_factor_auth = False
db.session.commit()
flash(_('You have opted to skip the two step verification setup for now. We will ask you again next '
'time you login.'), 'warning')
# Has the user agreed to the terms and conditions
#if active_user.active and active_user.tos_agreement is False:
# return redirect(url_for('user.tos_get',
# user_id=active_user.id,
# url=next_url,
# remember=remember))
# Login the user and redirect them to the appropriate page
if active_user.active and login_user(active_user, remember=remember):
ActivityLogs.add_log(current_user.id, ActivityType.AUTHORIZATION, ActivityName.LOGIN_SUCCESS, request)
active_user.update_activity_tracking(request.remote_addr)
if next_url:
return redirect(safe_next_url(next_url))
if active_user.role in ['sysadmin', 'limitsysadmin', 'partner']:
return redirect(url_for('admin.dashboard'))
elif active_user.role in ['agent'] and current_user.subscription.plan != 'partnershipsingle':
return redirect(url_for('contacts.contact_list'))
elif active_user.subscription.plan == 'partnershipsingle':
return redirect(url_for('user.settings'))
else:
return redirect(url_for('dashboard.user_dashboard'))
else:
ActivityLogs.add_log(current_user.id, ActivityType.AUTHORIZATION, ActivityName.LOGIN_FAILED, request)
flash(_('This account has been disabled.'), 'danger')
return redirect(url_for('user.login'))
"""
# This function will be called if two-factor authentication is enabled
# for the user. The user will need to enter a verification code
@user.route('/verification/<int:user_id>', methods=['POST', 'GET'])
def two_factor_auth(user_id):
active_user = User.query.filter(User.id == user_id, User.is_deactivated.is_(False)).first()
if active_user and active_user.email.lower() == session.get('user_identify'):
domain = request.host.replace('www', '')
primary_partner = Partnership.query.filter(Partnership.id == 1).first()
partner_url = primary_partner.partner_url
partner_custom_styles = primary_partner.custom_styles
partners = Partnership.query.filter(Partnership.active.is_(True)).all()
for partner in partners:
if partner.partner_url and partner.id != 1:
partner_domain = partner.partner_url.replace('https://', '').replace('www', '')
if partner_domain == domain:
partner_url = partner_domain
if partner.custom_styles:
partner_custom_styles = partner.custom_styles
primary_partner = Partnership.query.filter(Partnership.id == 1).first()
partner_name = primary_partner.name
user_partnership = Partnership.query.filter(Partnership.id == active_user.partnership_id).first()
if user_partnership is not None:
partner_name = user_partnership.name
next_url = request.args.get('url')
remember = request.args.get('remember', 'False')
if remember == 'True':
remember = True
else:
remember = False
user_number = format_phone_number_bracket(active_user.phonenumber)
form = TwoFactorAuthForm()
if form.validate_on_submit():
# Compare the code sent to the user and the one saved in the session
if request.form['code'] == session['verification_code']:
ActivityLogs.add_log(active_user.id, ActivityType.AUTHORIZATION, ActivityName.TWOFACTAUTH_SUCCESS, request)
# Has the user agreed to the terms and conditions
if active_user.active and active_user.tos_agreement is False:
return redirect(url_for('user.tos_get',
user_id=active_user.id,
url=next_url,
remember=remember))
# Login the user and redirect them to the appropriate page
if active_user.active and login_user(active_user, remember=remember):
active_user.update_activity_tracking(request.remote_addr)
# Handle optionally redirecting to the next URL safely.
if next_url:
return redirect(safe_next_url(next_url))
if current_user.role in ['sysadmin', 'limitsysadmin', 'partner']:
return redirect(url_for('admin.dashboard'))
elif current_user.role in ['agent'] and current_user.subscription.plan != 'partnershipsingle':
return redirect(url_for('contacts.contact_list'))
elif current_user.subscription.plan == 'partnershipsingle':
return redirect(url_for('user.settings'))
else:
return redirect(url_for('dashboard.user_dashboard'))
else:
flash(_('This account has been disabled.'), 'danger')
return redirect(url_for('user.login'))
else:
ActivityLogs.add_log(active_user.id, ActivityType.AUTHORIZATION, ActivityName.TWOFACTAUTH_FAILED, request)
flash(_('You have entered an incorrect verification code. Please try again.'), 'danger')
return redirect(url_for('user.two_factor_auth',
user_id=active_user.id,
url=next_url,
remember=remember))
# Allow the user to resend the verification code if they didn't
# receive it first time round
if request.method == 'POST':
if request.form['btn_resend_code'] == 'btn_resend_code':
try:
send_verification_code(active_user)
log.info('The remember me value is {}'.format(remember))
return redirect(url_for('user.two_factor_auth',
user_id=active_user.id,
url=next_url,
remember=remember))
except Exception as e:
log.debug('There was a problem sending a verification code for user id: {}'.format(active_user.id))
return render_template('user/two_factor_auth.jinja2',
partner_name=partner_name,
form=form,
user=active_user,
user_number=user_number,
next_url=next_url,
remember=remember,
partner_custom_styles=partner_custom_styles,
partner_url=partner_url,
partner_id=partner.id
)
else:
flash(_('We were unable to complete your actions. Please try again or contact support.'), 'danger')
# Redirect the user back to log in due to failing session check
return redirect(url_for('user.login'))
@user.route('/tos/<int:user_id>', methods=['GET'])
def tos_get(user_id):
print(session.get('user_identify'))
active_user = User.query.filter(User.id == user_id, User.is_deactivated.is_(False)).first()
if active_user and active_user.email.lower() == session.get('user_identify'):
domain = request.host.replace('www', '')
primary_partner = Partnership.query.filter(Partnership.id == 1).first()
partner_url = primary_partner.partner_url
partner_custom_styles = primary_partner.custom_styles
partners = Partnership.query.filter(Partnership.active.is_(True)).all()
for partner in partners:
if partner.partner_url and partner.id != 1:
partner_domain = partner.partner_url.replace('https://', '').replace('www', '')
if partner_domain == domain:
partner_url = partner_domain
if partner.custom_styles:
partner_custom_styles = partner.custom_styles
primary_partner = Partnership.query.filter(Partnership.id == 1).first()
partner_name = primary_partner.name
user_partnership = Partnership.query.filter(Partnership.id == active_user.partnership_id).first()
if user_partnership is not None:
partner_name = user_partnership.name
if active_user.is_deactivated is True:
flash(_('This account has been disabled.'), 'danger')
return redirect(url_for('user.login'))
else:
return render_template('user/tos.jinja2',
partner_name=partner_name,
user=active_user,
partner_url=partner_url,
partner_custom_styles=partner_custom_styles,
partner_id=partner.id)
else:
flash(_('We were unable to complete your TOS action. Please try again or contact support.'), 'danger')
# Redirect the user back to log in due to failing session check
return redirect(url_for('user.login'))
@user.route('/tos_agree/<int:user_id>', methods=['POST'])
def tos_agree_post(user_id):
active_user = User.query.filter(User.id == user_id, User.is_deactivated.is_(False)).first()
if active_user and active_user.email.lower() == session.get('user_identify'):
next_url = request.args.get('url')
remember = request.args.get('remember', 'False')
if remember == 'True':
remember = True
else:
remember = False
active_user.tos_agreement = True
ActivityLogs.add_log(active_user.id, ActivityType.AUTHORIZATION, ActivityName.LOGIN_ACCEPTTERMS, request)
db.session.commit()
# Login the user and redirect them to the appropriate page
if active_user.active and login_user(active_user, remember=remember):
active_user.update_activity_tracking(request.remote_addr)
# Handle optionally redirecting to the next URL safely.
if next_url:
flash(_('Thank you for accepting our Terms of Service.'), 'success')
return redirect(safe_next_url(next_url))
if current_user.role in ['sysadmin', 'limitsysadmin', 'partner']:
flash(_('Thank you for accepting our Terms of Service.'), 'success')
return redirect(url_for('admin.dashboard'))
elif current_user.role in ['agent'] and current_user.subscription.plan != 'partnershipsingle':
flash(_('Thank you for accepting our Terms of Service.'), 'success')
return redirect(url_for('contacts.contact_list'))
elif current_user.subscription.plan == 'partnershipsingle':
flash(_('Thank you for accepting our Terms of Service.'), 'success')
return redirect(url_for('user.settings'))
else:
flash(_('Thank you for accepting our Terms of Service.'), 'success')
return redirect(url_for('dashboard.user_dashboard'))
else:
flash(_('This account has been disabled.'), 'danger')
ActivityLogs.add_log(active_user.id, ActivityType.AUTHORIZATION, ActivityName.LOGIN_FAILED, request)
return redirect(url_for('user.login'))
else:
flash(_('We were unable to complete your TOS confirmation. Please try again or contact support.'), 'danger')
# Redirect the user back to log in due to failing session check
return redirect(url_for('user.login'))
@user.route('/tos_disagree/<int:user_id>', methods=['POST'])
def tos_disagree_post(user_id):
active_user = User.query.filter(User.id == user_id, User.is_deactivated.is_(False)).first()
if active_user and active_user.email.lower() == session.get('user_identify'):
active_user.tos_agreement = False
ActivityLogs.add_log(active_user.id, ActivityType.AUTHORIZATION, ActivityName.LOGIN_DECLINETERMS, request)
db.session.commit()
flash(_('We require acceptance of our Terms of Service to continue. '
'Please contact support if you have any questions.'), 'danger')
return redirect(url_for('user.login'))
else:
flash(_('We were unable to complete your TOS declaration. Please try again or contact support.'), 'danger')
return redirect(url_for('user.login'))
@user.route('/logout')
@login_required
def logout():
# logging out supervisor as well
if current_supervisor_user.is_authenticated:
supervisor_logout_user()
ActivityLogs.add_log(current_user.id, ActivityType.AUTHORIZATION, ActivityName.LOGOUT, request)
logout_user()
flash(_('You have been logged out.'), 'success')
return redirect(url_for('user.login'))
@user.route('/account/begin_password_reset', methods=['GET', 'POST'])
@anonymous_required()
def begin_password_reset():
domain = request.host.replace('www', '')
white_label = False
primary_partner = Partnership.query.filter(Partnership.id == 1).first()
partner_name = primary_partner.name
partner_url = primary_partner.partner_url
partner_custom_styles = primary_partner.custom_styles
partners = Partnership.query.filter(Partnership.active.is_(True)).all()
for partner in partners:
if partner.partner_url and partner.id != 1:
partner_domain = partner.partner_url.replace('https://', '').replace('www', '')
if partner_domain == domain:
partner_url = partner_domain
white_label = True
if partner.custom_styles:
partner_custom_styles = partner.custom_styles
partner_name = partner.name
form = BeginPasswordResetForm()
if form.validate_on_submit():
user_email = request.form.get('identity')
u = User.initialize_password_reset(user_email.lower())
log.info('The user email requesting a password reset is: {}'.format(user_email.lower()))
flash(_('An email has been sent to %(email)s.',
email=u.email), 'success')
return redirect(url_for('user.login'))
return render_template('user/begin_password_reset.jinja2', form=form, partner_name=partner_name,
white_label=white_label, partner_url=partner_url,
partner_custom_styles=partner_custom_styles)
@user.route('/account/password_reset', methods=['GET', 'POST'])
@anonymous_required()
def password_reset():
domain = request.host.replace('www', '')
white_label = False
primary_partner = Partnership.query.filter(Partnership.id == 1).first()
partner_name = primary_partner.name
partner_url = primary_partner.partner_url
partner_custom_styles = primary_partner.custom_styles
partners = Partnership.query.filter(Partnership.active.is_(True)).all()
for partner in partners:
if partner.partner_url and partner.id != 1:
partner_domain = partner.partner_url.replace('https://', '').replace('www', '')
if partner_domain == domain:
partner_url = partner_domain
white_label = True
if partner.custom_styles:
partner_custom_styles = partner.custom_styles
partner_name = partner.name
form = PasswordResetForm(reset_token=request.args.get('reset_token'))
if form.validate_on_submit():
u = User.deserialize_token(request.form.get('reset_token'))
if u is None:
flash(_('Your reset token has expired or was tampered with.'),
'danger')
return redirect(url_for('user.begin_password_reset'))
# start check old password is same as new password
status = User.check_password_match(u.password, request.form.get('password', None))
if status:
ActivityLogs.add_log(u.id, ActivityType.AUTHORIZATION, ActivityName.PASSWORDRESET_FAILED, request)
flash(_('your password is same as old password, please try a new one.'), 'danger')
# Construct the URL with the reset_token as a query parameter
reset_url = url_for('user.password_reset') + f"?reset_token={request.form.get('reset_token')}"
# Redirect to the reset URL
return redirect(reset_url)
# end check old password is same as new password
form.populate_obj(u)
u.password = User.encrypt_password(request.form.get('password', None))
u.save()
# save password updated date
u.save_password_updated_date()
ActivityLogs.add_log(u.id, ActivityType.AUTHORIZATION, ActivityName.PASSWORDRESET_SUCCESS, request)
#redirect to login page
flash(_('Your password has been reset.'), 'success')
return redirect(url_for('user.login'))
return render_template('user/password_reset.jinja2', form=form, white_label=white_label,
partner_name=partner_name, partner_url=partner_url,
partner_custom_styles=partner_custom_styles)
@user.route('/signup', methods=['GET', 'POST'])
@anonymous_required()
# TODO: review
def signup():
domain = request.host.replace('www', '')
white_label = False
primary_partner = Partnership.query.filter(Partnership.id == 1).first()
partner_name = primary_partner.name
partner_url = primary_partner.partner_url
partner_custom_styles = primary_partner.custom_styles
partners = Partnership.query.filter(Partnership.active.is_(True)).all()
for partner in partners:
if partner.partner_url and partner.id != 1:
partner_domain = partner.partner_url.replace('https://', '').replace('www', '')
if partner_domain == domain:
partner_url = partner_domain
white_label = True
if partner.custom_styles:
partner_custom_styles = partner.custom_styles
partner_name = partner.name
SECRET_KEY = 'OkxKhBp487R56qsyQNMnDgWvfh843'
current_app.config['RECAPTCHA_PUBLIC_KEY'] = current_app.config['RC_SITE_KEY']
current_app.config['RECAPTCHA_PRIVATE_KEY'] = current_app.config['RC_SECRET_KEY']
from buyercall.blueprints.widgets.default_settings import default
form = SignupForm()
partnership_token = request.args.get('partnership', '')
# if not partnership_token:
# partnership = Partnership.query.get(1) # Buyercall by default
# else:
partnership = Partnership.query \
.filter(Partnership.account_invitation_url_token == partnership_token) \
.first()
if not partnership:
log.error('No partnership found. Most likely because no partnership token was provided.')
return redirect(url_for('page.home'))
if form.is_submitted():
form_email = request.form.get('email').lower()
u = User.query \
.filter(func.lower(User.email) == form_email) \
.first()
account = None
was_deactivated = False
if u:
account = u.partnership_account
if not u.is_deactivated:
form.email.errors = {'Already exists'}
return render_template('user/signup.jinja2', form=form, partnership=partnership)
elif u.is_deactivated:
was_deactivated = True
u.is_deactivated = False
u.deactivated_on = None
db.session.commit()
else:
partnership_id = partnership.id
u = User()
account = PartnershipAccount()
account.business_type = partnership.business_type
account.name = u.email if not form.company else form.company.data
account.partnership_id = partnership_id
account.billing_type = partnership.default_billing_type
if account.billing_type in ['invoice','partnership']:
account.subscription_id = partnership.subscription_id
db.session.add(account)
db.session.commit()
partnership_account_id = account.id
form.populate_obj(u)
u.password = User.encrypt_password(request.form.get('password', None))
u.partnership_id = partnership_id
u.email = request.form.get('email').lower()
u.partnership_account_id = partnership_account_id
u.role = 'admin'
u.save()
# save password updated date
u.save_password_updated_date()
agent = Agent(
user_id=u.id,
firstname=u.firstname,
lastname=u.lastname,
email=u.email,
title=u.title or '',
department=u.department or '',
phonenumber=u.phonenumber,
mobile='',
extension=u.extension or None,
partnership_account_id=partnership_account_id
)
widget = Widget(
guid=str(uuid.uuid4()),
partnership_account_id=partnership_account_id,
name='Default',
options=default
)
db.session.add(agent)
db.session.add(widget)
db.session.commit()
if login_user(u):
if was_deactivated:
flash(_('Your account has been reactivated!'), 'success')
return render_template('user/settings.jinja2')
elif account.billing_type in ['invoice','partnership']:
if current_user.role == 'partner':
return redirect(url_for('user.settings'))
else:
return redirect(url_for('user.welcome'))
else:
flash(_('Awesome, thanks for signing up!'), 'success')
return redirect(url_for('billing.pricing'))
return render_template('user/signup.jinja2', form=form, partnership=partnership,
white_label=white_label, partner_name=partner_name,
partner_url=partner_url, partner_custom_styles=partner_custom_styles)
@user.route('/welcome', methods=['GET', 'POST'])
@login_required
def welcome():
agent = Agent.query \
.filter(Agent.partnership_account_id == current_user.partnership_account_id).first()
if agent:
phonenumber = format_phone_number(current_user.phonenumber)
flash(_("Great, you have signed up successfully!"), 'success')
return render_template('user/welcome.jinja2', agent=agent, phonenumber=phonenumber)
else:
return render_template('user/settings.jinja2')
@user.route('/settings')
@login_required
def settings():
if current_user.role in ['sysadmin', 'limitsysadmin']:
return render_template('sysadmin/settings.jinja2')
elif current_user.role == 'admin' and current_user.is_viewing_partnership:
return redirect(url_for('dashboard.user_dashboard'))
else:
return render_template('user/settings.jinja2')
@user.route('/settings/update_credentials', methods=['GET', 'POST'])
@login_required
def update_credentials():
form = UpdateCredentials(current_user, uid=current_user.id)
if form.validate_on_submit():
# saving old password on to a variable
old_password = current_user.password
# We cannot form.populate_obj() because the password is optional.
new_password = request.form.get('password', '')
current_user.email = request.form.get('email')
if new_password:
current_user.password = User.encrypt_password(new_password)
current_user.save()
# start check password is same
if old_password and new_password:
status = User.check_password_match(old_password, new_password)
if not status:
current_user.save_password_updated_date()
# end check password is same
ActivityLogs.add_log(current_user.id, ActivityType.AUTHORIZATION, ActivityName.LOGOUT, request)
logout_user()
flash(_('Your sign in settings have been updated. Please log in again with your new credentials.'), 'success')
return redirect(url_for('user.login'))
return render_template('user/update_credentials.jinja2', form=form)
@user.route('/settings/update_personaldetails', methods=['GET', 'POST'])
@login_required
def update_personaldetails():
form = UpdatePersonalDetails(current_user, uid=current_user.id)
if form.validate_on_submit():
form.populate_obj(current_user)
current_user.save()
flash(_('Your personal details have been updated.'), 'success')
return redirect(url_for('user.settings'))
return render_template('user/update_personaldetails.jinja2', form=form)
@user.route('/settings/update_locale', methods=['GET', 'POST'])
@login_required
def update_locale():
form = UpdateLocale(locale=current_user.locale)
if form.validate_on_submit():
form.populate_obj(current_user)
current_user.save()
flash(_('Your locale settings have been updated.'), 'success')
return redirect(url_for('user.settings'))
return render_template('user/update_locale.jinja2', form=form)
@user.route('/settings/update_security', methods=['GET', 'POST'])
@login_required
def update_security():
form = UpdateSecurity(two_factor_auth=current_user.two_factor_auth)
crt_user = User.query.filter(User.id == current_user.id).first()
if crt_user:
crt_user_phone = format_phone_number_bracket(crt_user.phonenumber)
crt_user_email = crt_user.email
sms_label = 'SMS - ' + crt_user_phone
email_label = 'Email - ' + crt_user_email
# Generate choices based on boolean fields from the database
choices = []
if crt_user.role in ('admin', 'agent'):
if crt_user.partnership.is_2fa_sms_enabled and crt_user.partnership_account.is_2fa_sms_enabled:
choices.append(('field1', sms_label))
if crt_user.partnership.is_2fa_email_enabled and crt_user.partnership_account.is_2fa_email_enabled:
if crt_user.partnership.is_2fa_sms_enabled and crt_user.partnership_account.is_2fa_sms_enabled:
choices.append(('field2', email_label))
else:
choices.append(('field1', email_label))
elif crt_user.role == 'partner':
if crt_user.partnership.is_2fa_sms_enabled:
choices.append(('field1', sms_label))
if crt_user.partnership.is_2fa_email_enabled:
if crt_user.partnership.is_2fa_sms_enabled:
choices.append(('field2', email_label))
else:
choices.append(('field1', email_label))
elif crt_user.role in ('sysadmin', 'limitsysadmin'):
choices.append(('field1', sms_label))
choices.append(('field2', email_label))
form.choice.choices = choices
# Preselect the choice based on the database values
if len(choices) > 1:
if crt_user.is_2fa_sms:
form.choice.data = 'field1'
elif crt_user.is_2fa_email:
form.choice.data = 'field2'
if form.validate_on_submit():
form.populate_obj(current_user)
if current_user.partnership:
if current_user.partnership.is_2fa_enforced:
current_user.two_factor_auth = True
current_user.two_factor_auth_onboard = True
# Update the database record based on the selected option
if form.auth_method.data == 'SMS':
current_user.is_2fa_sms = True
current_user.is_2fa_email = False
elif form.auth_method.data == 'EMAIL':
current_user.is_2fa_sms = False
current_user.is_2fa_email = True
current_user.save()
flash(_('Your security settings have been updated.'), 'success')
return redirect(url_for('user.settings'))
return render_template('user/update_security.jinja2', form=form)
@user.route('/settings/rest_api', methods=['GET'])
@login_required
@role_required('admin', 'partner')
def rest_api_view():
generated = False, ''
if current_user.is_partnership_account_user:
if current_user.partnership_account.api_token_hash:
generated = True
elif current_user.is_partnership_user:
if current_user.partnership.api_token_hash:
generated = True
return render_template('user/rest_api.jinja2', generated=generated)
@user.route('/settings/rest_api', methods=['POST'])
@login_required
@role_required('admin', 'partner')
def rest_api_update():
token = ''
if current_user.is_partnership_account_user:
partnership_account_id = current_user.partnership_account_id
partnership_account_group_viewing = current_user.is_viewing_partnership
# Check if being viewed by super partner
if partnership_account_group_viewing:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
token = current_user.partnership_account.regenerate_api_token(partnership_account_id)
elif current_user.is_partnership_user:
token = current_user.partnership.regenerate_api_token(current_user.partnership.id)
return render_template(
'user/rest_api.jinja2',
generated=True,
token=token
)
@user.route('/user/switch_back', methods=['GET'])
# @role_required('admin', 'partner', 'sysadmin')
@login_required
@supervisor_login_required
def switch_back():
target_user = copy.deepcopy(current_supervisor_user)
u = current_user
logout_user()
login_user(target_user, remember=False, force=True)
supervisor_logout_user()
return redirect(url_for('user.settings'))