HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/blueprints/widgets/models.py
import re
import logging as log  # noqa

from flask import jsonify
from buyercall.lib.util_sqlalchemy import ResourceMixin
from sqlalchemy.orm import backref, column_property, load_only
from sqlalchemy.dialects import postgresql
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm.attributes import flag_modified
from sqlalchemy.schema import Index
from sqlalchemy import and_, func, select
from buyercall.extensions import db
from buyercall.blueprints.user.models import User
from buyercall.blueprints.leads.models import Lead
from buyercall.blueprints.agents.models import Agent


class Widget(ResourceMixin, db.Model):
    __tablename__ = 'widgets'

    __table_args__ = (
        Index(
            'uq_widgets_guid',  # Index name
            'guid',  # Columns which are part of the index
            unique=True
            ),
        )

    id = db.Column(db.Integer, primary_key=True)

    # The external (non-sequential) widget ID. Sufficiently random
    # so that retrieving the widget by this key doesn't require
    # authentication.
    guid = db.Column(db.String, index=True, nullable=False, unique=True)

    leads = db.relationship(Lead, backref='widget')

    agents = association_proxy('assignments', 'agent')

    partnership_account_id = db.Column(
        db.Integer,
        db.ForeignKey('partnership_accounts.id', onupdate='CASCADE', ondelete='CASCADE'),
        index=True,
        nullable=False
    )

    # The widget name, as displayed on 'My Widgets' page
    name = db.Column(db.String(128), index=True, nullable=False)

    type = db.Column(db.String(50), index=True, nullable=False)

    inbound_id = db.Column(
        'phone_number_id',
        db.Integer,
        db.ForeignKey(
            'phonenumbers.id', onupdate='CASCADE', ondelete='SET NULL'))

    # The widget configuration, in JSON format.
    # Python does not need to parse this.
    # Fields described in assets/components/widgets/default_settings.js
    _options = db.Column(
        'options', postgresql.JSON, nullable=False, server_default='{}')

    email = db.Column(db.String(255), unique=True, index=True, nullable=True)

    enabled = db.Column(
        db.Boolean, nullable=False, default=True, server_default='true'
    )

    lead_count = column_property(select([func.count(Lead.id)]).where(Lead.widget_guid == guid))

    @classmethod
    def api_create(cls, params, phonenumber_id):
        """
        Return widget id if widget was created successfully.

        :return: widget id
        """
        from buyercall.blueprints.phonenumbers.models import Phone
        widget_id = -1

        try:
            widget = Widget(**params)

            phone = Phone.query.filter(
                Phone.id == phonenumber_id,
                Phone.partnership_account_id == widget.partnership_account_id
            ).first()

            if phone:
                widget.inbound = phone

            db.session.add(widget)
            db.session.flush()
            widget_id = widget.id
            db.session.commit()

        except Exception as e:
            log.error('Error creating widget. Error: {}'.format(e))

        return widget_id

    @classmethod
    def api_update(cls, partnership_account_id, guid, name, type, phonenumber_id, enabled,
                   add_hidden_information, language, hidden_information, record_calls, voicemail,
                   voicemail_message, agent_id, agent_contact_using, agent_full_name,
                   route_randomly, route_in_sequence, route_simultaneously, retry_routing):
        """
        Return True if widget was updated successfully.

        :return: Boolean
        """
        from buyercall.blueprints.phonenumbers.models import Phone
        result = False

        try:
            widget = Widget.query\
                .filter(and_(Widget.partnership_account_id == partnership_account_id, Widget.guid == guid))\
                .first()

            if widget:
                if name:
                    widget.name = name

                if type:
                    widget.type = type

                if enabled in [True, False]:
                    widget.enabled = enabled
                if widget.options:
                    result_options = widget.options
                    log.error(result_options)

                    if result_options:
                        result_option_agents = result_options['agents']

                        if result_option_agents:
                            first_agent = result_option_agents[0]

                            if agent_id:
                                first_agent['id'] = agent_id

                            if agent_contact_using:
                                first_agent['contactUsing'] = agent_contact_using

                            if agent_full_name:
                                first_agent['fullName'] = agent_full_name

                        if add_hidden_information:
                            result_options['addHiddenInformation'] = add_hidden_information

                        if hidden_information:
                            result_options['hiddenInformation'] = hidden_information

                        if language:
                            result_options['language'] = language

                        if record_calls:
                            result_options['recordCalls'] = record_calls

                        if voicemail:
                            result_options['voicemail'] = voicemail

                        if voicemail_message:
                            result_options['voicemailMessage'] = voicemail_message

                        if retry_routing:
                            result_options['retryRouting'] = retry_routing

                        if route_randomly:
                            result_options['routeRandomly'] = route_randomly

                        if route_in_sequence:
                            result_options['routeInSequence'] = route_in_sequence

                        if route_simultaneously:
                            result_options['routeSimultaneously'] = route_simultaneously

                        flag_modified(widget, '_options')

                phone = Phone.query.filter(
                    Phone.id == phonenumber_id,
                    Phone.partnership_account_id == widget.partnership_account_id
                ).first()

                if phone:
                    widget.inbound_id = phonenumber_id

                db.session.commit()
                result = True

        except Exception as e:
            log.error('Error updating widget. Error: {}'.format(e))

        return result

    @property
    def agents_available(self):
        """ Checks if the agent schedule allows calling at this time.
        """
        return any(agent.available_now for agent in self.agents)

    # @property
    # def lead_count(self):
    #     return len(self.leads)

    @property
    def options(self):
        agents = [
            dict(
                id=asg.agent_id,
                fullName=asg.agent.full_name,
                contactUsing=asg.contact_using
            )
            for asg in sorted(self.assignments, key=lambda x: x.order)
        ]
        self._options['agents'] = agents
        self._options['widgetType'] = self.type
        try:
            self._options['enableNotifications'] = not self._options['notifyNone']
        except KeyError:
            self._options['enableNotifications'] = False
        return self._options

    @options.setter
    def options(self, opt):
        if not opt:
            return

        # Remove assignments that are no longer there
        for asg in [x for x in self.assignments]:
            db.session.delete(asg)

        # Add new ones
        for i, agent in enumerate(opt.get('agents', [])):
            id_, contact_using = agent['id'], agent['contactUsing']
            self.assignments.append(AgentAssignment(
                widget=self,
                agent_id=id_,
                contact_using=contact_using,
                order=i
            ))

        self.type = opt.get('widgetType', 'widget')
        opt['notifyNone'] = not opt['enableNotifications']

        self._options = opt

    @property
    def emails(self):
        """ Return a list of notification emails for the widget.
        """
        result = list()
        if self.options['notifyNone']:
            return result

        admin = User.query.filter(
            User.role == 'admin',
            User.is_deactivated.is_(False),
            User.partnership_account_id == self.partnership_account_id
        ).first()
        my_email = admin.email if admin else None
        agents_emails = [a.email for a in self.agents]

        if self.options['notifyAllLeads']:
            if self.options['notifyAllMe'] and my_email:
                result.append(my_email)
            if self.options['notifyAllAgents']:
                result.extend(agents_emails)
            result.extend(split_emails(self.options['notifyAllCustom']))
        if self.options['notifyMissedLeads']:
            if self.options['notifyMissedMe'] and my_email:
                result.append(my_email)
            if self.options['notifyMissedAgents']:
                result.extend(agents_emails)
            result.extend(split_emails(self.options['notifyMissedCustom']))

        return result

    @property
    def is_adf(self):
        """ Returns whether or not the lead is an ADF widget lead
        """
        result = False

        if 'notifyAdf' in self.options:
            if self.options['notifyAdf'] and (self.options['notifyAdf'] is True or self.options['notifyAdf'] is 'true'):
                result = True

        return result

    @property
    def phone_number_id(self):
        """ Returns whether or not the lead is an ADF widget lead
        """
        return self.inbound_id

    @property
    def adf_notification_emails(self):
        """ Return a list of adf notification emails for the widget.
        """
        result = list()

        if self.options['notifyAdf']:
            result.extend(split_emails(self.options['notifyAdfCustom']))

        return result

    @classmethod
    def delete(cls, id, guid, partnership_account_id):
        """ Deletes widget. Provide None where not applicable.
        """
        widget = None
        if id:
            widget = Widget.query.options(
                load_only('id')
            ).filter(Widget.id == id,
                     Widget.partnership_account_id == partnership_account_id).first()
        elif guid:
            widget = Widget.query.options(
                load_only('guid')
            ).filter(Widget.guid == guid,
                     Widget.partnership_account_id == partnership_account_id).first()

        if widget:
            db.session.delete(widget)
            db.session.commit()
            return True
        return False

    @classmethod
    def disable(cls, id, guid, partnership_account_id):
        """ Disable widget. Provide None where not applicable.
        """
        widget = None
        if id:
            widget = Widget.query.options(
                load_only('id', 'enabled')
            ).filter(Widget.id == id, Widget.partnership_account_id == partnership_account_id).first()
        elif guid:
            widget = Widget.query.options(
                load_only('guid', 'enabled')
            ).filter(Widget.guid == guid, Widget.partnership_account_id == partnership_account_id).first()

        if widget:
            widget.enabled = False
            db.session.commit()
            return True
        return False

    @classmethod
    def enable(cls, id, guid, partnership_account_id):
        """ Enable widget. Provide None where not applicable.
        """
        widget = None
        if id:
            widget = Widget.query.options(
                load_only('id', 'enabled')
            ).filter(Widget.id == id, Widget.partnership_account_id == partnership_account_id).first()
        elif guid:
            widget = Widget.query.options(
                load_only('guid', 'enabled')
            ).filter(Widget.guid == guid, Widget.partnership_account_id == partnership_account_id).first()

        if widget:
            widget.enabled = True
            db.session.commit()
            return True
        return False

    @property
    def created_datetime(self):

        """ Return the date/time this widget was created
        """
        return self.created_on.strftime('%Y-%m-%d %H:%M:%S')

    @property
    def updated_datetime(self):

        """ Return the date/time this widget was updated
        """
        return self.updated_on.strftime('%Y-%m-%d %H:%M:%S')

    def jsonify(self):
        return jsonify(
            id=self.id,
            name=self.name,
            user_id=self.user_id,
            created_on=self.created_on,
            updated_on=self.updated_on,
            options=self.options
        )


class AgentAssignment(db.Model):
    __tablename__ = 'widget_agents'

    widget = db.relationship(
        Widget, backref=backref('assignments', cascade='all')
    )
    agent = db.relationship(
        Agent, backref=backref('assignments', cascade='all')
    )

    widget_id = db.Column(
        db.Integer,
        db.ForeignKey('widgets.id', onupdate='CASCADE', ondelete='CASCADE'),
        primary_key=True
    )

    agent_id = db.Column(
        db.Integer,
        db.ForeignKey('agents.id', onupdate='CASCADE', ondelete='CASCADE'),
        primary_key=True
    )

    """ Possible values: phone, mobile, extension """
    contact_using = db.Column(
        db.String(10),
        nullable=False,
        server_default='phone'
    )

    order = db.Column(db.Integer())


class WidgetFile(ResourceMixin, db.Model):
    __tablename__ = 'widget_files'

    guid = db.Column(db.String(36), primary_key=True)

    partnership_account_id = db.Column(
        db.Integer,
        db.ForeignKey('partnership_accounts.id', onupdate='CASCADE', ondelete='CASCADE'),
        index=True,
        nullable=False
    )

    url = db.Column(db.String, nullable=True)


def split_emails(email_str):
    """ Split a string of email addresses into individual emails.

    :param email_str: A comma- or semicolon-separated list of emails.
    :type email_str: str
    :return: A list of emails
    :rtype: list
    """
    return [x.strip() for x in re.split(',|;', email_str) if x]