File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/blueprints/user/models.py
import datetime
from collections import OrderedDict
from hashlib import md5
import logging as log
import jwt
import redis
import pytz
from flask import current_app
from flask_login import UserMixin
# from itsdangerous import TimedJSONWebSignatureSerializer
from itsdangerous import URLSafeTimedSerializer
from sqlalchemy import or_, and_
from sqlalchemy.sql import text
from sqlalchemy.ext.hybrid import hybrid_property
from buyercall.lib.util_sqlalchemy import ResourceMixin, AwareDateTime
from buyercall.blueprints.agents.models import Agent
from buyercall.extensions import db, bcrypt
from buyercall.blueprints.reports.models import ReportUserTie
log = log.getLogger(__name__)
DAYS = 86400 # The length of a day in seconds
class UserExternalApiAutoPostTie(ResourceMixin, db.Model):
__tablename__ = 'user_external_api_access_tie'
user_id = db.Column(db.Integer, db.ForeignKey('users.id', name='users_id_user_access_tie_fkey'),
primary_key=True, index=True)
external_api_service_provider_id = db.Column(
db.Integer,
db.ForeignKey('external_api_service_providers.id',
name='service_provider_id_user_access_tie_fkey'
),
primary_key=True
)
is_allowed = db.Column('is_allowed', db.Boolean(), nullable=False, server_default='0')
@classmethod
def get_existing_ties(cls, user_id):
"""
Returns a list of existing user/external service provider ties.
:return: list of user/external api service provider tie objects
"""
ties = UserExternalApiAutoPostTie.query.filter(UserExternalApiAutoPostTie.user_id == user_id).all()
return ties
@classmethod
def get_service_provider_access_state(cls, user_id, partnership_account_id, service_provider_name):
"""
Returns an existing user/external service provider tie.
:return: a user/external api service provider tie object
"""
service_provider_tie = None
from ..partnership.models import ExternalApiServiceProvidersPartnershipAccountTie, ExternalApiServiceProviders
service_provider = db.session\
.query(ExternalApiServiceProviders)\
.join(ExternalApiServiceProvidersPartnershipAccountTie)\
.filter(ExternalApiServiceProviders.name == service_provider_name)\
.filter(ExternalApiServiceProvidersPartnershipAccountTie.partnership_account_id == partnership_account_id,
ExternalApiServiceProvidersPartnershipAccountTie.active)\
.first()
if service_provider:
service_provider_tie = UserExternalApiAutoPostTie\
.query\
.filter(UserExternalApiAutoPostTie.user_id == user_id,
UserExternalApiAutoPostTie.external_api_service_provider_id == service_provider.id)\
.first()
return service_provider_tie
@classmethod
def set_service_provider_access_state(cls, user_id, partnership_account_id, service_provider_name, value):
"""
Sets a user/external service provider tie.
"""
from ..partnership.models import ExternalApiServiceProvidersPartnershipAccountTie, ExternalApiServiceProviders
service_provider = db.session\
.query(ExternalApiServiceProviders)\
.join(ExternalApiServiceProvidersPartnershipAccountTie)\
.filter(ExternalApiServiceProviders.name == service_provider_name)\
.filter(ExternalApiServiceProvidersPartnershipAccountTie.partnership_account_id == partnership_account_id)\
.first()
if service_provider:
service_provider_tie = UserExternalApiAutoPostTie\
.query\
.filter(UserExternalApiAutoPostTie.user_id == user_id,
UserExternalApiAutoPostTie.external_api_service_provider_id == service_provider.id)\
.first()
if service_provider_tie:
service_provider_tie.is_allowed = value
service_provider_tie.updated_on = datetime.datetime.now(pytz.utc)
else:
new_tie = UserExternalApiAutoPostTie()
new_tie.user_id = user_id
new_tie.external_api_service_provider_id = service_provider.id
new_tie.is_allowed = value
new_tie.created_on = datetime.datetime.now(pytz.utc)
new_tie.updated_on = datetime.datetime.now(pytz.utc)
db.session.add(new_tie)
db.session.commit()
@classmethod
def get_allowed_service_providers(cls, user_id, partnership_account_id):
"""
Returns a list of service providers the user has access to.
:return: a external api service provider list
"""
from ..partnership.models import ExternalApiServiceProvidersPartnershipAccountTie, ExternalApiServiceProviders
from buyercall.blueprints.form_leads.models import FormLog
service_providers = db.session\
.query(ExternalApiServiceProviders)\
.join(ExternalApiServiceProvidersPartnershipAccountTie)\
.filter(ExternalApiServiceProvidersPartnershipAccountTie.partnership_account_id == partnership_account_id,
ExternalApiServiceProvidersPartnershipAccountTie.active == True)\
.join(UserExternalApiAutoPostTie)\
.filter(UserExternalApiAutoPostTie.user_id == user_id, UserExternalApiAutoPostTie.is_allowed == True)\
.order_by(ExternalApiServiceProviders.name)\
.all()
return service_providers
@classmethod
def get_allowed_service_providers_with_form_log_count(cls, user_id, partnership_account_id, form_lead_id):
"""
Returns a list of service providers the user has access to including if form logs exist.
:return: a external api service provider list
"""
from ..partnership.models import ExternalApiServiceProvidersPartnershipAccountTie, ExternalApiServiceProviders
from buyercall.blueprints.form_leads.models import ExternalApiFormLeadPostTie
result = []
service_providers = db.session\
.query(ExternalApiServiceProviders.id, ExternalApiServiceProviders.name)\
.join(ExternalApiServiceProvidersPartnershipAccountTie)\
.filter(ExternalApiServiceProvidersPartnershipAccountTie.partnership_account_id == partnership_account_id,
ExternalApiServiceProvidersPartnershipAccountTie.active == True)\
.join(UserExternalApiAutoPostTie)\
.filter(UserExternalApiAutoPostTie.user_id == user_id, UserExternalApiAutoPostTie.is_allowed == True).all()
for provider in service_providers:
new_entry = []
new_entry.append(provider[0])
new_entry.append(provider[1])
logs_exist = ExternalApiFormLeadPostTie.existing_posts_for_lead(form_lead_id, provider[0])
new_entry.append(logs_exist)
result.append(new_entry)
return result
class User(UserMixin, ResourceMixin, db.Model):
ROLE = OrderedDict([
('guest', 'Guest'),
('member', 'Member'),
('admin', 'Admin'),
('agent', 'Agent'),
('partner', 'Partner'),
('limitsysadmin', 'Limited System Admin'),
('sysadmin', 'System Admin')
])
DEPARTMENT = OrderedDict([
('none', 'None'),
('sales', 'Sales'),
('internet sales', 'Internet Sales'),
('bdc', 'BDC'),
('service', 'Service'),
('management', 'Management'),
('client relationship', 'Client Relationship'),
('finance', 'Finance'),
('other', 'Other')
])
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
partnership_id = db.Column(db.Integer, db.ForeignKey('partnerships.id',
onupdate='CASCADE',
ondelete='CASCADE'),
index=True, nullable=True)
partnership_account_id = db.Column(db.Integer, db.ForeignKey('partnership_accounts.id',
onupdate='CASCADE',
ondelete='CASCADE'),
index=True, nullable=True)
# Relationships.
agent = db.relationship(Agent, backref='user', uselist=False, passive_deletes=True)
reports = db.relationship(
ReportUserTie, back_populates="user", cascade="all, delete-orphan")
# Authentication.
role = db.Column(db.Enum(*ROLE, name='role_types'),
index=True, nullable=False, server_default='member')
active = db.Column('is_active', db.Boolean(), nullable=False,
server_default='1')
deactivated_on = db.Column(db.DateTime(), nullable=True)
is_deactivated = db.Column(db.Boolean(), nullable=False,
server_default='0', default=False)
username = db.Column(db.String(24), unique=True, index=True)
email = db.Column(db.String(255), unique=True, index=True, nullable=False,
server_default='')
password = db.Column(db.String(128), nullable=False, server_default='')
# TOS agreement.
tos_agreement = db.Column('is_tos_agreed', db.Boolean(), nullable=False,
server_default='0')
# Hide certain features and links if set to false for user. Otherwise show everything.
# Features that will be hidden: Forms and Payment Settings and Progress Status on user details page
# Note, the link to the feature will be hidden and the feature functionality will still work
full_feature_access = db.Column('full_feature_access', db.Boolean(), nullable=False, server_default='1')
# Hide inbound and outbound routing feature links if set to false for user. Otherwise show everything.
# Features that will be hidden: Inbound routing
# Note, the link to the feature will be hidden and the feature functionality will still work
inbound_routing_access = db.Column('inbound_routing_access', db.Boolean(), nullable=False, server_default='1')
# Hide inbound and outbound routing feature links if set to false for user. Otherwise show everything.
# Features that will be hidden: Outbound routing
# Note, the link to the feature will be hidden and the feature functionality will still work
outbound_routing_access = db.Column('outbound_routing_access', db.Boolean(), nullable=False, server_default='1')
# Hide forms feature links if set to false for user. Otherwise show everything.
# Features that will be hidden: Forms
# Note, the link to the feature will be hidden and the feature functionality will still work
forms_access = db.Column('forms_access', db.Boolean(), nullable=False, server_default='1')
# Hide forms feature links if set to false for user. Otherwise show everything.
# Features that will be hidden: AutoPay Access
# Note, the link to the feature will be hidden and the feature functionality will still work
external_api_access = db.Column('external_api_access', db.Boolean(), nullable=False, server_default='0')
# Activity tracking.
sign_in_count = db.Column(db.Integer, nullable=False, default=0)
current_sign_in_on = db.Column(AwareDateTime())
current_sign_in_ip = db.Column(db.String(45))
last_sign_in_on = db.Column(AwareDateTime())
last_sign_in_ip = db.Column(db.String(45))
# User information
firstname = db.Column(
db.String(128), index=True, nullable=True, server_default=''
)
lastname = db.Column(
db.String(128), index=True, nullable=True, server_default=''
)
# Has BDC Status
external_bdc = db.Column(db.Boolean(), nullable=False,
server_default='0', default=False)
is_2fa_email = db.Column(db.Boolean(), nullable=False,
server_default='0')
is_2fa_sms = db.Column(db.Boolean(), nullable=False,
server_default='1')
@hybrid_property
def is_bdc_user(self):
"""
Whether or not the user account is a BDC user.
"""
if self.external_bdc:
return True
else:
return False
@hybrid_property
def name(self):
if not self.firstname and not self.lastname:
return None
return "{} {}".format(
'' if not self.firstname else self.firstname,
'' if not self.lastname else self.lastname
)
@name.expression
def name(cls):
if not cls.firstname and not cls.lastname:
return None
return '' if not cls.firstname else cls.firstname + '' + '' if not cls.lastname else cls.lastname
phonenumber = db.Column(db.String(20), nullable=True, server_default='')
company = db.Column(db.String(256), nullable=True, server_default='')
extension = db.Column(db.Integer)
title = db.Column(db.String(128))
department = db.Column(db.String(128))
# Onboarding confirmation fields
leads_onboard = db.Column(
db.Boolean, nullable=False, default=False, server_default='false'
)
agents_onboard = db.Column(
db.Boolean, nullable=False, default=False, server_default='false'
)
outbound_onboard = db.Column(
db.Boolean, nullable=False, default=False, server_default='false'
)
inbound_onboard = db.Column(
db.Boolean, nullable=False, default=False, server_default='false'
)
two_factor_auth_onboard = db.Column(
db.Boolean, nullable=False, default=False, server_default='false'
)
partnership_account_group_id = db.Column(
db.Integer,
db.ForeignKey(
'partnership_account_groups.id',
onupdate='CASCADE',
ondelete='CASCADE'
),
index=True,
nullable=True
)
# Locale.
locale = db.Column(db.String(5), nullable=False, server_default='en')
# Indicates if the user has two factor authentication enabled. If so sms verification will happen
two_factor_auth = db.Column(db.Boolean(), nullable=False, server_default='false')
# field to store password update date
password_updated_date = db.Column(db.DateTime(), nullable=True)
# saves the user agent of the user, as comma separated string
user_agent = db.Column(db.Text(), nullable=True, server_default='')
# saves ip of the user, as comma separated string
ip_address = db.Column(db.Text(), nullable=True, server_default='')
def __init__(self, **kwargs):
# Call Flask-SQLAlchemy's constructor.
super(User, self).__init__(**kwargs)
self.password = User.encrypt_password(kwargs.get('password', ''))
@classmethod
def deactivate(cls, id, partnership_account_id):
"""
Deactivate user. The is_active flag will be set to false.
:return: bool
"""
user = User.query.filter(and_(User.id == id, User.partnership_account_id == partnership_account_id)).first()
if user is not None:
user.active = False
user.deactivated_on = datetime.now()
db.session.commit()
return True
else:
return False
@classmethod
def create(cls, title, firstname, lastname, extension, phonenumber, email, company,
department, role, username, partnership_account_id, password):
new_password = User.encrypt_password(password)
if new_password is not None:
if role not in ['guest', 'member', 'agent', 'partner']:
role = 'member'
new_user = User(
title=title, firstname=firstname, lastname=lastname, extension=extension,
phonenumber=phonenumber, email=email, company=company, department=department,
role=role, username=username, partnership_account_id=partnership_account_id,
password=new_password, tos_agreement=False
)
db.session.add(new_user)
db.session.commit()
return True
return False
@classmethod
def update(cls, id, partnership_account_id, title, firstname, lastname, extension,
phonenumber, email, company, department, role):
"""
Update the user's details.
:return: bool
"""
user = User.query.filter(and_(User.id == id, User.partnership_account_id == partnership_account_id)).first()
if user is not None:
user.firstname = firstname
user.lastname = lastname
user.phonenumber = phonenumber
user.extension = extension
user.email = email
user.department = department
user.title = title
user.company = company
if role not in ['guest', 'member', 'agent', 'partner']:
role = 'member'
user.role = role
db.session.commit()
return True
return False
@classmethod
def search(cls, query):
"""
Search a resource by 1 or more fields.
:param query: Search query
:type query: str
:return: SQLAlchemy filter
"""
if not query:
return text('')
search_query = '%{0}%'.format(query)
search_chain = (User.email.ilike(search_query),
User.firstname.ilike(search_query),
User.lastname.ilike(search_query),
User.company.ilike(search_query))
return text(or_(*search_chain))
@classmethod
def find_by_identity(cls, identity):
"""
Find a user by their e-mail or username.
:param identity: Email or username
:type identity: str
:return: User instance
"""
return User.query.filter(
(User.email == identity) | (User.username == identity)
).filter(User.is_deactivated.is_(False)).first()
@classmethod
def encrypt_password(cls, plaintext_password):
"""
Hash a plaintext string using bcrypt.
:param plaintext_password: Password in plain text
:type plaintext_password: str
:return: str
"""
if plaintext_password:
return bcrypt.generate_password_hash(plaintext_password, 8).decode('utf-8')
return None
@classmethod
def deserialize_token(cls, token):
"""
Obtain a user from de-serializing a signed token.
:param token: Signed token.
:type token: str
:return: User instance or None
"""
# private_key = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY'])
private_key = current_app.config['SECRET_KEY']
try:
# decoded_payload = private_key.loads(token)
decoded_payload = jwt.decode(token, private_key, algorithms=['HS256'])
return User.find_by_identity(decoded_payload.get('user_email'))
except Exception:
return None
@classmethod
def is_last_admin(cls, user, new_role, new_active):
"""
Determine whether or not this user is the last admin account.
:param user: User being tested
:type user: User
:param new_role: New role being set
:type new_role: str
:param new_active: New active status being set
:type new_active: bool
:return: bool
"""
is_changing_roles = user.role == 'admin' and new_role != 'admin'
is_changing_active = user.active is True and new_active is None
if is_changing_roles or is_changing_active:
admin_count = User.query.filter(
User.role == 'admin',
User.partnership_id == user.partnership_id,
User.partnership_account_id == user.partnership_account_id
).count()
# active_count = User.query.filter(User.is_active is True).count()
if admin_count == 1: # or active_count == 1:
return True
return False
@classmethod
def is_last_partner(cls, user, new_role, new_active):
"""
Determine whether or not this user is the last admin account.
:param user: User being tested
:type user: User
:param new_role: New role being set
:type new_role: str
:param new_active: New active status being set
:type new_active: bool
:return: bool
"""
is_changing_roles = user.role == 'partner' and new_role != 'partner'
is_changing_active = user.active is True and new_active is None
if is_changing_roles or is_changing_active:
partner_count = User.query.filter(User.role == 'partner',
User.partnership_id == user.partnership_id).count()
# active_count = User.query.filter(User.is_active is True).count()
if partner_count == 1: # or active_count == 1:
return True
return False
@classmethod
def bulk_delete(cls, ids):
"""
Override the general bulk_delete method because we need to delete them
one at a time while also deleting them on Stripe.
:param ids: List of ids to be deleted
:type ids: list
:return: int
"""
delete_count = db.session.query(cls) \
.filter(cls.id.in_(ids)) \
.update({"active": False}, synchronize_session='fetch')
db.session.commit()
return delete_count
@classmethod
def deactivate_user(cls, id):
"""
Deactivates a user. Sets the is_deactivated to TRUE and sets the deactivated_on date.
:param id: User ID to be deactivated
:type id: int
:return: boolean
"""
result = False
user = User.query.filter(User.id == id).first()
if user:
user.is_deactivated = True
user.deactivated_on = datetime.datetime.now()
db.session.commit()
result = True
return result
@classmethod
def initialize_password_reset(cls, identity):
"""
Generate a token to reset the password for a specific user.
:param identity: User e-mail address or username
:type identity: str
:return: User instance
"""
u = User.find_by_identity(identity)
reset_token = u.serialize_token()
# This prevents circular imports.
from buyercall.blueprints.user.tasks import deliver_password_reset_email
from ..partnership.models import Partnership, PartnershipAccount
partner_account = PartnershipAccount.query. \
filter(PartnershipAccount.id == u.partnership_account_id).first()
if u.role == 'partner':
partner = Partnership.query.filter(Partnership.id == u.partnership_id).first()
partner_name = partner.name
elif u.role in ('admin', 'agent'):
partner = Partnership.query.filter(Partnership.id == partner_account.partnership_id).first()
else:
partner = Partnership.query.filter(Partnership.id == 1).first()
deliver_password_reset_email.delay(u.id, reset_token, partner.name, partner.logo)
return u
def is_active(self):
"""
Return whether or not the user account is active, this satisfies
Flask-Login by overwriting the default value.
:return: bool
"""
return self.active
def get_auth_token(self):
"""
Return the user's auth token. Use their password as part of the token
because if the user changes their password we will want to invalidate
all of their logins across devices. It is completely fine to use
md5 here as nothing leaks.
This satisfies Flask-Login by providing a means to create a token.
:return: str
"""
private_key = current_app.config['SECRET_KEY']
serializer = URLSafeTimedSerializer(private_key)
data = [str(self.id), md5(self.password).hexdigest()]
return serializer.dumps(data).decode('utf-8')
def authenticated(self, with_password=True, password=''):
"""
Ensure a user is authenticated, and optionally check their password.
:param with_password: Optionally check their password
:type with_password: bool
:param password: Optionally verify this as their password
:type password: str
:return: bool
"""
if with_password:
return bcrypt.check_password_hash(self.password, password)
return True
def serialize_token(self, expiration=3600):
"""
Sign and create a token that can be used for things such as resetting
a password or other tasks that involve a one off token.
:param expiration: Seconds until it expires, defaults to 1 hour
:type expiration: int
:return: JSON
"""
# private_key = current_app.config['SECRET_KEY']
# serializer = TimedJSONWebSignatureSerializer(private_key, expiration)
# return serializer.dumps({'user_email': self.email}).decode('utf-8')
private_key = current_app.config['SECRET_KEY']
payload = {
'user_email': self.email,
'exp': datetime.datetime.utcnow() + datetime.timedelta(seconds=expiration)
}
token = jwt.encode(payload, private_key, algorithm='HS256')
return token
def update_activity_tracking(self, ip_address):
"""
Update various fields on the user that's related to meta data on his
account, such as the sign in count and ip address, etc..
:param ip_address: IP address
:type ip_address: str
:return: SQLAlchemy commit results
"""
self.sign_in_count += 1
self.last_sign_in_on = self.current_sign_in_on
self.last_sign_in_ip = self.current_sign_in_ip
self.current_sign_in_on = datetime.datetime.now(pytz.utc)
self.current_sign_in_ip = ip_address
return self.save()
@hybrid_property
def partnership_or_partnership_account(self):
dep = 'partnership'
if self.role == 'admin':
dep = 'partnership_account'
return getattr(self, dep)
@hybrid_property
def subscription(self):
if self.partnership_account:
if self.partnership_account.subscription:
return self.partnership_account.subscription
if self.partnership_account.partnership and self.partnership_account.partnership.subscription:
return self.partnership_account.partnership.subscription
if self.partnership and self.partnership.subscription:
return self.partnership.subscription
return None
@hybrid_property
def is_partnership_account_user(self):
"""
Whether or not the user account is used in a partnership account or not.
"""
if self.partnership_account_id is not None and self.partnership_account_id > 0:
return True
else:
return False
@hybrid_property
def is_partnership_user(self):
"""
Whether or not the user account is used in a partnership or not.
"""
if self.partnership_id is not None and self.partnership_id > 0:
return True
else:
return False
@hybrid_property
def is_admin_user_with_groups(self):
"""
Whether or not the user account is used to view/acccess/change partnership account groups.
Checks: Must be a admin role, must have a partnership account ID and must belong to a group.
"""
from ..partnership.models import PartnershipAccountGroupTie
if self.role == 'admin' and self.partnership_account_group_id:
if self.partnership_account_id and self.partnership_account_id > 0:
group_tie_exists = PartnershipAccountGroupTie.query.filter(
and_(PartnershipAccountGroupTie.partnership_account_id == self.partnership_account_id,
PartnershipAccountGroupTie.partnership_account_group_id == self.partnership_account_group_id)
).count()
if group_tie_exists:
return True
return False
@hybrid_property
def is_viewing_partnership(self):
"""
Whether or not the user account is viewing a partnership account.
"""
# Declare redis config url
redis_db = redis.StrictRedis(host=current_app.config['REDIS_CONFIG_URL'],
port=current_app.config['REDIS_CONFIG_PORT'])
from ..partnership.models import PartnershipAccountGroupTie
try:
if self.role == 'admin' and self.partnership_account_group_id:
if self.partnership_account_id and self.partnership_account_id > 0:
group_tie_exists = PartnershipAccountGroupTie.query.filter(
and_(PartnershipAccountGroupTie.partnership_account_id == self.partnership_account_id,
PartnershipAccountGroupTie.partnership_account_group_id ==
self.partnership_account_group_id)
).count()
if group_tie_exists and group_tie_exists > 0:
key = 'partner-user-view:{}'.format(self.id)
if redis_db.exists(key) == 1:
result = int(redis_db.get(key))
if result > 0:
return True
return False
except Exception as e:
log.error('Error retrieving viewed partnership account.')
return False
@hybrid_property
def get_user_viewing_partnership_account_id(self):
"""
Whether or not the user account is currently viewing (acting on behalf) of a partnership account.
Returns -1 if not viewing, partnership account id if viewing.
"""
# Declare redis config url
redis_db = redis.StrictRedis(host=current_app.config['REDIS_CONFIG_URL'],
port=current_app.config['REDIS_CONFIG_PORT'])
key = 'partner-user-view:{}'.format(self.id)
result = -1
if redis_db.exists(key) == 1:
try:
result = int(redis_db.get(key))
except Exception as e:
log.error('Error retrieving viewed partnership account. Error: {}'.format(e))
result = -1
return result
@hybrid_property
def get_user_viewing_partnership_account_name(self):
"""
Returns the viewed partnership account name.
"""
# Declare redis config url
redis_db = redis.StrictRedis(host=current_app.config['REDIS_CONFIG_URL'],
port=current_app.config['REDIS_CONFIG_PORT'])
from ..partnership.models import PartnershipAccount
try:
result = ''
key = 'partner-user-view:{}'.format(self.id)
if redis_db.exists(key) == 1:
partnership_account_id = int(redis_db.get(key))
if partnership_account_id > 0:
account = PartnershipAccount.query.filter(PartnershipAccount.id == partnership_account_id).first()
if account is not None:
result = account.name
except Exception as e:
log.error('Error retrieving viewed partnership account name. Error: '.format(e))
result = ''
return result
@hybrid_property
def get_user_viewing_partnership_id(self):
"""
Returns the viewed partnership id.
"""
# Declare redis config url
redis_db = redis.StrictRedis(host=current_app.config['REDIS_CONFIG_URL'],
port=current_app.config['REDIS_CONFIG_PORT'])
from ..partnership.models import PartnershipAccount
try:
result = -1
key = 'partner-user-view:{}'.format(self.id)
if redis_db.exists(key) == 1:
partnership_account_id = int(redis_db.get(key))
if partnership_account_id > 0:
account = PartnershipAccount.query.filter(PartnershipAccount.id == partnership_account_id).first()
if account is not None:
result = account.partnership_id
except Exception as e:
log.error('Error retrieving viewed partnership id. Error: {}'.format(e))
result = -1
return result
@hybrid_property
def get_user_viewing_partnership_account_subscription_plan(self):
"""
Returns the viewed partnership account name.
"""
# Declare redis config url
redis_db = redis.StrictRedis(host=current_app.config['REDIS_CONFIG_URL'],
port=current_app.config['REDIS_CONFIG_PORT'])
from buyercall.blueprints.partnership.models import PartnershipAccount
try:
result = ''
key = 'partner-user-view:{}'.format(self.id)
if redis_db.exists(key) == 1:
partnership_account_id = int(redis_db.get(key))
if partnership_account_id > 0:
account = PartnershipAccount\
.query\
.filter(PartnershipAccount.id == partnership_account_id)\
.first()
if account is not None:
if account.subscription and account.subscription.plan:
result = account.subscription.plan
elif account.partnership.subscription and account.partnership.subscription.plan:
result = account.partnership.subscription.plan
elif account.partnership and account.partnership.subscription:
result = account.partnership.subscription.plan
except Exception as e:
log.error('Error retrieving viewed partnership account name. Error: '.format(e))
result = ''
return result
@classmethod
def set_user_viewing_partnership_account(cls, user_id, partnership_account_id):
"""
Sets the partnership account id of the account the user is viewing.
Set -1 if not viewing, partnership account id if viewing.
"""
# Declare redis config url
redis_db = redis.StrictRedis(host=current_app.config['REDIS_CONFIG_URL'],
port=current_app.config['REDIS_CONFIG_PORT'])
key = 'partner-user-view:{}'.format(user_id)
try:
redis_db.set(key, partnership_account_id)
redis_db.expire(key, DAYS)
result = True
except Exception as e:
log.error('Error setting viewed partnership account. Error: {}'.format(e))
result = False
return result
@classmethod
def is_in_same_group(cls, user_id, partnership_account_id):
"""
Whether or not the user account is in the same group as the passed partnership account id.
"""
from ..partnership.models import PartnershipAccountGroupTie
result = False
user = User.query.filter(User.id == user_id).first()
if user and user.role == 'admin' and user.partnership_account_group_id:
if user.partnership_account_id and user.partnership_account_id > 0:
try:
group_tie_user = PartnershipAccountGroupTie.query.filter(
and_(PartnershipAccountGroupTie.partnership_account_id == user.partnership_account_id,
PartnershipAccountGroupTie.partnership_account_group_id ==
user.partnership_account_group_id)
).first()
group_tie_other = PartnershipAccountGroupTie.query.filter(
and_(PartnershipAccountGroupTie.partnership_account_id == partnership_account_id,
PartnershipAccountGroupTie.partnership_account_group_id ==
user.partnership_account_group_id)
).first()
if group_tie_user and group_tie_user.partnership_account_group_id > 0:
if group_tie_other and group_tie_other.partnership_account_group_id > 0:
if group_tie_other.partnership_account_group_id == \
group_tie_user.partnership_account_group_id:
result = True
except Exception as e:
log.error('Error checking if user is in the same partnership account group. Error: {}'.format(e))
return result
@hybrid_property
def business_type(self):
"""
Retrieve the user business type from partnership account
"""
if self.partnership_account_id is not None:
# Import the partnership account model
from ..partnership.models import PartnershipAccount
partnership_account = PartnershipAccount \
.query \
.filter(PartnershipAccount.id == self.partnership_account_id) \
.first()
if partnership_account:
return partnership_account.business_type
else:
if self.partnership_id is not None:
# Import the partnership model
from ..partnership.models import Partnership
partnership = Partnership \
.query \
.filter(Partnership.id == self.partnership_id) \
.first()
if partnership:
return partnership.business_type
return ''
# password expiry functionality
@staticmethod
def check_password_match(hashed_password, plaintext_password):
"""
Verify if a plaintext password matches a hashed password.
:param plaintext_password: Password in plain text
:type plaintext_password: str
:param hashed_password: Hashed password
:type hashed_password: str
:return: bool
"""
return bcrypt.check_password_hash(hashed_password, plaintext_password)
def save_password_updated_date(self):
# Set password_updated_date to current date and time
self.password_updated_date = datetime.datetime.now(pytz.utc)
db.session.commit() # Save the changes to the database
@classmethod
def logic_password_expiry(cls):
from buyercall.app import create_app
from datetime import datetime, timedelta
# Create a context for the database connection.
app = create_app()
db.app = app
with app.app_context():
users = cls.query.filter(cls.is_deactivated.is_(False)).all()
password_expiry_days = current_app.config.get('PASSWORD_EXPIRY_DAYS', 60)
for user in users:
if user.password_updated_date:
# Calculate the date for sending the emails
expiry_date = user.password_updated_date + timedelta(days=password_expiry_days)
days_to_expire = (expiry_date - datetime.now()).days
if days_to_expire == 10:
# Send email 10 days before expiry
cls.initialize_password_expiry(
user, "Your password will expire in 10 days, please reset your password using the button below.")
elif days_to_expire == 1:
# Send notification email on the 1 day before expiry
cls.initialize_password_expiry(
user, "Your password will expire in 1 day, please reset your password using the button below.")
elif days_to_expire == 0:
# Send email on the day of expiry
cls.initialize_password_expiry(
user, "Your password has expired, please reset your password using the button below.")
else:
user.password_updated_date = datetime.now() - timedelta(days=password_expiry_days-11)
user.save()
return {}
@classmethod
def initialize_password_expiry(cls, user, message):
"""
Generate a token to reset the password for a specific user.
:param user: User obj
:type message: str
:return: User instance
"""
reset_token = user.serialize_token()
# This prevents circular imports.
from buyercall.blueprints.user.tasks import deliver_password_expire_email
if user.firstname and user.lastname:
name = f"{user.firstname} {user.lastname}"
elif user.firstname:
name = f"{user.firstname}"
elif user.username:
name = f"{user.username}"
else:
name = f"{user.email}"
deliver_password_expire_email.delay(user.id, reset_token, f"{name}", message)
return user
# function to get password expire status
@staticmethod
def get_password_expire_status(user):
from datetime import datetime, timedelta
from flask import current_app as app
"""
function to get password expire status
"""
if user.password_updated_date:
expiry_date = user.password_updated_date + timedelta(days=app.config.get('PASSWORD_EXPIRY_DAYS', 60))
status = False if datetime.now() >= expiry_date else True
else:
status = False
return status
# ip save function
def save_user_agent(self, os_value=None):
"""
Save the IP address to the os_list field if it doesn't exist
"""
if self.user_agent:
# Split the existing IP addresses by commas
os_lists = self.user_agent.split('<!!>')
# Check if the IP address already exists
if os_value in os_lists:
status = False
else:
# Append the new IP address to the existing values
os_lists.append(os_value)
self.user_agent = '<!!>'.join(os_lists)
status = True
else:
# First IP address, set it directly
self.user_agent = os_value
status = True
# Commit the changes to the database
db.session.commit()
return status
def save_ip_address(self, ip_address):
"""
Save the IP address to the ip_address_list field if it doesn't exist
"""
if self.ip_address:
# Split the existing IP addresses by commas
ip_addresses = self.ip_address.split(',')
# Check if the IP address already exists
if ip_address in ip_addresses:
status = False
else:
# Append the new IP address to the existing values
ip_addresses.append(ip_address)
self.ip_address = ','.join(ip_addresses)
status = True
else:
# First IP address, set it directly
self.ip_address = ip_address
status = True
# Commit the changes to the database
db.session.commit()
return status
@staticmethod
def get_partnership_name(user):
from buyercall.blueprints.partnership.models import Partnership
partnership = Partnership.query.get(user.partnership_id) if user.partnership_id else None
return partnership.name if partnership else None