HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/blueprints/widgets/views.py
from contextlib import closing
from io import StringIO
from datetime import date
from urllib.request import urlopen

import json
import codecs
import csv
import logging as log
import os.path as path
import traceback
import uuid

from datatables import DataTables, ColumnDT
from werkzeug.utils import safe_join
from flask import (
    Blueprint,
    current_app as app,
    redirect,
    request,
    render_template,
    flash,
    jsonify,
    make_response,
    url_for,
    # safe_join,
    send_file
)
from flask_babel import gettext as _
from flask_login import current_user, login_required
from sqlalchemy import and_, func
from sqlalchemy.orm import load_only, Load

from buyercall.blueprints.billing.decorators import subscription_required
from buyercall.extensions import webpack, db, csrf
from buyercall.blueprints.user.decorators import role_required
from buyercall.lib.flask_mailplus import _try_renderer_template
from buyercall.lib.util_ses_email import send_ses_email
from buyercall.blueprints.leads.models import Lead
from buyercall.blueprints.phonenumbers.models import Phone
from .models import Widget, WidgetFile
from buyercall.blueprints.partnership.models import Partnership, PartnershipAccount

widgets = Blueprint('widgets', __name__, template_folder='templates')

MAX_FILE_SIZE = 2 * 1024 * 1024


# Outbound Routing onboarding page
@widgets.route('/outbound_onboarding', methods=['GET', 'POST'])
@csrf.exempt
@login_required
@role_required('admin', 'partner', 'sysadmin')
def onboarding_outbound():
    if request.method == 'POST':
        current_user.outbound_onboard = True
        db.session.commit()

    if current_user.outbound_onboard:
        flash(_(
            'Great, you are ready to get started with the Outbound Routing '
            'Wizard feature. Remember to checkout the '
            'support section or FAQ if you have any additional outbound '
            'routing questions.'), 'success')
        return redirect(url_for('widgets.list'))

    return render_template('widgets/outbound_onboarding.jinja2')


@widgets.route('/outbound')
@login_required
@role_required('admin', 'partner', 'sysadmin')
def list():
    # Check if onboarding was excepted
    if not current_user.outbound_onboard:
        return redirect(url_for('widgets.onboarding_outbound'))

    partnership_account_id = current_user.partnership_account_id
    partnership_account_group_viewing = current_user.is_viewing_partnership

    # Check if being viewed by super partner
    if partnership_account_group_viewing:
        partnership_account_id = current_user.get_user_viewing_partnership_account_id

    widgets = db.session.query(
        Widget.id,
        Widget.name,
        Widget.created_on,
        Widget.updated_on,
        Widget.enabled,
        func.count(Widget.leads).label('lead_count')
    ).outerjoin(Lead).filter(
        Widget.partnership_account_id == partnership_account_id
    ).group_by(Widget.id).all()

    return render_template(
        'widgets/list.jinja2',
        widgets=widgets
    )


@widgets.route('/outbound/new')
@login_required
@subscription_required
@role_required('admin')
def new():
    partnership_account_id = current_user.partnership_account_id
    is_admin_in_group = current_user.is_admin_user_with_groups
    viewing_partnership_account = current_user.is_viewing_partnership

    # Check if being viewed by super partner
    if is_admin_in_group and viewing_partnership_account:
        partnership_account_id = current_user.get_user_viewing_partnership_account_id
    elif not viewing_partnership_account and is_admin_in_group:
        return redirect(url_for('partnership.company_accounts'))

    inbound_count = Phone.query.filter(
        Phone.partnership_account_id == partnership_account_id
    ).count()
    if not inbound_count:
        flash(_('You must add an inbound routing before creating an '
                'outbound one.'), 'warning')
        return redirect(url_for('phonenumbers.inbound_list'))

    return render_template(
        'widgets/edit.jinja2',
        title='New Call Widget'
    )


@widgets.route('/outbound/<int:id>')
@login_required
@subscription_required
@role_required('admin', 'partner', 'sysadmin')
def edit(id):
    try:
        partnership_account_id = current_user.partnership_account_id
        partnership_account_group_viewing = current_user.is_viewing_partnership

        # Check if being viewed by super partner
        if partnership_account_group_viewing:
            partnership_account_id = current_user.get_user_viewing_partnership_account_id

        widget = Widget.query.filter(
            Widget.partnership_account_id == partnership_account_id,
            Widget.id == id
        ).first()
        return render_template(
            'widgets/edit.jinja2', title=widget.name,
            widget_data=json.dumps(widget.options),
        )
    except Exception as e:
        log.error(traceback.format_exc())
        flash(_('Outbound routing with ID {} not found.'.format(id)))
        return redirect(url_for('widgets.list'))


@widgets.route('/outbound/csv', methods=['GET'])
@login_required
@role_required('admin')
def outbound_csv():
    """
    Retrieves all outbound routings for the current user in CSV format.
    """
    # TODO: no partner allowed as role
    header = [
        'No', 'Name', 'Lead Count', 'Created On', 'Updated On', 'Enabled'
    ]
    columns = [
        ColumnDT(Widget.id),
        ColumnDT(Widget.name),
        ColumnDT(Widget.lead_count),
        ColumnDT(Widget.created_on),
        ColumnDT(Widget.updated_on),
        ColumnDT(Widget.enabled)
    ]

    partnership_account_id = current_user.partnership_account_id
    is_admin_in_group = current_user.is_admin_user_with_groups
    viewing_partnership_account = current_user.is_viewing_partnership

    # Check if being viewed by super partner
    if is_admin_in_group and viewing_partnership_account:
        partnership_account_id = current_user.get_user_viewing_partnership_account_id
    elif not viewing_partnership_account and is_admin_in_group:
        return redirect(url_for('partnership.company_accounts'))

    args = dict(
        (key, request.args.getlist(key)
         if len(request.args.getlist(key)) > 1
         else request.args.getlist(key)[0]) for key in request.args.keys()
    )
    args['start'] = '0'

    query = db.session.query().select_from(Widget).options(
        Load(Widget).load_only('id', 'name', 'created_on', 'updated_on', 'enabled')
    ).group_by(
        Widget.id
    ).filter(
        Widget.partnership_account_id == partnership_account_id
    )
    row_table = DataTables(args, query, columns)
    results = row_table.output_result()
    for row in results['data']:
        row['5'] = (row['5'] == 'True')

    # Build the CSV
    row_no = 0
    with closing(StringIO()) as out:
        writer = csv.writer(out)
        writer.writerow(header)
        for row in results['data']:
            csv_row = [row[key] for key in sorted(row.keys())]
            row_no += 1
            csv_row[0] = row_no
            writer.writerow(csv_row)

        filename = 'Buyercall Outbound Routings - {}.csv'.format(
            date.today().strftime('%Y-%m-%d')
        )

        resp = make_response(out.getvalue())
        resp.headers['Content-Type'] = 'text/csv'
        resp.headers['Content-Disposition'] = 'attachment; filename="{}"'.format(
            filename
        )
        return resp


@widgets.route('/outbound/image', methods=['POST'])
@login_required
@subscription_required
@role_required('admin', 'partner', 'sysadmin')
def upload_image():
    def allowed_file(file_ext):
        return file_ext in ['.jpg', '.gif', '.png']

    file = request.files['file']
    file_ext = path.splitext(file.filename.lower())[1]
    if not file:
        return make_response('Error uploading file.', 400)

    if not allowed_file(file_ext):
        return make_response('File extension not allowed: ' + file_ext, 400)

    if file.content_length and file.content_length > MAX_FILE_SIZE:
        return make_response('File too large.', 400)

    partnership_account_id = current_user.partnership_account_id
    partnership_account_group_viewing = current_user.is_viewing_partnership

    # Check if being viewed by super partner
    if partnership_account_group_viewing:
        partnership_account_id = current_user.get_user_viewing_partnership_account_id

    filename = str(uuid.uuid4())
    file_path = path.join(app.config['UPLOAD_FOLDER'], filename)
    file.save(file_path)

    if path.getsize(file_path) > MAX_FILE_SIZE:
        return make_response('File too large.', 400)

    from buyercall.blueprints.partnership.models import PartnershipAccount
    partner = PartnershipAccount.query.filter(PartnershipAccount.id == partnership_account_id).first()
    partner_name = partner.name

    from .tasks import upload_widget_image
    image_url = upload_widget_image(partner_name, partnership_account_id, filename, file_path, file_ext)
    log.info('The mms url is {}'.format(image_url))

    widget_image = WidgetFile(guid=filename, partnership_account_id=partnership_account_id, url=image_url)
    db.session.add(widget_image)
    db.session.commit()

    return jsonify(filename=filename)


@widgets.route('/outbound/image/<guid>', methods=['GET'])
def get_image(guid):
    # Get the widget image url
    widget_image_url = WidgetFile.query.options(
        load_only("guid", "url")
    ).filter(WidgetFile.guid == guid).first()
    if widget_image_url.url is not None:
        return redirect(widget_image_url.url)
    else:
        path = safe_join(app.config['UPLOAD_FOLDER'], guid)
        if path is not None:
            mimetype = 'image/jpeg'

            with open(path, 'rb') as f:
                bytes = f.read(4)
                if bytes == '\x89PNG':
                    mimetype = 'image/png'
                elif bytes == 'GIF8':
                    mimetype = 'image/gif'

            return send_file(path, mimetype=mimetype)
        else:
            return log.info('There seems to be no widget image file for guid: {} on S3'.format(guid))


@widgets.route('/outbound/install-instructions/<int:id>')
@login_required
@subscription_required
@role_required('admin')
def install_instructions(id):
    script, button_script, link_script = '', None, None

    try:
        partnership_account_id = current_user.partnership_account_id
        is_admin_in_group = current_user.is_admin_user_with_groups
        viewing_partnership_account = current_user.is_viewing_partnership

        # Check if being viewed by super partner
        if is_admin_in_group and viewing_partnership_account:
            partnership_account_id = current_user.get_user_viewing_partnership_account_id
        elif not viewing_partnership_account and is_admin_in_group:
            return redirect(url_for('partnership.company_accounts'))

        widget = Widget.query.filter(
            and_(Widget.id == id, Widget.partnership_account_id == partnership_account_id)
        ).one()

        # TODO: Make widget locale-sensitive?
        script = render_template(
            'widgets/install_script_widget.jinja2',
            script_src=webpack.asset_url_for('call_widget_external.js'),
            widget_guid=widget.guid
        )

        if widget.options.get('typeButton', False):
            button_script = render_template(
                'widgets/install_script_button.jinja2',
                widget_guid=widget.guid
            )
        if widget.options.get('typeLink', False):
            link_script = render_template(
                'widgets/install_script_link.jinja2',
                widget_guid=widget.guid
            )
    except Exception as e:
        log.error(traceback.format_exc())
        flash(_('Outbound routing with ID {} not found.'.format(id)))
        return redirect(url_for('widgets.list'))

    if widget.email is None or widget.email is '':
        widgetemail = "No email has been generated for this widget."
    else:
        widgetemail = widget.email

    return render_template(
        'widgets/install_instructions.jinja2',
        id=id,
        script=script,
        button_script=button_script,
        link_script=link_script,
        email=widgetemail
    )


@widgets.route(
    '/outbound/install-instructions/<int:id>/email',
    methods=['POST']
)
@login_required
@role_required('admin')
def email_instructions(id):
    partnership_account_id = current_user.partnership_account_id
    is_admin_in_group = current_user.is_admin_user_with_groups
    viewing_partnership_account = current_user.is_viewing_partnership

    # Check if being viewed by super partner
    if is_admin_in_group and viewing_partnership_account:
        partnership_account_id = current_user.get_user_viewing_partnership_account_id
    elif not viewing_partnership_account and is_admin_in_group:
        return redirect(url_for('partnership.company_accounts'))

    widget = Widget.query.filter(
        and_(Widget.id == id, Widget.partnership_account_id == partnership_account_id)
    ).one()

    partner_account = PartnershipAccount.query.filter(PartnershipAccount.id == widget.partnership_account_id).first()
    partner = Partnership.query.filter(Partnership.id == partner_account.partnership_id).first()

    ctx = {
        template: render_template(
            'widgets/install_{}.jinja2'.format(template),
            script_src=webpack.asset_url_for('call_widget_external.js'),
            widget_guid=widget.guid
        )
        for template in ['script_widget', 'script_button', 'script_link']
    }
    if not widget.options['typeButton']:
        del ctx['script_button']
    if not widget.options['typeLink']:
        del ctx['script_link']

    ctx['partner_logo'] = partner.logo
    ctx['company'] = partner.name

    email = request.form.get('email', '')
    app.logger.info('Sending email to <{}>'.format(email))
    if email:
        # Render text template for adf email
        widget_email_template = _try_renderer_template('widgets/mail/install_instructions', ext='html', **ctx)
        send_ses_email(recipients=[email],
                       p_id=partner.id,
                       subject='Call Widget Install Instructions',
                       html=widget_email_template
                       )
        flash(_('Email successfully sent.'), 'success')
    else:
        flash(_('Please provide an email address.'), 'danger')

    return redirect(url_for('.install_instructions', id=id))


@widgets.route('/outbound/install-instructions/<int:id>/verify',
               methods=['POST'])
@login_required
@role_required('admin')
def verify_widget(id):
    # Try opening the URL, and look for the string 'CallWidgetView'
    # TODO: A more sophisticated approach, like loading the site in phantomjs
    # and loooking for the #buyercall-widget element.
    # TODO: no partner allowed as role

    site_url = request.form['url']
    partnership_account_id = current_user.partnership_account_id
    partnership_account_group_viewing = current_user.is_viewing_partnership

    # Check if being viewed by super partner
    if partnership_account_group_viewing:
        partnership_account_id = current_user.get_user_viewing_partnership_account_id

    widget = Widget.query.filter(
        and_(Widget.id == id, Widget.partnership_account_id == partnership_account_id)
    ).first()

    try:
        site = urlopen(site_url, None, 2)
        site_data = codecs.decode(site.read(), 'ascii', 'ignore')
        if site_data.find('CallWidgetView') == -1:
            flash('Could not find the buyecall widget on ' + site_url, 'danger')
        elif site_data.find(widget.guid) == -1:
            flash(
                'A different buyercall widget appears'
                ' to be configured on ' + site_url,
                'warning'
            )
        else:
            flash(
                'The buyercall widget appears to be configured on ' + site_url,
                'info'
            )
    except Exception as e:
        log.warning(traceback.format_exc())
        flash('Site {} could not be opened.'.format(site_url), 'danger')

    return redirect(url_for('.install_instructions', id=id))