File: //proc/thread-self/root/home/arjun/projects/buyercall/buyercall/blueprints/user/endpoints.py
import ast
from buyercall.blueprints.sysadmin.endpoints import partnerships
from buyercall.blueprints.notification.models import NotificationSettings
import json
import logging as log
from buyercall.lib.util_datetime import tza_to_datetime
from flask import Blueprint, jsonify
from flask.globals import current_app, request
from flask_login import current_user
from flask_login.utils import login_required
from buyercall.lib.util_rest import api_jsonify, api_login_required
from buyercall.blueprints.user.decorators import role_required
from buyercall.blueprints.user.models import User, UserExternalApiAutoPostTie
from buyercall.blueprints.partnership.models import Partnership, PartnershipAccount
from buyercall.blueprints.filters import format_phone_number, format_phone_number_bracket
from buyercall.lib.util_boto3_s3 import generate_presigned_file_url
from buyercall.blueprints.notification.utilities import send_notifications
from sqlalchemy.sql import text
from buyercall.extensions import db
from buyercall.blueprints.reports.models import Report, ReportUserTie
from buyercall.blueprints.filters import format_phone_number
from buyercall.blueprints.agents.models import AgentSchedule
from buyercall.blueprints.agents.models import Agent
user_api = Blueprint('userapi', __name__, url_prefix='/api')
logger = log.getLogger(__name__)
@role_required('sysadmin')
def roles():
"""
Get all user roles
"""
try:
status_code = 200
success = True
message = f"User roles fetched successfully!"
data = []
for k, v in current_user.ROLE.items():
_role = {}
_role['label'] = v
_role['value'] = k
data.append(_role)
except Exception as e:
status_code = 500
success = False
message = f"Partnership List Fetching Failed!"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
@role_required('partner', 'admin', 'sysadmin', 'agent')
def timezones():
"""
Get all user timezones
"""
try:
status_code = 200
success = True
message = f"Timezones fetched successfully!"
data = []
for k, v in current_app.config['TIMEZONES'].items():
_tz = {}
_tz['label'] = v
_tz['value'] = k
data.append(_tz)
except Exception as e:
status_code = 500
success = False
message = f"Timezones Fetching Failed!"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
@role_required('partner', 'admin', 'sysadmin', 'agent')
def languages():
"""
Get all user languages
"""
try:
status_code = 200
success = True
message = f"Languages fetched successfully!"
data = []
for k, v in current_app.config['LANGUAGES'].items():
_ln = {}
_ln['label'] = v
_ln['value'] = k
data.append(_ln)
except Exception as e:
status_code = 500
success = False
message = f"Languages Fetching Failed!"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
@login_required
def profile_information():
"""
Save all user profile information
"""
try:
status_code = 200
success = True
message = "Profile information successfully updated!"
data = []
avatar = None
user = current_user
is_image_deleted = False
_data = None
if 'data' in request.form:
try:
_data = json.loads(request.form['data'])
except:
_data = {}
if _data:
first_name = _data.get('firstName', None)
last_name = _data.get('lastName', None)
timezone = _data.get('timeZoneId', None)
job_title = _data.get('jobTitle', '')
department = _data.get('department', '')
locale = _data.get('languageId', None)
is_image_deleted = _data.get('isImageDeleted', False)
if first_name:
user.firstname = first_name
if last_name:
user.lastname = last_name
if locale:
user.locale = locale['value']
if timezone and user.agent:
user.agent.timezone = timezone['label']
# Optional fields
user.title = job_title
user.department = department
user.save()
if 'file' in request.files:
avatar = request.files['file']
if avatar and not is_image_deleted:
from buyercall.lib.util_boto3_s3 import generate_presigned_file_url, upload_file_object
ALLOWED_EXTENSIONS = ['jpg', 'jpeg', 'png']
avatar_ext = avatar.filename.split('.')[-1]
if avatar_ext in ALLOWED_EXTENSIONS:
partnership = Partnership.query.filter(Partnership.id == user.partnership_id).first()
if not partnership:
partnership = Partnership.query.get(1)
key = f'{partnership.name}_{partnership.sid}/avatar_{user.sid}.{avatar_ext}'
key = key.replace(' ', '_')
resp = upload_file_object(avatar, current_app.config['USER_AVATAR_BUCKET'], key)
user.user_avatar = f"{current_app.config['USER_AVATAR_BUCKET']}::{key}"
else:
status_code = 400
success = False
message = "Unsupported image format for profile avatar!"
data = []
elif is_image_deleted:
user.user_avatar = ''
user.save()
partnership = Partnership.query.filter(Partnership.id == user.partnership_id).first()
if not partnership:
partnership = Partnership.query.get(1)
es_data = {
'user_id': user.sid,
'notify_message_type': 'PERSONAL_SETTINGS_EDITED',
'user_related_entities': "You've",
'other_user_related_entities': f'{user.firstname} {user.lastname}',
'hyperlink': f'{partnership.partner_url}/settings'
}
es_response = send_notifications(**es_data)
except Exception as e:
logger.error(f'POST(/profile/information) Error: {e}')
status_code = 500
success = False
message = f"Profile information update failed! - {e}"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
@api_login_required
def profile_security_information():
"""
Save profile security information
"""
try:
status_code = 200
success = True
message = "Profile security information successfully updated!"
data = []
is_password_match = False
user = current_user
partnership = Partnership.get_by_id(user.partnership_id)
if not partnership:
partnership = Partnership.get_by_id(1)
form_data = request.get_json(silent=True)
if form_data.get('password', None):
current_password = form_data.get('password')
is_password_match = user.authenticated(with_password=True, password=current_password)
if is_password_match:
if form_data.get('email', None):
existing_user = User.query.filter(User.email == form_data.get('email')).first()
if existing_user != current_user:
status_code = 400
success = False
message = "A user with entered email already exists!"
else:
user.email = form_data.get('email')
if form_data.get('mobileNumber', None):
user.phonenumber = format_phone_number(form_data.get('mobileNumber'))
if form_data.get('newpassword', None):
user.password = User.encrypt_password(form_data.get('newpassword'))
user.save()
else:
status_code = 400
success = False
message = "You've entered a wrong current password!"
es_data = {
'user_id': user.sid,
'notify_message_type': 'SECURITY_INFO_EDITED',
'user_related_entities': "You've",
'other_user_related_entities': f'{user.firstname} {user.lastname}',
'hyperlink': f'{partnership.partner_url}/settings'
}
es_response = send_notifications(**es_data)
except Exception as e:
print('E : ', e)
status_code = 500
success = False
message = f"Profile security information updation failed!"
data = []
response = api_jsonify(
status_code=status_code,
success=success,
message=message,
data=data
)
return response
@login_required
def profile_2fa():
"""
Save the two-factor authentication settings of the user
"""
try:
status_code = 200
success = True
message = "Two factor authentication successfully updated!"
data = []
user = current_user
partnership = Partnership.query.filter(Partnership.id == user.id).first()
if not partnership:
partnership = Partnership.query.get(1)
two_factor_auth = request.get_json(silent=True)
if 'twoFactor' in two_factor_auth:
user.two_factor_auth = two_factor_auth['twoFactor']
user.save()
es_data = {
'user_id': user.sid,
'notify_message_type': '2FA_SETTINGS_EDITED',
'user_related_entities': "You've",
'other_user_related_entities': f'{user.firstname} {user.lastname}',
'hyperlink': f'{partnership.partner_url}/settings'
}
es_response = send_notifications(**es_data)
except Exception as e:
status_code = 500
success = False
message = f"Two factor authentication updation failed!"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
@login_required
def profile_theme_settings():
"""
Save the profile theme settings
"""
try:
status_code = 200
success = True
message = "Theme settings successfully updated!"
data = []
is_dark_mode_now = current_user.dark_mode
form_data = request.get_json(silent=True)
partnership = Partnership.query.filter(Partnership.id == current_user.id).first()
if not partnership:
partnership = Partnership.query.get(1)
if form_data:
is_dark_mode = form_data.get('darkMode', False)
if isinstance(is_dark_mode, bool):
if is_dark_mode_now != is_dark_mode:
current_user.dark_mode = is_dark_mode
current_user.save()
es_data = {
'user_id': current_user.sid,
'notify_message_type': 'THEME_SETTINGS_EDITED',
'user_related_entities': "You've",
'other_user_related_entities': f'{current_user.firstname} {current_user.lastname}',
'hyperlink': f'{partnership.partner_url}/settings'
}
es_response = send_notifications(**es_data)
except Exception as e:
status_code = 500
success = False
message = f"Theme settings updation failed!"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
@login_required
def profile():
"""
Get the user profile details
"""
try:
status_code = 200
success = True
message = "Profile info fetched successfully!"
data = {}
user = current_user
if user.agent:
if user.agent.timezone:
user_tz = user.agent.timezone
else:
user_tz = 'US/Eastern'
else:
user_tz = 'US/Eastern'
partnership_sid = Partnership.get_sid_from_id(user.partnership_id or 1)
partnership_account_sid = PartnershipAccount.get_sid_from_id(user.partnership_account_id)
data['userSid'] = user.sid
data['partnershipSid'] = partnership_sid
data['partnershipAccountSid'] = partnership_account_sid
data['firstName'] = user.firstname
data['lastName'] = user.lastname
data['timeZoneId'] = user_tz
data['jobTitle'] = user.title
data['department'] = user.department
data['languageId'] = user.locale
try:
bucket_name, key = user.user_avatar.split("::")
profile_image = generate_presigned_file_url(key, bucket_name)
except Exception as e:
profile_image = ''
data['profileImage'] = profile_image
data['email'] = user.email
data['mobileNumber'] = format_phone_number_bracket(user.phonenumber)
data['twoFactor'] = user.two_factor_auth
data['darkMode'] = user.dark_mode
data['isSubscribed'] = user.push_notification_subscription
data['role'] = user.role
loginInfo = {
'signInCount': user.sign_in_count,
'currentSignInOn': tza_to_datetime(user.current_sign_in_on),
'currentSignInIP': user.current_sign_in_ip,
'lastSignInDate': tza_to_datetime(user.last_sign_in_on),
'lastSignInIP': user.last_sign_in_ip
}
data['loginInfo'] = loginInfo
except Exception as e:
status_code = 500
success = False
message = f"Profile info fetching failed! Error: {e}"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
def get_user(sid):
try:
status_code = 200
success = True
message = "User info fetched successfully!"
data = []
user = User.query.filter(User.sid == sid).first()
if user:
data = user.to_dict()
else:
status_code = 404
success = False
message = "User not found!"
except Exception as ex:
log.error(f'{ex}')
status_code = 500
success = False
message = f"Error: {ex}"
response = {
'statusCode': status_code,
'success': success,
'message': message,
'data': data
}
return jsonify(response)
@login_required
def in_app_notification_settings():
try:
if request.method == 'GET':
status_code = 200
success = True
message = "In-app notification settings fetched successfully!"
notification_settings = NotificationSettings.get_or_create(user_id=current_user.id)
data = {
"IanNewLead": notification_settings.ian_on_new_lead,
"IanNewTasks": notification_settings.ian_on_task_assign,
"IanAllTasks": notification_settings.ian_all_tasks,
"IanAccountActivity": notification_settings.ian_on_account_activity,
"IanAccountSecurity": notification_settings.ian_on_account_security,
"IanPromotions": notification_settings.ian_on_promo,
"PushNotification": notification_settings.push_notification
}
else:
notification_settings = NotificationSettings.get_or_create(user_id=current_user.id)
partnership = Partnership.query.filter(Partnership.id == current_user.id).first()
if not partnership:
partnership = Partnership.query.get(1)
notification_data = request.get_json()
if notification_data:
if 'IanNewLead' in notification_data:
notification_settings.ian_on_new_lead = notification_data.get('IanNewLead', False)
if 'IanNewTasks' in notification_data:
notification_settings.ian_on_task_assign = notification_data.get('IanNewTasks', False)
if 'IanAllTasks' in notification_data:
notification_settings.ian_all_tasks = notification_data.get('IanAllTasks', False)
if 'IanAccountActivity' in notification_data:
notification_settings.ian_on_account_activity = notification_data.get('IanAccountActivity', False)
if 'IanAccountSecurity' in notification_data:
notification_settings.ian_on_account_security = notification_data.get('IanAccountSecurity', False)
if 'IanPromotions' in notification_data:
notification_settings.ian_on_promo = notification_data.get('IanPromotions', False)
if 'PushNotification' in notification_data:
notification_settings.push_notification = notification_data.get('PushNotification', False)
notification_settings.save()
es_data = {
'user_id': current_user.sid,
'notify_message_type': 'INAPP_NOTIFY_SETTINGS_EDITED',
'user_related_entities': "You've",
'other_user_related_entities': f'{current_user.firstname} {current_user.lastname}',
'hyperlink': f'{partnership.partner_url}/settings'
}
es_response = send_notifications(**es_data)
status_code = 200
success = True
message = "In-app notification settings updated successfully!"
data = []
except Exception as e:
status_code = 500
success = False
message = f"In-app notification settings: {str(e)}"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
@login_required
def email_notification_settings():
try:
if request.method == 'GET':
status_code = 200
success = True
message = "Email notification settings fetched successfully!"
notification_settings = NotificationSettings.get_or_create(user_id=current_user.id)
data = {
"EmnNewLead": notification_settings.emn_on_new_lead,
"EmnNewTasks": notification_settings.emn_on_new_task,
"EmnAllTasks": notification_settings.emn_all_tasks,
"EmnAccountActivity": notification_settings.emn_on_account_activity,
"EmnAccountSecurity": notification_settings.emn_on_account_security,
"EmnPromotions": notification_settings.emn_on_promo
}
else:
notification_settings = NotificationSettings.get_or_create(user_id=current_user.id)
partnership = Partnership.query.filter(Partnership.id == current_user.id).first()
if not partnership:
partnership = Partnership.query.get(1)
notification_data = request.get_json()
if notification_data:
if 'EmnNewLead' in notification_data:
notification_settings.emn_on_new_lead = notification_data.get('EmnNewLead', False)
if 'EmnNewTasks' in notification_data:
notification_settings.emn_on_new_task = notification_data.get('EmnNewTasks', False)
if 'EmnAllTasks' in notification_data:
notification_settings.emn_all_tasks = notification_data.get('EmnAllTasks', False)
if 'EmnAccountActivity' in notification_data:
notification_settings.emn_on_account_activity = notification_data.get('EmnAccountActivity', False)
if 'EmnAccountSecurity' in notification_data:
notification_settings.emn_on_account_security = notification_data.get('EmnAccountSecurity', False)
if 'EmnPromotions' in notification_data:
notification_settings.emn_on_promo = notification_data.get('EmnPromotions', False)
notification_settings.save()
es_data = {
'user_id': current_user.sid,
'notify_message_type': 'EMAIL_NOTIFY_SETTINGS_EDITED',
'user_related_entities': "You've",
'other_user_related_entities': f'{current_user.firstname} {current_user.lastname}',
'hyperlink': f'{partnership.partner_url}/settings'
}
es_response = send_notifications(**es_data)
status_code = 200
success = True
message = "Email notification settings updated successfully!"
data = []
except Exception as e:
status_code = 500
success = False
message = f"Email notification settings: {str(e)}"
data = []
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
# test
def get_request_details():
context = {}
context['data'] = str(request.__dict__)
return api_jsonify(context, 200, 'Success', True)
@role_required('admin')
def get_user_management():
try:
partnership_account_id = current_user.partnership_account_id
partnership_account_group_viewing = current_user.is_viewing_partnership
# Check if being viewed by super partner
if partnership_account_group_viewing:
partnership_account_id = current_user.get_user_viewing_partnership_account_id
users = User.query \
.with_entities(User.id, User.firstname, User.email, User.lastname, User.phonenumber, User.last_sign_in_on, User.sign_in_count, User.sid) \
.filter(User.partnership_account_id == partnership_account_id, User.partnership_id != None) \
.filter(User.is_deactivated.is_(False))
users_list = []
user = current_user
value = None
if user.agent:
if user.agent.timezone:
if user.agent.timezone == 'US/Eastern':
value = 'EST'
elif user.agent.timezone == 'US/Central':
value = 'CST'
elif user.agent.timezone == 'US/Mountain':
value = 'MST'
elif user.agent.timezone == 'US/Pacific':
value = 'PST'
else:
value = None
for user in users:
user_data = {
'id': user.id,
'firstname': user.firstname,
'lastname': user.lastname,
'phonenumber': format_phone_number_bracket(user.phonenumber),
'email': user.email,
'sign_in_count': user.sign_in_count,
'sid': user.sid,
# Add more fields as needed
}
if user.last_sign_in_on is not None:
user_data['last_sign_in_on'] = f'{tza_to_datetime(user.last_sign_in_on)} {value}'
users_list.append(user_data)
# Fetching reports data
reports = [{"id": report.id, "name": report.name} for report in Report.query.all()]
# Fetching preferred_languages data
preferred_languages = []
for k, v in current_app.config['LANGUAGES'].items():
_ln = {}
_ln['label'] = v
_ln['value'] = k
preferred_languages.append(_ln)
from ..partnership.models import ExternalApiServiceProvidersPartnershipAccountTie
ams_2000 = False
partnership_account_ams_2000_ties = ExternalApiServiceProvidersPartnershipAccountTie.exists_for_service_provider(
partnership_account_id, 'AMS 2000')
if partnership_account_ams_2000_ties:
ams_2000 = True
ams_evo = False
partnership_account_ams_evo_ties = ExternalApiServiceProvidersPartnershipAccountTie.exists_for_service_provider(
partnership_account_id, 'AMS Evolution')
if partnership_account_ams_evo_ties:
ams_evo = True
neo_verify = False
partnership_account_neo_verify_ties = ExternalApiServiceProvidersPartnershipAccountTie.exists_for_service_provider(
partnership_account_id, 'NEO Verify')
if partnership_account_neo_verify_ties:
neo_verify = True
status_code = 200
success = True
message = "User Management List!"
data = {
"reports": reports,
"preferred_languages": preferred_languages,
"users": users_list,
"ams_2000":ams_2000,
"ams_evo":ams_evo,
"neo_verify":neo_verify
}
except Exception as e:
status_code = 500
success = False
message = f"User Management List: {str(e)}"
data = {}
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
@role_required('admin')
def add_user_management():
try:
data = request.get_json()
required_fields = ['title', 'firstname', 'lastname', 'extension', 'phonenumber', 'email',
'department', 'role', 'languages', 'reports']
for field in required_fields:
if field not in data:
raise ValueError(f"Missing required field: {field}")
input_email = data['email'].lower()
existing_user = User.query.filter(User.email == input_email).first()
if existing_user:
raise ValueError("Email is already in use. Please choose a different email.")
partnership_account_id = current_user.partnership_account_id
is_admin_in_group = current_user.is_admin_user_with_groups
viewing_partnership_account = current_user.is_viewing_partnership
new_user = User(
title=data['title'],
firstname=data['firstname'],
lastname=data['lastname'],
extension=data['extension'],
phonenumber=data['phonenumber'],
email=input_email,
department=data['department'],
role=data['role'],
partnership_account_id=partnership_account_id,
partnership_id=current_user.partnership_id,
locale=data.get('languages', 'en'), # Set the default value to 'en' if not provided
tos_agreement = False,
two_factor_auth = data['two_factor_auth']
)
db.session.add(new_user)
db.session.commit()
# Add reports relationship
reports_data = data.get('reports', [])
for report_id in reports_data:
report_tie = ReportUserTie(
report_id=report_id,
user_id=new_user.id,
partnership_accounts_id=partnership_account_id
)
new_user.reports.append(report_tie)
# Set service provider access states
ams_2000 = data.get('ams_2000')
ams_2000_result = False
if ams_2000 and ams_2000 == 'y':
ams_2000_result = True
UserExternalApiAutoPostTie.set_service_provider_access_state(new_user.id, partnership_account_id, 'AMS 2000', ams_2000_result)
ams_evolution = data.get('ams_evolution')
ams_evolution_result = False
if ams_evolution and ams_evolution == 'y':
ams_evolution_result = True
UserExternalApiAutoPostTie.set_service_provider_access_state(new_user.id, partnership_account_id, 'AMS Evolution', ams_evolution_result)
neo_verify = data.get('neo_verify')
neo_verify_result = False
if neo_verify and neo_verify == 'y':
neo_verify_result = True
UserExternalApiAutoPostTie.set_service_provider_access_state(new_user.id, partnership_account_id, 'NEO Verify', neo_verify_result)
# Only create agent if department is 'admin' or 'agent'
if data['role'] in ['admin', 'agent']:
# Create agent
agent = Agent()
total_agents = Agent.query.filter(current_user.id == Agent.user_id).count()
# Set user timezone
user_tz = 'US/Eastern'
if current_user.agent and current_user.agent.timezone:
user_tz = current_user.agent.timezone
params = {
'user_id': new_user.id,
'firstname': data['firstname'],
'lastname': data['lastname'],
'title': "",
'email': input_email,
'phonenumber': data['phonenumber'],
'mobile': data['phonenumber'],
'extension': data['extension'],
'description': agent.description,
'department': data['department'],
'timezone': user_tz,
'partnership_account_id': partnership_account_id,
'all_hours': True
}
result = Agent.create(params)
# Add schedules for each day of the week
for day in range(7):
# Assuming '-1' should be None for agent_id and partnership_account_id
schedule = AgentSchedule(
day=day,
available_from='08:00 AM',
available_to='17:00 PM',
is_active=(day != 0 and day != 6),
partnership_account_id=partnership_account_id,
agent_id=result
)
db.session.add(schedule)
db.session.commit()
user_email = input_email
u = User.initialize_password_reset(user_email.lower())
log.info('The user email requesting a password reset is: {}'.format(user_email.lower()))
status_code = 201
success = True
message = f"User created successfully!, And a email has been sent to {input_email} to set password."
data = {}
except Exception as e:
print(e)
status_code = 500
db.session.rollback() # Rollback the transaction in case of an exception
success = False
message = f"Failed to create user: {str(e)}"
data = {}
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
@role_required('admin')
def edit_user_management(user_id):
try:
# Retrieve the user to be edited
user_to_edit = User.query.get(user_id)
partnership_account_id = current_user.partnership_account_id
agent_to_edit = Agent.query.filter(
Agent.user_id == user_id, Agent.partnership_account_id == partnership_account_id
).first()
if not user_to_edit:
status_code = 404
success = False
message = "User not found"
data = {}
else:
if request.method == 'GET':
# If it's a GET request, return user details
status_code = 200
success = True
message = "User details retrieved successfully"
data = user_to_edit.to_dict()
data['updated_on'] = user_to_edit.updated_on.strftime("%Y-%m-%d %H:%M:%S") if user_to_edit.updated_on else None
from ..partnership.models import ExternalApiServiceProvidersPartnershipAccountTie
ams_2000_account_tie = False
partnership_account_ams_2000_ties = ExternalApiServiceProvidersPartnershipAccountTie.exists_for_service_provider(
partnership_account_id, 'AMS 2000')
if partnership_account_ams_2000_ties:
ams_2000_account_tie = True
ams_2000_exists = UserExternalApiAutoPostTie.get_service_provider_access_state(
user_to_edit.id, partnership_account_id, 'AMS 2000'
)
if ams_2000_exists and ams_2000_exists.is_allowed:
data.update({'ams_2000': True,})
else:
data.update({'ams_2000': False,})
ams_evo_account_tie = False
partnership_account_ams_evo_ties = ExternalApiServiceProvidersPartnershipAccountTie.exists_for_service_provider(
partnership_account_id, 'AMS Evolution')
if partnership_account_ams_evo_ties:
ams_evo_account_tie = True
ams_evolution_user_tie_exists = UserExternalApiAutoPostTie.get_service_provider_access_state(
user_to_edit.id, partnership_account_id, 'AMS Evolution'
)
if ams_evolution_user_tie_exists and ams_evolution_user_tie_exists.is_allowed:
data.update({'ams_evolution': True,})
else:
data.update({'ams_evolution': False,})
neo_verify_account_tie = False
partnership_account_neo_verify_ties = ExternalApiServiceProvidersPartnershipAccountTie.exists_for_service_provider(
partnership_account_id, 'NEO Verify')
if partnership_account_neo_verify_ties:
neo_verify_account_tie = True
neo_verify_user_tie_exists = UserExternalApiAutoPostTie.get_service_provider_access_state(
user_to_edit.id, partnership_account_id, 'NEO Verify'
)
if neo_verify_user_tie_exists and neo_verify_user_tie_exists.is_allowed:
data.update({'neo_verify': True,})
else:
data.update({'neo_verify': False,})
elif request.method == 'PUT':
# If it's a PUT request, update user details
data = request.get_json()
required_fields = ['title', 'firstname', 'lastname', 'extension', 'phonenumber',
'department', 'role', 'reports']
for field in required_fields:
if field not in data:
raise ValueError(f"Missing required field: {field}")
# Update user details
user_to_edit.title = data['title']
user_to_edit.firstname = data['firstname']
user_to_edit.lastname = data['lastname']
user_to_edit.extension = data['extension']
user_to_edit.phonenumber = data['phonenumber']
user_to_edit.department = data['department']
user_to_edit.role = data['role']
user_to_edit.two_factor_auth = data['two_factor_auth']
user_to_edit.locale = data.get('languages', 'en')
# Clear existing reports for the user
user_to_edit.reports.clear()
# Update agent details only if the department is 'admin' or 'agent'
if data['role'] in ['admin', 'agent']:
if not agent_to_edit:
# If there's no agent record, create a new one
# Set user timezone
user_tz = 'US/Eastern'
if current_user.agent and current_user.agent.timezone:
user_tz = current_user.agent.timezone
agent_to_edit = Agent(
user_id=user_to_edit.id,
firstname=data['firstname'],
email= data['email'].lower(),
lastname=data['lastname'],
extension=data['extension'],
phonenumber=data['phonenumber'],
department=data['department'],
timezone=user_tz,
partnership_account_id=partnership_account_id
)
db.session.add(agent_to_edit)
db.session.commit()
agent_id = agent_to_edit.id
# Add schedules for each day of the week
for day in range(7):
# Assuming '-1' should be None for agent_id and partnership_account_id
schedule = AgentSchedule(
day=day,
available_from='08:00 AM',
available_to='17:00 PM',
is_active=(day != 0 and day != 6),
partnership_account_id=partnership_account_id,
agent_id=agent_id
)
db.session.add(schedule)
# Update agent details
agent_to_edit.firstname = data['firstname']
agent_to_edit.lastname = data['lastname']
agent_to_edit.extension = data['extension']
agent_to_edit.phonenumber = data['phonenumber']
agent_to_edit.department = data['department']
agent_to_edit.is_deactivated = False
else:
# If the department is neither 'admin' nor 'agent', delete the agent record if it exists
if agent_to_edit:
Agent.deactivate(agent_to_edit.id, partnership_account_id)
# Add reports based on the data provided in the request
reports_data = data.get('reports', [])
for report_id in reports_data:
report_tie = ReportUserTie(
report_id=report_id,
user_id=user_to_edit.id,
partnership_accounts_id=user_to_edit.partnership_account_id
)
user_to_edit.reports.append(report_tie)
ams_2000 = data.get('ams_2000')
ams_2000_result = False
if ams_2000 and ams_2000 == 'y':
ams_2000_result = True
UserExternalApiAutoPostTie.set_service_provider_access_state(user_id, partnership_account_id, 'AMS 2000', ams_2000_result)
ams_evolution = data.get('ams_evolution')
ams_evolution_result = False
if ams_evolution and ams_evolution == 'y':
ams_evolution_result = True
UserExternalApiAutoPostTie.set_service_provider_access_state(user_id, partnership_account_id, 'AMS Evolution', ams_evolution_result)
neo_verify = data.get('neo_verify')
neo_verify_result = False
if neo_verify and neo_verify == 'y':
neo_verify_result = True
UserExternalApiAutoPostTie.set_service_provider_access_state(user_id, partnership_account_id, 'NEO Verify', neo_verify_result)
if 'email' in data:
input_email = data['email'].lower()
# If the edited email is different from the existing one or if it's a new email
if input_email != user_to_edit.email.lower():
existing_user = User.query.filter(User.email == input_email).first()
if existing_user:
raise ValueError("Email is already in use. Please choose a different email.")
# Update the email only if it's different and not in use
user_to_edit.email = input_email
db.session.commit()
status_code = 200
success = True
message = "User updated successfully!"
data = {}
else:
# If it's neither GET nor PUT, return an error
status_code = 400
success = False
message = "Invalid request method"
data = {}
except Exception as e:
print(e, "=====")
status_code = 500
success = False
message = f"Failed to edit user: {str(e)}"
data = {}
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response
@role_required('admin')
def delete_user_management(user_id):
try:
result = User.deactivate_user(user_id)
partnership_account_id = current_user.partnership_account_id
agent_to_deactivate = Agent.query.filter(
Agent.user_id == user_id, Agent.partnership_account_id == partnership_account_id
).first()
if agent_to_deactivate:
Agent.deactivate(agent_to_deactivate.id, partnership_account_id)
status_code = 201
success = True
message = f"User deactivated successfully"
data = {}
except Exception as e:
status_code = 500
success = False
message = f"Failed to delete user: {str(e)}"
data = {}
response = jsonify({
"statusCode": status_code,
"success": success,
"message": message,
"data": data
})
return response