HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall_new/buyercall/buyercall/blueprints/sysadmin/models.py
import logging
from buyercall.extensions import db
from buyercall.lib.util_sqlalchemy import ResourceMixin
from sqlalchemy.dialects import postgresql as pg
from datetime import datetime, timedelta

log = logging.getLogger(__name__)

DEFAULT_TIMEZONE = 'US/Central'  # TODO: Define settings in a central location


class RequestLog(ResourceMixin, db.Model):
    # Define the name of the database table
    __tablename__ = 'request_log'

    # Define the columns of the table
    id = db.Column(db.Integer, primary_key=True)
    request_id = db.Column(db.String(255), nullable=True)
    user_id = db.Column(db.Integer, db.ForeignKey('users.id',
                                                  name='request_headers_user_id_fkey',
                                                  onupdate='CASCADE',
                                                  ondelete='CASCADE'),
                        index=True, nullable=True)
    partnership_id = db.Column(db.Integer, nullable=True)
    partnership_account_id = db.Column(db.Integer, nullable=True)
    partnership_name = db.Column(db.String(255), nullable=True)
    partnership_account_name = db.Column(db.String(255), nullable=True)
    post_data = db.Column(db.Text(), nullable=True, server_default='')
    role = db.Column(db.String(255), nullable=True)
    request_type = db.Column(db.String(255), nullable=True)
    current_url = db.Column(db.String(255), nullable=True)
    host = db.Column(db.String(255), nullable=True)
    accept = db.Column(db.String(255), nullable=True)
    user_agent = db.Column(db.String(255), nullable=True)
    security_client_values = db.Column(pg.JSON, nullable=True, server_default='{}')
    referer = db.Column(db.String(512), nullable=True)
    encoding = db.Column(db.String(255), nullable=True)
    language = db.Column(db.String(255), nullable=True)
    cookies = db.Column(pg.JSON, nullable=True, server_default='{}')
    remote_ip_address = db.Column(db.String(64), nullable=True)
    remote_port = db.Column(db.String(64), nullable=True)
    method = db.Column(db.String(64), nullable=True)
    path_info = db.Column(db.String(255), nullable=True)
    query_string = db.Column(db.Text(), nullable=True, server_default='')
    server_name = db.Column(db.String(255), nullable=True)
    server_port = db.Column(db.Integer, nullable=True)
    is_secure = db.Column(db.Boolean, default=True)
    is_xhr = db.Column(db.Boolean, default=False)
    content_length = db.Column(db.Integer, nullable=True)
    is_json = db.Column(db.Boolean, default=False)
    # response fields
    response_code = db.Column(db.Integer, nullable=True)
    curl = db.Column(db.Text(), nullable=True, server_default='')
    error = db.Column(db.Text(), nullable=True, server_default='')
    response_text = db.Column(db.Text(), nullable=True, server_default='')
    is_authenticated = db.Column(db.Boolean, default=False)
    created_on = db.Column(db.DateTime, nullable=True, default=datetime.utcnow)
    updated_on = db.Column(db.DateTime, nullable=True, default=datetime.utcnow, onupdate=datetime.utcnow)
    # ip based values
    country = db.Column(db.String(64), nullable=True)
    state = db.Column(db.String(64), nullable=True)
    city = db.Column(db.String(64), nullable=True)
    zip_code = db.Column(db.String(64), nullable=True)
    reverse = db.Column(db.String(255), nullable=True)
    # resource save inbound_id or form id
    resource = db.Column(db.Integer, nullable=True)
    status = db.Column(db.String(64), nullable=True)

    @classmethod
    def create_record(cls, data):
        # Create a new record with the provided data and commit it to the database
        new_record = cls(**data, created_on=datetime.utcnow())
        db.session.add(new_record)
        db.session.commit()
        return new_record

    def update_record(self, request_id, data):
        # Update an existing record with the provided data based on the request_id
        try:
            record = self.query.filter_by(request_id=request_id).first()
            if record:
                for key, value in data.items():
                    if hasattr(record, key):
                        setattr(record, key, value)
                db.session.commit()
        except Exception as e:
            print(f"The exception in updating request log is: {e}")
            record = None
        return record

    @staticmethod
    def apply_filters(query, filters):
        # Apply filters to the query based on the provided filters dictionary
        for field, value in filters.items():
            if hasattr(RequestLog, field):
                column = getattr(RequestLog, field)
                query = query.filter(column == value)
        return query

    @classmethod
    def get_records(cls, filters=None):
        # Get records from the database based on the provided filters
        query = cls.query
        if filters:
            query = cls.apply_filters(query, filters)
        records = query.all()
        return records

    @classmethod
    def delete_old_records(cls):
        """
        Delete activity logs 6 months and older.

        :return: int
        """
        from buyercall.app import create_app
        # Create a context for the database connection.
        app = create_app()
        db.app = app
        with app.app_context():
            months_ago = datetime.utcnow() - timedelta(days=120)  # Assuming 30 days per month
            # Construct the delete query
            query = RequestLog.query.filter(RequestLog.created_on <= months_ago)
            # Execute the delete query
            deleted_count = query.delete()
            # Commit the changes to the database
            db.session.commit()

            return deleted_count

    def hash(self):
        import hashlib
        data_string = f"{self.user_id}, {self.method}, {self.user_agent}, {self.remote_ip_address}, {self.response_code}, {self.current_url}"
        return hashlib.md5(data_string.encode('utf-8')).hexdigest()