HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall_new/buyercall/buyercall/blueprints/sysadmin/views.py
import os

from flask import (
    Blueprint,
    make_response,
    flash,
    jsonify,
    json,
    render_template,
    g, request, redirect, url_for,
    Response)
from flask_babel import gettext as _
from flask_babel import ngettext as _n
from flask_login import login_required, current_user, logout_user, login_user

from buyercall.blueprints.sysadmin.models import RequestLog
from buyercall.lib.supervisor_manager import current_supervisor_user
from buyercall.lib.supervisor_manager import (
    login_user as supervisor_login_user
)
from sqlalchemy.sql.elements import and_
from sqlalchemy.exc import IntegrityError, InvalidRequestError
from sqlalchemy import asc, desc, or_, func
from sqlalchemy.orm import load_only
from buyercall.blueprints.user.decorators import role_required
from buyercall.blueprints.admin.forms import SearchForm, BulkDeleteForm, \
    UserForm, UserCancelSubscriptionForm, IssueForm, IssueContactForm, \
    CouponForm
from buyercall.blueprints.partnership.forms import PartnershipForm
from buyercall.blueprints.reports.models import Report, ReportUserTie
from buyercall.extensions import db
from sqlalchemy.sql import text
from buyercall.blueprints.user.models import User
from buyercall.blueprints.partnership.models import (
    ApiToken,
    Partnership,
    PartnershipAccount,
    BILLING_TYPE)
from buyercall.blueprints.billing.models.subscription import Subscription
import uuid
from wtforms import Label
import logging as log
from buyercall.extensions import db
from datatables import DataTables, ColumnDT

sysadmin = Blueprint('sysadmin', __name__, template_folder='templates')


@sysadmin.before_request
@login_required
@role_required('sysadmin', 'limitsysadmin')
def before_request():
    """ We are protecting all of our sysadmin endpoints. """
    pass


# Partners  -----------------------------------------------------------------------
@sysadmin.route('/partnerships', defaults={'page': 1})
@sysadmin.route('/partnerships/page/<int:page>')
def partnerships(page):
    search_form = SearchForm()
    bulk_form = BulkDeleteForm()

    sort_by = Partnership.sort_by(request.args.get('sort', 'name'),
                           request.args.get('direction', 'asc'))
    table = 'partnerships'
    order_values = '{0}.{1} {2}'.format(table, sort_by[0], sort_by[1])

    if request.args.get('sort', 'name') == 'count':
        table = 'partnerships'
        sort_by = 'count(users.partnership_id)', sort_by[1]
        order_values = '{1} {2}'.format(table, sort_by[0], sort_by[1])

    order_query = text(order_values)

    paginated_partnerships = Partnership.query.with_entities(Partnership.active, Partnership.id, Partnership.name, func.count(User.partnership_id).label('count')) \
        .outerjoin(User, and_(User.partnership_id == Partnership.id, User.role == 'partner')) \
        .filter(Partnership.search(request.args.get('q', ''))) \
        .order_by(order_query) \
        .group_by(Partnership.active, Partnership.id, Partnership.name, User.partnership_id) \
        .paginate(page, 20, True)

    return render_template('partnerships/index.jinja2',
                           form=search_form, bulk_form=bulk_form,
                           partnerships=paginated_partnerships)


@sysadmin.route('/all_partners')
def partners():
    return redirect(url_for('sysadmin.partnerships', page=1))


@sysadmin.route('/partnership/<int:id>')
def partnership(id):
    search_form = SearchForm()
    bulk_form = BulkDeleteForm()

    sort_by = User.sort_by(request.args.get('sort', 'name'),
                           request.args.get('direction', 'asc'))
    table = 'users'

    order_values = '{0}.{1} {2}'.format(table, sort_by[0], sort_by[1])

    order_query = text(order_values)

    if request.args.get('sort', 'name') == 'name':
        order_query = asc(User.name) if sort_by[1] == 'asc' else desc(User.name)

    partnership_var = Partnership.query.get(id)

    paginated_users = User.query \
        .join(User.partnership) \
        .filter(User.partnership_account_id == None, User.partnership_id == id, User.is_deactivated.is_(False)) \
        .order_by(order_query).all()
    # .filter(User.search(request.args.get('q', ''))) \

    return render_template('partners/index.jinja2',
                           form=search_form, bulk_form=bulk_form,
                           users=paginated_users, partnership=partnership_var)


# TODO: needs review
@sysadmin.route('/partners/<int:id>/edit', methods=['GET', 'POST'])
def partners_edit(id):
    user = User.query.get(id)
    form = UserForm(obj=user)
    form.role = Label('lbl_role', _('%s' % user.role))
    form.role.label = {'text': 'Privileges'}
    form.reports_list.choices = []
    rework_reports = [(report.id, report.name) for report in Report.query.all()]
    for r in rework_reports:
        if r != (3, 'Partner Channel Account Report'):
            form.reports_list.choices.append(r)

    if form.validate_on_submit():
        if User.is_last_partner(user,
                              'partner',
                              request.form.get('active')):
            flash(_('This is the last partner within partnership, you cannot do that.'),
                  'danger')
            return redirect(url_for('sysadmin.partnership', id=user.partnership_id))

        del form.role
        form.populate_obj(user)

        if user.username == '':
            user.username = None
        user.save()

        user.reports = list()
        # re-adding reports
        for id in form.reports_list.data:
            link = ReportUserTie()
            link.report_id = id
            link.user_id = user.id
            db.session.add(link)
        db.session.commit()

        flash(_('User has been saved successfully.'), 'success')
        return redirect(url_for('sysadmin.partnership', id=user.partnership_id))
    else:
        form.reports_list.process_data([link.report.id for link in user.reports])

    return render_template('partners/edit.jinja2', form=form, user=user)


@sysadmin.route('/partnerships/<int:id>/edit/login_access', methods=['POST'])
def partnership_edit_login_access(id):
    partnership = Partnership.query.filter(Partnership.id == id).first()
    status = request.form.get('status', 'off')

    if partnership is None:
        flash(_('This partnership does not exist.'), 'danger')
        return redirect(url_for('sysadmin.partnerships'))

    status = not (True if status == 'on' else False)

    db.session.query(Partnership) \
        .filter(Partnership.id == id) \
        .update({"active": status}, synchronize_session='fetch')

    db.session.query(User) \
        .filter(User.partnership_id == id, User.role.in_(['partner']), User.is_deactivated.is_(False)) \
        .update({"active": status}, synchronize_session='fetch')

    db.session.commit()

    flash(_('Partnership settings have been changed successfully.'), 'success')
    return redirect(url_for('sysadmin.partnerships'))


@sysadmin.route('/partnership/<int:id>/edit/edit_twillio', methods=['POST'])
def partnership_edit_twillio(id):
    partnership = Partnership.query.filter(Partnership.id == id).first()

    status = request.form.get('status', 'off')

    if partnership is None or partnership.subscription is None:
        flash(_('This partnership does not exist.'), 'danger')
        return redirect(url_for('sysadmin.partnerships'))

    status = not (True if status == 'on' else False)

    if status:
        partnership.subscription.activate_twilio_account()
    else:
        partnership.subscription.suspend_twilio_account()

    flash(_('Partnership settings have been changed successfully.'), 'success')
    return redirect(url_for('sysadmin.partnerships'))


@sysadmin.route('/partnership/<int:id>/edit/edit_provider', methods=['POST'])
def partnership_edit_provider(id):
    provider = request.form['provider']
    if provider in ['twilio', 'bandwidth']:
        partnership = Partnership.query.filter(Partnership.id == id).first()
        partnership.default_provider = provider
        db.session.commit()

    flash(_('Partnership settings have been changed successfully.'), 'success')
    return redirect(url_for('sysadmin.partnerships'))


@sysadmin.route('/partnership/<int:id>/edit/close_subscription', methods=['POST'])
def partnership_close_subscription(id):
    partnership = Partnership.query.filter(Partnership.id == id).first()

    if partnership is None or partnership.subscription is None:
        flash(_('This partnership does not exist.'), 'danger')
        return redirect(url_for('sysadmin.partnerships'))

    partnership.subscription.close_twilio_account()

    flash(_('Partnership settings have been changed successfully.'), 'success')
    return redirect(url_for('sysadmin.partnerships'))


@sysadmin.route('/partnership/<int:id>/bulk_delete/', methods=['POST'])
def partners_bulk_delete(id):
    form = BulkDeleteForm()

    if form.validate_on_submit():
        ids = request.form.getlist('bulk_ids')

        totalIds = User.query \
            .filter(User.partnership_id == id, User.role == 'partner', User.active == True, User.is_deactivated.is_(False)) \
            .count()

        if(len(ids) == totalIds):
            flash(_('You cannot deactivate all partners and leave partnership without its manager. Rather '
                    'deactivate whole partnership.'), 'danger')
            return redirect(url_for('sysadmin.partnership', id=id))

        db.session.query(User) \
            .filter(User.id.in_(ids), User.is_deactivated.is_(False)) \
            .update({"active": False}, synchronize_session='fetch')

        db.session.commit()

    else:
        flash(_('No partnerships were deactivated, something went wrong.'), 'danger')

    # return redirect(url_for('sysadmin.partnerships'))
    return redirect(url_for('sysadmin.partnership', id=id))


@sysadmin.route('/partnerships/new', methods=['GET', 'POST'])
@sysadmin.route('/partnerships/<int:id>/edit', methods=['GET', 'POST'])
def partnerships_new(id=None):
    invited_emails = list()
    moved_accounts = list()
    partnership = Partnership()
    if id is not None:
        partnership = Partnership.query.get(id)

    form = PartnershipForm(obj=partnership)

    if id is not None:
        form.default_billing_type = Label('lbl_role', _('%s' % BILLING_TYPE[partnership.default_billing_type]))
        form.default_billing_type.label = {'text': 'Current Default Billing'}

    if form.validate_on_submit():
        with db.session.no_autoflush:
            try:
                del form.logo
                form.populate_obj(partnership)

                invited_emails = json.loads(request.form.get('invited_emails'))
                moved_accounts = json.loads(request.form.get('moved_accounts'))
                moved_accounts = list(map(lambda a: a['id'], moved_accounts))

                partnership.account_invitation_url_token = str(uuid.uuid4())
                if partnership.default_billing_type == 'invoice':
                    invoice_subscription = Subscription.query.filter(Subscription.plan == 'invoice', Subscription.status == 'active').first()
                    partnership.subscription_id = invoice_subscription.id

                if partnership.id is not None and partnership.id > 0:
                    Partnership.update_business_type(partnership.id, partnership.business_type)

                if partnership.api_token_id is None:
                    partnership.regenerate_api_token(partnership.id)

                db.session.add(partnership)
                db.session.flush()

                for email in invited_emails:
                    params = {
                        'role': 'partner',
                        'email': email['name'],
                        'password': 'password',
                        'company': partnership.name,
                        'tos_agreement': False,
                        'partnership': partnership
                    }
                    try:
                        partner = User(**params)
                        db.session.add(partner)
                        db.session.flush()
                        # send email invitation
                        reset_token = partner.serialize_token()
                        from buyercall.blueprints.user.tasks import send_partners_invitation_email
                        # TODO: can't call celery's delay() method. says not serializable
                        send_partners_invitation_email.delay(partner.id, reset_token, partnership.name)
                    except (IntegrityError, InvalidRequestError):
                        flash(_('User with email id :%(email) already exists', email=email['name']),
                              'error')
                        db.session.rollback()

                if len(moved_accounts) > 0:
                    db.session.query(User)\
                        .filter(User.partnership_account_id.in_(moved_accounts), User.is_deactivated.is_(False))\
                        .update({"partnership_id": partnership.id}, synchronize_session='fetch')

                    db.session.query(PartnershipAccount).with_for_update(of=PartnershipAccount) \
                        .filter(PartnershipAccount.id.in_(moved_accounts)) \
                        .update({"partnership_id": partnership.id}, synchronize_session='fetch')

                db.session.flush()
                db.session.commit()

                if request.files and len(request.files) > 0:
                    file = request.files['logo']
                    file.seek(0, os.SEEK_END)
                    file_length = file.tell()
                    if file_length == 0:
                        pass
                    elif file_length > 1 * 1024 * 1024:
                        flash('Logo is too big and was not saved. Max size is 1MB', 'danger')
                        return render_template(url_for('sysadmin.partnerships', page=1))
                    else:
                        file.seek(0)
                        folder_name = '%s_%s' % (partnership.id, partnership.name)
                        file_name = file.filename
                        from buyercall.blueprints.partnership.tasks import upload_partnership_logo
                        upload_partnership_logo(folder_name, file)
                        # partnership.logo = '%s/%s' % (folder_name, file_name)
                        db.session.query\
                            (Partnership) \
                            .filter(Partnership.id == partnership.id) \
                            .update({"logo": '%s/%s' % (folder_name, file_name)}, synchronize_session='fetch')
                        db.session.flush()
                        db.session.commit()

                flash('The partnership has been updated successfully.', 'success')
                return redirect(url_for('sysadmin.partnerships'))
            except Exception as e:
                form.logo = PartnershipForm(obj=partnership).logo
                flash(_(str(e.message)), 'danger')
                db.session.rollback()
                return redirect(url_for('sysadmin.partnerships'))

    return render_template('partnerships/new.jinja2', form=form, partnership=partnership)


@sysadmin.route('/partnerships/act_as', methods=['POST'])
def act_as():
    s = current_supervisor_user
    u = current_user

    target_user = User.query.get(request.form.get('partner_id'))

    if target_user is None:
        flash(_('Such Partnership does not exist'), 'danger')
        return redirect(url_for('sysadmin.partnerships'))

    supervisor_login_user(u, remember=True, force=True)

    logout_user()
    login_user(target_user, remember=True, force=True)

    return redirect(url_for('user.settings'))


@sysadmin.route('/api/accounts/search', methods=['GET', 'POST'])
def search_account():
    query = request.args.get('q', '')
    search_query = '%{0}%'.format(query)
    result = list()
    data = PartnershipAccount.query \
        .join(PartnershipAccount.partnership) \
        .filter(or_(PartnershipAccount.name.ilike(search_query))) \
        .order_by(asc(PartnershipAccount.name)) \
        .limit(20) \
        .all()

    def compile(account):
        o = {
            'id': account.id,
            'name': account.name,
            'partnership': account.partnership.name
        }
        return o

    if len(data) > 0:
        result = list(map(compile, data))

    return Response(json.dumps(result),  mimetype='application/json')


# request log view
@role_required('sysadmin', 'limitsysadmin')
@sysadmin.route('/request-log')
def request_log():

    # Get id and name for Partnership objects
    partnership_data = [{'id': partner_ship.id, 'name': partner_ship.name}
                        for partner_ship in Partnership.query.all()]

    # Get id and name for PartnershipAccount objects
    partnership_account_data = [{'id': account.id, 'name': account.name}
                                for account in PartnershipAccount.query.all()]

    return render_template('request_log/index.jinja2', partner_ships=partnership_data,
                           partnership_accounts=partnership_account_data)


# request log list API
@login_required
@role_required('sysadmin', 'limitsysadmin')
@sysadmin.route('/log-requests')
def request_log_api():
    """Return server side data."""
    from buyercall.blueprints.form_leads.models import ExternalForm as Form
    from buyercall.blueprints.phonenumbers import Phone
    from buyercall.blueprints.leads.models import Lead
    from sqlalchemy import case, null, and_

    partnership_id = request.args.get('partnerIdFilter')
    partnership_account_id = request.args.get('accountIdFilter')
    status = request.args.get('statusCodeFilter')

    columns = [
         ColumnDT(RequestLog.id),
         ColumnDT(RequestLog.partnership_name),
         ColumnDT(RequestLog.partnership_account_name),
         ColumnDT(RequestLog.request_type),
         ColumnDT(RequestLog.response_code),
         ColumnDT(RequestLog.error),
         ColumnDT(RequestLog.method),
         ColumnDT(RequestLog.path_info),
         ColumnDT(RequestLog.remote_ip_address),
         ColumnDT(RequestLog.server_name),
         ColumnDT(RequestLog.created_on),
         ColumnDT(RequestLog.user_agent),
         # ip based details
         ColumnDT(RequestLog.country),
         ColumnDT(RequestLog.state),
         ColumnDT(RequestLog.city),
         ColumnDT(RequestLog.zip_code),
         ColumnDT(RequestLog.reverse),
         # user values
         ColumnDT(User.firstname),
         ColumnDT(User.lastname),
         ColumnDT(User.email),
         # resource
         ColumnDT(Phone.phonenumber),
         ColumnDT(Form.display_name),
         ColumnDT(Lead.phonenumber),

         ]

    query = (
        db.session.query()
        .select_from(RequestLog, User, Form, Phone, Lead)
        .join(User, RequestLog.user_id == User.id, isouter=True)
        .join(Form, and_(RequestLog.resource == Form.id, RequestLog.path_info.startswith('/form_leads')), isouter=True)
        .join(Lead, and_(RequestLog.resource == Lead.id, RequestLog.path_info.startswith('/bw/voice/status')), isouter=True)
        .join(
            Phone,
            and_(
                RequestLog.resource == Phone.id,
                or_(
                    RequestLog.path_info.startswith('/bw/sms'),
                    RequestLog.path_info.startswith('/bw/voice')
                )
            ),
            isouter=True
        )
    )

    # start date filter
    start_date = request.args.get('start_date', None)
    end_date = request.args.get('end_date', None)

    if start_date and end_date:
        query = query.filter(RequestLog.created_on >= start_date, RequestLog.created_on <= end_date)

    # partnership id filter
    filters = {}
    if partnership_id not in ["all_partner_ships", "partnership_all_na"]:
        filters["partnership_id"] = partnership_id
    else:
        if request.args.get('partnerIdFilter') == "partnership_all_na":
            filters["partnership_id"] = None

    # partnership account id filter
    if partnership_account_id not in ["all_partner_ship_accounts", "partnership_account_all_na"]:
        filters["partnership_account_id"] = partnership_account_id
    else:
        if request.args.get('accountIdFilter') == "partnership_account_all_na":
            filters["partnership_account_id"] = None

    if status not in ["all_status"]:
        filters["status"] = status

    query = RequestLog.apply_filters(query, filters)

    # sort query
    sort = request.args.get('sort', "DESC")
    sort_field = request.args.get('sort_field', "created_on")

    if sort_field:
        sort_order = desc if sort.upper() == 'DESC' else asc
        column = getattr(RequestLog, sort_field, None)
        if column is not None:
            query = query.order_by(sort_order(column))
    else:
        query = query.order_by(desc(RequestLog.created_on))

    row_table = DataTables(request.args, query, columns)
    result = row_table.output_result()
    field_mapping = {
        str(index): column for index, column in enumerate(['id', 'partnership_name',
                                                           'partnership_account_name', 'request_type',
                                                           "response_code", "error", "method",
                                                           "path_info", "remote_ip_address",
                                                           "server_name", "created_on", "user_agent",
                                                           "country", "state", "city", "zip_code",
                                                           "reverse", "firstname", "lastname", "email",
                                                           "resource", "resource1", "resource2"])
    }

    my_list = []
    if not result.get('error'):
        for item in result['data']:
            my_dict = {}
            for key, value in item.items():

                if field_mapping.get(key, key) in ['resource', 'resource1'] and value:
                    my_dict['resource'] = value
                else:
                    my_dict[f"{field_mapping.get(key, key)}"] = value
                continue

            my_list.append(my_dict)

    result['data'] = my_list

    return jsonify(result)