HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/blueprints/api2/doc/endpoints/accounts.py
import logging
import uuid
import re
from flask import jsonify, request, current_app
from flask_restx import Resource
from sqlalchemy import and_, func, text
from sqlalchemy.orm import load_only, defer
from buyercall.extensions import db
from buyercall.lib.util_rest import rest_partnership, requires_auth, rest_is_partnership
from buyercall.blueprints.api2.doc import serializers
from buyercall.blueprints.api2.restplus import api
from buyercall.blueprints.partnership.models import PartnershipAccount, PartnershipAccountCreditTie
from buyercall.blueprints.user.models import User
from buyercall.blueprints.agents.models import Agent
from buyercall.blueprints.filters import format_phone_number
import datetime
import pytz

log = logging.getLogger(__name__)
ns = api.namespace('Accounts', description='Operations related to partnership accounts.', path='/accounts')

email_regex = re.compile(r'\S+@\S+\.\S+')
phone_regex = re.compile(r'^\+?[0-9]{10,12}$')


def getboolean(param):
    if param.lower() == 'true':
        return True
    else:
        return False


class TypeSwitcher(object):
    """
    The purpose of this class is to test the lead form field values against the defined requirements.
    """

    def valid(self, type_string, id_string, value_string, min_value, max_value):
        method_name = type_string + "_type"
        method = getattr(self, method_name, lambda: "Invalid")

        return method(id_string, value_string, min_value, max_value)

    def phone_type(self, value_string):
        error_message = ""
        result = True

        if result and not phone_regex.match(value_string):
            error_message = "Value '" \
                            + value_string + "' is not a valid phone number."
            result = False

        return result, error_message

    def email_type(self, value_string):
        error_message = ""
        result = True
        if result and not email_regex.match(value_string):
            error_message = "Value '" \
                            + value_string + "' is not a valid email address."
            result = False

        return result, error_message

@ns.route('/')
@api.doc(responses={200: 'OK',
                    400: 'Error performing operation.',
                    401: 'Unauthorized request.'})
class Account(Resource):

    @api.response(200, 'Partnership account successfully added.')
    @api.expect(serializers.account_add, validate=True)
    @requires_auth
    def post(self):
        """
        Adds a partnership account.
        <p>
        The Accounts POST endpoint should be used to create new partner accounts.
        For example, if you manage multiple dealers,
        you would want to create a separate account for each dealer you manage.
        You will only require a partner authentication token to create a new partner account.
        The Accounts POST endpoint is the first endpoint you will utilize in the BuyerCall API.
        Most other endpoint requests in the BuyerCall Partner API requires an partner account id.
        Therefore, it makes sense to create all of your partner accounts first
        before using any other BuyerCall Partner API endpoint.
        </p>
        <br />
        <p>
        A response will
        be returned, similar to the example below, based
        on a successful request:
        <br />
        <br />
        </p>
         <pre class="code-background" style="color: white">
        {
          "name": "EFF Motors",
          "partner_account_code": "A0034234",
          "partnership_account_id": 6
        }
     </pre>
        """
        if rest_is_partnership and rest_partnership is not None:
            received = request.json
            if received is not None:
                result_is_active = received['is_active']
                result_name = received['name']

                if 'partner_account_code' in received:
                    result_account_code = received['partner_account_code']
                else:
                    result_account_code = ''

                if 'users' in received and len(received["users"]) != 0:
                    admin_role = False
                    for user in received["users"]:
                        if user['role'] == 'admin':
                            admin_role = True
                        email = user['email'].lower()
                        type_switcher = TypeSwitcher()
                        # Validate email address
                        email_check = type_switcher.email_type(email)
                        if not email_check[0]:
                            api.abort(code=400, message=email_check[1])

                        u = User.query.filter(func.lower(User.email) == email).first()
                        if u:
                            if not u.is_deactivated:
                                api.abort(code=400, message="user with email address; " + email + " already exist.")
                        # Validate phone number
                        phone_check = type_switcher.phone_type(str(user["phonenumber"]))
                        if not phone_check[0]:
                            api.abort(code=400, message=phone_check[1])
                    if not admin_role:
                        api.abort(code=400, message="The account requires at least one admin user.")

                else:
                    api.abort(code=400, message="The account requires at least one admin user.")

                partnership_account_result = PartnershipAccount.create_partial(
                    result_name,
                    rest_partnership.id,
                    result_is_active,
                    result_account_code,
                    'automotive'
                )

                if partnership_account_result > 0:
                    # Create account users
                    for user in received["users"]:
                        email = user['email'].lower()
                        u = User.query.filter(func.lower(User.email) == email).first()
                        if u:
                            if u.is_deactivated:
                                u.partnership_account_id = partnership_account_result
                                u.partnership_id = rest_partnership.id
                                u.company = received["name"]
                                u.is_deactivated = False
                                u.deactivated_on = None
                                db.session.commit()
                        else:
                            if 'password' in user:
                                new_password = user['password']
                            else:
                                hex_value = uuid.uuid4().hex
                                new_password = hex_value
                            new_user = User(
                                firstname=user['firstname'],
                                lastname=user['lastname'],
                                email=user['email'].lower(),
                                phonenumber=format_phone_number(str(user['phonenumber'])),
                                password=new_password,
                                role=user['role'],
                                partnership_account_id=partnership_account_result,
                                partnership_id=rest_partnership.id,
                                company=received["name"],
                                tos_agreement=False,
                                password_updated_date=datetime.datetime.now(pytz.utc)
                            )
                            db.session.add(new_user)
                            db.session.flush()
                            db.session.commit()
                            if new_user.id:
                                agent = Agent(
                                    user_id=new_user.id,
                                    firstname=new_user.firstname,
                                    lastname=new_user.lastname,
                                    email=new_user.email,
                                    title=new_user.title or '',
                                    department=new_user.department or '',
                                    phonenumber=new_user.phonenumber,
                                    mobile='',
                                    extension=new_user.extension or None,
                                    partnership_account_id=new_user.partnership_account_id
                                )
                                db.session.add(agent)
                                db.session.flush()
                                db.session.commit()

                    if 'partner_account_credit_service_provider' in received:
                        result_credit_sp = (received['partner_account_credit_service_provider']).lower()
                        new_credit_sp = PartnershipAccountCreditTie(
                            service_provider=result_credit_sp,
                            active=True,
                            partnership_account_id=partnership_account_result,
                            product_type='prequalify',
                            equifax_enabled=True
                        )
                        db.session.add(new_credit_sp)
                        db.session.flush()
                        db.session.commit()

                    return jsonify(
                        partnership_account_id=partnership_account_result,
                        name=result_name,
                        partner_account_code=result_account_code
                    )
                else:
                    return api.abort(code=400, message="Error creating partnership account.")
            else:
                return api.abort(400, message="No data payload received for the request.")
        else:
            return api.abort(401)


@ns.route('/<int:paid>')
@api.doc(responses={200: 'OK',
                    400: 'Error performing operation.',
                    401: 'Unauthorized request.',
                    404: 'Partnership account not found.'},
         params={'paid': 'The partner account Id'})
class AccountDetails(Resource):
    @requires_auth
    def get(self, paid):
        """
        Retrieves partnership account details.

        <p>The Accounts GET endpoint should be used to return information an a specific partner account.</p>
        <br />
        <p>
        You require a partner authentication token and a partner account id to make a successful request.
         A response will be returned, similar to the example below, based
        on a successful request:
        <br />
        <br />
        </p>
         <pre class="code-background" style="color: white">
        {
          "created_on": "2019-04-05 14:49:14",
          "id": 6,
          "is_active": true,
          "name": "EFF Motors INC",
          "partner_account_code": "A0034244",
          "updated_on": "2019-04-05 14:51:32"
        }
     </pre>
        """
        if rest_is_partnership and rest_partnership is not None:

            partnership_account = PartnershipAccount\
                .query\
                .filter(and_(PartnershipAccount.id == paid, PartnershipAccount.partnership_id == rest_partnership.id))\
                .first()

            if partnership_account is not None:
                result_id = partnership_account.id
                result_created_on = partnership_account.created_on
                result_updated_on = partnership_account.updated_on
                result_name = partnership_account.name
                result_is_active = partnership_account.active
                result_partner_account_code = partnership_account.partner_account_code

                return jsonify(
                    id=result_id,
                    name=result_name,
                    created_on=partnership_account.created_datetime,
                    updated_on=partnership_account.updated_datetime,
                    is_active=result_is_active,
                    partner_account_code=result_partner_account_code
                )
            else:
                return api.abort(404, message='Partnership account not found.')
        else:
            return api.abort(401, message='Unauthorized request.')

    @api.response(204, 'Partnership account successfully updated.')
    @api.expect(serializers.account_update, validate=True)
    @requires_auth
    def put(self, paid):
        """
        Update a partnership account.

        <p>The Accounts PUT endpoint should be used to update an existing partner account. For example, you can change
        the partner account code assigned to an existing partner account or add a user to the account.</p>
        <br />
        <p>
        You will require a partner authentication token and a partner account id to make a successful request.
        </p>
        """
        if rest_is_partnership and rest_partnership is not None:
            # Create a context for the database connection.
            app = current_app
            db.app = app
            # Create db connection
            conn = db.engine.connect()
            type_switcher = TypeSwitcher()
            received = request.json

            partnership_account = PartnershipAccount\
                .query\
                .filter(and_(PartnershipAccount.id == paid,
                             PartnershipAccount.partnership_id == rest_partnership.id))\
                .first()

            if received is not None and partnership_account is not None:
                result_id = paid
                result_name = None
                result_active = None
                result_partner_account_code = None
                result_partner_account_credit_service_provider = None
                result_partner_users = []

                if 'is_active' in received:
                    result_active = received['is_active']

                if 'name' in received:
                    result_name = received['name']

                if 'partner_account_code' in received:
                    result_partner_account_code = received['partner_account_code']

                if 'partner_account_credit_service_provider' in received:
                    result_partner_account_credit_service_provider = received['partner_account_credit_service_provider']

                if 'users' in received and len(received["users"]) != 0:
                    for usr in received['users']:
                        # Validate email address
                        email_check = type_switcher.email_type(usr["email"])
                        if not email_check[0]:
                            api.abort(code=400, message=email_check[1])
                        # Check to see if email already exist
                        usr_email = usr["email"].lower()
                        exist_user = conn.execute(
                            text("SELECT * FROM users "
                                 "WHERE email=:user_email"),
                            user_email=usr_email)
                        if exist_user:
                            existing_user_list = 0
                            for eus in exist_user:
                                existing_user_list += 1
                            if existing_user_list > 0:
                                api.abort(code=400, message="User with email address; " + usr_email + " already exist.")
                        # Validate phone number
                        phone_check = type_switcher.phone_type(str(usr["phonenumber"]))
                        if not phone_check[0]:
                            api.abort(code=400, message=phone_check[1])
                        result_partner_users.append(usr)

                if PartnershipAccount.api_update(
                        result_id,
                        rest_partnership.id,
                        result_name,
                        result_active,
                        result_partner_account_code,
                        result_partner_account_credit_service_provider,
                        result_partner_users
                ):
                    return True, 204
                else:
                    api.abort(code=400, message="Error updating partnership account.")
            else:
                return api.abort(400)
        else:
            return api.abort(401)


@ns.route('/search')
@api.doc(responses={200: 'OK',
                    400: 'Error performing operation.',
                    401: 'Unauthorized request.',
                    404: 'Partnership account not found.'},
         params={'name': {'description': 'The name of the partner account', 'type': 'str'},
                 'email': {'description': 'The email address of partner account user', 'type': 'str'}})
class AccountSearch(Resource):
    @requires_auth
    def get(self):
        """
        Search for a partnership account by name or email address

        <p>The Accounts GET endpoint should be used when looking up specific partnership accounts.</p>
        <br />
        <p>
        You require a partner authentication token, a name parameter and optionally an email address
        parameter to make a successful request.
         A response will be returned, similar to the example below, based
        on a successful request:
        <br />
        <br />
        </p>
         <pre class="code-background" style="color: white">
            {
                "results":
                    "accounts":
                        [{
                            "name": "Mac Tyre",
                            "id": 453,
                            "is_active": false,
                            "created_on": "Fri, 11 Sep 2020 23:26:05 GMT"
                        },
                        {
                            "name": "Mac & Tire Shop",
                            "id": 566,
                            "is_active": true,
                            "created_on": "Fri, 11 Sep 2020 23:26:05 GMT"
                        }]
                    "user":
                        {
                            "name": "John Townsend",
                            "email": "john@townsend.com",
                            "is_active": false,
                            "created_on": "Fri, 11 Sep 2020 23:26:05 GMT",
                            "partnership_account_id": 4,
                            "partnership_account_name": "Mac & Tire Shop"
                        }
            }
        </pre>
        """
        if rest_is_partnership and rest_partnership is not None:
            # Create a context for the database connection.
            app = current_app
            db.app = app
            # Create db connection
            conn = db.engine.connect()
            received = request.args
            if received is not None:
                name = received.get('name')
                email = received.get('email')
            else:
                name = ''
                email = ''

            ac_results = []
            if name:
                from thefuzz import fuzz
                partner_accounts = PartnershipAccount.query.filter(PartnershipAccount.partnership_id == rest_partnership.id).all()
                for ac in partner_accounts:
                    src_score = fuzz.partial_ratio(ac.name, name)
                    if src_score > 65:
                        result_dict = dict(
                            account_id=ac.id,
                            account_name=ac.name,
                            partner_account_code=ac.partner_account_code,
                            is_active=ac.active,
                            created_on=ac.created_on
                        )
                        ac_results.append(result_dict)
            user_dict = {}
            if email:
                email_lower = email.lower()
                user = conn.execute(
                    text("SELECT * FROM users "
                         "WHERE email=:entered_email "
                         "AND partnership_id=:partner_id"),
                    entered_email=email_lower,
                    partner_id=rest_partnership.id)
                if user:
                    for usr in user:
                        u_pa = (PartnershipAccount.query
                                .filter(PartnershipAccount.id == usr.partnership_account_id).first())
                        if u_pa:
                            partner_account_name = u_pa.name
                        else:
                            partner_account_name = ''
                        user_dict = dict(
                            name=usr.firstname + " " + usr.lastname,
                            email=usr.email,
                            is_deactivated=usr.is_deactivated,
                            created_on=usr.created_on,
                            partnership_account_id=usr.partnership_account_id,
                            partnership_account_name=partner_account_name
                        )

            if user_dict:
                result_dict = dict(
                    accounts=ac_results,
                    user=user_dict
                )
                return jsonify(result_dict)
            else:
                result_dict = dict(
                    accounts=ac_results
                )
                return jsonify(result_dict)
        else:
            return api.abort(401, message='Unauthorized request.')


"""
# DISABLE ENDPOINT
@ns.route('/<int:paid>/usage')
@api.doc(responses={200: 'OK',
                    400: 'Error performing operation.',
                    401: 'Unauthorized request.',
                    404: 'Partnership account not found.'},
         params={'paid': 'The partner account Id'})
"""
class Usage(Resource):
    @requires_auth
    def get(self, paid):
        """
        Retrieves partnership usage measurements.

        <p>The Accounts Usage GET endpoint should be used to return all usage data, which includes: minutes,
        phonenumbers, message etc., for a specific partner account.</p>
        <br />
        <p>
        You require a partner authentication token and a partner account id to make a successful request.
        A response will be returned, similar to the example below, based
        on a successful request:
        <br />
        <br />
        </p>
         <pre class="code-background" style="color: white">
        {
          "current_month_min_usage": 0,
          "current_month_total_inbound_calls": 0,
          "current_month_total_inbound_messages": 0,
          "current_month_total_outbound_calls": 1,
          "current_month_total_outbound_messages": 1,
          "previous_month_minutes": 6,
          "previous_month_total_inbound_calls": 8,
          "previous_month_total_inbound_messages": 1,
          "previous_month_total_outbound_calls": 8,
          "previous_month_total_outbound_messages": 4,
          "total_active_phonenumbers": 19,
          "total_calls": 424,
          "total_inbound_calls": 396,
          "total_inbound_messages": 1,
          "total_messages": 15,
          "total_min_usage": 19,
          "total_mobile_phonenumbers": 8,
          "total_outbound_calls": 28,
          "total_outbound_messages": 14,
          "total_priority_phonenumbers": 8,
          "total_tracking_phonenumbers": 3
        }
     </pre>

        """
        if rest_is_partnership and rest_partnership is not None:
            partnership_account = PartnershipAccount\
                .query\
                .filter(and_(PartnershipAccount.id == paid, PartnershipAccount.partnership_id == rest_partnership.id))\
                .first()

            if partnership_account is not None:
                result_id = partnership_account.id
                result_name = partnership_account.name

                return jsonify(
                    total_min_usage=partnership_account.total_minute_usage(paid),
                    current_month_min_usage=partnership_account.monthly_minute_usage(paid),
                    previous_month_minutes=partnership_account.previous_month_minute_usage(paid),
                    total_calls=partnership_account.total_calls_count(paid),
                    total_inbound_calls=partnership_account.total_inbound_calls_count(paid),
                    current_month_total_inbound_calls=partnership_account.monthly_total_inbound_calls_count(paid),
                    previous_month_total_inbound_calls=partnership_account.previous_total_inbound_calls_count(paid),
                    total_outbound_calls=partnership_account.total_outbound_calls_count(paid),
                    current_month_total_outbound_calls=partnership_account.monthly_total_outbound_calls_count(paid),
                    previous_month_total_outbound_calls=partnership_account.previous_total_outbound_calls_count(paid),
                    total_active_phonenumbers=partnership_account.total_active_phonenumbers(paid),
                    total_priority_phonenumbers=partnership_account.total_priority_phonenumbers(paid),
                    total_mobile_phonenumbers=partnership_account.total_mobile_phonenumbers(paid),
                    total_tracking_phonenumbers=partnership_account.total_tracking_phonenumbers(paid),
                    total_messages=partnership_account.total_messages(paid),
                    total_inbound_messages=partnership_account.total_inbound_messages(paid),
                    current_month_total_inbound_messages=partnership_account.total_monthly_inbound_messages(paid),
                    previous_month_total_inbound_messages=partnership_account.previous_total_monthly_inbound_messages(paid),
                    total_outbound_messages=partnership_account.total_outbound_messages(paid),
                    current_month_total_outbound_messages=partnership_account.total_monthly_outbound_messages(paid),
                    previous_month_total_outbound_messages=partnership_account.previous_total_monthly_outbound_messages(paid)
                )
            else:
                return api.abort(404, message='Partnership account not found.')

        else:
            return api.abort(401)