HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/blueprints/block_numbers/views.py
import csv
from contextlib import closing
from flask import (
    Blueprint,
    flash,
    request,
    jsonify,
    make_response,
    redirect,
    url_for,
    render_template)
from io import StringIO
from datetime import date
from flask_login import current_user, login_required
from buyercall.blueprints.user.decorators import role_required
from datatables import DataTables, ColumnDT
from flask_babel import gettext as _
from sqlalchemy import or_
from buyercall.extensions import csrf

from buyercall.blueprints.block_numbers.models import Block
from buyercall.blueprints.block_numbers.forms import BlockForm
from buyercall.extensions import db


block_numbers = Blueprint('block_numbers', __name__, template_folder='templates')


# This function is to list all blocked numbers for a user
@block_numbers.route('/block_numbers')
@login_required
def block_number_list():
    some_text = "hello world this is block numbers"
    return render_template('block_numbers/block_numbers.jinja2',
                           some_text=some_text)


# This function is used to create new block number
@block_numbers.route('/block_numbers/new', methods=['GET', 'POST'])
@login_required
@role_required('admin', 'agent')
def block_new():
    block_number = Block()
    form = BlockForm(obj=block_number)
    partnership_account_id = current_user.partnership_account_id
    is_admin_in_group = current_user.is_admin_user_with_groups
    viewing_partnership_account = current_user.is_viewing_partnership

    # Check if being viewed by super partner
    if is_admin_in_group and viewing_partnership_account:
        partnership_account_id = current_user.get_user_viewing_partnership_account_id
    elif not viewing_partnership_account and is_admin_in_group:
        return redirect(url_for('partnership.company_accounts'))

    if form.validate_on_submit():
        form.populate_obj(block_number)
        params = {
            'partnership_accounts_id': partnership_account_id,
            'phonenumber': block_number.phonenumber,
            'status': block_number.status
        }
        if Block.create(params):
            flash(_('The number has successfully been added to the blocked list.'), 'success')
            return redirect(url_for('block_numbers.block_number_list'))
        else:
            flash(_('The number has successfully been added to the blocked list.'), 'error')
            return redirect(url_for('block_numbers.block_new'))

    return render_template('block_numbers/new.jinja2',
                           block_number=block_number,
                           form=form)


# Edit blocked numbers
@block_numbers.route('/block_numbers/edit/<int:id>', methods=['GET', 'POST'])
@login_required
@role_required('admin', 'agent')
def block_edit(id):
    partnership_account_id = current_user.partnership_account_id
    is_admin_in_group = current_user.is_admin_user_with_groups
    viewing_partnership_account = current_user.is_viewing_partnership

    # Check if being viewed by super partner
    if is_admin_in_group and viewing_partnership_account:
        partnership_account_id = current_user.get_user_viewing_partnership_account_id
    elif not viewing_partnership_account and is_admin_in_group:
        return redirect(url_for('partnership.company_accounts'))

    block_number = Block.query.filter(
        Block.id == id, Block.partnership_accounts_id == partnership_account_id
    ).first()
    if not block_number:
        flash('Blocked number with ID {} not found.'.format(id))
        return redirect(url_for('block_numbers.block_number_list'))

    form = BlockForm(obj=block_number)

    if form.validate_on_submit():
        form.populate_obj(block_number)
        block_number.save()

        if block_number.save():
            flash(_('The blocked number has been updated successfully.'), 'success')
            return redirect(url_for('block_numbers.block_number_list'))
        else:
            flash(_('The blocked number could not be updated.'), 'error')
            return redirect(url_for('block_numbers.block_edit'))

    return render_template('block_numbers/edit.jinja2',
                           block_number=block_number,
                           form=form)


@block_numbers.route('/block_numbers/delete/<int:id>', methods=['DELETE'])
@login_required
@role_required('admin')
def block_delete(id):
    partnership_account_id = current_user.partnership_account_id
    is_admin_in_group = current_user.is_admin_user_with_groups
    viewing_partnership_account = current_user.is_viewing_partnership

    # Check if being viewed by super partner
    if is_admin_in_group and viewing_partnership_account:
        partnership_account_id = current_user.get_user_viewing_partnership_account_id
    elif not viewing_partnership_account and is_admin_in_group:
        return redirect(url_for('partnership.company_accounts'))

    block_number = Block.query\
        .filter(Block.id == id, Block.partnership_accounts_id == partnership_account_id).first()
    if block_number:
        block_number.delete()
    else:
        flash(_('Sorry, something went wrong. We are unable to delete the number.'), 'error')
        return redirect(url_for('block_numbers.block_number_list'))

    return jsonify(success=True)


# return blocked number data into jquery datatables
@block_numbers.route('/blocked-data')
@csrf.exempt
@login_required
def data():
    """Return server side data."""
    partnership_accounts_id = current_user.partnership_account_id
    is_admin_in_group = current_user.is_admin_user_with_groups
    viewing_partnership_account = current_user.is_viewing_partnership

    # Check if being viewed by super partner
    if is_admin_in_group and viewing_partnership_account:
        partnership_accounts_id = current_user.get_user_viewing_partnership_account_id

    # defining columns
    columns = [ColumnDT(Block.id),
               ColumnDT(Block.phonenumber),
               ColumnDT(Block.status),
               ]

    query = db.session.query().select_from(Block).filter(
        Block.partnership_accounts_id == partnership_accounts_id
    )

    # instantiating a DataTable for the query and table needed
    row_table = DataTables(request.args, query, columns)
    return jsonify(row_table.output_result())


# return blocked phone number data into jquery datatables
@block_numbers.route('/block_numbers/csv')
@login_required
def data_csv():
    """Return server side data."""
    header = [
        'No', 'Phone Number', 'Status'
    ]
    partnership_account_id = current_user.partnership_account_id
    is_admin_in_group = current_user.is_admin_user_with_groups
    viewing_partnership_account = current_user.is_viewing_partnership

    # Check if being viewed by super partner
    if is_admin_in_group and viewing_partnership_account:
        partnership_account_id = current_user.get_user_viewing_partnership_account_id

    query = Block.query.filter(partnership_account_id == Block.partnership_accounts_id)

    search = request.args.get('search[value]', '')
    if search:
        pattern = '%{}%'.format(search)
        query = query.filter(or_(
            Block.phonenumber.ilike(pattern)
        ))

    query = query.order_by(Block.id)

    # Build the CSV
    row_no = 0
    with closing(StringIO()) as out:
        writer = csv.writer(out)
        writer.writerow(header)
        for a in query.all():
            row_no += 1
            csv_row = [
                row_no, a.phonenumber, a.status
            ]
            writer.writerow(csv_row)

        filename = 'Blocked Phone Numbers - {}.csv'.format(
            date.today().strftime('%Y-%m-%d')
        )

        resp = make_response(out.getvalue())

        resp.headers['Content-Type'] = 'text/csv'
        resp.headers['Content-Disposition'] = \
            'attachment; filename="{}"'.format(filename)
        return resp