File: //home/arjun/projects/buyercall_forms/buyercall/cli/commands/cmd_add.py
import logging
import random
from datetime import datetime
from buyercall.blueprints.filters import format_phone_number
import click
from faker import Faker
SEED_ADMIN_EMAIL = None
ACCEPT_LANGUAGES = None
try:
from instance import settings
SEED_ADMIN_EMAIL = settings.SEED_ADMIN_EMAIL
ACCEPT_LANGUAGES = settings.ACCEPT_LANGUAGES
except ImportError:
logging.error('Ensure __init__.py and settings.py both exist in instance/')
exit(1)
except AttributeError:
from config import settings
if SEED_ADMIN_EMAIL is None:
SEED_ADMIN_EMAIL = settings.SEED_ADMIN_EMAIL
if ACCEPT_LANGUAGES is None:
ACCEPT_LANGUAGES = settings.ACCEPT_LANGUAGES
from buyercall.app import create_app
from buyercall.extensions import db
from buyercall.blueprints.issue.models import Issue
from buyercall.blueprints.leads.models import Lead
from buyercall.blueprints.agents.models import Agent
from buyercall.blueprints.user.models import User
from buyercall.blueprints.partnership.models import PartnershipAccount
from buyercall.blueprints.phonenumbers.models import Phone
from buyercall.blueprints.contacts.models import Contact
from buyercall.blueprints.billing.models.invoice import Invoice
from buyercall.blueprints.billing.models.coupon import Coupon
from buyercall.blueprints.billing.gateways.stripecom import \
Coupon as PaymentCoupon
fake = Faker()
app = create_app()
db.app = app
def _log_status(count, model_label):
"""
Log the output of how many records were created.
:param count: Amount created
:type count: int
:param model_label: Name of the model
:type model_label: str
:return: None
"""
click.echo('Created {0} {1}'.format(count, model_label))
return None
def _bulk_insert(model, data, label):
"""
Bulk insert data to a specific model and log it.
:param model: Model being affected
:type model: SQLAlchemy
:param data: Data to be saved
:type data: list
:param label: Label for the output
:type label: str
:return: None
"""
with app.app_context():
#model.query.delete()
db.session.commit()
db.engine.execute(model.__table__.insert(), data)
_log_status(model.query.count(), label)
return None
@click.group()
def cli():
""" Populate your db with generated data. """
pass
@click.command()
def users():
"""
Create random users.
"""
random_emails = []
data = []
# Ensure we get about 50 unique random emails, +1 due to the seeded email.
for i in range(0, 49):
random_emails.append(fake.email())
random_emails.append(SEED_ADMIN_EMAIL)
random_emails = list(set(random_emails))
while True:
if len(random_emails) == 0:
break
email = random_emails.pop()
params = {
'role': random.choice(User.ROLE.keys()),
'email': email,
'password': User.encrypt_password('password'),
'firstname': fake.first_name(),
'lastname': fake.last_name(),
'locale': random.choice(ACCEPT_LANGUAGES),
'company': fake.company(),
'phonenumber': fake.phone_number(),
'extension': fake.year(),
'is_tos_agreed': '1'
}
# Ensure the seeded admin is always an admin.
if email == SEED_ADMIN_EMAIL:
params['role'] = 'admin'
params['locale'] = 'en'
data.append(params)
return _bulk_insert(User, data, 'users')
@click.command()
def contacts():
"""
Create random contacts.
"""
random_phone = []
data = []
# Ensure we get about 50 unique random emails, +1 due to the seeded email.
for i in range(0, 8000):
random_phone.append(format_phone_number(fake.phone_number()))
random_phone = list(set(random_phone))
while True:
if len(random_phone) == 0:
break
phone = random_phone.pop()
params = {
'firstname': fake.first_name(),
'lastname': fake.last_name(),
'phonenumber_1': phone,
'address_1': fake.street_name(),
'city': fake.city(),
'state': fake.state(),
'zip': fake.zipcode(),
'email': fake.email(),
'birthday': '04/03/1984',
'partnership_account_id': 1
}
data.append(params)
return _bulk_insert(Contact, data, 'contacts')
@click.command()
def issues():
"""
Create random issues.
"""
data = []
for i in range(0, 50):
params = {
'status': random.choice(Issue.STATUS.keys()),
'label': random.choice(Issue.LABEL.keys()),
'email': fake.email(),
'question': fake.paragraph()
}
data.append(params)
return _bulk_insert(Issue, data, 'issues')
@click.command()
def leads():
"""
Create random leads.
"""
data = []
partnership_accounts = db.session.query(PartnershipAccount).all()
for partnership_account in partnership_accounts:
for i in range(0, 500):
# Create a fake unix timestamp in the future.
created_on = fake.date_time_between(
start_date='-1y', end_date='now').strftime('%s')
created_on = datetime.utcfromtimestamp(
float(created_on)).strftime('2017-%m-%dT%H:%M:%S Z')
duration = random.randint(1, 300)
# Use dev@localhost user id for all leads
partnership_account_id = partnership_account.id
params = {
'created_on': created_on,
'updated_on': created_on,
'partnership_account_id': partnership_account_id,
'status': random.choice(['completed', 'missed']),
'firstname': fake.first_name(),
'lastname': fake.last_name(),
'email': fake.email(),
'phonenumber': fake.phone_number(),
'duration': duration,
'starttime': fake.date_time(),
'endtime': fake.date_time(),
'call_sid': fake.md5(),
'notes': fake.text(),
'call_type': random.choice(['inbound', 'outbound'])
}
data.append(params)
return _bulk_insert(Lead, data, 'leads')
@click.command()
def agents():
"""
Create random agents.
"""
data = []
users = db.session.query(User).all()
for user in users:
for i in range(0, 50):
# Create a fake unix timestamp in the future.
created_on = fake.date_time_between(
start_date='-1y', end_date='now').strftime('%s')
created_on = datetime.utcfromtimestamp(
float(created_on)).strftime('%Y-%m-%dT%H:%M:%S Z')
# Use dev@localhost user id for all phonenumbers
agent_user = user.id
params = {
'created_on': created_on,
'updated_on': created_on,
'user_id': agent_user,
'firstname': fake.first_name(),
'lastname': fake.last_name(),
'title': fake.last_name(),
'email': fake.email(),
'phonenumber': fake.phone_number(),
'extension': fake.year(),
'department': random.choice(Agent.DEPARTMENT.keys()),
}
data.append(params)
return _bulk_insert(Agent, data, 'agents')
@click.command()
def phonenumbers():
"""
Create random phonenumbers.
"""
data = []
users = db.session.query(PartnershipAccount).all()
for i in range(0, 50):
# Create a fake unix timestamp in the future.
created_on = fake.date_time_between(
start_date='-1y', end_date='now').strftime('%s')
created_on = datetime.utcfromtimestamp(
float(created_on)).strftime('%Y-%m-%dT%H:%M:%S Z')
# Use dev@localhost user id for all phonenumbers
phone_user = random.choice(users).id
# Get random number between 0 and 1 for boolean fields
random_boolean = bool(random.randint(0, 1))
params = {
'created_on': created_on,
'updated_on': created_on,
'partnership_account_id': phone_user,
'phonenumber': fake.phone_number(),
'is_local': random_boolean,
'is_tollfree': random_boolean,
'is_active': random_boolean,
'friendly_name': fake.first_name(),
}
data.append(params)
return _bulk_insert(Phone, data, 'phonenumbers')
@click.command()
def coupons():
"""
Create random coupons.
"""
data = []
for i in range(0, 5):
random_pct = random.random()
duration = random.choice(Coupon.DURATION.keys())
# Create a fake unix timestamp in the future.
redeem_by = fake.date_time_between(start_date='now',
end_date='+1y').strftime('%s')
# Bulk inserts need the same columns, so let's setup a few nulls.
params = {
'code': Coupon.random_coupon_code(),
'duration': duration,
'percent_off': None,
'amount_off': None,
'currency': None,
'redeem_by': None,
'max_redemptions': None,
'duration_in_months': None,
}
if random_pct >= 0.5:
params['percent_off'] = random.randint(1, 100)
params['max_redemptions'] = random.randint(15, 50)
else:
params['amount_off'] = random.randint(1, 1337)
params['currency'] = 'usd'
if random_pct >= 0.75:
params['redeem_by'] = redeem_by
if duration == 'repeating':
duration_in_months = random.randint(1, 12)
params['duration_in_months'] = duration_in_months
PaymentCoupon.create(**params)
# Our database requires a Date object, not a unix timestamp.
if redeem_by:
params['redeem_by'] = datetime.utcfromtimestamp(float(redeem_by)) \
.strftime('%Y-%m-%dT%H:%M:%S Z')
if 'id' in params:
params['code'] = params.get('id')
del params['id']
data.append(params)
return _bulk_insert(Coupon, data, 'coupons')
@click.command()
def invoices():
"""
Create random invoices.
"""
data = []
users = db.session.query(User).all()
for user in users:
for i in range(0, random.randint(1, 12)):
# Create a fake unix timestamp in the future.
created_on = fake.date_time_between(
start_date='-1y', end_date='now').strftime('%s')
period_start_on = fake.date_time_between(
start_date='now', end_date='+1y').strftime('%s')
period_end_on = fake.date_time_between(
start_date=period_start_on, end_date='+14d').strftime('%s')
exp_date = fake.date_time_between(
start_date='now', end_date='+2y').strftime('%s')
created_on = datetime.utcfromtimestamp(
float(created_on)).strftime('%Y-%m-%dT%H:%M:%S Z')
period_start_on = datetime.utcfromtimestamp(
float(period_start_on)).strftime('%Y-%m-%d')
period_end_on = datetime.utcfromtimestamp(
float(period_end_on)).strftime('%Y-%m-%d')
exp_date = datetime.utcfromtimestamp(
float(exp_date)).strftime('%Y-%m-%d')
plans = ['BRONZE', 'GOLD', 'PLATINUM']
cards = ['Visa', 'Mastercard', 'AMEX',
'J.C.B', "Diner's Club"]
params = {
'created_on': created_on,
'updated_on': created_on,
'user_id': user.id,
'receipt_number': fake.md5(),
'description': '{0} MONTHLY'.format(random.choice(plans)),
'period_start_on': period_start_on,
'period_end_on': period_end_on,
'currency': 'usd',
'tax': random.random() * 100,
'tax_percent': random.random() * 10,
'total': random.random() * 1000,
'brand': random.choice(cards),
'last4': random.randint(1000, 9000),
'exp_date': exp_date
}
data.append(params)
return _bulk_insert(Invoice, data, 'invoices')
@click.command()
@click.pass_context
def all(ctx):
"""
Populate everything at once.
:param ctx:
:return: None
"""
ctx.invoke(users)
ctx.invoke(issues)
ctx.invoke(coupons)
ctx.invoke(invoices)
ctx.invoke(contacts)
ctx.invoke(leads)
ctx.invoke(agents)
ctx.invoke(phonenumbers)
return None
cli.add_command(users)
cli.add_command(contacts)
cli.add_command(issues)
cli.add_command(coupons)
cli.add_command(invoices)
cli.add_command(leads)
cli.add_command(agents)
cli.add_command(phonenumbers)
cli.add_command(all)