File: //home/arjun/projects/buyercall_new/buyercall/buyercall/blueprints/sysadmin/models.py
import logging
from buyercall.extensions import db
from buyercall.lib.util_sqlalchemy import ResourceMixin
from sqlalchemy.dialects import postgresql as pg
from datetime import datetime, timedelta
log = logging.getLogger(__name__)
DEFAULT_TIMEZONE = 'US/Central' # TODO: Define settings in a central location
class RequestLog(ResourceMixin, db.Model):
# Define the name of the database table
__tablename__ = 'request_log'
# Define the columns of the table
id = db.Column(db.Integer, primary_key=True)
request_id = db.Column(db.String(255), nullable=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id',
name='request_headers_user_id_fkey',
onupdate='CASCADE',
ondelete='CASCADE'),
index=True, nullable=True)
partnership_id = db.Column(db.Integer, nullable=True)
partnership_account_id = db.Column(db.Integer, nullable=True)
partnership_name = db.Column(db.String(255), nullable=True)
partnership_account_name = db.Column(db.String(255), nullable=True)
post_data = db.Column(db.Text(), nullable=True, server_default='')
role = db.Column(db.String(255), nullable=True)
request_type = db.Column(db.String(255), nullable=True)
current_url = db.Column(db.String(255), nullable=True)
host = db.Column(db.String(255), nullable=True)
accept = db.Column(db.String(255), nullable=True)
user_agent = db.Column(db.String(255), nullable=True)
security_client_values = db.Column(pg.JSON, nullable=True, server_default='{}')
referer = db.Column(db.String(512), nullable=True)
encoding = db.Column(db.String(255), nullable=True)
language = db.Column(db.String(255), nullable=True)
cookies = db.Column(pg.JSON, nullable=True, server_default='{}')
remote_ip_address = db.Column(db.String(64), nullable=True)
remote_port = db.Column(db.String(64), nullable=True)
method = db.Column(db.String(64), nullable=True)
path_info = db.Column(db.String(255), nullable=True)
query_string = db.Column(db.Text(), nullable=True, server_default='')
server_name = db.Column(db.String(255), nullable=True)
server_port = db.Column(db.Integer, nullable=True)
is_secure = db.Column(db.Boolean, default=True)
is_xhr = db.Column(db.Boolean, default=False)
content_length = db.Column(db.Integer, nullable=True)
is_json = db.Column(db.Boolean, default=False)
# response fields
response_code = db.Column(db.Integer, nullable=True)
curl = db.Column(db.Text(), nullable=True, server_default='')
error = db.Column(db.Text(), nullable=True, server_default='')
response_text = db.Column(db.Text(), nullable=True, server_default='')
is_authenticated = db.Column(db.Boolean, default=False)
created_on = db.Column(db.DateTime, nullable=True, default=datetime.utcnow)
updated_on = db.Column(db.DateTime, nullable=True, default=datetime.utcnow, onupdate=datetime.utcnow)
# ip based values
country = db.Column(db.String(64), nullable=True)
state = db.Column(db.String(64), nullable=True)
city = db.Column(db.String(64), nullable=True)
zip_code = db.Column(db.String(64), nullable=True)
reverse = db.Column(db.String(255), nullable=True)
# resource save inbound_id or form id
resource = db.Column(db.Integer, nullable=True)
status = db.Column(db.String(64), nullable=True)
@classmethod
def create_record(cls, data):
# Create a new record with the provided data and commit it to the database
new_record = cls(**data, created_on=datetime.utcnow())
db.session.add(new_record)
db.session.commit()
return new_record
def update_record(self, request_id, data):
# Update an existing record with the provided data based on the request_id
try:
record = self.query.filter_by(request_id=request_id).first()
if record:
for key, value in data.items():
if hasattr(record, key):
setattr(record, key, value)
db.session.commit()
except Exception as e:
print(f"The exception in updating request log is: {e}")
record = None
return record
@staticmethod
def apply_filters(query, filters):
# Apply filters to the query based on the provided filters dictionary
for field, value in filters.items():
if hasattr(RequestLog, field):
column = getattr(RequestLog, field)
query = query.filter(column == value)
return query
@classmethod
def get_records(cls, filters=None):
# Get records from the database based on the provided filters
query = cls.query
if filters:
query = cls.apply_filters(query, filters)
records = query.all()
return records
@classmethod
def delete_old_records(cls):
"""
Delete activity logs 6 months and older.
:return: int
"""
from buyercall.app import create_app
# Create a context for the database connection.
app = create_app()
db.app = app
with app.app_context():
months_ago = datetime.utcnow() - timedelta(days=120) # Assuming 30 days per month
# Construct the delete query
query = RequestLog.query.filter(RequestLog.created_on <= months_ago)
# Execute the delete query
deleted_count = query.delete()
# Commit the changes to the database
db.session.commit()
return deleted_count
def hash(self):
import hashlib
data_string = f"{self.user_id}, {self.method}, {self.user_agent}, {self.remote_ip_address}, {self.response_code}, {self.current_url}"
return hashlib.md5(data_string.encode('utf-8')).hexdigest()