HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/blueprints/user/models.py
import datetime
from collections import OrderedDict
from hashlib import md5
import logging as log

import jwt
import redis

import pytz
from flask import current_app

from flask_login import UserMixin
# from itsdangerous import TimedJSONWebSignatureSerializer
from itsdangerous import URLSafeTimedSerializer

from sqlalchemy import or_, and_
from sqlalchemy.sql import text
from sqlalchemy.ext.hybrid import hybrid_property

from buyercall.lib.util_sqlalchemy import ResourceMixin, AwareDateTime
from buyercall.blueprints.agents.models import Agent
from buyercall.extensions import db, bcrypt
from buyercall.blueprints.reports.models import ReportUserTie

log = log.getLogger(__name__)

DAYS = 86400  # The length of a day in seconds


class UserExternalApiAutoPostTie(ResourceMixin, db.Model):
    __tablename__ = 'user_external_api_access_tie'

    user_id = db.Column(db.Integer, db.ForeignKey('users.id', name='users_id_user_access_tie_fkey'),
                        primary_key=True, index=True)

    external_api_service_provider_id = db.Column(
        db.Integer,
        db.ForeignKey('external_api_service_providers.id',
                      name='service_provider_id_user_access_tie_fkey'
                      ),
        primary_key=True
    )

    is_allowed = db.Column('is_allowed', db.Boolean(), nullable=False, server_default='0')

    @classmethod
    def get_existing_ties(cls, user_id):
        """
        Returns a list of existing user/external service provider ties.

        :return: list of user/external api service provider tie objects
        """
        ties = UserExternalApiAutoPostTie.query.filter(UserExternalApiAutoPostTie.user_id == user_id).all()

        return ties

    @classmethod
    def get_service_provider_access_state(cls, user_id, partnership_account_id, service_provider_name):
        """
        Returns an existing user/external service provider tie.

        :return: a user/external api service provider tie object
        """
        service_provider_tie = None

        from ..partnership.models import ExternalApiServiceProvidersPartnershipAccountTie, ExternalApiServiceProviders
        service_provider = db.session\
            .query(ExternalApiServiceProviders)\
            .join(ExternalApiServiceProvidersPartnershipAccountTie)\
            .filter(ExternalApiServiceProviders.name == service_provider_name)\
            .filter(ExternalApiServiceProvidersPartnershipAccountTie.partnership_account_id == partnership_account_id,
                    ExternalApiServiceProvidersPartnershipAccountTie.active)\
            .first()

        if service_provider:
            service_provider_tie = UserExternalApiAutoPostTie\
                .query\
                .filter(UserExternalApiAutoPostTie.user_id == user_id,
                        UserExternalApiAutoPostTie.external_api_service_provider_id == service_provider.id)\
                .first()

        return service_provider_tie

    @classmethod
    def set_service_provider_access_state(cls, user_id, partnership_account_id, service_provider_name, value):
        """
        Sets a user/external service provider tie.
        """
        from ..partnership.models import ExternalApiServiceProvidersPartnershipAccountTie, ExternalApiServiceProviders
        service_provider = db.session\
            .query(ExternalApiServiceProviders)\
            .join(ExternalApiServiceProvidersPartnershipAccountTie)\
            .filter(ExternalApiServiceProviders.name == service_provider_name)\
            .filter(ExternalApiServiceProvidersPartnershipAccountTie.partnership_account_id == partnership_account_id)\
            .first()

        if service_provider:
            service_provider_tie = UserExternalApiAutoPostTie\
                .query\
                .filter(UserExternalApiAutoPostTie.user_id == user_id,
                        UserExternalApiAutoPostTie.external_api_service_provider_id == service_provider.id)\
                .first()

            if service_provider_tie:
                service_provider_tie.is_allowed = value
                service_provider_tie.updated_on = datetime.datetime.now(pytz.utc)
            else:
                new_tie = UserExternalApiAutoPostTie()
                new_tie.user_id = user_id
                new_tie.external_api_service_provider_id = service_provider.id
                new_tie.is_allowed = value
                new_tie.created_on = datetime.datetime.now(pytz.utc)
                new_tie.updated_on = datetime.datetime.now(pytz.utc)

                db.session.add(new_tie)

            db.session.commit()

    @classmethod
    def get_allowed_service_providers(cls, user_id, partnership_account_id):
        """
        Returns a list of service providers the user has access to.

        :return: a external api service provider list
        """
        from ..partnership.models import ExternalApiServiceProvidersPartnershipAccountTie, ExternalApiServiceProviders
        from buyercall.blueprints.form_leads.models import FormLog

        service_providers = db.session\
            .query(ExternalApiServiceProviders)\
            .join(ExternalApiServiceProvidersPartnershipAccountTie)\
            .filter(ExternalApiServiceProvidersPartnershipAccountTie.partnership_account_id == partnership_account_id,
                    ExternalApiServiceProvidersPartnershipAccountTie.active == True)\
            .join(UserExternalApiAutoPostTie)\
            .filter(UserExternalApiAutoPostTie.user_id == user_id, UserExternalApiAutoPostTie.is_allowed == True)\
            .order_by(ExternalApiServiceProviders.name)\
            .all()

        return service_providers

    @classmethod
    def get_allowed_service_providers_with_form_log_count(cls, user_id, partnership_account_id, form_lead_id):
        """
        Returns a list of service providers the user has access to including if form logs exist.

        :return: a external api service provider list
        """
        from ..partnership.models import ExternalApiServiceProvidersPartnershipAccountTie, ExternalApiServiceProviders
        from buyercall.blueprints.form_leads.models import ExternalApiFormLeadPostTie
        result = []

        service_providers = db.session\
            .query(ExternalApiServiceProviders.id, ExternalApiServiceProviders.name)\
            .join(ExternalApiServiceProvidersPartnershipAccountTie)\
            .filter(ExternalApiServiceProvidersPartnershipAccountTie.partnership_account_id == partnership_account_id,
                    ExternalApiServiceProvidersPartnershipAccountTie.active == True)\
            .join(UserExternalApiAutoPostTie)\
            .filter(UserExternalApiAutoPostTie.user_id == user_id, UserExternalApiAutoPostTie.is_allowed == True).all()

        for provider in service_providers:
            new_entry = []
            new_entry.append(provider[0])
            new_entry.append(provider[1])
            logs_exist = ExternalApiFormLeadPostTie.existing_posts_for_lead(form_lead_id, provider[0])
            new_entry.append(logs_exist)
            result.append(new_entry)

        return result


class User(UserMixin, ResourceMixin, db.Model):
    ROLE = OrderedDict([
        ('guest', 'Guest'),
        ('member', 'Member'),
        ('admin', 'Admin'),
        ('agent', 'Agent'),
        ('partner', 'Partner'),
        ('limitsysadmin', 'Limited System Admin'),
        ('sysadmin', 'System Admin')
    ])

    DEPARTMENT = OrderedDict([
        ('none', 'None'),
        ('sales', 'Sales'),
        ('internet sales', 'Internet Sales'),
        ('bdc', 'BDC'),
        ('service', 'Service'),
        ('management', 'Management'),
        ('client relationship', 'Client Relationship'),
        ('finance', 'Finance'),
        ('other', 'Other')
    ])

    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)

    partnership_id = db.Column(db.Integer, db.ForeignKey('partnerships.id',
                                                         onupdate='CASCADE',
                                                         ondelete='CASCADE'),
                               index=True, nullable=True)

    partnership_account_id = db.Column(db.Integer, db.ForeignKey('partnership_accounts.id',
                                                                 onupdate='CASCADE',
                                                                 ondelete='CASCADE'),
                                       index=True, nullable=True)

    # Relationships.
    agent = db.relationship(Agent, backref='user', uselist=False, passive_deletes=True)

    reports = db.relationship(
        ReportUserTie, back_populates="user", cascade="all, delete-orphan")

    # Authentication.
    role = db.Column(db.Enum(*ROLE, name='role_types'),
                     index=True, nullable=False, server_default='member')
    active = db.Column('is_active', db.Boolean(), nullable=False,
                       server_default='1')
    deactivated_on = db.Column(db.DateTime(), nullable=True)
    is_deactivated = db.Column(db.Boolean(), nullable=False,
                               server_default='0', default=False)
    username = db.Column(db.String(24), unique=True, index=True)
    email = db.Column(db.String(255), unique=True, index=True, nullable=False,
                      server_default='')
    password = db.Column(db.String(128), nullable=False, server_default='')

    # TOS agreement.
    tos_agreement = db.Column('is_tos_agreed', db.Boolean(), nullable=False,
                              server_default='0')

    # Hide certain features and links if set to false for user. Otherwise show everything.
    # Features that will be hidden: Forms and Payment Settings and Progress Status on user details page
    # Note, the link to the feature will be hidden and the feature functionality will still work
    full_feature_access = db.Column('full_feature_access', db.Boolean(), nullable=False, server_default='1')

    # Hide inbound and outbound routing feature links if set to false for user. Otherwise show everything.
    # Features that will be hidden: Inbound routing
    # Note, the link to the feature will be hidden and the feature functionality will still work
    inbound_routing_access = db.Column('inbound_routing_access', db.Boolean(), nullable=False, server_default='1')

    # Hide inbound and outbound routing feature links if set to false for user. Otherwise show everything.
    # Features that will be hidden: Outbound routing
    # Note, the link to the feature will be hidden and the feature functionality will still work
    outbound_routing_access = db.Column('outbound_routing_access', db.Boolean(), nullable=False, server_default='1')

    # Hide forms feature links if set to false for user. Otherwise show everything.
    # Features that will be hidden: Forms
    # Note, the link to the feature will be hidden and the feature functionality will still work
    forms_access = db.Column('forms_access', db.Boolean(), nullable=False, server_default='1')

    # Hide forms feature links if set to false for user. Otherwise show everything.
    # Features that will be hidden: AutoPay Access
    # Note, the link to the feature will be hidden and the feature functionality will still work
    external_api_access = db.Column('external_api_access', db.Boolean(), nullable=False, server_default='0')

    # Activity tracking.
    sign_in_count = db.Column(db.Integer, nullable=False, default=0)
    current_sign_in_on = db.Column(AwareDateTime())
    current_sign_in_ip = db.Column(db.String(45))
    last_sign_in_on = db.Column(AwareDateTime())
    last_sign_in_ip = db.Column(db.String(45))

    # User information
    firstname = db.Column(
        db.String(128), index=True, nullable=True, server_default=''
    )
    lastname = db.Column(
        db.String(128), index=True, nullable=True, server_default=''
    )

    # Has BDC Status
    external_bdc = db.Column(db.Boolean(), nullable=False,
                               server_default='0', default=False)

    is_2fa_email = db.Column(db.Boolean(), nullable=False,
                               server_default='0')

    is_2fa_sms = db.Column(db.Boolean(), nullable=False,
                             server_default='1')

    @hybrid_property
    def is_bdc_user(self):
        """
        Whether or not the user account is a BDC user.
        """
        if self.external_bdc:
            return True
        else:
            return False

    @hybrid_property
    def name(self):
        if not self.firstname and not self.lastname:
            return None
        return "{} {}".format(
            '' if not self.firstname else self.firstname,
            '' if not self.lastname else self.lastname
        )

    @name.expression
    def name(cls):
        if not cls.firstname and not cls.lastname:
            return None
        return '' if not cls.firstname else cls.firstname + '' + '' if not cls.lastname else cls.lastname

    phonenumber = db.Column(db.String(20), nullable=True, server_default='')
    company = db.Column(db.String(256), nullable=True, server_default='')
    extension = db.Column(db.Integer)
    title = db.Column(db.String(128))
    department = db.Column(db.String(128))

    # Onboarding confirmation fields
    leads_onboard = db.Column(
        db.Boolean, nullable=False, default=False, server_default='false'
    )
    agents_onboard = db.Column(
        db.Boolean, nullable=False, default=False, server_default='false'
    )
    outbound_onboard = db.Column(
        db.Boolean, nullable=False, default=False, server_default='false'
    )
    inbound_onboard = db.Column(
        db.Boolean, nullable=False, default=False, server_default='false'
    )
    two_factor_auth_onboard = db.Column(
        db.Boolean, nullable=False, default=False, server_default='false'
    )

    partnership_account_group_id = db.Column(
        db.Integer,
        db.ForeignKey(
            'partnership_account_groups.id',
            onupdate='CASCADE',
            ondelete='CASCADE'
        ),
        index=True,
        nullable=True
    )

    # Locale.
    locale = db.Column(db.String(5), nullable=False, server_default='en')

    # Indicates if the user has two factor authentication enabled. If so sms verification will happen
    two_factor_auth = db.Column(db.Boolean(), nullable=False, server_default='false')

    # field to store password update date
    password_updated_date = db.Column(db.DateTime(), nullable=True)

    # saves the user agent of the user, as comma separated string
    user_agent = db.Column(db.Text(), nullable=True, server_default='')

    # saves ip of the user, as comma separated string
    ip_address = db.Column(db.Text(), nullable=True, server_default='')

    def __init__(self, **kwargs):
        # Call Flask-SQLAlchemy's constructor.
        super(User, self).__init__(**kwargs)

        self.password = User.encrypt_password(kwargs.get('password', ''))

    @classmethod
    def deactivate(cls, id, partnership_account_id):
        """
        Deactivate user. The is_active flag will be set to false.

        :return: bool
        """
        user = User.query.filter(and_(User.id == id, User.partnership_account_id == partnership_account_id)).first()

        if user is not None:
            user.active = False
            user.deactivated_on = datetime.now()
            db.session.commit()
            return True
        else:
            return False

    @classmethod
    def create(cls, title, firstname, lastname, extension, phonenumber, email, company,
               department, role, username, partnership_account_id, password):

        new_password = User.encrypt_password(password)

        if new_password is not None:
            if role not in ['guest', 'member', 'agent', 'partner']:
                role = 'member'

            new_user = User(
                title=title, firstname=firstname, lastname=lastname, extension=extension,
                phonenumber=phonenumber, email=email, company=company, department=department,
                role=role, username=username, partnership_account_id=partnership_account_id,
                password=new_password, tos_agreement=False
            )
            db.session.add(new_user)
            db.session.commit()

            return True
        return False

    @classmethod
    def update(cls, id, partnership_account_id, title, firstname, lastname, extension,
               phonenumber, email, company, department, role):
        """
        Update the user's details.

        :return: bool
        """
        user = User.query.filter(and_(User.id == id, User.partnership_account_id == partnership_account_id)).first()

        if user is not None:
            user.firstname = firstname
            user.lastname = lastname
            user.phonenumber = phonenumber
            user.extension = extension
            user.email = email
            user.department = department
            user.title = title
            user.company = company

            if role not in ['guest', 'member', 'agent', 'partner']:
                role = 'member'

            user.role = role
            db.session.commit()

            return True
        return False

    @classmethod
    def search(cls, query):
        """
        Search a resource by 1 or more fields.

        :param query: Search query
        :type query: str
        :return: SQLAlchemy filter
        """
        if not query:
            return text('')

        search_query = '%{0}%'.format(query)
        search_chain = (User.email.ilike(search_query),
                        User.firstname.ilike(search_query),
                        User.lastname.ilike(search_query),
                        User.company.ilike(search_query))

        return text(or_(*search_chain))

    @classmethod
    def find_by_identity(cls, identity):
        """
        Find a user by their e-mail or username.

        :param identity: Email or username
        :type identity: str
        :return: User instance
        """
        return User.query.filter(
            (User.email == identity) | (User.username == identity)
        ).filter(User.is_deactivated.is_(False)).first()

    @classmethod
    def encrypt_password(cls, plaintext_password):
        """
        Hash a plaintext string using bcrypt.

        :param plaintext_password: Password in plain text
        :type plaintext_password: str
        :return: str
        """
        if plaintext_password:
            return bcrypt.generate_password_hash(plaintext_password, 8).decode('utf-8')

        return None

    @classmethod
    def deserialize_token(cls, token):
        """
        Obtain a user from de-serializing a signed token.

        :param token: Signed token.
        :type token: str
        :return: User instance or None
        """
        # private_key = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY'])
        private_key = current_app.config['SECRET_KEY']
        try:
            # decoded_payload = private_key.loads(token)
            decoded_payload = jwt.decode(token, private_key, algorithms=['HS256'])
            return User.find_by_identity(decoded_payload.get('user_email'))
        except Exception:
            return None

    @classmethod
    def is_last_admin(cls, user, new_role, new_active):
        """
        Determine whether or not this user is the last admin account.

        :param user: User being tested
        :type user: User
        :param new_role: New role being set
        :type new_role: str
        :param new_active: New active status being set
        :type new_active: bool
        :return: bool
        """
        is_changing_roles = user.role == 'admin' and new_role != 'admin'
        is_changing_active = user.active is True and new_active is None

        if is_changing_roles or is_changing_active:
            admin_count = User.query.filter(
                User.role == 'admin',
                User.partnership_id == user.partnership_id,
                User.partnership_account_id == user.partnership_account_id
            ).count()
            # active_count = User.query.filter(User.is_active is True).count()

            if admin_count == 1:  # or active_count == 1:
                return True

        return False

    @classmethod
    def is_last_partner(cls, user, new_role, new_active):
        """
        Determine whether or not this user is the last admin account.

        :param user: User being tested
        :type user: User
        :param new_role: New role being set
        :type new_role: str
        :param new_active: New active status being set
        :type new_active: bool
        :return: bool
        """
        is_changing_roles = user.role == 'partner' and new_role != 'partner'
        is_changing_active = user.active is True and new_active is None

        if is_changing_roles or is_changing_active:
            partner_count = User.query.filter(User.role == 'partner',
                                              User.partnership_id == user.partnership_id).count()
            # active_count = User.query.filter(User.is_active is True).count()

            if partner_count == 1:  # or active_count == 1:
                return True

        return False

    @classmethod
    def bulk_delete(cls, ids):
        """
        Override the general bulk_delete method because we need to delete them
        one at a time while also deleting them on Stripe.

        :param ids: List of ids to be deleted
        :type ids: list
        :return: int
        """
        delete_count = db.session.query(cls) \
            .filter(cls.id.in_(ids)) \
            .update({"active": False}, synchronize_session='fetch')

        db.session.commit()

        return delete_count

    @classmethod
    def deactivate_user(cls, id):
        """
        Deactivates a user. Sets the is_deactivated to TRUE and sets the deactivated_on date.
        :param id: User ID to be deactivated
        :type id: int
        :return: boolean
        """
        result = False
        user = User.query.filter(User.id == id).first()

        if user:
            user.is_deactivated = True
            user.deactivated_on = datetime.datetime.now()
            db.session.commit()
            result = True

        return result

    @classmethod
    def initialize_password_reset(cls, identity):
        """
        Generate a token to reset the password for a specific user.

        :param identity: User e-mail address or username
        :type identity: str
        :return: User instance
        """
        u = User.find_by_identity(identity)
        reset_token = u.serialize_token()

        # This prevents circular imports.
        from buyercall.blueprints.user.tasks import deliver_password_reset_email
        from ..partnership.models import Partnership, PartnershipAccount

        partner_account = PartnershipAccount.query. \
            filter(PartnershipAccount.id == u.partnership_account_id).first()
        if u.role == 'partner':
            partner = Partnership.query.filter(Partnership.id == u.partnership_id).first()
            partner_name = partner.name
        elif u.role in ('admin', 'agent'):
            partner = Partnership.query.filter(Partnership.id == partner_account.partnership_id).first()
        else:
            partner = Partnership.query.filter(Partnership.id == 1).first()
        deliver_password_reset_email.delay(u.id, reset_token, partner.name, partner.logo)

        return u

    def is_active(self):
        """
        Return whether or not the user account is active, this satisfies
        Flask-Login by overwriting the default value.

        :return: bool
        """
        return self.active

    def get_auth_token(self):
        """
        Return the user's auth token. Use their password as part of the token
        because if the user changes their password we will want to invalidate
        all of their logins across devices. It is completely fine to use
        md5 here as nothing leaks.

        This satisfies Flask-Login by providing a means to create a token.

        :return: str
        """
        private_key = current_app.config['SECRET_KEY']

        serializer = URLSafeTimedSerializer(private_key)
        data = [str(self.id), md5(self.password).hexdigest()]

        return serializer.dumps(data).decode('utf-8')

    def authenticated(self, with_password=True, password=''):
        """
        Ensure a user is authenticated, and optionally check their password.

        :param with_password: Optionally check their password
        :type with_password: bool
        :param password: Optionally verify this as their password
        :type password: str
        :return: bool
        """
        if with_password:
            return bcrypt.check_password_hash(self.password, password)

        return True

    def serialize_token(self, expiration=3600):
        """
        Sign and create a token that can be used for things such as resetting
        a password or other tasks that involve a one off token.

        :param expiration: Seconds until it expires, defaults to 1 hour
        :type expiration: int
        :return: JSON
        """
        # private_key = current_app.config['SECRET_KEY']

        # serializer = TimedJSONWebSignatureSerializer(private_key, expiration)
        # return serializer.dumps({'user_email': self.email}).decode('utf-8')
        private_key = current_app.config['SECRET_KEY']

        payload = {
            'user_email': self.email,
            'exp': datetime.datetime.utcnow() + datetime.timedelta(seconds=expiration)
        }

        token = jwt.encode(payload, private_key, algorithm='HS256')

        return token

    def update_activity_tracking(self, ip_address):
        """
        Update various fields on the user that's related to meta data on his
        account, such as the sign in count and ip address, etc..

        :param ip_address: IP address
        :type ip_address: str
        :return: SQLAlchemy commit results
        """
        self.sign_in_count += 1

        self.last_sign_in_on = self.current_sign_in_on
        self.last_sign_in_ip = self.current_sign_in_ip

        self.current_sign_in_on = datetime.datetime.now(pytz.utc)
        self.current_sign_in_ip = ip_address

        return self.save()

    @hybrid_property
    def partnership_or_partnership_account(self):
        dep = 'partnership'

        if self.role == 'admin':
            dep = 'partnership_account'
        return getattr(self, dep)

    @hybrid_property
    def subscription(self):

        if self.partnership_account:
            if self.partnership_account.subscription:
                return self.partnership_account.subscription
            if self.partnership_account.partnership and self.partnership_account.partnership.subscription:
                return self.partnership_account.partnership.subscription

        if self.partnership and self.partnership.subscription:
            return self.partnership.subscription

        return None

    @hybrid_property
    def is_partnership_account_user(self):
        """
        Whether or not the user account is used in a partnership account or not.
        """
        if self.partnership_account_id is not None and self.partnership_account_id > 0:
            return True
        else:
            return False

    @hybrid_property
    def is_partnership_user(self):
        """
        Whether or not the user account is used in a partnership or not.
        """
        if self.partnership_id is not None and self.partnership_id > 0:
            return True
        else:
            return False

    @hybrid_property
    def is_admin_user_with_groups(self):
        """
        Whether or not the user account is used to view/acccess/change partnership account groups.
        Checks: Must be a admin role, must have a partnership account ID and must belong to a group.
        """
        from ..partnership.models import PartnershipAccountGroupTie

        if self.role == 'admin' and self.partnership_account_group_id:
            if self.partnership_account_id and self.partnership_account_id > 0:
                group_tie_exists = PartnershipAccountGroupTie.query.filter(
                    and_(PartnershipAccountGroupTie.partnership_account_id == self.partnership_account_id,
                         PartnershipAccountGroupTie.partnership_account_group_id == self.partnership_account_group_id)
                ).count()

                if group_tie_exists:
                    return True

        return False

    @hybrid_property
    def is_viewing_partnership(self):
        """
        Whether or not the user account is viewing a partnership account.
        """
        # Declare redis config url
        redis_db = redis.StrictRedis(host=current_app.config['REDIS_CONFIG_URL'],
                                     port=current_app.config['REDIS_CONFIG_PORT'])

        from ..partnership.models import PartnershipAccountGroupTie

        try:
            if self.role == 'admin' and self.partnership_account_group_id:
                if self.partnership_account_id and self.partnership_account_id > 0:
                    group_tie_exists = PartnershipAccountGroupTie.query.filter(
                        and_(PartnershipAccountGroupTie.partnership_account_id == self.partnership_account_id,
                             PartnershipAccountGroupTie.partnership_account_group_id ==
                             self.partnership_account_group_id)
                    ).count()

                    if group_tie_exists and group_tie_exists > 0:
                        key = 'partner-user-view:{}'.format(self.id)

                        if redis_db.exists(key) == 1:
                            result = int(redis_db.get(key))
                            if result > 0:
                                return True
            return False

        except Exception as e:
            log.error('Error retrieving viewed partnership account.')
            return False

    @hybrid_property
    def get_user_viewing_partnership_account_id(self):
        """
        Whether or not the user account is currently viewing (acting on behalf) of a partnership account.
        Returns -1 if not viewing, partnership account id if viewing.
        """
        # Declare redis config url
        redis_db = redis.StrictRedis(host=current_app.config['REDIS_CONFIG_URL'],
                                     port=current_app.config['REDIS_CONFIG_PORT'])

        key = 'partner-user-view:{}'.format(self.id)
        result = -1

        if redis_db.exists(key) == 1:
            try:
                result = int(redis_db.get(key))
            except Exception as e:
                log.error('Error retrieving viewed partnership account. Error: {}'.format(e))
                result = -1

        return result

    @hybrid_property
    def get_user_viewing_partnership_account_name(self):
        """
        Returns the viewed partnership account name.
        """
        # Declare redis config url
        redis_db = redis.StrictRedis(host=current_app.config['REDIS_CONFIG_URL'],
                                     port=current_app.config['REDIS_CONFIG_PORT'])

        from ..partnership.models import PartnershipAccount

        try:
            result = ''

            key = 'partner-user-view:{}'.format(self.id)
            if redis_db.exists(key) == 1:
                partnership_account_id = int(redis_db.get(key))

                if partnership_account_id > 0:
                    account = PartnershipAccount.query.filter(PartnershipAccount.id == partnership_account_id).first()

                    if account is not None:
                        result = account.name
        except Exception as e:
            log.error('Error retrieving viewed partnership account name. Error: '.format(e))
            result = ''

        return result

    @hybrid_property
    def get_user_viewing_partnership_id(self):
        """
        Returns the viewed partnership id.
        """
        # Declare redis config url
        redis_db = redis.StrictRedis(host=current_app.config['REDIS_CONFIG_URL'],
                                     port=current_app.config['REDIS_CONFIG_PORT'])

        from ..partnership.models import PartnershipAccount

        try:
            result = -1

            key = 'partner-user-view:{}'.format(self.id)
            if redis_db.exists(key) == 1:
                partnership_account_id = int(redis_db.get(key))

                if partnership_account_id > 0:
                    account = PartnershipAccount.query.filter(PartnershipAccount.id == partnership_account_id).first()

                    if account is not None:
                        result = account.partnership_id
        except Exception as e:
            log.error('Error retrieving viewed partnership id. Error: {}'.format(e))
            result = -1

        return result

    @hybrid_property
    def get_user_viewing_partnership_account_subscription_plan(self):
        """
        Returns the viewed partnership account name.
        """
        # Declare redis config url
        redis_db = redis.StrictRedis(host=current_app.config['REDIS_CONFIG_URL'],
                                     port=current_app.config['REDIS_CONFIG_PORT'])

        from buyercall.blueprints.partnership.models import PartnershipAccount

        try:
            result = ''

            key = 'partner-user-view:{}'.format(self.id)
            if redis_db.exists(key) == 1:
                partnership_account_id = int(redis_db.get(key))

                if partnership_account_id > 0:
                    account = PartnershipAccount\
                        .query\
                        .filter(PartnershipAccount.id == partnership_account_id)\
                        .first()

                    if account is not None:

                        if account.subscription and account.subscription.plan:
                            result = account.subscription.plan
                        elif account.partnership.subscription and account.partnership.subscription.plan:
                            result = account.partnership.subscription.plan
                        elif account.partnership and account.partnership.subscription:
                            result = account.partnership.subscription.plan

        except Exception as e:
            log.error('Error retrieving viewed partnership account name. Error: '.format(e))
            result = ''

        return result

    @classmethod
    def set_user_viewing_partnership_account(cls, user_id, partnership_account_id):
        """
        Sets the partnership account id of the account the user is viewing.
        Set -1 if not viewing, partnership account id if viewing.
        """
        # Declare redis config url
        redis_db = redis.StrictRedis(host=current_app.config['REDIS_CONFIG_URL'],
                                     port=current_app.config['REDIS_CONFIG_PORT'])

        key = 'partner-user-view:{}'.format(user_id)

        try:
            redis_db.set(key, partnership_account_id)
            redis_db.expire(key, DAYS)
            result = True

        except Exception as e:
            log.error('Error setting viewed partnership account. Error: {}'.format(e))
            result = False

        return result

    @classmethod
    def is_in_same_group(cls, user_id, partnership_account_id):
        """
        Whether or not the user account is in the same group as the passed partnership account id.
        """
        from ..partnership.models import PartnershipAccountGroupTie

        result = False
        user = User.query.filter(User.id == user_id).first()

        if user and user.role == 'admin' and user.partnership_account_group_id:
            if user.partnership_account_id and user.partnership_account_id > 0:
                try:
                    group_tie_user = PartnershipAccountGroupTie.query.filter(
                        and_(PartnershipAccountGroupTie.partnership_account_id == user.partnership_account_id,
                             PartnershipAccountGroupTie.partnership_account_group_id ==
                             user.partnership_account_group_id)
                    ).first()

                    group_tie_other = PartnershipAccountGroupTie.query.filter(
                        and_(PartnershipAccountGroupTie.partnership_account_id == partnership_account_id,
                             PartnershipAccountGroupTie.partnership_account_group_id ==
                             user.partnership_account_group_id)
                    ).first()

                    if group_tie_user and group_tie_user.partnership_account_group_id > 0:
                        if group_tie_other and group_tie_other.partnership_account_group_id > 0:
                            if group_tie_other.partnership_account_group_id == \
                                    group_tie_user.partnership_account_group_id:
                                result = True
                except Exception as e:
                    log.error('Error checking if user is in the same partnership account group. Error: {}'.format(e))

        return result

    @hybrid_property
    def business_type(self):
        """
        Retrieve the user business type from partnership account
        """
        if self.partnership_account_id is not None:
            # Import the partnership account model
            from ..partnership.models import PartnershipAccount
            partnership_account = PartnershipAccount \
                .query \
                .filter(PartnershipAccount.id == self.partnership_account_id) \
                .first()

            if partnership_account:
                return partnership_account.business_type
            else:
                if self.partnership_id is not None:
                    # Import the partnership model
                    from ..partnership.models import Partnership
                    partnership = Partnership \
                        .query \
                        .filter(Partnership.id == self.partnership_id) \
                        .first()

                    if partnership:
                        return partnership.business_type

        return ''

    # password expiry functionality
    @staticmethod
    def check_password_match(hashed_password, plaintext_password):
        """
        Verify if a plaintext password matches a hashed password.
        :param plaintext_password: Password in plain text
        :type plaintext_password: str
        :param hashed_password: Hashed password
        :type hashed_password: str
        :return: bool
        """
        return bcrypt.check_password_hash(hashed_password, plaintext_password)

    def save_password_updated_date(self):
        # Set password_updated_date to current date and time
        self.password_updated_date = datetime.datetime.now(pytz.utc)
        db.session.commit()  # Save the changes to the database

    @classmethod
    def logic_password_expiry(cls):

        from buyercall.app import create_app
        from datetime import datetime, timedelta
        # Create a context for the database connection.
        app = create_app()
        db.app = app
        with app.app_context():
            users = cls.query.filter(cls.is_deactivated.is_(False)).all()
            password_expiry_days = current_app.config.get('PASSWORD_EXPIRY_DAYS', 60)
            for user in users:
                if user.password_updated_date:
                    # Calculate the date for sending the emails
                    expiry_date = user.password_updated_date + timedelta(days=password_expiry_days)
                    days_to_expire = (expiry_date - datetime.now()).days

                    if days_to_expire == 10:
                        # Send email 10 days before expiry
                        cls.initialize_password_expiry(
                            user, "Your password will expire in 10 days, please reset your password using the button below.")

                    elif days_to_expire == 1:
                        # Send notification email on the 1 day before expiry
                        cls.initialize_password_expiry(
                            user, "Your password will expire in 1 day, please reset your password using the button below.")

                    elif days_to_expire == 0:
                        # Send email on the day of expiry
                        cls.initialize_password_expiry(
                            user, "Your password has expired, please reset your password using the button below.")
                else:
                    user.password_updated_date = datetime.now() - timedelta(days=password_expiry_days-11)
                    user.save()
        return {}

    @classmethod
    def initialize_password_expiry(cls, user, message):
        """
        Generate a token to reset the password for a specific user.
        :param user: User obj
        :type message: str
        :return: User instance
        """
        reset_token = user.serialize_token()
        # This prevents circular imports.
        from buyercall.blueprints.user.tasks import deliver_password_expire_email

        if user.firstname and user.lastname:
            name = f"{user.firstname} {user.lastname}"
        elif user.firstname:
            name = f"{user.firstname}"
        elif user.username:
            name = f"{user.username}"
        else:
            name = f"{user.email}"

        deliver_password_expire_email.delay(user.id, reset_token, f"{name}", message)
        return user

    # function to get password expire status
    @staticmethod
    def get_password_expire_status(user):
        from datetime import datetime, timedelta
        from flask import current_app as app
        """
        function to get password expire status
        """
        if user.password_updated_date:
            expiry_date = user.password_updated_date + timedelta(days=app.config.get('PASSWORD_EXPIRY_DAYS', 60))
            status = False if datetime.now() >= expiry_date else True
        else:
            status = False
        return status

    # ip save function
    def save_user_agent(self, os_value=None):
        """
        Save the IP address to the os_list field if it doesn't exist
        """
        if self.user_agent:
            # Split the existing IP addresses by commas
            os_lists = self.user_agent.split('<!!>')
            # Check if the IP address already exists
            if os_value in os_lists:
                status = False
            else:
                # Append the new IP address to the existing values
                os_lists.append(os_value)
                self.user_agent = '<!!>'.join(os_lists)
                status = True
        else:
            # First IP address, set it directly
            self.user_agent = os_value
            status = True
        # Commit the changes to the database
        db.session.commit()
        return status

    def save_ip_address(self, ip_address):
        """
        Save the IP address to the ip_address_list field if it doesn't exist
        """
        if self.ip_address:
            # Split the existing IP addresses by commas
            ip_addresses = self.ip_address.split(',')
            # Check if the IP address already exists
            if ip_address in ip_addresses:
                status = False
            else:
                # Append the new IP address to the existing values
                ip_addresses.append(ip_address)
                self.ip_address = ','.join(ip_addresses)
                status = True
        else:
            # First IP address, set it directly
            self.ip_address = ip_address
            status = True
        # Commit the changes to the database
        db.session.commit()
        return status

    @staticmethod
    def get_partnership_name(user):
        from buyercall.blueprints.partnership.models import Partnership
        partnership = Partnership.query.get(user.partnership_id) if user.partnership_id else None
        return partnership.name if partnership else None