File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/blueprints/block_numbers/views.py
import csv
from contextlib import closing
from flask import (
Blueprint,
flash,
request,
jsonify,
make_response,
redirect,
url_for,
render_template)
from io import StringIO
from datetime import date
from flask_login import current_user, login_required
from buyercall.blueprints.user.decorators import role_required
from datatables import DataTables, ColumnDT
from flask_babel import gettext as _
from sqlalchemy import or_
from buyercall.extensions import csrf
from buyercall.blueprints.block_numbers.models import Block
from buyercall.blueprints.block_numbers.forms import BlockForm
from buyercall.extensions import db
block_numbers = Blueprint('block_numbers', __name__, template_folder='templates')
# This function is to list all blocked numbers for a user
@block_numbers.route('/block_numbers')
@login_required
def block_number_list():
some_text = "hello world this is block numbers"
return render_template('block_numbers/block_numbers.jinja2',
some_text=some_text)
# This function is used to create new block number
@block_numbers.route('/block_numbers/new', methods=['GET', 'POST'])
@login_required
@role_required('admin', 'agent')
def block_new():
block_number = Block()
form = BlockForm(obj=block_number)
partnership_account_id = current_user.partnership_account_id
is_admin_in_group = current_user.is_admin_user_with_groups
viewing_partnership_account = current_user.is_viewing_partnership
# Check if being viewed by super partner
if is_admin_in_group and viewing_partnership_account:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
elif not viewing_partnership_account and is_admin_in_group:
return redirect(url_for('partnership.company_accounts'))
if form.validate_on_submit():
form.populate_obj(block_number)
params = {
'partnership_accounts_id': partnership_account_id,
'phonenumber': block_number.phonenumber,
'status': block_number.status
}
if Block.create(params):
flash(_('The number has successfully been added to the blocked list.'), 'success')
return redirect(url_for('block_numbers.block_number_list'))
else:
flash(_('The number has successfully been added to the blocked list.'), 'error')
return redirect(url_for('block_numbers.block_new'))
return render_template('block_numbers/new.jinja2',
block_number=block_number,
form=form)
# Edit blocked numbers
@block_numbers.route('/block_numbers/edit/<int:id>', methods=['GET', 'POST'])
@login_required
@role_required('admin', 'agent')
def block_edit(id):
partnership_account_id = current_user.partnership_account_id
is_admin_in_group = current_user.is_admin_user_with_groups
viewing_partnership_account = current_user.is_viewing_partnership
# Check if being viewed by super partner
if is_admin_in_group and viewing_partnership_account:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
elif not viewing_partnership_account and is_admin_in_group:
return redirect(url_for('partnership.company_accounts'))
block_number = Block.query.filter(
Block.id == id, Block.partnership_accounts_id == partnership_account_id
).first()
if not block_number:
flash('Blocked number with ID {} not found.'.format(id))
return redirect(url_for('block_numbers.block_number_list'))
form = BlockForm(obj=block_number)
if form.validate_on_submit():
form.populate_obj(block_number)
block_number.save()
if block_number.save():
flash(_('The blocked number has been updated successfully.'), 'success')
return redirect(url_for('block_numbers.block_number_list'))
else:
flash(_('The blocked number could not be updated.'), 'error')
return redirect(url_for('block_numbers.block_edit'))
return render_template('block_numbers/edit.jinja2',
block_number=block_number,
form=form)
@block_numbers.route('/block_numbers/delete/<int:id>', methods=['DELETE'])
@login_required
@role_required('admin')
def block_delete(id):
partnership_account_id = current_user.partnership_account_id
is_admin_in_group = current_user.is_admin_user_with_groups
viewing_partnership_account = current_user.is_viewing_partnership
# Check if being viewed by super partner
if is_admin_in_group and viewing_partnership_account:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
elif not viewing_partnership_account and is_admin_in_group:
return redirect(url_for('partnership.company_accounts'))
block_number = Block.query\
.filter(Block.id == id, Block.partnership_accounts_id == partnership_account_id).first()
if block_number:
block_number.delete()
else:
flash(_('Sorry, something went wrong. We are unable to delete the number.'), 'error')
return redirect(url_for('block_numbers.block_number_list'))
return jsonify(success=True)
# return blocked number data into jquery datatables
@block_numbers.route('/blocked-data')
@csrf.exempt
@login_required
def data():
"""Return server side data."""
partnership_accounts_id = current_user.partnership_account_id
is_admin_in_group = current_user.is_admin_user_with_groups
viewing_partnership_account = current_user.is_viewing_partnership
# Check if being viewed by super partner
if is_admin_in_group and viewing_partnership_account:
partnership_accounts_id = current_user.get_user_viewing_partnership_account_id
# defining columns
columns = [ColumnDT(Block.id),
ColumnDT(Block.phonenumber),
ColumnDT(Block.status),
]
query = db.session.query().select_from(Block).filter(
Block.partnership_accounts_id == partnership_accounts_id
)
# instantiating a DataTable for the query and table needed
row_table = DataTables(request.args, query, columns)
return jsonify(row_table.output_result())
# return blocked phone number data into jquery datatables
@block_numbers.route('/block_numbers/csv')
@login_required
def data_csv():
"""Return server side data."""
header = [
'No', 'Phone Number', 'Status'
]
partnership_account_id = current_user.partnership_account_id
is_admin_in_group = current_user.is_admin_user_with_groups
viewing_partnership_account = current_user.is_viewing_partnership
# Check if being viewed by super partner
if is_admin_in_group and viewing_partnership_account:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
query = Block.query.filter(partnership_account_id == Block.partnership_accounts_id)
search = request.args.get('search[value]', '')
if search:
pattern = '%{}%'.format(search)
query = query.filter(or_(
Block.phonenumber.ilike(pattern)
))
query = query.order_by(Block.id)
# Build the CSV
row_no = 0
with closing(StringIO()) as out:
writer = csv.writer(out)
writer.writerow(header)
for a in query.all():
row_no += 1
csv_row = [
row_no, a.phonenumber, a.status
]
writer.writerow(csv_row)
filename = 'Blocked Phone Numbers - {}.csv'.format(
date.today().strftime('%Y-%m-%d')
)
resp = make_response(out.getvalue())
resp.headers['Content-Type'] = 'text/csv'
resp.headers['Content-Disposition'] = \
'attachment; filename="{}"'.format(filename)
return resp