File: //proc/self/root/home/arjun/projects/buyercall/buyercall/blueprints/sysadmin/endpoints.py
import json
import logging as log
import math
import uuid
from datetime import datetime
from flask_babel import lazy_gettext as _
from flask import Blueprint, jsonify, request, current_app
from flask_login import current_user, logout_user, login_user
from buyercall.blueprints.contacts.models import Contact
from buyercall.blueprints.email.models import EmailIdentity
from buyercall.blueprints.form_leads.models import FormLead
from buyercall.blueprints.issue.models import Issue
from buyercall.blueprints.leads.models import Lead
from buyercall.blueprints.notification.utilities import send_notifications
from buyercall.blueprints.partnership.models import Partnership, BILLING_TYPE, BUSINESS_TYPE
from buyercall.blueprints.partnership.models import PartnershipAccount
from buyercall.blueprints.phonenumbers.models import Phone
from buyercall.blueprints.sms.models import Message
from buyercall.blueprints.user.decorators import api_role_required
from buyercall.blueprints.user.models import User
from buyercall.extensions import ses_client, sns_client
from buyercall.lib.flask_mailplus import _try_renderer_template
from buyercall.lib.supervisor_manager import login_user as supervisor_login_user
from buyercall.lib.util_boto3_s3 import generate_presigned_file_url
from buyercall.lib.util_crypto import AESCipherDome
from buyercall.lib.util_datetime import tza_to_date
from buyercall.lib.util_query import get_by_partnership_account_expr
from buyercall.lib.util_rest import api_jsonify
sysadmin_api = Blueprint('sysadminapi', __name__, url_prefix='/api')
logger = log.getLogger(__name__)
@api_role_required('sysadmin')
def act_as(pid, uid):
try:
status_code = 205
success = True
message = "Remote Access successfully established!"
data = []
target_user = User.query.filter(User.sid == uid).first()
if not target_user:
status_code = 404
success = False
message = "User not found!"
else:
supervisor_login_user(current_user, remember=True, force=True)
logout_user()
login_user(target_user, remember=True, force=True)
except Exception as e:
status_code = 500
success = False
message = f"Remote Access failed!"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
@api_role_required('sysadmin')
def restrict_login_access():
try:
status_code = 200
success = True
message = "Partnership settings have been changed successfully!"
data = []
form_data = request.get_json()
if form_data:
partner_id = form_data.get('partnerID')
restrict_login = form_data.get('restrictLoginAccess')
restrict_subscription = form_data.get('suspendSubscription') # placeholder
if partner_id:
status = not restrict_login
partnership = Partnership.query.filter(Partnership.sid == partner_id).first()
if partnership:
partnership_accounts = PartnershipAccount.query.filter(
PartnershipAccount.partnership_id == partnership.id)
for pa in partnership_accounts:
pa.active = status
pa.save()
users = User.query.filter(User.partnership_id == partnership.id,
User.role.in_(['partner', 'agent', 'admin', 'member']))
for user in users:
user.active = status
user.save()
# Old method
# db.session.query(User) \
# .filter(User.partnership_id == partner_id, User.role.in_(['partner']), User.is_deactivated.is_(False)) \
# .update({"active": status}, synchronize_session='fetch')
# db.session.commit()
else:
status_code = 404
success = False
message = "Partnership not found!"
except Exception as e:
status_code = 500
success = False
message = f"Partnership settings updation failed!"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
@api_role_required('sysadmin')
def close_partnership_account(pid):
try:
status_code = 200
success = True
status = None
message = "Partnership account closed successfully!"
data = []
partnership = Partnership.query.filter(Partnership.sid == pid).first()
if partnership:
status = Partnership.close_account(pid)
if not status:
status_code = 400
status = False
message = "Partnership account closing failed!"
except Exception as e:
status_code = 500
success = False
message = f"Partnership account closing failed!"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
def create_domain_records(dkim_tokens, domain, region):
dns_records = []
for token in dkim_tokens:
_record = {
"Type": "CNAME",
"Name": f"{token}._domainkey.{domain}",
"Value": f"{token}.dkim.amazonses.com"
}
dns_records.append(_record)
mx_record = {
"Type": "MX",
"Name": domain,
"Value": f"10 inbound-smtp.{region}.amazonaws.com"
}
dns_records.append(mx_record)
return dns_records
@api_role_required('sysadmin')
def verify_email(email):
email_add_resp = ses_client.verify_email_identity(email)
return api_jsonify(email_add_resp)
@api_role_required('sysadmin')
def create_partnership():
is_debug = current_app.config.get('DEBUG', False)
try:
status_code = 200
success = True
message = "Partnership created successfully!"
data = []
partnership = None
avatar_large = request.files.get('file', None)
logo_large_alt = request.files.get('fileAlt', None)
avatar_small = request.files.get('fileSmall', None)
logo_small_alt = request.files.get('fileSmallAlt', None)
partnership_data = request.form.to_dict(flat=False)
if 'data' in partnership_data:
partnership_data = json.loads(partnership_data['data'][0])
partner_url = partnership_data.get('partnerUrl', None)
partner_email = partnership_data.get('partneruseremail', None)
system_email = partnership_data.get('systemEmail', None)
from buyercall.extensions import ses_client
if partner_url:
identity_status = ses_client.check_identity_verify_status(partner_url)
status = identity_status['data'].get('VerificationAttributes', {}).get(partner_url, {}).get(
'VerificationStatus', '')
if status == 'Success' and not is_debug:
message = "Partner domain already registered"
status_code = 401
success = False
return api_jsonify(message=message, status_code=status_code, success=success)
if system_email:
identity_status = ses_client.check_identity_verify_status(system_email)
status = identity_status['data'].get('VerificationAttributes', {}).get(system_email, {}).get(
'VerificationStatus', '')
if status == 'Success' and not is_debug:
message = "System email already registered"
status_code = 401
success = False
return api_jsonify(message=message, status_code=status_code, success=success)
partnership = Partnership()
if partnership_data:
if partnership_data.get('partnershipName', None):
partnership.name = partnership_data.get('partnershipName')
if partnership_data.get('partnerUrl', None):
partnership.partner_url = partnership_data.get('partnerUrl')
if partnership_data.get('systemEmail', None):
partnership.email_sender = partnership_data.get('systemEmail')
if partnership_data.get('sidebarColor', None):
partnership.primary_color = partnership_data.get('sidebarColor')
if partnership_data.get('defaultBilling', None):
partnership.default_billing_type = partnership_data.get('defaultBilling').get('value')
if partnership_data.get('businessType', None):
partnership.business_type = partnership_data.get('businessType').get('value')
if partnership_data.get('partnerType', None):
partnership.partner_type = partnership_data.get('partnerType').get('value')
if 'expandedMenu' in partnership_data:
partnership.is_expanded_menu = partnership_data.get('expandedMenu', False)
partnership.account_invitation_url_token = str(uuid.uuid4())
partnership.save()
from buyercall.lib.util_boto3_s3 import generate_presigned_file_url, upload_file_object
if avatar_large and partnership:
result = None
avatar_ext = avatar_large.filename.split('.')[-1]
key = f'large_logos/partnership_logo_{partnership.sid}_lg.{avatar_ext}'
resp = upload_file_object(avatar_large, current_app.config['PARTNERSHIP_BUCKET'], key)
if resp:
result = f"{current_app.config['PARTNERSHIP_BUCKET']}::{key}"
if result:
partnership.logo = result
partnership.save()
else:
logger.error("Error in uploading the large logo.!")
status_code = 200
success = False
message = "Partnership large image uploading failed!"
data = []
if logo_large_alt and partnership:
result = None
avatar_ext = logo_large_alt.filename.split('.')[-1]
key = f'large_logos/partnership_logo_{partnership.sid}_lg_alt.{avatar_ext}'
resp = upload_file_object(logo_large_alt, current_app.config['PARTNERSHIP_BUCKET'], key)
if resp:
result = f"{current_app.config['PARTNERSHIP_BUCKET']}::{key}"
if result:
partnership.alternative_logo = result
partnership.save()
else:
logger.error("Error in uploading the alternative large logo.!")
status_code = 200
success = False
message = "Partnership alternative large image uploading failed!"
data = []
if avatar_small and partnership:
result = None
avatar_ext = avatar_small.filename.split('.')[-1]
key = f'small_logos/partnership_logo_{partnership.sid}_sm.{avatar_ext}'
resp = upload_file_object(avatar_small, current_app.config['PARTNERSHIP_BUCKET'], key)
if resp:
result = f"{current_app.config['PARTNERSHIP_BUCKET']}::{key}"
if result:
partnership.logo_small = result
partnership.save()
else:
logger.error("Error in uploading the small logo.!")
status_code = 200
success = False
message = "Partnership small image uploading failed!"
data = []
if logo_small_alt and partnership:
result = None
avatar_ext = logo_small_alt.filename.split('.')[-1]
key = f'small_logos/partnership_logo_{partnership.sid}_sm_alt.{avatar_ext}'
resp = upload_file_object(logo_small_alt, current_app.config['PARTNERSHIP_BUCKET'], key)
if resp:
result = f"{current_app.config['PARTNERSHIP_BUCKET']}::{key}"
if result:
partnership.alternative_logo_small = result
partnership.save()
else:
logger.error("Error in uploading the alternative small logo.!")
status_code = 200
success = False
message = "Partnership alternative small image uploading failed!"
data = []
if system_email:
is_dkim = True
if 'isDKIM' in partnership_data:
is_dkim = partnership_data.get('isDKIM', True)
enc_key = current_app.config['CRYPTO_SECRET_KEY']
cipher = AESCipherDome(enc_key)
if partner_url and partnership:
domain_records = ses_client.verify_domain(partner_url)
email_config_resp = set_email_config(partnership)
username, domain = system_email.split('@')
email_identity = EmailIdentity.get_by_username(username, partnership.id)
email_add_resp = ses_client.verify_email_identity(system_email)
if not email_identity:
email_identity_params = {
"username": system_email.split('@')[0],
"domain": system_email.split('@')[1],
"partnership_id": partnership.id
}
email_identity = EmailIdentity.create(**email_identity_params)
if 'domain_identity' in domain_records['data']:
if 'VerificationToken' in domain_records['data']['domain_identity']:
domain_verify_token = domain_records['data']['domain_identity']['VerificationToken']
if domain_verify_token:
partnership.domain_record = cipher.encrypt(domain_verify_token)
partnership.save()
if is_dkim:
if 'DkimTokens' in domain_records['data']['domain_dkim']:
dkim_tokens = domain_records['data']['domain_dkim']['DkimTokens']
# Create dns records from tokens
aws_region = current_app.config.get('SES_REGION_NAME', '')
dns_records = create_domain_records(dkim_tokens, partner_url, aws_region)
# Response context
data = {'dns_records': dns_records}
if dkim_tokens:
_dkim_records = []
for dkim_token in dkim_tokens:
dkim_record = cipher.encrypt(dkim_token)
_dkim_records.append(dkim_record)
partnership.dkim_records = _dkim_records
partnership.save()
params = {
'role': 'partner',
'email': system_email,
'password': 'password',
'company': partnership.name,
'tos_agreement': False,
'partnership_id': partnership.id
}
try:
u = User.query.filter(User.email == system_email).first()
from buyercall.blueprints.user.tasks import send_generic_mail
if not u:
u = User(**params)
u.save()
reset_token = u.serialize_token()
subject = 'Invitation to {}'.format(partnership.name)
ctx = {'user': u, 'reset_token': reset_token}
data = _try_renderer_template(template_path='partnership/mail/invitation', ext='html', **ctx)
html = _try_renderer_template(template_path='partnership/mail/invitation', ext='html', **ctx)
# send email invitation
send_generic_mail.delay(subject=subject, recipients=[partner_email], text=data, html=html,
sender=current_app.config.get('SES_EMAIL_SOURCE'))
except Exception as e:
print("Error: ", e)
except Exception as e:
print('Error : ', e)
status_code = 500
success = False
message = f"Partnership creation failed!"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
def email_configuration():
try:
from buyercall.blueprints.email.utils.create_configuration import EmailConfiguration
endpoint = current_app.config.get('SNS_ENDPOINT')
bucket_name = current_app.config.get('EMAIL_BUCKET')
prefix = current_app.config.get('MAIL_CONFIG_PREFIX')
email_config = EmailConfiguration(endpoint, bucket_name, prefix)
response = email_config.configure()
except Exception as e:
return api_jsonify(status_code=500, message=str(e), success=False)
return api_jsonify(data=response)
# @api_api_role_required('sysadmin')
def set_email_config(partnership):
status_code = 200
success = True
message = "Create config set successful!"
data = []
pid = partnership.sid
try:
# SES config set, destination,
config_set_name = f"{pid}-config_set"
config_set = ses_client.create_configuration_set(config_set_name)
print('config set : ', config_set)
# SNS topic, subscribe, confirm subscription
topic_name = f"{pid}-topic"
sns_topic = sns_client.create_topic(topic_name)
print('sns topic : ', sns_topic)
if sns_topic.get('status', False):
sns_topic_arn = sns_topic.get('data')['TopicArn']
if config_set_name and sns_topic_arn:
destination_name = f"{pid}-event_destination"
destination_config = {
'Name': destination_name,
'Enabled': True,
'MatchingEventTypes': [
'send', 'reject', 'bounce', 'complaint', 'delivery', 'open', 'click', 'renderingFailure'
],
'SNSDestination': {
'TopicARN': sns_topic_arn
},
}
destination = ses_client.create_event_destination(config_set_name, destination_config)
print('destination : ', destination)
# Change to get from settings
ruleset_name = current_app.config.get('SES_RULESET_NAME')
rule_name = f"{pid}-rule"
bucket_name = current_app.config.get("EMAIL_BUCKET", None)
key_prefix = f"emails/{pid}/"
# recipients = [partnership.partner_url, ]
recipients = []
# Create receipt rule
receipt_rule = ses_client.create_receipt_rule(ruleset_name, sns_topic_arn, rule_name, bucket_name,
recipients, key_prefix)
print('receipt_rule : ', receipt_rule)
# subscribe
sns_endpoint = current_app.config.get('SNS_ENDPOINT')
# url_scheme = current_app.config.get('PREFERRED_URL_SCHEME', current_app.config.get('URL_SCHEME', 'http'))
url_scheme = 'https'
subscription = sns_client.subscribe(sns_topic_arn, sns_endpoint, )
print('subscription : ', subscription)
except Exception as e:
success = False
status_code = 500
message = str(e)
return api_jsonify(data, status_code, message, success)
# @api_api_role_required('sysadmin')
# OneTime / manual creation
# Put the receipt rule name in settings.py
def create_receipt_rule():
status_code = 200
success = True
message = "Create receipt successful!"
data = []
try:
unique_id = uuid.uuid4()
# Create receipt ruleset
ruleset_name = f"{unique_id}-rule_set"
ruleset = ses_client.create_receipt_rule_set(ruleset_name)
print(ruleset)
except Exception as e:
success = False
status_code = 500
message = str(e)
return api_jsonify(data, status_code, message, success)
@api_role_required('sysadmin')
def resend_invite(pid, uid):
try:
status_code = 200
success = True
message = "Resend invite successful!"
data = []
partner = User.query.filter(User.sid == uid).first()
partnership = Partnership.query.filter(Partnership.sid == pid).first()
if partner and partnership:
# send email invitation
reset_token = partner.serialize_token()
from buyercall.blueprints.user.tasks import send_partners_invitation_email
# TODO: can't call celery's delay() method. says not serializable
send_partners_invitation_email.delay(partner.id, reset_token, partnership.name)
else:
success = False
message = "Resend invite failed!"
except Exception as e:
print(e)
status_code = 500
success = False
message = f"Resend invite failed!"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
@api_role_required('sysadmin')
def partnerships():
"""
Retrieves all partnerships.
"""
try:
status_code = 200
partnerships = Partnership.query.order_by(Partnership.updated_on.desc()).all()
data = []
for p in partnerships:
_data = {
"id": p.id,
"sid": p.sid,
"isActive": p.active,
"partnershipName": p.name,
"created_on": tza_to_date(p.created_on),
"updated_on": tza_to_date(p.updated_on)
}
data.append(_data)
success = True
message = "Partnership List Fetched Successfully!"
except Exception as e:
print(e)
status_code = 500
success = False
message = f"Partnership List Fetching Failed!"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
@api_role_required('sysadmin')
def partnership_by_id(pid):
"""
Retrieves a single partnership by sid.
"""
try:
status_code = 200
partnership = Partnership.query.filter(Partnership.sid == pid).first()
message = 'Partnership fetched successfully!'
success = True
p_acs = partnership.partnership_accounts
p_users = partnership.users
is_restricted = True
for pa in p_acs:
if pa.active:
is_restricted = False
for pu in p_users:
if pu.active and pu.role in ['partner', 'agent', 'admin', 'member']:
is_restricted = False
dkim_records = None
enc_key = current_app.config['CRYPTO_SECRET_KEY']
cipher = AESCipherDome(enc_key)
dkim_records = []
if partnership.dkim_records:
for dkimr in partnership.dkim_records:
dkimr_decrypted = cipher.decrypt(dkimr)
dkim_records.append(
{
"type": "CNAME",
"name": f"{dkimr_decrypted}._domainkey.{partnership.partner_url}",
"value": f"{dkimr_decrypted}.dkim.amazonses.com"
}
)
# Get domain and email(ses identities) verification status
ses_identities = [partnership.email_sender, partnership.partner_url]
identity_status = ses_client.check_identity_verify_status(ses_identities)
ses_identity_status = []
if 'VerificationAttributes' in identity_status['data']:
for idt, val in identity_status['data']['VerificationAttributes'].items():
_idt = {}
_idt['identityName'] = idt
_idt['identityStatus'] = val['VerificationStatus']
_idt['identityType'] = 'email' if '@' in idt else 'domain'
ses_identity_status.append(_idt)
partner_image_lg = ''
partner_image_lg_alt = ''
partner_image_lg_small = ''
partner_image_lg_small_alt = ''
if partnership.logo:
if '::' in partnership.logo:
bucket_name, key = partnership.logo.split("::")
partner_image_lg = generate_presigned_file_url(key, bucket_name)
if partnership.alternative_logo:
if '::' in partnership.alternative_logo:
bucket_name, key = partnership.alternative_logo.split("::")
partner_image_lg_alt = generate_presigned_file_url(key, bucket_name)
if partnership.logo_small:
if '::' in partnership.logo_small:
bucket_name, key = partnership.logo_small.split("::")
partner_image_lg_small = generate_presigned_file_url(key, bucket_name)
if partnership.alternative_logo_small:
if '::' in partnership.alternative_logo_small:
bucket_name, key = partnership.alternative_logo_small.split("::")
partner_image_lg_small_alt = generate_presigned_file_url(key, bucket_name)
data = {
'name': partnership.name,
'sid': partnership.sid,
'isActive': partnership.active,
'createdOn': tza_to_date(partnership.created_on),
'updateOn': tza_to_date(partnership.updated_on),
'billingType': partnership.default_billing_type,
'businessType': partnership.business_type,
'partnerType': partnership.partner_type,
'partnerUrl': partnership.partner_url,
'email': partnership.email_sender,
'sidebarColor': partnership.primary_color,
'partnerImage': partner_image_lg,
'partnerImageAlt': partner_image_lg_alt,
'partnerImageSmall': partner_image_lg_small,
'partnerImageSmallAlt': partner_image_lg_small_alt,
'restrictLoginAccess': is_restricted,
'suspendSubscription': False,
'isExpandedMenu': partnership.is_expanded_menu,
'signUpLink': f"{partnership.partner_url}/signup?partnership={partnership.account_invitation_url_token}",
'dkimRecords': dkim_records,
'sesIdentityStatus': ses_identity_status
}
users = []
for u in partnership.users:
if u.active and u.email and u.role not in ['sysadmin']:
_user = {
'id': u.id,
'sid': u.sid,
'firstName': u.firstname,
'lastName': u.lastname,
'email': u.email,
'isActive': u.active,
'phoneNumber': u.phonenumber,
'roleId': u.role,
'signInCount': u.sign_in_count,
'signInDate': tza_to_date(u.last_sign_in_on),
'signInIp': u.last_sign_in_ip
}
users.append(_user)
data['users'] = users
accounts = partnership.partnership_accounts
# Placeholder initializatins
campaigns = 0
landing_page_visits = 0
third_party_subs = 0
widget_i10s_count_this_month = 0
widget_i10s_count_last_month = 0
widget_i10s_count_total = 0
# Initilize to add count for multiple accounts
leads_count_this_month = 0
leads_count_last_month = 0
leads_count_total = 0
phone_nums_count_this_month = 0
phone_nums_count_last_month = 0
phone_nums_count_total = 0
web_form_subs_count_this_month = 0
web_form_subs_count_last_month = 0
web_form_subs_count_total = 0
call_seconds_this_month = 0
call_seconds_last_month = 0
call_seconds_total = 0
text_msgs_this_month = 0
text_msgs_last_month = 0
text_msgs_total = 0
users_count_this_month = User.get_by_partnership_datetime_expr(
partnership.id, count_only=True, date_range_expr='this_month')
users_count_last_month = User.get_by_partnership_datetime_expr(
partnership.id, count_only=True, date_range_expr='last_month')
users_count_total = User.get_by_partnership_datetime_expr(partnership.id, count_only=True)
issues_count_this_month = Issue.get_by_partnership_datetime_expr(
partnership.id, count_only=True, date_range_expr='this_month')
issues_count_last_month = Issue.get_by_partnership_datetime_expr(
partnership.id, count_only=True, date_range_expr='last_month')
issues_count_total = Issue.get_by_partnership_datetime_expr(partnership.id, count_only=True)
for ac in accounts:
leads_count_this_month += get_by_partnership_account_expr(
Contact, ac.id, count_only=True, date_range_expr='this_month')
leads_count_last_month += get_by_partnership_account_expr(
Contact, ac.id, count_only=True, date_range_expr='last_month')
leads_count_total += get_by_partnership_account_expr(Contact, ac.id, count_only=True)
phone_nums_count_this_month += get_by_partnership_account_expr(
Phone, ac.id, count_only=True, date_range_expr='this_month')
phone_nums_count_last_month += get_by_partnership_account_expr(
Phone, ac.id, count_only=True, date_range_expr='last_month')
phone_nums_count_total += get_by_partnership_account_expr(Phone, ac.id, count_only=True)
web_form_subs_count_this_month += get_by_partnership_account_expr(
FormLead, ac.id, count_only=True, date_range_expr='this_month')
web_form_subs_count_last_month += get_by_partnership_account_expr(
FormLead, ac.id, count_only=True, date_range_expr='last_month')
web_form_subs_count_total += get_by_partnership_account_expr(FormLead, ac.id, count_only=True)
text_msgs_this_month += get_by_partnership_account_expr(
Message, ac.id, count_only=True, date_range_expr='this_month')
text_msgs_last_month += get_by_partnership_account_expr(
Message, ac.id, count_only=True, date_range_expr='last_month')
text_msgs_total += get_by_partnership_account_expr(Message, ac.id, count_only=True)
leads_this_month = get_by_partnership_account_expr(Lead, ac.id, date_range_expr='this_month')
for lead1 in leads_this_month:
if lead1.duration:
call_seconds_this_month += lead1.duration
leads_last_month = get_by_partnership_account_expr(Lead, ac.id, date_range_expr='last_month')
for lead2 in leads_last_month:
if lead2.duration:
call_seconds_last_month += lead2.duration
leads_total = get_by_partnership_account_expr(Lead, ac.id)
for leadt in leads_total:
if leadt.duration:
call_seconds_total += leadt.duration
call_minutes_this_month = int(math.ceil(call_seconds_this_month / 60))
call_minutes_last_month = int(math.ceil(call_seconds_last_month / 60))
call_minutes_total = int(math.ceil(call_seconds_total / 60))
data['usage'] = [
{
'label': "Leads",
'currentMonthTotal': leads_count_this_month,
'lastMonthTotal': leads_count_last_month,
'total': leads_count_total,
},
{
'label': "Accounts",
'currentMonthTotal': 0,
'lastMonthTotal': 0,
'total': len(accounts),
},
{
'label': "Users",
'currentMonthTotal': users_count_this_month,
'lastMonthTotal': users_count_last_month,
'total': users_count_total,
},
{
'label': "Campaigns",
'currentMonthTotal': 0,
'lastMonthTotal': 0,
'total': campaigns,
},
{
'label': "SupportTickets",
'currentMonthTotal': issues_count_this_month,
'lastMonthTotal': issues_count_last_month,
'total': issues_count_total,
},
{
'label': "PhoneNumbers",
'currentMonthTotal': phone_nums_count_this_month,
'lastMonthTotal': phone_nums_count_last_month,
'total': phone_nums_count_total,
},
{
'label': "WidgetInteractions",
'currentMonthTotal': widget_i10s_count_this_month,
'lastMonthTotal': widget_i10s_count_last_month,
'total': widget_i10s_count_total,
},
{
'label': "WebFormSubmissions",
'currentMonthTotal': web_form_subs_count_this_month,
'lastMonthTotal': web_form_subs_count_last_month,
'total': web_form_subs_count_total,
},
{
'label': "LandingPageVisitors",
'currentMonthTotal': 0,
'lastMonthTotal': 0,
'total': landing_page_visits,
},
{
'label': "CallMinutes",
'currentMonthTotal': call_minutes_this_month,
'lastMonthTotal': call_minutes_last_month,
'total': call_minutes_total,
},
{
'label': "TextMessages",
'currentMonthTotal': text_msgs_this_month,
'lastMonthTotal': text_msgs_last_month,
'total': text_msgs_total,
},
{
'label': "ThirdPartySubmissions",
'currentMonthTotal': 0,
'lastMonthTotal': 0,
'total': third_party_subs,
},
]
es_data = {
'user_id': current_user.sid,
'notify_message_type': 'PARTNERSHIP_SETTINGS_VIEWED',
'user_related_entities': ["You've", partnership.name],
'other_user_related_entities': [f'{current_user.firstname} {current_user.lastname}', partnership.name],
'hyperlink': f'{partnership.partner_url}/settings'
}
es_response = send_notifications(**es_data)
except Exception as e:
print(e)
status_code = 500
success = False
message = f"Partnership Fetching Failed!"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
@api_role_required('sysadmin')
def partnership_billing_types():
"""
Retrieves all billing types
"""
data = []
try:
for k, v in BILLING_TYPE.items():
_bt = {}
_bt['label'] = v
_bt['value'] = k
data.append(_bt)
message = 'Partnership billing types fetched successfully!'
status_code = 200
success = True
except Exception as e:
status_code = 500
success = False
message = f"Partnership billing types fetching failed!"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
@api_role_required('sysadmin')
def partnership_business_types():
"""
Retrieves all business types
"""
data = []
try:
for k, v in BUSINESS_TYPE.items():
_bt = {}
_bt['label'] = v
_bt['value'] = k
data.append(_bt)
message = 'Partnership business types fetched successfully!'
status_code = 200
success = True
except Exception as e:
status_code = 500
success = False
message = f"Partnership business types fetching failed!"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
@api_role_required('sysadmin')
def deactivate_user(pid, uid):
try:
user = User.query.filter(User.sid == uid).first()
if not user:
success = False
message = "User not found."
status_code = 200
else:
user.active = False
user.deactivated_on = datetime.now()
user.save()
partnership = Partnership.query.filter(Partnership.id == user.partnership_id).first()
if not partnership:
partnership = Partnership.query.get(1)
es_data = {
'user_id': user.sid,
'notify_message_type': 'USER_DEACTIVATED',
'user_related_entities': f"{user.email}",
'other_user_related_entities': f'{user.email}',
'hyperlink': f'{partnership.partner_url}/dashboard'
}
es_response = send_notifications(**es_data)
success = True
message = "User deactivation successful!"
status_code = 204
data = []
except Exception as e:
status_code = 500
success = False
message = f"User deactivation Failed!"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
@api_role_required('sysadmin')
def partnership_types():
"""
Get the list of partnership types
"""
partnership_types = [
{
"label": "General",
"value": "general"
},
{
"label": "API",
"value": "api"
},
]
success = True
message = "Partnership types fetched successfully!"
status_code = 200
data = partnership_types
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
@api_role_required('sysadmin')
def update_partnership_settings(pid):
"""
Updates partnership.
"""
try:
status_code = 200
success = True
message = "Partnership details updated successfully!"
data = []
partnership = None
logo_large = request.files.get('file', None)
logo_large_alt = request.files.get('fileAlt', None)
logo_small = request.files.get('fileSmall', None)
logo_small_alt = request.files.get('fileSmallAlt', None)
partnership_data = request.form.get('data', None)
if partnership_data:
try:
partnership_data = json.loads(partnership_data)
except:
partnership_data = {}
partnership = Partnership.query.filter(Partnership.sid == pid).first()
if partnership_data and partnership:
if partnership_data.get('partnershipName', None):
partnership.name = partnership_data.get('partnershipName')
if partnership_data.get('defaultBilling', None):
partnership.default_billing_type = partnership_data.get('defaultBilling').get('value')
if partnership_data.get('businessType', None):
partnership.business_type = partnership_data.get('businessType').get('value')
if partnership_data.get('partnerType', None):
partnership.partner_type = partnership_data.get('partnerType').get('value')
if partnership_data.get('isLargeImageDeleted', None):
partnership.logo = ''
if partnership_data.get('isSmallImageDeleted', None):
partnership.logo_small = ''
partnership.save()
from buyercall.lib.util_boto3_s3 import generate_presigned_file_url, upload_file_object
if logo_large and partnership:
result = None
avatar_ext = logo_large.filename.split('.')[-1]
key = f'large_logos/partnership_logo_{partnership.sid}_lg.{avatar_ext}'
resp = upload_file_object(logo_large, current_app.config['PARTNERSHIP_BUCKET'], key)
if resp:
partnership.logo = f"{current_app.config['PARTNERSHIP_BUCKET']}::{key}"
partnership.save()
else:
status_code = 200
success = False
message = "Partnership large image uploading failed!"
data = []
if logo_large_alt and partnership:
result = None
avatar_ext = logo_large_alt.filename.split('.')[-1]
key = f'large_logos/partnership_logo_{partnership.sid}_lg_alt.{avatar_ext}'
resp = upload_file_object(logo_large_alt, current_app.config['PARTNERSHIP_BUCKET'], key)
if resp:
partnership.alternative_logo = f"{current_app.config['PARTNERSHIP_BUCKET']}::{key}"
partnership.save()
else:
logger.error("Error in uploading the alternative large logo.!")
status_code = 200
success = False
message = "Partnership alternative large image uploading failed!"
data = []
if logo_small and partnership:
result = None
avatar_ext = logo_small.filename.split('.')[-1]
key = f'small_logos/partnership_logo_{partnership.sid}_sm.{avatar_ext}'
resp = upload_file_object(logo_small, current_app.config['PARTNERSHIP_BUCKET'], key)
if resp:
partnership.logo_small = f"{current_app.config['PARTNERSHIP_BUCKET']}::{key}"
partnership.save()
else:
status_code = 200
success = False
message = "Partnership small image uploading failed!"
data = []
if logo_small_alt and partnership:
result = None
avatar_ext = logo_small_alt.filename.split('.')[-1]
key = f'small_logos/partnership_logo_{partnership.sid}_sm_alt.{avatar_ext}'
resp = upload_file_object(logo_small_alt, current_app.config['PARTNERSHIP_BUCKET'], key)
if resp:
partnership.alternative_logo_small = f"{current_app.config['PARTNERSHIP_BUCKET']}::{key}"
partnership.save()
else:
logger.error("Error in uploading the alternative small logo.!")
status_code = 200
success = False
message = "Partnership alternative small image uploading failed!"
data = []
notify_data = {
'user_id': current_user.sid,
'notify_message_type': 'PARTNERSHIP_SETTINGS_EDITED',
'user_related_entities': ["You've", partnership.name],
'other_user_related_entities': [f'{current_user.firstname} {current_user.lastname}', partnership.name],
'hyperlink': f'{partnership.partner_url}/settings'
}
response = send_notifications(**notify_data)
except Exception as e:
print(e)
status_code = 500
success = False
message = "Partnership Fetching Failed!"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
@api_role_required('sysadmin')
def update_white_label_config(sid):
try:
status_code = 200
success = True
message = "Partnership details updated successfully!"
data = []
partnership = None
partnership_data = request.get_json()
print('partnership_data : ', partnership_data)
partnership = Partnership.query.filter(Partnership.sid == sid).first()
if not partnership:
partnership = Partnership.query.get(1)
if partnership_data and partnership:
if partnership_data.get('partnerUrl', None):
partnership.partner_url = partnership_data.get('partnerUrl')
if partnership_data.get('systemEmail', None):
partnership.email_sender = partnership_data.get('systemEmail')
if partnership_data.get('sidebarColor', None):
partnership.primary_color = partnership_data.get('sidebarColor')
if 'isExpandedMenu' in partnership_data:
if isinstance(partnership_data.get('isExpandedMenu'), bool):
partnership.is_expanded_menu = partnership_data.get('isExpandedMenu')
partnership.save()
partner_email = partnership_data.get('partneruseremail', None)
if partner_email:
params = {
'role': 'partner',
'email': partner_email,
'password': 'password',
'company': partnership.name,
'tos_agreement': False,
'partnership_id': partnership.id
}
try:
user = User.query.filter(User.email == partner_email).first()
if not user:
user = User(**params)
user.save()
# send email invitation
reset_token = user.serialize_token()
ctx = {'user': user, 'reset_token': reset_token}
data = _try_renderer_template(template_path='partnership/mail/invitation', ext='html', **ctx)
html = _try_renderer_template(template_path='partnership/mail/invitation', ext='html', **ctx)
subject = 'Invitation to {}'.format(partnership.name)
# send email invitation
from buyercall.blueprints.user.tasks import send_generic_mail
send_generic_mail.delay(subject=subject, recipients=[partner_email], text=data, html=html,
sender=current_app.config.get('SES_EMAIL_SOURCE'))
except Exception as e:
print(e)
notify_data = {
'user_id': current_user.sid,
'notify_message_type': 'PARTNERSHIP_SETTINGS_EDITED',
'user_related_entities': ["You've", partnership.name],
'other_user_related_entities': [f'{current_user.firstname} {current_user.lastname}', partnership.name],
'hyperlink': f'{partnership.partner_url}/settings'
}
response = send_notifications(**notify_data)
except Exception as e:
status_code = 500
success = False
message = f"Partnership details updation failed!: {str(e)}"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
@api_role_required('sysadmin')
def verify_ses_domain(pid):
try:
status_code = 200
success = True
message = "Domain verification request sent successfully!"
data = {}
partnership = Partnership.query.filter(Partnership.sid == pid).first()
if not partnership:
partnership = Partnership.query.get(1)
domain_verify_data = request.get_json()
if not domain_verify_data:
domain_verify_data = {}
is_dkim = domain_verify_data.get('isDkimEnabled', True)
partner_url = domain_verify_data.get('partnerUrl', None)
if partner_url:
partnership.partner_url = partner_url
else:
partner_url = partnership.partner_url
# domain_records = ses_client.verify_domain(partner_url, is_dkim)
domain_records = {}
domain_verify_token = None
dkim_tokens = None
if 'domain_identity' in domain_records['data']:
if 'VerificationToken' in domain_records['data']['domain_identity']:
domain_verify_token = domain_records['data']['domain_identity']['VerificationToken']
data['VerificationToken'] = domain_verify_token
if is_dkim:
if 'DkimTokens' in domain_records['data']['domain_dkim']:
dkim_tokens = domain_records['data']['domain_dkim']['DkimTokens']
data['DkimTokens'] = dkim_tokens
enc_key = current_app.config['CRYPTO_SECRET_KEY']
cipher = AESCipherDome(enc_key)
if domain_verify_token:
partnership.domain_record = cipher.encrypt(domain_verify_token)
if dkim_tokens:
_dkim_records = []
for dkimtoken in dkim_tokens:
dkim_record = cipher.encrypt(dkimtoken)
_dkim_records.append(dkim_record)
partnership.dkim_records = _dkim_records
if domain_verify_token or dkim_tokens:
partnership.save()
else:
status_code = 500
success = False
message = "Domain verification request failed!"
except Exception as e:
status_code = 500
success = False
message = f"Domain verification request failed!: {str(e)}"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
@api_role_required('sysadmin')
def verify_ses_email(pid):
try:
status_code = 200
success = True
message = "Email verification request sent successfully!"
data = {}
partnership = Partnership.query.filter(Partnership.sid == pid).first()
if not partnership:
partnership = Partnership.query.get(1)
email_verify_data = request.get_json()
if not email_verify_data:
email_verify_data = {}
email_sender = email_verify_data.get('systemEmail', False)
if email_sender:
partnership.email_sender = email_sender
partnership.save()
else:
email_sender = partnership.email_sender
email_verify_response = ses_client.verify_email(email_sender)
if not email_verify_response.get('status', False):
status_code = 500
success = False
message = "Email verification request failed!"
except Exception as e:
status_code = 500
success = False
message = f"Email verification request failed!: {str(e)}"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
@api_role_required('sysadmin')
def check_ses_identity_verify_status(pid):
try:
status_code = 200
success = True
message = "Identity verification status fetched succesfully!"
data = {}
partnership = Partnership.query.filter(Partnership.sid == pid).first()
if not partnership:
partnership = Partnership.query.get(1)
ses_identities = [partnership.email_sender, partnership.partner_url]
identity_status = ses_client.check_identity_verify_status(ses_identities)
ses_identity_status = []
if 'VerificationAttributes' in identity_status['data']:
for idt, val in identity_status['data']['VerificationAttributes'].items():
_idt = {}
_idt['identityName'] = idt
_idt['identityStatus'] = val['VerificationStatus']
_idt['identityType'] = 'email' if '@' in idt else 'domain'
ses_identity_status.append(_idt)
data = ses_identity_status
except Exception as e:
status_code = 500
success = False
message = f"Email verification request failed!: {str(e)}"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response