HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //proc/thread-self/root/home/arjun/projects/buyercall/buyercall/blueprints/appointments/models.py
import logging
import traceback
from buyercall.lib.util_sqlalchemy import ResourceMixin
from buyercall.extensions import db
from sqlalchemy import and_, func
import uuid
from datetime import timedelta, datetime
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.dialects.postgresql import UUID

log = logging.getLogger(__name__)


class Appointment(ResourceMixin, db.Model):

    __tablename__ = 'appointments'

    id = db.Column(db.Integer, primary_key=True)

    sid = db.Column(UUID(as_uuid=True), unique=True, default=uuid.uuid4, index=True)
    
    partnership_account_id = db.Column(db.Integer, db.ForeignKey('partnership_accounts.id',
                                                                 onupdate='CASCADE',
                                                                 ondelete='CASCADE'),
                                       index=True, nullable=False)
    user_id = db.Column(db.Integer, db.ForeignKey('users.id',
                                                  onupdate='CASCADE',
                                                  ondelete='CASCADE'),
                        index=True, nullable=True)
    contact_id = db.Column(db.Integer, db.ForeignKey('contacts.id',
                                                     onupdate='CASCADE',
                                                     ondelete='CASCADE'),
                           index=True, nullable=False)
    # Relationships.
    appointment_slot = db.relationship('AppointmentSlot',
                                       backref='appointment_slot', lazy='joined')

    appointment_date = db.Column(db.DateTime(), nullable=False)
    appointment_type = db.Column(db.String(128), nullable=False,
                                 default='general')
    deactivated_on = db.Column(db.DateTime(), nullable=True)
    is_deactivated = db.Column(db.Boolean(), nullable=False,
                               server_default='0', default=False)

    @classmethod
    def appointment_count(cls, partnership_account_id, appointment_date,
                          appointment_start_time, appointment_type, user_id=None):
        """
        Return the appointment count for either a user or account
        :param partnership_account_id: Integer
        :param appointment_date: String
        :param appointment_start_time: String
        :param appointment_type: String
        :param user_id: Integer
        :return: Integer
        """
        try:
            # If the user id is available check the total existing appointments for the date start time
            if user_id:
                appointment_subquery = db.session.query(cls.id) \
                    .filter(and_(cls.appointment_date == appointment_date,
                                 cls.user_id == user_id,
                                 cls.partnership_account_id == partnership_account_id)).subquery()
                appointment_slot_count_query = db.session.query(func.count(AppointmentSlot.id)) \
                    .filter(and_(AppointmentSlot.appointment_id.in_(appointment_subquery),
                                 AppointmentSlot.start_time == appointment_start_time)).scalar()
                return appointment_slot_count_query
            # If the user id is not available check the total existing appointments for the partnership account id
            else:
                appointment_subquery = db.session.query(cls.id) \
                    .filter(and_(cls.appointment_date == appointment_date,
                                 cls.partnership_account_id == partnership_account_id,
                                 cls.appointment_type == appointment_type)).subquery()
                appointment_slot_count_query = db.session.query(func.count(AppointmentSlot.id)) \
                    .filter(and_(AppointmentSlot.appointment_id.in_(appointment_subquery),
                                 AppointmentSlot.start_time == appointment_start_time)).scalar()
                return appointment_slot_count_query
        except Exception:
            log.error('Error returning appointment count for partnership account: {}. Error: {}'
                      .format(partnership_account_id, traceback.format_exc()))
            return None

    @classmethod
    def create(cls, partnership_account_id, contact_id,
               appointment_date, appointment_start_time, appointment_type, user_id=None):
        """
        Create new appointment.

        :return: app_sid - appointment system id
        """
        try:
            # Use case 1: If the user id is provided check user availability and set the appointment for the user
            if user_id:
                # Import the user model
                from buyercall.blueprints.user.models import User
                # Check to see the user is available on the requested date and time
                user_availability = User.user_appointment_availability(user_id, appointment_date,
                                                                       appointment_start_time)
                if not user_availability:
                    log.info('The user id: {} is already booked for this {} at {}'
                             .format(user_id, appointment_date, appointment_start_time))
                    return -2
                else:
                    # if the user is available get the user record
                    user = User.query.filter(and_(User.id == user_id,
                                                  User.is_deactivated.is_(False),
                                                  User.appointment_enabled.is_(True))).first()
                    # If the user exist then add the appointment for the user
                    if user:
                        # Create the end-time by getting the interval time and adding it to the start time
                        from buyercall.lib.util_datetime import convert_to_deltatime
                        end_time = str(convert_to_deltatime(appointment_start_time)
                                       + timedelta(minutes=user.appointment_interval))

                        add_appointment = cls(
                            partnership_account_id=partnership_account_id,
                            user_id=user_id,
                            contact_id=contact_id,
                            appointment_date=appointment_date,
                            appointment_type=appointment_type,
                        )
                        new_app_slot = AppointmentSlot(start_time=appointment_start_time,
                                                       end_time=end_time[:-3])
                        db.session.add(add_appointment)
                        db.session.flush()
                        new_appointment = db.session.query(cls) \
                            .filter(cls.id == add_appointment.id).first()
                        new_appointment.appointment_slot.append(new_app_slot)
                        db.session.commit()
                        appointment_id = new_appointment.sid
                        return appointment_id
                    else:
                        log.info('User: {} is not found to create an appointment for'.format(user_id))
                        return -3

            # Use case 2: User is not provided check the account availability and create the appointment for the account
            else:
                # Import the Partnership Account model
                from buyercall.blueprints.partnership.models import PartnershipAccount
                # Check to see the user is available on the requested date and time
                account_availability = PartnershipAccount.partnership_account_appointment_availability(
                    partnership_account_id, appointment_type, appointment_date, appointment_start_time)
                if not account_availability:
                    log.info('The partnership account id: {} is already booked for this {} at {}'
                             .format(partnership_account_id, appointment_date, appointment_start_time))
                    return -4
                else:
                    # if the user is available get the user record
                    account = PartnershipAccount.query\
                        .filter(and_(PartnershipAccount.id == partnership_account_id,
                                     PartnershipAccount.active.is_(True),
                                     PartnershipAccount.appointment_enabled.is_(True))).first()
                    # If the user exist then add the appointment for the user
                    if account:
                        # Create the end-time by getting the interval time and adding it to the start time
                        from buyercall.lib.util_datetime import convert_to_deltatime
                        end_time = str(convert_to_deltatime(appointment_start_time)
                                       + timedelta(minutes=account.appointment_interval))

                        add_appointment = cls(
                            partnership_account_id=partnership_account_id,
                            contact_id=contact_id,
                            appointment_date=appointment_date,
                            appointment_type=appointment_type,
                        )
                        new_app_slot = AppointmentSlot(start_time=appointment_start_time,
                                                       end_time=end_time[:-3])
                        db.session.add(add_appointment)
                        db.session.flush()
                        new_appointment = db.session.query(cls) \
                            .filter(cls.id == add_appointment.id).first()
                        new_appointment.appointment_slot.append(new_app_slot)
                        db.session.commit()
                        appointment_id = new_appointment.sid
                        return appointment_id
                    else:
                        log.info('Partnership account id: {} is not found to create '
                                 'an appointment for'.format(partnership_account_id))
                        return -5
        except Exception:
            log.error('Error creating an appointment for partnership account: {}. Error: {}'
                      .format(partnership_account_id, traceback.format_exc()))
            return -1

    @classmethod
    def update(cls, app_id, app_slot_id, partnership_account_id, appointment_date, appointment_start_time,
               appointment_type, user_id):
        """
        Return whether or not the appointment was updated successfully.

        :return: bool
        """
        # Check if the updated date and time is available
        try:
            if user_id is not None:
                from buyercall.blueprints.user.models import User
                availability = User.user_appointment_availability(user_id, appointment_date, appointment_start_time)
            else:
                from buyercall.blueprints.partnership.models import PartnershipAccount
                availability = PartnershipAccount.partnership_account_appointment_availability(
                    partnership_account_id, appointment_type, appointment_date, appointment_start_time)

            if availability:
                appointment = Appointment.query.filter(Appointment.id == app_id).first()
                appointment_slot = AppointmentSlot.query.filter(AppointmentSlot.id == app_slot_id).first()
                appointment.appointment_date = appointment_date
                appointment.user_id = user_id
                appointment.appointment_type = appointment_type
                appointment_slot.start_time = appointment_start_time
                if user_id:
                    user = User.query.filter(User.id == user_id).first()
                    from buyercall.lib.util_datetime import convert_to_deltatime
                    end_time = str(convert_to_deltatime(appointment_start_time)
                                   + timedelta(minutes=user.appointment_interval))
                    appointment_slot.end_time = end_time[:-3]
                else:
                    account = PartnershipAccount.query.filter(PartnershipAccount.id == partnership_account_id).first()
                    from buyercall.lib.util_datetime import convert_to_deltatime
                    end_time = str(convert_to_deltatime(appointment_start_time)
                                   + timedelta(minutes=account.appointment_interval))
                    appointment_slot.end_time = end_time[:-3]
                db.session.commit()
                return True
            else:
                return -1

        except Exception:
            log.error('Error updating the appointment for partnership account: {}. Error: {}'
                      .format(partnership_account_id, traceback.format_exc()))
            return False

    @classmethod
    def deactivate(cls, app_id, partnership_account_id):
        """
        Deactivate appointment. The is_deactivated flag will be set.

        :return: bool
        """
        appointment = cls.query\
            .filter(and_(cls.id == app_id, cls.partnership_account_id == partnership_account_id)).first()

        if appointment:
            appointment.is_deactivated = True
            appointment.deactivated_on = datetime.now()

            db.session.commit()

            return True
        else:
            return False

    @hybrid_property
    def appointment_timezone(self):
        if self.user_id:
            from buyercall.blueprints.user.models import User
            user = User.query.filter(User.id == self.user_id).first()
            timezone = user.agent.timezone
        else:
            from buyercall.blueprints.partnership.models import PartnershipAccount
            account = PartnershipAccount.query\
                .filter(PartnershipAccount.id == self.partnership_account_id).first()
            timezone = account.timezone
        return timezone

    @property
    def created_datetime(self):

        """ Return the date/time this appointment was created
        """
        return self.created_on.strftime('%Y-%m-%d %H:%M:%S')

    @property
    def updated_datetime(self):

        """ Return the date/time this appointment was updated
        """
        return self.updated_on.strftime('%Y-%m-%d %H:%M:%S')


class AppointmentSlot(ResourceMixin, db.Model):
    __tablename__ = 'appointment_slots'
    id = db.Column(db.Integer, primary_key=True)
    # UUID.
    sid = db.Column(UUID(as_uuid=True), unique=True, default=uuid.uuid4)
    appointment_id = db.Column(db.Integer, db.ForeignKey('appointments.id',
                                                         onupdate='CASCADE',
                                                         ondelete='CASCADE'),
                               index=True, nullable=False)
    start_time = db.Column(
        db.String(5), nullable=False, server_default='08:00'
    )
    end_time = db.Column(
        db.String(5), nullable=False, server_default='17:00'
    )