HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall/buyercall/blueprints/sysadmin/utilities/request_log_task_call.py
from buyercall.blueprints.partnership.models import Partnership, PartnershipAccount, ApiToken
from buyercall.blueprints.sysadmin.utilities.generate_curl import GenerateCurl
from flask_login import current_user
from flask import get_flashed_messages

from buyercall.blueprints.sysadmin.utilities.ip_api import IpApi


class LogRequestTaskTrigger:
    @staticmethod
    def log_request_task_trigger(request, request_type):
        from buyercall.blueprints.sysadmin.tasks import create_request_log
        from buyercall.lib.util_crypto import SHA
        from buyercall.blueprints.user.models import User

        # fetch ip-address
        ip_address = request.environ.get('HTTP_X_FORWARDED_FOR')\
            if request.environ.get('HTTP_X_FORWARDED_FOR') else request.environ.get('REMOTE_ADDR')

        # fetch details based on ip
        details = IpApi.get_request_complete_details(ip_address)
        user = None

        if request.path.startswith('/api/v2') or request.path.startswith('/api/v1'):
            auth_header = request.headers['Authorize'] if 'Authorize' in request.headers else None
            token = auth_header.split(" ")[1] if auth_header else None
            token_shah = SHA.encrypt(token) if token else None
            api_token = ApiToken.query.filter_by(api_token_hash=token_shah).first() if token_shah else None

            partnership = Partnership.query.filter_by(api_token_id=api_token.id).first() if api_token else None
            partnership_account_id = request.view_args.get('paid') if request.view_args else None
            partnership_account = PartnershipAccount.query.get(partnership_account_id) if partnership_account_id else None
            
        else:
            partnership_id = current_user.partnership_id if not current_user.is_anonymous else None
            partnership_account_id = current_user.partnership_account_id if not current_user.is_anonymous else None
            partnership = Partnership.query.get(partnership_id) if partnership_id else None
            partnership_account = PartnershipAccount.query.get(partnership_account_id) if partnership_account_id else None

            if request.path == "/login" or request.path == "/account/begin_password_reset":
                user = User.find_by_identity(request.form.get('identity'))
                partnership_id = user.partnership_id if user else None
                partnership_account_id = user.partnership_account_id if user else None
                partnership = Partnership.query.get(partnership_id) if partnership_id else None
                partnership_account = PartnershipAccount.query.get(partnership_account_id) if partnership_account_id else None

            if request.path == "/account/password_reset":
                user = User.deserialize_token(request.form.get('reset_token'))
                partnership_id = user.partnership_id if user else None
                partnership_account_id = user.partnership_account_id if user else None
                partnership = Partnership.query.get(partnership_id) if partnership_id else None
                partnership_account = PartnershipAccount.query.get(partnership_account_id) if partnership_account_id else None
                
            if request.path.startswith("/form_leads"):
                email_address = ''
                if request.path == "/form_leads":
                    if 'emailfield' in request.form:
                        email_address = request.form.get('emailfield', '')
                    elif 'email' in request.form:
                        email_address = request.form.get('email', '')
                    u = User.find_by_identity(email_address)
                    partnership_id = u.partnership_id if u else None
                    partnership_account_id = u.partnership_account_id if u else None
                    partnership = Partnership.query.get(partnership_id) if partnership_id else None
                    partnership_account = PartnershipAccount.query.get(partnership_account_id) if partnership_account_id else None
                else:
                    user = current_user if not current_user.is_anonymous else None
                    partnership_id = user.partnership_id if user else None
                    partnership_account_id = user.partnership_account_id if user else None
                    partnership = Partnership.query.get(partnership_id) if partnership_id else None
                    partnership_account = PartnershipAccount.query.get(partnership_account_id) if partnership_account_id else None
        post_data = request.data.decode('utf-8')

        # request log
        request_data = {
            'request_id': request.environ.get('HTTP_X_REQUEST_ID'),
            'user_id': (current_user.id if not current_user.is_anonymous else None) if not user else (user.id if not user.is_anonymous else None),
            "request_type": request_type,
            'post_data': None if "login" in request.path else post_data,
            'partnership_id': partnership.id if partnership else None,
            'partnership_account_id': partnership_account.id if partnership_account else None,

            'partnership_name': partnership.name if partnership else None,
            'partnership_account_name': partnership_account.name if partnership_account else None,

            'role': current_user.role if not current_user.is_anonymous else None,
            'current_url': request.url,
            'host': request.headers.get('Host'),
            'accept': request.headers.get('Accept'),
            'user_agent': request.headers.get('User-Agent'),
            # security_client_values
            "security_client_values":
                {
                    'sec_ch_ua_platform': request.headers.get('Sec-Ch-Ua-Platform'),
                    'sec_ch_ua_mobile': request.headers.get('Sec-Ch-Ua-Mobile'),
                    'sec_fetch_site': request.headers.get('Sec-Fetch-Site'),
                    'sec_fetch_mode': request.headers.get('Sec-Fetch-Mode'),
                    'sec_fetch_dest': request.headers.get('Sec-Fetch-Dest')
                },
            'referer': request.headers.get('Referer'),
            'encoding': request.headers.get('Accept-Encoding'),
            'language': request.headers.get('Accept-Language'),
            'cookies': request.headers.get('Cookie'),
            'remote_ip_address': ip_address,
            'remote_port': request.environ.get('REMOTE_PORT'),
            'method': request.method,
            'path_info': request.environ.get('PATH_INFO'),
            'query_string': request.environ.get('QUERY_STRING'),
            'server_name': request.environ.get('SERVER_NAME'),
            'server_port': request.environ.get('SERVER_PORT'),
            'is_secure': request.is_secure,
            # 'is_xhr': request.is_xhr,
            'is_xhr': request.headers.get('X-Requested-With') == 'XMLHttpRequest',
            'curl': GenerateCurl().curl_generate(request),
            'content_length': request.content_length,
            'is_json': request.is_json,
            # ip based fetched values
            'country': details.get('country', None),
            'state': details.get('regionName', None),
            'city': details.get('city', None),
            'zip_code': details.get('zip', None),
            'reverse': details.get('reverse', None),
        }
        # request_type
        create_request_log(request_data)

    @staticmethod
    def update_log_request_task_trigger(request, response):
        from buyercall.blueprints.sysadmin.tasks import update_request_log
        from buyercall.blueprints.user.models import User
        import json
        from buyercall.lib.util_crypto import SHA

        error = None
        user = None
        # update request log
        if request.path.startswith("/form_leads"):
            partnership_id = current_user.partnership_id if not current_user.is_anonymous else None
            partnership_account_id = current_user.partnership_account_id if not current_user.is_anonymous else None
            partnership = Partnership.query.get(partnership_id) if partnership_id else None
            partnership_account = PartnershipAccount.query.get(partnership_account_id) if partnership_account_id else None

        if request.path.startswith('/api/v2') or request.path.startswith('/api/v1'):
            
            auth_header = request.headers['Authorize'] if 'Authorize' in request.headers else None
            token = auth_header.split(" ")[1] if auth_header else None
            token_shah = SHA.encrypt(token) if token else None
            api_token = ApiToken.query.filter_by(api_token_hash=token_shah).first() if token_shah else None

            partnership = Partnership.query.filter_by(api_token_id=api_token.id).first() if api_token else None
            partnership_account_id = request.view_args.get('paid') if request.view_args else None
            partnership_account = PartnershipAccount.query.get(partnership_account_id) if partnership_account_id else None
            
            if request.path.startswith('/api/v2/accounts/'):
                # fetch partnership account id
                try:
                    json_obj = response.get_json()
                    partnership_account_id = json_obj.get('partnership_account_id', 1)
                    partnership_account = PartnershipAccount.query.get(partnership_account_id) \
                        if partnership_account_id else None
                except Exception as e:
                    print(f"The Exception in {request.path} for fetching p_acc_id is: {e}")

        else:
            if request.path.startswith('/bw'):
                print(type(request.data))
            partnership_id = current_user.partnership_id if not current_user.is_anonymous else None
            partnership_account_id = current_user.partnership_account_id if not current_user.is_anonymous else None

            partnership = Partnership.query.get(partnership_id) if partnership_id else None
            partnership_account = PartnershipAccount.query.get(partnership_account_id) if partnership_account_id else None

        # fetch api error
        try:
            # if request.path.startswith('/verification') and request.method == "POST"\
            #         and response.status_code >= 302:
            #     # error = get_flashed_messages()
            #     pass

            if response.status_code >= 400 and response and response.get_json():
                error = json.dumps(response.get_json())
                # if response.get_json().get('errors') else None
        except Exception as e:
            print(f"The exception to fetch error is {e}")

        request_id = request.environ.get('HTTP_X_REQUEST_ID')

        if request.path == "/login":
            response_code = 401 if response.status_code == 200 else response.status_code
            user = User.find_by_identity(request.form.get('identity'))
            if user:
                partnership_id = user.partnership_id if user else None
                partnership_account_id = user.partnership_account_id if user else None
                partnership = Partnership.query.get(partnership_id) if partnership_id else None
                partnership_account = PartnershipAccount.query.get(partnership_account_id) if partnership_account_id else None
        else:
            response_code = response.status_code

        if request.path == "/account/begin_password_reset":
            user = User.find_by_identity(request.form.get('identity'))
            partnership_id = user.partnership_id if user else None
            partnership_account_id = user.partnership_account_id if user else None
            partnership = Partnership.query.get(partnership_id) if partnership_id else None
            partnership_account = PartnershipAccount.query.get(partnership_account_id) if partnership_account_id else None

        if request.path == "/account/password_reset":
            user = User.deserialize_token(request.form.get('reset_token'))
            partnership_id = user.partnership_id if user else None
            partnership_account_id = user.partnership_account_id if user else None
            partnership = Partnership.query.get(partnership_id) if partnership_id else None
            partnership_account = PartnershipAccount.query.get(partnership_account_id) if partnership_account_id else None

        try:
            response_text = response.get_data(as_text=True)[:750]
        except Exception as e:
            response_text = None
        update_data = {
            "response_code": response_code,
            "curl": GenerateCurl().curl_generate(request),
            "response_text": response_text,
            "status": "success" if response_code < 400 else "failed"
        }

        if error:
            update_data['error'] = error

        if not request.path.startswith('/bw') and request.path != '/form_leads':
            if partnership:
                update_data['partnership_id'] = partnership.id if partnership else None
                update_data['partnership_name'] = partnership.name if partnership else None
            if partnership_account:
                update_data['partnership_account_id'] = partnership_account.id if partnership_account else None
                update_data['partnership_account_name'] = partnership_account.name if partnership_account else None

        # if current_user:
        #     update_data['user_id'] = current_user.id if not current_user.is_anonymous else None
        #     update_data['role'] = current_user.role if not current_user.is_anonymous else None
        update_request_log.delay(request_id, update_data, response.status_code)