HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall/cli/commands/cmd_add.py
import logging
import random
from datetime import datetime
from buyercall.blueprints.filters import format_phone_number

import click
from faker import Faker

SEED_ADMIN_EMAIL = None
ACCEPT_LANGUAGES = None

try:
    from instance import settings

    SEED_ADMIN_EMAIL = settings.SEED_ADMIN_EMAIL
    ACCEPT_LANGUAGES = settings.ACCEPT_LANGUAGES
except ImportError:
    logging.error('Ensure __init__.py and settings.py both exist in instance/')
    exit(1)
except AttributeError:
    from config import settings

    if SEED_ADMIN_EMAIL is None:
        SEED_ADMIN_EMAIL = settings.SEED_ADMIN_EMAIL

    if ACCEPT_LANGUAGES is None:
        ACCEPT_LANGUAGES = settings.ACCEPT_LANGUAGES

from buyercall.app import create_app
from buyercall.extensions import db
from buyercall.blueprints.issue.models import Issue
from buyercall.blueprints.leads.models import Lead
from buyercall.blueprints.agents.models import Agent
from buyercall.blueprints.user.models import User
from buyercall.blueprints.partnership.models import PartnershipAccount
from buyercall.blueprints.phonenumbers.models import Phone
from buyercall.blueprints.contacts.models import Contact
from buyercall.blueprints.billing.models.invoice import Invoice
from buyercall.blueprints.billing.models.coupon import Coupon
from buyercall.blueprints.billing.gateways.stripecom import \
    Coupon as PaymentCoupon

fake = Faker()
app = create_app()
db.app = app


def _log_status(count, model_label):
    """
    Log the output of how many records were created.

    :param count: Amount created
    :type count: int
    :param model_label: Name of the model
    :type model_label: str
    :return: None
    """
    click.echo('Created {0} {1}'.format(count, model_label))

    return None


def _bulk_insert(model, data, label):
    """
    Bulk insert data to a specific model and log it.

    :param model: Model being affected
    :type model: SQLAlchemy
    :param data: Data to be saved
    :type data: list
    :param label: Label for the output
    :type label: str
    :return: None
    """
    with app.app_context():
        #model.query.delete()
        db.session.commit()
        db.engine.execute(model.__table__.insert(), data)

        _log_status(model.query.count(), label)

    return None


@click.group()
def cli():
    """ Populate your db with generated data. """
    pass


@click.command()
def users():
    """
    Create random users.
    """
    random_emails = []
    data = []

    # Ensure we get about 50 unique random emails, +1 due to the seeded email.
    for i in range(0, 49):
        random_emails.append(fake.email())

    random_emails.append(SEED_ADMIN_EMAIL)
    random_emails = list(set(random_emails))

    while True:
        if len(random_emails) == 0:
            break

        email = random_emails.pop()

        params = {
            'role': random.choice(User.ROLE.keys()),
            'email': email,
            'password': User.encrypt_password('password'),
            'firstname': fake.first_name(),
            'lastname': fake.last_name(),
            'locale': random.choice(ACCEPT_LANGUAGES),
            'company': fake.company(),
            'phonenumber': fake.phone_number(),
            'extension': fake.year(),
            'is_tos_agreed': '1'
        }

        # Ensure the seeded admin is always an admin.
        if email == SEED_ADMIN_EMAIL:
            params['role'] = 'admin'
            params['locale'] = 'en'

        data.append(params)

    return _bulk_insert(User, data, 'users')


@click.command()
def contacts():
    """
    Create random contacts.
    """
    random_phone = []
    data = []

    # Ensure we get about 50 unique random emails, +1 due to the seeded email.
    for i in range(0, 8000):
        random_phone.append(format_phone_number(fake.phone_number()))

    random_phone = list(set(random_phone))

    while True:
        if len(random_phone) == 0:
            break

        phone = random_phone.pop()

        params = {
            'firstname': fake.first_name(),
            'lastname': fake.last_name(),
            'phonenumber_1': phone,
            'address_1': fake.street_name(),
            'city': fake.city(),
            'state': fake.state(),
            'zip': fake.zipcode(),
            'email': fake.email(),
            'birthday': '04/03/1984',
            'partnership_account_id': 1
        }

        data.append(params)

    return _bulk_insert(Contact, data, 'contacts')


@click.command()
def issues():
    """
    Create random issues.
    """
    data = []

    for i in range(0, 50):
        params = {
            'status': random.choice(Issue.STATUS.keys()),
            'label': random.choice(Issue.LABEL.keys()),
            'email': fake.email(),
            'question': fake.paragraph()
        }

        data.append(params)

    return _bulk_insert(Issue, data, 'issues')


@click.command()
def leads():
    """
    Create random leads.
    """
    data = []

    partnership_accounts = db.session.query(PartnershipAccount).all()

    for partnership_account in partnership_accounts:
        for i in range(0, 500):
            # Create a fake unix timestamp in the future.
            created_on = fake.date_time_between(
                start_date='-1y', end_date='now').strftime('%s')

            created_on = datetime.utcfromtimestamp(
                float(created_on)).strftime('2017-%m-%dT%H:%M:%S Z')

            duration = random.randint(1, 300)
            # Use dev@localhost user id for all leads
            partnership_account_id = partnership_account.id

            params = {
                'created_on': created_on,
                'updated_on': created_on,
                'partnership_account_id': partnership_account_id,
                'status': random.choice(['completed', 'missed']),
                'firstname': fake.first_name(),
                'lastname': fake.last_name(),
                'email': fake.email(),
                'phonenumber': fake.phone_number(),
                'duration': duration,
                'starttime': fake.date_time(),
                'endtime': fake.date_time(),
                'call_sid': fake.md5(),
                'notes': fake.text(),
                'call_type': random.choice(['inbound', 'outbound'])
            }

            data.append(params)

        return _bulk_insert(Lead, data, 'leads')


@click.command()
def agents():
    """
    Create random agents.
    """
    data = []

    users = db.session.query(User).all()

    for user in users:
        for i in range(0, 50):
            # Create a fake unix timestamp in the future.
            created_on = fake.date_time_between(
                start_date='-1y', end_date='now').strftime('%s')

            created_on = datetime.utcfromtimestamp(
                float(created_on)).strftime('%Y-%m-%dT%H:%M:%S Z')

            # Use dev@localhost user id for all phonenumbers
            agent_user = user.id

            params = {
                'created_on': created_on,
                'updated_on': created_on,
                'user_id': agent_user,
                'firstname': fake.first_name(),
                'lastname': fake.last_name(),
                'title': fake.last_name(),
                'email': fake.email(),
                'phonenumber': fake.phone_number(),
                'extension': fake.year(),
                'department': random.choice(Agent.DEPARTMENT.keys()),
            }

            data.append(params)

        return _bulk_insert(Agent, data, 'agents')


@click.command()
def phonenumbers():
    """
    Create random phonenumbers.
    """
    data = []

    users = db.session.query(PartnershipAccount).all()

    for i in range(0, 50):
        # Create a fake unix timestamp in the future.
        created_on = fake.date_time_between(
            start_date='-1y', end_date='now').strftime('%s')

        created_on = datetime.utcfromtimestamp(
            float(created_on)).strftime('%Y-%m-%dT%H:%M:%S Z')

        # Use dev@localhost user id for all phonenumbers
        phone_user = random.choice(users).id
        # Get random number between 0 and 1 for boolean fields
        random_boolean = bool(random.randint(0, 1))

        params = {
            'created_on': created_on,
            'updated_on': created_on,
            'partnership_account_id': phone_user,
            'phonenumber': fake.phone_number(),
            'is_local': random_boolean,
            'is_tollfree': random_boolean,
            'is_active': random_boolean,
            'friendly_name': fake.first_name(),


        }

        data.append(params)

    return _bulk_insert(Phone, data, 'phonenumbers')


@click.command()
def coupons():
    """
    Create random coupons.
    """
    data = []

    for i in range(0, 5):
        random_pct = random.random()
        duration = random.choice(Coupon.DURATION.keys())

        # Create a fake unix timestamp in the future.
        redeem_by = fake.date_time_between(start_date='now',
                                           end_date='+1y').strftime('%s')

        # Bulk inserts need the same columns, so let's setup a few nulls.
        params = {
            'code': Coupon.random_coupon_code(),
            'duration': duration,
            'percent_off': None,
            'amount_off': None,
            'currency': None,
            'redeem_by': None,
            'max_redemptions': None,
            'duration_in_months': None,
        }

        if random_pct >= 0.5:
            params['percent_off'] = random.randint(1, 100)
            params['max_redemptions'] = random.randint(15, 50)
        else:
            params['amount_off'] = random.randint(1, 1337)
            params['currency'] = 'usd'

        if random_pct >= 0.75:
            params['redeem_by'] = redeem_by

        if duration == 'repeating':
            duration_in_months = random.randint(1, 12)
            params['duration_in_months'] = duration_in_months

        PaymentCoupon.create(**params)

        # Our database requires a Date object, not a unix timestamp.
        if redeem_by:
            params['redeem_by'] = datetime.utcfromtimestamp(float(redeem_by)) \
                .strftime('%Y-%m-%dT%H:%M:%S Z')

        if 'id' in params:
            params['code'] = params.get('id')
            del params['id']

        data.append(params)

    return _bulk_insert(Coupon, data, 'coupons')


@click.command()
def invoices():
    """
    Create random invoices.
    """
    data = []

    users = db.session.query(User).all()

    for user in users:
        for i in range(0, random.randint(1, 12)):
            # Create a fake unix timestamp in the future.
            created_on = fake.date_time_between(
                start_date='-1y', end_date='now').strftime('%s')
            period_start_on = fake.date_time_between(
                start_date='now', end_date='+1y').strftime('%s')
            period_end_on = fake.date_time_between(
                start_date=period_start_on, end_date='+14d').strftime('%s')
            exp_date = fake.date_time_between(
                start_date='now', end_date='+2y').strftime('%s')

            created_on = datetime.utcfromtimestamp(
                float(created_on)).strftime('%Y-%m-%dT%H:%M:%S Z')
            period_start_on = datetime.utcfromtimestamp(
                float(period_start_on)).strftime('%Y-%m-%d')
            period_end_on = datetime.utcfromtimestamp(
                float(period_end_on)).strftime('%Y-%m-%d')
            exp_date = datetime.utcfromtimestamp(
                float(exp_date)).strftime('%Y-%m-%d')

            plans = ['BRONZE', 'GOLD', 'PLATINUM']
            cards = ['Visa', 'Mastercard', 'AMEX',
                     'J.C.B', "Diner's Club"]

            params = {
                'created_on': created_on,
                'updated_on': created_on,
                'user_id': user.id,
                'receipt_number': fake.md5(),
                'description': '{0} MONTHLY'.format(random.choice(plans)),
                'period_start_on': period_start_on,
                'period_end_on': period_end_on,
                'currency': 'usd',
                'tax': random.random() * 100,
                'tax_percent': random.random() * 10,
                'total': random.random() * 1000,
                'brand': random.choice(cards),
                'last4': random.randint(1000, 9000),
                'exp_date': exp_date
            }

            data.append(params)

    return _bulk_insert(Invoice, data, 'invoices')


@click.command()
@click.pass_context
def all(ctx):
    """
    Populate everything at once.

    :param ctx:
    :return: None
    """
    ctx.invoke(users)
    ctx.invoke(issues)
    ctx.invoke(coupons)
    ctx.invoke(invoices)
    ctx.invoke(contacts)
    ctx.invoke(leads)
    ctx.invoke(agents)
    ctx.invoke(phonenumbers)

    return None


cli.add_command(users)
cli.add_command(contacts)
cli.add_command(issues)
cli.add_command(coupons)
cli.add_command(invoices)
cli.add_command(leads)
cli.add_command(agents)
cli.add_command(phonenumbers)
cli.add_command(all)