File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/blueprints/widgets/views.py
from contextlib import closing
from io import StringIO
from datetime import date
from urllib.request import urlopen
import json
import codecs
import csv
import logging as log
import os.path as path
import traceback
import uuid
from datatables import DataTables, ColumnDT
from werkzeug.utils import safe_join
from flask import (
Blueprint,
current_app as app,
redirect,
request,
render_template,
flash,
jsonify,
make_response,
url_for,
# safe_join,
send_file
)
from flask_babel import gettext as _
from flask_login import current_user, login_required
from sqlalchemy import and_, func
from sqlalchemy.orm import load_only, Load
from buyercall.blueprints.billing.decorators import subscription_required
from buyercall.extensions import webpack, db, csrf
from buyercall.blueprints.user.decorators import role_required
from buyercall.lib.flask_mailplus import _try_renderer_template
from buyercall.lib.util_ses_email import send_ses_email
from buyercall.blueprints.leads.models import Lead
from buyercall.blueprints.phonenumbers.models import Phone
from .models import Widget, WidgetFile
from buyercall.blueprints.partnership.models import Partnership, PartnershipAccount
widgets = Blueprint('widgets', __name__, template_folder='templates')
MAX_FILE_SIZE = 2 * 1024 * 1024
# Outbound Routing onboarding page
@widgets.route('/outbound_onboarding', methods=['GET', 'POST'])
@csrf.exempt
@login_required
@role_required('admin', 'partner', 'sysadmin')
def onboarding_outbound():
if request.method == 'POST':
current_user.outbound_onboard = True
db.session.commit()
if current_user.outbound_onboard:
flash(_(
'Great, you are ready to get started with the Outbound Routing '
'Wizard feature. Remember to checkout the '
'support section or FAQ if you have any additional outbound '
'routing questions.'), 'success')
return redirect(url_for('widgets.list'))
return render_template('widgets/outbound_onboarding.jinja2')
@widgets.route('/outbound')
@login_required
@role_required('admin', 'partner', 'sysadmin')
def list():
# Check if onboarding was excepted
if not current_user.outbound_onboard:
return redirect(url_for('widgets.onboarding_outbound'))
partnership_account_id = current_user.partnership_account_id
partnership_account_group_viewing = current_user.is_viewing_partnership
# Check if being viewed by super partner
if partnership_account_group_viewing:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
widgets = db.session.query(
Widget.id,
Widget.name,
Widget.created_on,
Widget.updated_on,
Widget.enabled,
func.count(Widget.leads).label('lead_count')
).outerjoin(Lead).filter(
Widget.partnership_account_id == partnership_account_id
).group_by(Widget.id).all()
return render_template(
'widgets/list.jinja2',
widgets=widgets
)
@widgets.route('/outbound/new')
@login_required
@subscription_required
@role_required('admin')
def new():
partnership_account_id = current_user.partnership_account_id
is_admin_in_group = current_user.is_admin_user_with_groups
viewing_partnership_account = current_user.is_viewing_partnership
# Check if being viewed by super partner
if is_admin_in_group and viewing_partnership_account:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
elif not viewing_partnership_account and is_admin_in_group:
return redirect(url_for('partnership.company_accounts'))
inbound_count = Phone.query.filter(
Phone.partnership_account_id == partnership_account_id
).count()
if not inbound_count:
flash(_('You must add an inbound routing before creating an '
'outbound one.'), 'warning')
return redirect(url_for('phonenumbers.inbound_list'))
return render_template(
'widgets/edit.jinja2',
title='New Call Widget'
)
@widgets.route('/outbound/<int:id>')
@login_required
@subscription_required
@role_required('admin', 'partner', 'sysadmin')
def edit(id):
try:
partnership_account_id = current_user.partnership_account_id
partnership_account_group_viewing = current_user.is_viewing_partnership
# Check if being viewed by super partner
if partnership_account_group_viewing:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
widget = Widget.query.filter(
Widget.partnership_account_id == partnership_account_id,
Widget.id == id
).first()
return render_template(
'widgets/edit.jinja2', title=widget.name,
widget_data=json.dumps(widget.options),
)
except Exception as e:
log.error(traceback.format_exc())
flash(_('Outbound routing with ID {} not found.'.format(id)))
return redirect(url_for('widgets.list'))
@widgets.route('/outbound/csv', methods=['GET'])
@login_required
@role_required('admin')
def outbound_csv():
"""
Retrieves all outbound routings for the current user in CSV format.
"""
# TODO: no partner allowed as role
header = [
'No', 'Name', 'Lead Count', 'Created On', 'Updated On', 'Enabled'
]
columns = [
ColumnDT(Widget.id),
ColumnDT(Widget.name),
ColumnDT(Widget.lead_count),
ColumnDT(Widget.created_on),
ColumnDT(Widget.updated_on),
ColumnDT(Widget.enabled)
]
partnership_account_id = current_user.partnership_account_id
is_admin_in_group = current_user.is_admin_user_with_groups
viewing_partnership_account = current_user.is_viewing_partnership
# Check if being viewed by super partner
if is_admin_in_group and viewing_partnership_account:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
elif not viewing_partnership_account and is_admin_in_group:
return redirect(url_for('partnership.company_accounts'))
args = dict(
(key, request.args.getlist(key)
if len(request.args.getlist(key)) > 1
else request.args.getlist(key)[0]) for key in request.args.keys()
)
args['start'] = '0'
query = db.session.query().select_from(Widget).options(
Load(Widget).load_only('id', 'name', 'created_on', 'updated_on', 'enabled')
).group_by(
Widget.id
).filter(
Widget.partnership_account_id == partnership_account_id
)
row_table = DataTables(args, query, columns)
results = row_table.output_result()
for row in results['data']:
row['5'] = (row['5'] == 'True')
# Build the CSV
row_no = 0
with closing(StringIO()) as out:
writer = csv.writer(out)
writer.writerow(header)
for row in results['data']:
csv_row = [row[key] for key in sorted(row.keys())]
row_no += 1
csv_row[0] = row_no
writer.writerow(csv_row)
filename = 'Buyercall Outbound Routings - {}.csv'.format(
date.today().strftime('%Y-%m-%d')
)
resp = make_response(out.getvalue())
resp.headers['Content-Type'] = 'text/csv'
resp.headers['Content-Disposition'] = 'attachment; filename="{}"'.format(
filename
)
return resp
@widgets.route('/outbound/image', methods=['POST'])
@login_required
@subscription_required
@role_required('admin', 'partner', 'sysadmin')
def upload_image():
def allowed_file(file_ext):
return file_ext in ['.jpg', '.gif', '.png']
file = request.files['file']
file_ext = path.splitext(file.filename.lower())[1]
if not file:
return make_response('Error uploading file.', 400)
if not allowed_file(file_ext):
return make_response('File extension not allowed: ' + file_ext, 400)
if file.content_length and file.content_length > MAX_FILE_SIZE:
return make_response('File too large.', 400)
partnership_account_id = current_user.partnership_account_id
partnership_account_group_viewing = current_user.is_viewing_partnership
# Check if being viewed by super partner
if partnership_account_group_viewing:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
filename = str(uuid.uuid4())
file_path = path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(file_path)
if path.getsize(file_path) > MAX_FILE_SIZE:
return make_response('File too large.', 400)
from buyercall.blueprints.partnership.models import PartnershipAccount
partner = PartnershipAccount.query.filter(PartnershipAccount.id == partnership_account_id).first()
partner_name = partner.name
from .tasks import upload_widget_image
image_url = upload_widget_image(partner_name, partnership_account_id, filename, file_path, file_ext)
log.info('The mms url is {}'.format(image_url))
widget_image = WidgetFile(guid=filename, partnership_account_id=partnership_account_id, url=image_url)
db.session.add(widget_image)
db.session.commit()
return jsonify(filename=filename)
@widgets.route('/outbound/image/<guid>', methods=['GET'])
def get_image(guid):
# Get the widget image url
widget_image_url = WidgetFile.query.options(
load_only("guid", "url")
).filter(WidgetFile.guid == guid).first()
if widget_image_url.url is not None:
return redirect(widget_image_url.url)
else:
path = safe_join(app.config['UPLOAD_FOLDER'], guid)
if path is not None:
mimetype = 'image/jpeg'
with open(path, 'rb') as f:
bytes = f.read(4)
if bytes == '\x89PNG':
mimetype = 'image/png'
elif bytes == 'GIF8':
mimetype = 'image/gif'
return send_file(path, mimetype=mimetype)
else:
return log.info('There seems to be no widget image file for guid: {} on S3'.format(guid))
@widgets.route('/outbound/install-instructions/<int:id>')
@login_required
@subscription_required
@role_required('admin')
def install_instructions(id):
script, button_script, link_script = '', None, None
try:
partnership_account_id = current_user.partnership_account_id
is_admin_in_group = current_user.is_admin_user_with_groups
viewing_partnership_account = current_user.is_viewing_partnership
# Check if being viewed by super partner
if is_admin_in_group and viewing_partnership_account:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
elif not viewing_partnership_account and is_admin_in_group:
return redirect(url_for('partnership.company_accounts'))
widget = Widget.query.filter(
and_(Widget.id == id, Widget.partnership_account_id == partnership_account_id)
).one()
# TODO: Make widget locale-sensitive?
script = render_template(
'widgets/install_script_widget.jinja2',
script_src=webpack.asset_url_for('call_widget_external.js'),
widget_guid=widget.guid
)
if widget.options.get('typeButton', False):
button_script = render_template(
'widgets/install_script_button.jinja2',
widget_guid=widget.guid
)
if widget.options.get('typeLink', False):
link_script = render_template(
'widgets/install_script_link.jinja2',
widget_guid=widget.guid
)
except Exception as e:
log.error(traceback.format_exc())
flash(_('Outbound routing with ID {} not found.'.format(id)))
return redirect(url_for('widgets.list'))
if widget.email is None or widget.email is '':
widgetemail = "No email has been generated for this widget."
else:
widgetemail = widget.email
return render_template(
'widgets/install_instructions.jinja2',
id=id,
script=script,
button_script=button_script,
link_script=link_script,
email=widgetemail
)
@widgets.route(
'/outbound/install-instructions/<int:id>/email',
methods=['POST']
)
@login_required
@role_required('admin')
def email_instructions(id):
partnership_account_id = current_user.partnership_account_id
is_admin_in_group = current_user.is_admin_user_with_groups
viewing_partnership_account = current_user.is_viewing_partnership
# Check if being viewed by super partner
if is_admin_in_group and viewing_partnership_account:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
elif not viewing_partnership_account and is_admin_in_group:
return redirect(url_for('partnership.company_accounts'))
widget = Widget.query.filter(
and_(Widget.id == id, Widget.partnership_account_id == partnership_account_id)
).one()
partner_account = PartnershipAccount.query.filter(PartnershipAccount.id == widget.partnership_account_id).first()
partner = Partnership.query.filter(Partnership.id == partner_account.partnership_id).first()
ctx = {
template: render_template(
'widgets/install_{}.jinja2'.format(template),
script_src=webpack.asset_url_for('call_widget_external.js'),
widget_guid=widget.guid
)
for template in ['script_widget', 'script_button', 'script_link']
}
if not widget.options['typeButton']:
del ctx['script_button']
if not widget.options['typeLink']:
del ctx['script_link']
ctx['partner_logo'] = partner.logo
ctx['company'] = partner.name
email = request.form.get('email', '')
app.logger.info('Sending email to <{}>'.format(email))
if email:
# Render text template for adf email
widget_email_template = _try_renderer_template('widgets/mail/install_instructions', ext='html', **ctx)
send_ses_email(recipients=[email],
p_id=partner.id,
subject='Call Widget Install Instructions',
html=widget_email_template
)
flash(_('Email successfully sent.'), 'success')
else:
flash(_('Please provide an email address.'), 'danger')
return redirect(url_for('.install_instructions', id=id))
@widgets.route('/outbound/install-instructions/<int:id>/verify',
methods=['POST'])
@login_required
@role_required('admin')
def verify_widget(id):
# Try opening the URL, and look for the string 'CallWidgetView'
# TODO: A more sophisticated approach, like loading the site in phantomjs
# and loooking for the #buyercall-widget element.
# TODO: no partner allowed as role
site_url = request.form['url']
partnership_account_id = current_user.partnership_account_id
partnership_account_group_viewing = current_user.is_viewing_partnership
# Check if being viewed by super partner
if partnership_account_group_viewing:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
widget = Widget.query.filter(
and_(Widget.id == id, Widget.partnership_account_id == partnership_account_id)
).first()
try:
site = urlopen(site_url, None, 2)
site_data = codecs.decode(site.read(), 'ascii', 'ignore')
if site_data.find('CallWidgetView') == -1:
flash('Could not find the buyecall widget on ' + site_url, 'danger')
elif site_data.find(widget.guid) == -1:
flash(
'A different buyercall widget appears'
' to be configured on ' + site_url,
'warning'
)
else:
flash(
'The buyercall widget appears to be configured on ' + site_url,
'info'
)
except Exception as e:
log.warning(traceback.format_exc())
flash('Site {} could not be opened.'.format(site_url), 'danger')
return redirect(url_for('.install_instructions', id=id))