File: //home/arjun/projects/buyercall_new/buyercall/buyercall/blueprints/form_leads/models.py
import logging as log
from datetime import date, datetime
import traceback
import pytz
from buyercall.lib.util_sqlalchemy import ResourceMixin
from buyercall.extensions import db
from sqlalchemy.dialects import postgresql as pg
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.sql import select, func, and_
from buyercall.lib.util_datetime import tzware_datetime
from buyercall.lib.util_sqlalchemy import AwareDateTime
from sqlalchemy import or_, PrimaryKeyConstraint
class ExternalApiFormLeadPostTie(ResourceMixin, db.Model):
__tablename__ = 'external_api_form_lead_post_tie'
created_on = db.Column(db.DateTime(), nullable=False)
updated_on = db.Column(db.DateTime(), nullable=False)
id = db.Column(db.Integer, primary_key=True)
form_id = db.Column(db.Integer, db.ForeignKey('external_forms.id', name='form_id_form_lead_post_tie_fkey'))
form_lead_id = db.Column(db.Integer, db.ForeignKey('form_leads.id', name='form_lead_id_form_lead_post_tie_fkey'))
external_api_service_provider_id = db.Column(db.Integer,
db.ForeignKey('external_api_service_providers.id',
name='service_provider_id_form_lead_post_tie_fkey'))
external_api_lead_id = db.Column(db.String(255), nullable=False)
@classmethod
def set_external_api_form_lead_post(cls, form_id, form_lead_id, external_api_service_provider_id, external_api_lead_id):
"""
Sets a form/external service post.
"""
new_post = ExternalApiFormLeadPostTie()
new_post.form_id = form_id
new_post.form_lead_id = form_lead_id
new_post.external_api_service_provider_id = external_api_service_provider_id
new_post.external_api_lead_id = external_api_lead_id
new_post.created_on = datetime.now(pytz.utc)
new_post.updated_on = datetime.now(pytz.utc)
db.session.add(new_post)
db.session.commit()
@classmethod
def existing_posts_for_lead(cls, form_lead_id, external_api_service_provider_id):
"""
Returns whether or not leads posts exist.
"""
result = False
lead_post_exists = ExternalApiFormLeadPostTie.query\
.filter(ExternalApiFormLeadPostTie.form_lead_id == form_lead_id,
ExternalApiFormLeadPostTie.external_api_service_provider_id == external_api_service_provider_id)\
.first()
if lead_post_exists:
result = True
return result
@classmethod
def existing_posts_for_leads(cls, form_id, form_lead_ids, partnership_account_id):
"""
Returns a dictionary/map of external service provider posts associated to the lead.
"""
result = {}
from ..partnership.models import ExternalApiServiceProvidersPartnershipAccountTie, ExternalApiServiceProviders
service_providers = ExternalApiServiceProviders\
.query\
.join(ExternalApiServiceProvidersPartnershipAccountTie)\
.filter(ExternalApiServiceProvidersPartnershipAccountTie.partnership_account_id == partnership_account_id)\
.filter(ExternalApiServiceProvidersPartnershipAccountTie.active)\
.with_entities(ExternalApiServiceProviders.id)\
.all()
if service_providers:
posts = ExternalApiFormLeadPostTie.query\
.filter(ExternalApiFormLeadPostTie.form_id == form_id,
ExternalApiFormLeadPostTie.form_lead_id.in_(form_lead_ids))\
.with_entities(ExternalApiFormLeadPostTie.form_lead_id,
ExternalApiFormLeadPostTie.external_api_service_provider_id)\
.all()
for post in posts:
for provider in service_providers:
if post.external_api_service_provider_id == provider.id:
if post.form_lead_id not in result:
result[post.form_lead_id] = []
result[post.form_lead_id].append(provider.id)
else:
if provider.id not in result[post.form_lead_id]:
result[post.form_lead_id].append(provider.id)
return result
class ExternalApiFormAutoPostTie(ResourceMixin, db.Model):
__tablename__ = 'external_api_form_auto_post_tie'
created_on = db.Column(db.DateTime(), nullable=False)
updated_on = db.Column(db.DateTime(), nullable=False)
form_id = db.Column(db.Integer, db.ForeignKey('external_forms.id', name='form_id_form_auto_post_tie_fkey'), primary_key=True, index=True)
external_api_service_provider_id = db.Column(db.Integer, db.ForeignKey('external_api_service_providers.id', name='service_provider_id_form_auto_post_tie_fkey'), primary_key=True)
is_automatically_sent = db.Column('is_automatically_sent', db.Boolean(), nullable=False, server_default='0')
@classmethod
def get_existing_ties(cls, form_id):
"""
Returns a list of existing form/external service provider ties.
:return: list of form/external api service provider tie objects
"""
ties = ExternalApiFormAutoPostTie.query.filter(ExternalApiFormAutoPostTie.form_id == form_id).all()
return ties
@classmethod
def get_service_provider_send_state(cls, form_id, partnership_account_id, service_provider_name):
"""
Returns an existing form/external service provider tie.
:return: a form/external api service provider tie object
"""
service_provider_tie = None
from ..partnership.models import ExternalApiServiceProvidersPartnershipAccountTie, ExternalApiServiceProviders
service_provider = db.session\
.query(ExternalApiServiceProviders)\
.join(ExternalApiServiceProvidersPartnershipAccountTie)\
.filter(ExternalApiServiceProviders.name == service_provider_name)\
.filter(ExternalApiServiceProvidersPartnershipAccountTie.partnership_account_id == partnership_account_id)\
.first()
if service_provider:
service_provider_tie = ExternalApiFormAutoPostTie\
.query\
.filter(ExternalApiFormAutoPostTie.form_id == form_id,
ExternalApiFormAutoPostTie.external_api_service_provider_id == service_provider.id)\
.first()
return service_provider_tie
@classmethod
def set_service_provider_access_state(cls, form_id, partnership_account_id, service_provider_name, value):
"""
Sets a form/external service provider tie.
"""
from ..partnership.models import ExternalApiServiceProvidersPartnershipAccountTie, ExternalApiServiceProviders
service_provider = db.session\
.query(ExternalApiServiceProviders)\
.join(ExternalApiServiceProvidersPartnershipAccountTie)\
.filter(ExternalApiServiceProviders.name == service_provider_name)\
.filter(ExternalApiServiceProvidersPartnershipAccountTie.partnership_account_id == partnership_account_id)\
.first()
if service_provider:
service_provider_tie = ExternalApiFormAutoPostTie\
.query\
.filter(ExternalApiFormAutoPostTie.form_id == form_id,
ExternalApiFormAutoPostTie.external_api_service_provider_id == service_provider.id)\
.first()
if service_provider_tie:
service_provider_tie.is_automatically_sent = value
service_provider_tie.updated_on = datetime.now(pytz.utc)
else:
new_tie = ExternalApiFormAutoPostTie()
new_tie.form_id = form_id
new_tie.external_api_service_provider_id = service_provider.id
new_tie.is_automatically_sent = value
new_tie.created_on = datetime.now(pytz.utc)
new_tie.updated_on = datetime.now(pytz.utc)
db.session.add(new_tie)
db.session.commit()
class ExternalFormFieldDefinition(db.Model):
__tablename__ = 'external_form_field_definitions'
field_id = db.Column(db.String(50), primary_key=True)
old_field_id = db.Column(db.String(10), nullable=False, server_default='0')
type = db.Column(db.String(10))
min_length = db.Column(db.Integer, nullable=True)
max_length = db.Column(db.Integer, nullable=True)
required = db.Column(db.Boolean, nullable=False, server_default='0')
display_name = db.Column(db.String(255), nullable=False)
category = db.Column(db.String(50), nullable=False, server_default='')
show_in_table = db.Column(db.Boolean, nullable=False, server_default='0')
position = db.Column(db.Integer, nullable=True)
@classmethod
def api_get_field_definitions(cls):
"""
Return a dictionary containing all the form field definitions.
Key - field id
Value - external form field definition object
:return: dictionary object
"""
result = {}
form_fields = ExternalFormFieldDefinition.query.filter().all()
for form_field in form_fields:
result[form_field.field_id] = form_field
return result
@classmethod
def api_get_required_fields(cls):
"""
Return a list of the required fields for a form.
:return: list of required fields
"""
form_fields = db.session.query(ExternalFormFieldDefinition.field_id)\
.filter(ExternalFormFieldDefinition.required == True)\
.all()
field_ids = [x.field_id for x in form_fields]
return field_ids
@classmethod
def check_field_id_match(cls, field_id, definition_field_id, definition_old_field_id):
"""
Return whether or not a field id provided matches a field definition
:return: boolean
"""
if definition_field_id == field_id:
return True
else:
if definition_old_field_id is not None:
if ',' not in str(definition_old_field_id):
if definition_old_field_id == field_id:
return True
else:
return False
else:
split_old_fields = str(definition_old_field_id).split(',')
for field in split_old_fields:
if field == field_id:
return True
return False
else:
return False
@classmethod
def get_all_related_fields(cls, field_id, field_definitions):
"""
Return a list of related fields (there can be several fields for the sa,e type."
:return: string array
"""
result = []
if field_id:
for field in field_definitions:
if field and field.field_id and field.field_id == field_id:
result.append(field.field_id)
if field.old_field_id:
if ',' in str(field.old_field_id):
split_old_fields = str(field.old_field_id).split(',')
for old_field in split_old_fields:
if old_field not in result:
result.append(old_field)
else:
result.append(field.old_field_id)
break
else:
if field.old_field_id:
if ',' in str(field.old_field_id):
split_old_fields = str(field.old_field_id).split(',')
if field_id in split_old_fields:
result.append(field.field_id)
result.extend(split_old_fields)
break
elif field.old_field_id == field_id:
result.append(field.old_field_id)
result.append(field.field_id)
break
return result
class ExternalForm(ResourceMixin, db.Model):
__tablename__ = 'external_forms'
leads = db.relationship('FormLead', backref='form')
fields = db.relationship('ExternalFormField', backref='form')
routing = db.relationship('Widget')
id = db.Column(db.Integer, primary_key=True)
public_id = db.Column(db.String(50), nullable=False, index=True)
display_name = db.Column(db.String(50), nullable=False, server_default='')
partnership_account_id = db.Column(
db.Integer, db.ForeignKey('partnership_accounts.id'), index=True)
routing_id = db.Column(
db.Integer,
db.ForeignKey('widgets.id', onupdate='CASCADE', ondelete='SET NULL'))
is_external_api_auto = db.Column(db.Boolean, nullable=False, server_default='0')
email_addresses = db.Column(pg.JSON, nullable=False, server_default='{}')
adf_email_addresses = db.Column(pg.JSON, nullable=False, server_default='{"emails": ""}')
email_subject = db.Column(db.String(64), nullable=False, server_default='')
partial_data_email = db.Column(db.Boolean, nullable=False, server_default='1')
full_submit_email = db.Column(db.Boolean, nullable=False, server_default='0')
send_auto_email = db.Column(db.Boolean, nullable=False, server_default='0')
send_auto_email_subject = db.Column(db.String(256), nullable=False, server_default='')
send_auto_email_msg = db.Column(db.String(2048), nullable=False, server_default='')
send_auto_sms = db.Column(db.Boolean, nullable=False, server_default='0')
send_auto_sms_inbound_id = db.Column(db.Integer, db.ForeignKey('phonenumbers.id',
name='auto_sms_message_inbound_fkey',
onupdate='CASCADE', ondelete='SET NULL'),
index=True, nullable=True)
send_auto_sms_msg = db.Column(db.String(512), nullable=False, server_default='')
is_conditional_notification = db.Column(db.Boolean(), nullable=False, default=False)
conditional_criteria = db.Column(pg.JSON, nullable=False, server_default='{}')
deactivated_on = db.Column(db.DateTime(), nullable=True)
is_deactivated = db.Column(db.Boolean(), nullable=False, server_default='0', default=False)
auto_prequalify_credit = db.Column(db.Boolean, nullable=False, server_default='0')
@property
def created_datetime(self):
""" Return the date/time this form was created
"""
return self.created_on.strftime('%Y-%m-%d %H:%M:%S')
@property
def updated_datetime(self):
""" Return the date/time this form was updated
"""
return self.updated_on.strftime('%Y-%m-%d %H:%M:%S')
@property
def deactivated_on_datetime(self):
""" Return the date/time this phone was deactivated on.
"""
if self.deactivated_on is not None and self.deactivated_on is not '':
return self.deactivated_on.strftime('%Y-%m-%d %H:%M:%S')
else:
return ''
@classmethod
def api_create(cls, partnership_account_id, display_name, widget_id,
is_external_api_auto, email_addresses, adf_email_addresses, email_subject, partial_data_email,
full_submit_email, send_auto_email, send_auto_email_subject, send_auto_email_msg, send_auto_sms,
send_auto_sms_inbound_id, send_auto_sms_msg, auto_prequalify_credit):
"""
Return the id of the new form that was created. If positive, then successful.
:return: integer
"""
result = -1
error_message = ''
try:
adf_email_dict = {}
email_dict = {}
if adf_email_addresses is not None:
adf_email_dict['emails'] = adf_email_addresses.replace(" ", "")
else:
adf_email_dict['emails'] = ''
if email_addresses is not None:
email_dict['emails'] = email_addresses.replace(" ", "")
else:
email_dict['emails'] = ''
form_params = {
'partnership_account_id': partnership_account_id,
'public_id': '',
'display_name': display_name,
'routing_id': widget_id,
'is_external_api_auto': is_external_api_auto,
'email_addresses': email_dict,
'adf_email_addresses': adf_email_dict,
'email_subject': email_subject,
'partial_data_email': partial_data_email,
'full_submit_email': full_submit_email,
'send_auto_email': send_auto_email,
'send_auto_email_subject': send_auto_email_subject,
'send_auto_email_msg': send_auto_email_msg,
'send_auto_sms': send_auto_sms,
'send_auto_sms_inbound_id': send_auto_sms_inbound_id,
'send_auto_sms_msg': send_auto_sms_msg,
'auto_prequalify_credit': auto_prequalify_credit
}
new_form = ExternalForm(**form_params)
db.session.add(new_form)
db.session.commit()
field_definitions = ExternalFormFieldDefinition.api_get_field_definitions()
if field_definitions is not None:
fields_to_insert = []
for field_key in field_definitions:
field_value = field_definitions[field_key]
display_name = ''
if field_value.display_name is None:
display_name = field_value.display_name
fields_to_insert.append(ExternalFormField(
form_id=new_form.id,
position=field_value.position,
field_id=field_value.field_id,
show_in_table=field_value.show_in_table,
display_name=display_name)
)
if len(fields_to_insert) > 0:
db.session.bulk_save_objects(fields_to_insert)
db.session.commit()
result = new_form.id
else:
result = new_form.id
except:
error_message = 'Unable to create form.'
log.error('Error creating form. ' + traceback.format_exc())
return result, error_message
@classmethod
def api_update(cls, partnership_account_id, form_id, display_name, widget_id,
is_external_api_auto, email_addresses, adf_email_addresses, email_subject, partial_data_email,
full_submit_email, send_auto_email, send_auto_email_subject, send_auto_email_msg, send_auto_sms,
send_auto_sms_inbound_id, send_auto_sms_msg, auto_prequalify_credit):
"""
Return whether or not the form was updated successfully.
:return: bool
"""
result = False
error_message = ''
try:
form = ExternalForm.query\
.filter(and_(ExternalForm.id == form_id,
ExternalForm.partnership_account_id == partnership_account_id,
ExternalForm.is_deactivated == False))\
.first()
if form is not None:
adf_email_dict = {}
email_dict = {}
if adf_email_addresses is not None:
adf_email_dict['emails'] = adf_email_addresses.replace(" ", "")
form.adf_email_addresses = adf_email_dict
if email_addresses is not None:
email_dict['emails'] = email_addresses.replace(" ", "")
form.email_addresses = email_dict
if display_name is not None:
form.display_name = display_name
if widget_id is not None:
form.routing_id = widget_id
if is_external_api_auto is not None:
form.is_external_api_auto = is_external_api_auto
if email_subject is not None:
form.email_subject = email_subject
if partial_data_email is not None:
form.partial_data_email = partial_data_email
if full_submit_email is not None:
form.full_submit_email = full_submit_email
if auto_prequalify_credit is not None:
form.auto_prequalify_credit = auto_prequalify_credit
if send_auto_email is not None:
form.send_auto_email = send_auto_email
if send_auto_email_subject is not None:
form.send_auto_email_subject = send_auto_email_subject
if send_auto_email_msg is not None:
form.send_auto_email_msg = send_auto_email_msg
if send_auto_sms is not None:
form.send_auto_sms = send_auto_sms
if send_auto_sms_inbound_id is not None:
form.send_auto_sms_inbound_id = send_auto_sms_inbound_id
if send_auto_sms_msg is not None:
form.send_auto_sms_msg = send_auto_sms_msg
db.session.commit()
result = True
else:
error_message = 'Form not found.'
except Exception:
result = False
error_message = 'Unable to update form.'
log.error(f'Error updating form. {traceback.format_exc()}')
return result, error_message
@classmethod
def api_deactivate(cls, id, partnership_account_id):
"""
Return whether or not the form was deactivated successfully.
:return: bool
"""
result = False
form = ExternalForm.query\
.filter(and_(ExternalForm.id == id, ExternalForm.partnership_account_id == partnership_account_id))\
.first()
if form is not None:
form.is_deactivated = True
form.deactivated_on = datetime.now()
db.session.commit()
result = True
return result
class ExternalFormField(db.Model):
__tablename__ = 'external_form_fields'
form_id = db.Column(
db.Integer, db.ForeignKey('external_forms.id'), primary_key=True
)
position = db.Column(db.Integer, nullable=False)
field_id = db.Column(db.String(50), primary_key=True)
display_name = db.Column(db.String, nullable=False)
show_in_table = db.Column(db.Boolean, nullable=False, server_default='0')
""" Whether the field should be show in the all-leads datatable.
'True' for fields like first name, last name, phone number, address."""
@classmethod
def get_form_fields(cls, form_id):
"""
Return a list form fields for a form
:return: list
"""
fields = ExternalFormField.query\
.filter(ExternalFormField.form_id == form_id)\
.all()
field_ids = [x.field_id for x in fields]
return field_ids
@classmethod
def get_form_field_max_position(cls, form_id):
"""
Return the largest position for a form field
:return: integer
"""
result = db.session.query(func.max(ExternalFormField.position)) \
.filter(ExternalFormField.form_id == form_id) \
.scalar() or 0
return result
class FormLog(db.Model):
__tablename__ = 'form_logs'
id = db.Column(db.Integer, primary_key=True)
form_id = db.Column(db.Integer, db.ForeignKey('external_forms.id'), index=True)
form_lead_id = db.Column(db.Integer, db.ForeignKey('form_leads.id'), index=True)
external_api_service_provider_id = db.Column(db.Integer, db.ForeignKey('external_api_service_providers.id'), index=True)
status = db.Column(db.String(32), nullable=False, server_default='failed')
description = db.Column(db.String(1024), nullable=False, server_default='')
posted_on = db.Column(AwareDateTime(), nullable=False, default=tzware_datetime)
@classmethod
def create(cls, partnership_account_id, form_lead_id, status, description, provider_id, provider):
"""
Create a form log.
"""
formatted_status = ''
form_lead = FormLead.query.filter(
and_(FormLead.id == form_lead_id, FormLead.partnership_account_id == partnership_account_id)
).first()
if status in (200, 201, 202):
formatted_status = 'success'
else:
formatted_status = 'failed'
if form_lead is not None:
new_form_log = FormLog(
form_id=form_lead.form_id,
form_lead_id=form_lead_id,
status=formatted_status,
description=description,
external_api_service_provider_id=provider_id
)
db.session.add(new_form_log)
db.session.commit()
@classmethod
def logs_exist_for_lead(cls, form_lead_id, external_api_service_provider_id):
"""
Check whether or not the lead was posted to a specified provider before.
"""
result = FormLog\
.query\
.filter(and_(FormLog.form_lead_id == form_lead_id,
FormLog.external_api_service_provider_id == external_api_service_provider_id))\
.first()
if result:
return True
else:
return False
class FormLeadField(db.Model):
__tablename__ = 'form_lead_fields'
lead_id = db.Column(
db.Integer, db.ForeignKey('form_leads.id'), primary_key=True)
field_id = db.Column(db.String(50), primary_key=True)
field_value = db.Column(db.String)
def __init__(self, field_id, field_value):
self.field_id = field_id
self.field_value = field_value
class FormLead(ResourceMixin, db.Model):
__tablename__ = 'form_leads'
fields = db.relationship('FormLeadField', backref='lead')
id = db.Column(db.Integer, primary_key=True)
form_id = db.Column(
db.Integer, db.ForeignKey('external_forms.id'), nullable=False)
partnership_account_id = db.Column(
db.Integer, db.ForeignKey('partnership_accounts.id'), index=True)
external_api_lead_id = db.Column(db.String(50), nullable=True)
email_sent = db.Column(db.Boolean, nullable=False, server_default='0')
# The contact id associated with the form_lead
contact_id = db.Column(
db.Integer,
db.ForeignKey(
'contacts.id',
name='form_lead_contact_fkey',
onupdate='CASCADE',
ondelete='CASCADE'
),
index=True,
nullable=False
)
@property
def tos(self):
tos_result = FormLeadField\
.query\
.join(FormLead)\
.filter(or_(FormLeadField.field_id.like("%privacypolicyfield%"),
FormLeadField.field_id.like("%privacypolicy%")),
FormLeadField.lead_id == self.id
).first()
if tos_result is not None:
result = tos_result.field_value
return result
else:
return ''
@property
def interaction_time(self):
""" Return the date/time this lead occurred
"""
return self.created_on.strftime('%Y-%m-%d %H:%M:%S')
@property
def updated_datetime(self):
""" Return the date/time this lead was updated
"""
return self.updated_on.strftime('%Y-%m-%d %H:%M:%S')
@hybrid_property
def source(self):
return (
self.form and self.form.display_name
) or ''
@source.expression
def source(cls):
return select(
[ExternalForm.display_name]
).where(
ExternalForm.id == cls.form_id
).limit(1).label("form")
@hybrid_property
def source_id(self):
return (
self.form and self.form_id
) or ''
@classmethod
def api_create(cls, lead):
"""
Return whether or not the lead was created successfully.
:return: lead
"""
result = False
error_message = ''
try:
db.session.add(lead)
db.session.commit()
result = True
except:
result = False
error_message = 'Unable to create lead.'
log.error('Error creating lead. ' + traceback.format_exc())
return result, lead, error_message
@classmethod
def api_update(cls, form_id, lead_id, posted_form_lead_fields, form_lead):
"""
Return whether or not the form lead was updated successfully.
:return: boolean
"""
result = False
error_message = ''
for posted_field in posted_form_lead_fields:
field_id = posted_field
field_value = posted_form_lead_fields[field_id]
for field in form_lead.fields:
if field.field_id == field_id:
if field_value:
field.field_value = field_value
if field.field_id == 'cumemberfield' or field.field_id == 'ssnfield':
pass
log.info('The updated encoded form lead ssn number is {}'.format(field.field_value))
if field.field_id == 'cosignerssnfield':
pass
log.info('The updated encoded form lead cosigner ssn number is {}'.format(field.field_value))
if field.field_id == 'datepicker' or field.field_id == 'dobfield':
pass
log.info('The updated encoded form lead updated dob is {}'.format(field.field_value))
if field.field_id == 'cosignerdateofbirthfield':
pass
log.info('The updated encoded form lead cosigner dob is {}'.format(field.field_value))
if field.field_id == 'repfield':
# Get the contact associated with the form lead
from ..contacts.models import Contact
contact = Contact.query.filter(Contact.id == form_lead.contact_id).first()
contact.agent_assigned = field.field_value
db.session.commit()
try:
db.session.commit()
result = True
except Exception as e:
db.session.rollback()
result = False
# error_message = 'Unable to update lead.'
log.error(f'Error creating lead. {traceback.format_exc()}')
return result, error_message
class FormLeadReporting(ResourceMixin, db.Model):
__tablename__ = 'form_leads_reporting'
id = db.Column(db.Integer, primary_key=True)
form_id = db.Column(db.Integer, db.ForeignKey('external_forms.id', name='form_lead_reporting_form_id_fkey'),
nullable=False, index=True)
partnership_account_id = db.Column(
db.Integer, db.ForeignKey('partnership_accounts.id', name='form_lead_reporting_partnership_account_fkey'),
index=True, nullable=False)
external_api_lead_id = db.Column(db.String(50), nullable=True)
email_sent = db.Column(db.Boolean, nullable=True)
contact_id = db.Column(db.Integer, db.ForeignKey(
'contacts.id', name='form_lead_reporting_contact_id_fkey'), index=True,
nullable=False)