File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/blueprints/admin/models.py
from sqlalchemy import and_, func
from sqlalchemy.sql import text
from buyercall.blueprints.user.models import db, User
from buyercall.blueprints.issue.models import Issue
from buyercall.blueprints.billing.models.subscription import Subscription
from buyercall.blueprints.partnership.models import PartnershipAccount
class Dashboard(object):
@classmethod
def group_and_count_coupons(cls):
"""
Obtain coupon usage statistics across all subscribers.
:return: tuple
"""
not_null = db.session.query(Subscription).filter(
Subscription.coupon.isnot(None)).count()
total = db.session.query(func.count(Subscription.id)).scalar()
if total == 0:
percent = 0
else:
percent = round((not_null / float(total)) * 100, 1)
return not_null, total, percent
@classmethod
def group_and_count_plans(cls, current_user):
"""
Perform a group by/count on all subscriber types.
:return: dict
"""
if current_user.role == 'partner':
ids = PartnershipAccount.query.with_entities(PartnershipAccount.id)\
.filter(PartnershipAccount.partnership_id == current_user.partnership_id,
PartnershipAccount.active == True)\
.all()
filter_exp = and_(PartnershipAccount.id.in_(ids),
PartnershipAccount.subscription_id == Subscription.id,
Subscription.status == 'active')
elif current_user.role in ['sysadmin', 'limitsysadmin']:
# show all
filter_exp = (Subscription.status == 'active')
return Dashboard.group_and_count(Subscription, Subscription.plan, filter_exp)
@classmethod
def group_and_count_users(cls):
"""
Perform a group by/count on all user types.
:return: dict
"""
return Dashboard._group_and_count(User, User.role)
@classmethod
def group_and_count_issues(cls, current_user):
"""
Perform a group by/count on all issue types.
:return: dict
"""
filter_exp = text('1 = 1')
if current_user.role == 'partner':
filter_exp = Issue.partnership_id == current_user.partnership_id
if current_user.role == 'admin':
if current_user.is_viewing_partnership:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
filter_exp = and_(Issue.partnership_account_id == partnership_account_id)
else:
filter_exp = and_(Issue.partnership_account_id == current_user.partnership_account_id)
# filter_exp = and_(Issue.partnership_id == current_user.partnership_id,
# Issue.partnership_account_id == current_user.partnership_account_id)
return Dashboard.group_and_count(Issue, Issue.status, filter_exp)
@classmethod
def _group_and_count(cls, model, field):
"""
Group results for a specific model and field.
:param model: Name of the model
:type model: SQLAlchemy model
:param field: Name of the field to group on
:type field: SQLAlchemy field
:return: dict
"""
count = func.count(field)
query = db.session.query(count, field).group_by(field).all()
results = {
'query': query,
'total': model.query.count()
}
return results
@classmethod
def group_and_count(cls, model, field, filter_exp):
"""
Group results for a specific model and field.
:param model: Name of the model
:type model: SQLAlchemy model
:param field: Name of the field to group on
:type field: SQLAlchemy field
:param filter_exp: SQLAlchemy filter expression
:type field: SQLAlchemy filter
:return: dict
"""
count = func.count(field)
query = db.session.query(count, field).filter(filter_exp).group_by(field).all()
results = {
'query': query,
'total': model.query.filter(filter_exp).count()
}
return results