File: //home/arjun/projects/buyercall/buyercall/blueprints/sysadmin/utilities/request_log_task_call.py
from buyercall.blueprints.partnership.models import Partnership, PartnershipAccount, ApiToken
from buyercall.blueprints.sysadmin.utilities.generate_curl import GenerateCurl
from flask_login import current_user
from flask import get_flashed_messages
from buyercall.blueprints.sysadmin.utilities.ip_api import IpApi
class LogRequestTaskTrigger:
@staticmethod
def log_request_task_trigger(request, request_type):
from buyercall.blueprints.sysadmin.tasks import create_request_log
from buyercall.lib.util_crypto import SHA
from buyercall.blueprints.user.models import User
# fetch ip-address
ip_address = request.environ.get('HTTP_X_FORWARDED_FOR')\
if request.environ.get('HTTP_X_FORWARDED_FOR') else request.environ.get('REMOTE_ADDR')
# fetch details based on ip
details = IpApi.get_request_complete_details(ip_address)
user = None
if request.path.startswith('/api/v2') or request.path.startswith('/api/v1'):
auth_header = request.headers['Authorize'] if 'Authorize' in request.headers else None
token = auth_header.split(" ")[1] if auth_header else None
token_shah = SHA.encrypt(token) if token else None
api_token = ApiToken.query.filter_by(api_token_hash=token_shah).first() if token_shah else None
partnership = Partnership.query.filter_by(api_token_id=api_token.id).first() if api_token else None
partnership_account_id = request.view_args.get('paid') if request.view_args else None
partnership_account = PartnershipAccount.query.get(partnership_account_id) if partnership_account_id else None
else:
partnership_id = current_user.partnership_id if not current_user.is_anonymous else None
partnership_account_id = current_user.partnership_account_id if not current_user.is_anonymous else None
partnership = Partnership.query.get(partnership_id) if partnership_id else None
partnership_account = PartnershipAccount.query.get(partnership_account_id) if partnership_account_id else None
if request.path == "/login" or request.path == "/account/begin_password_reset":
user = User.find_by_identity(request.form.get('identity'))
partnership_id = user.partnership_id if user else None
partnership_account_id = user.partnership_account_id if user else None
partnership = Partnership.query.get(partnership_id) if partnership_id else None
partnership_account = PartnershipAccount.query.get(partnership_account_id) if partnership_account_id else None
if request.path == "/account/password_reset":
user = User.deserialize_token(request.form.get('reset_token'))
partnership_id = user.partnership_id if user else None
partnership_account_id = user.partnership_account_id if user else None
partnership = Partnership.query.get(partnership_id) if partnership_id else None
partnership_account = PartnershipAccount.query.get(partnership_account_id) if partnership_account_id else None
if request.path.startswith("/form_leads"):
email_address = ''
if request.path == "/form_leads":
if 'emailfield' in request.form:
email_address = request.form.get('emailfield', '')
elif 'email' in request.form:
email_address = request.form.get('email', '')
u = User.find_by_identity(email_address)
partnership_id = u.partnership_id if u else None
partnership_account_id = u.partnership_account_id if u else None
partnership = Partnership.query.get(partnership_id) if partnership_id else None
partnership_account = PartnershipAccount.query.get(partnership_account_id) if partnership_account_id else None
else:
user = current_user if not current_user.is_anonymous else None
partnership_id = user.partnership_id if user else None
partnership_account_id = user.partnership_account_id if user else None
partnership = Partnership.query.get(partnership_id) if partnership_id else None
partnership_account = PartnershipAccount.query.get(partnership_account_id) if partnership_account_id else None
post_data = request.data.decode('utf-8')
# request log
request_data = {
'request_id': request.environ.get('HTTP_X_REQUEST_ID'),
'user_id': (current_user.id if not current_user.is_anonymous else None) if not user else (user.id if not user.is_anonymous else None),
"request_type": request_type,
'post_data': None if "login" in request.path else post_data,
'partnership_id': partnership.id if partnership else None,
'partnership_account_id': partnership_account.id if partnership_account else None,
'partnership_name': partnership.name if partnership else None,
'partnership_account_name': partnership_account.name if partnership_account else None,
'role': current_user.role if not current_user.is_anonymous else None,
'current_url': request.url,
'host': request.headers.get('Host'),
'accept': request.headers.get('Accept'),
'user_agent': request.headers.get('User-Agent'),
# security_client_values
"security_client_values":
{
'sec_ch_ua_platform': request.headers.get('Sec-Ch-Ua-Platform'),
'sec_ch_ua_mobile': request.headers.get('Sec-Ch-Ua-Mobile'),
'sec_fetch_site': request.headers.get('Sec-Fetch-Site'),
'sec_fetch_mode': request.headers.get('Sec-Fetch-Mode'),
'sec_fetch_dest': request.headers.get('Sec-Fetch-Dest')
},
'referer': request.headers.get('Referer'),
'encoding': request.headers.get('Accept-Encoding'),
'language': request.headers.get('Accept-Language'),
'cookies': request.headers.get('Cookie'),
'remote_ip_address': ip_address,
'remote_port': request.environ.get('REMOTE_PORT'),
'method': request.method,
'path_info': request.environ.get('PATH_INFO'),
'query_string': request.environ.get('QUERY_STRING'),
'server_name': request.environ.get('SERVER_NAME'),
'server_port': request.environ.get('SERVER_PORT'),
'is_secure': request.is_secure,
# 'is_xhr': request.is_xhr,
'is_xhr': request.headers.get('X-Requested-With') == 'XMLHttpRequest',
'curl': GenerateCurl().curl_generate(request),
'content_length': request.content_length,
'is_json': request.is_json,
# ip based fetched values
'country': details.get('country', None),
'state': details.get('regionName', None),
'city': details.get('city', None),
'zip_code': details.get('zip', None),
'reverse': details.get('reverse', None),
}
# request_type
create_request_log(request_data)
@staticmethod
def update_log_request_task_trigger(request, response):
from buyercall.blueprints.sysadmin.tasks import update_request_log
from buyercall.blueprints.user.models import User
import json
from buyercall.lib.util_crypto import SHA
error = None
user = None
# update request log
if request.path.startswith("/form_leads"):
partnership_id = current_user.partnership_id if not current_user.is_anonymous else None
partnership_account_id = current_user.partnership_account_id if not current_user.is_anonymous else None
partnership = Partnership.query.get(partnership_id) if partnership_id else None
partnership_account = PartnershipAccount.query.get(partnership_account_id) if partnership_account_id else None
if request.path.startswith('/api/v2') or request.path.startswith('/api/v1'):
auth_header = request.headers['Authorize'] if 'Authorize' in request.headers else None
token = auth_header.split(" ")[1] if auth_header else None
token_shah = SHA.encrypt(token) if token else None
api_token = ApiToken.query.filter_by(api_token_hash=token_shah).first() if token_shah else None
partnership = Partnership.query.filter_by(api_token_id=api_token.id).first() if api_token else None
partnership_account_id = request.view_args.get('paid') if request.view_args else None
partnership_account = PartnershipAccount.query.get(partnership_account_id) if partnership_account_id else None
if request.path.startswith('/api/v2/accounts/'):
# fetch partnership account id
try:
json_obj = response.get_json()
partnership_account_id = json_obj.get('partnership_account_id', 1)
partnership_account = PartnershipAccount.query.get(partnership_account_id) \
if partnership_account_id else None
except Exception as e:
print(f"The Exception in {request.path} for fetching p_acc_id is: {e}")
else:
if request.path.startswith('/bw'):
print(type(request.data))
partnership_id = current_user.partnership_id if not current_user.is_anonymous else None
partnership_account_id = current_user.partnership_account_id if not current_user.is_anonymous else None
partnership = Partnership.query.get(partnership_id) if partnership_id else None
partnership_account = PartnershipAccount.query.get(partnership_account_id) if partnership_account_id else None
# fetch api error
try:
# if request.path.startswith('/verification') and request.method == "POST"\
# and response.status_code >= 302:
# # error = get_flashed_messages()
# pass
if response.status_code >= 400 and response and response.get_json():
error = json.dumps(response.get_json())
# if response.get_json().get('errors') else None
except Exception as e:
print(f"The exception to fetch error is {e}")
request_id = request.environ.get('HTTP_X_REQUEST_ID')
if request.path == "/login":
response_code = 401 if response.status_code == 200 else response.status_code
user = User.find_by_identity(request.form.get('identity'))
if user:
partnership_id = user.partnership_id if user else None
partnership_account_id = user.partnership_account_id if user else None
partnership = Partnership.query.get(partnership_id) if partnership_id else None
partnership_account = PartnershipAccount.query.get(partnership_account_id) if partnership_account_id else None
else:
response_code = response.status_code
if request.path == "/account/begin_password_reset":
user = User.find_by_identity(request.form.get('identity'))
partnership_id = user.partnership_id if user else None
partnership_account_id = user.partnership_account_id if user else None
partnership = Partnership.query.get(partnership_id) if partnership_id else None
partnership_account = PartnershipAccount.query.get(partnership_account_id) if partnership_account_id else None
if request.path == "/account/password_reset":
user = User.deserialize_token(request.form.get('reset_token'))
partnership_id = user.partnership_id if user else None
partnership_account_id = user.partnership_account_id if user else None
partnership = Partnership.query.get(partnership_id) if partnership_id else None
partnership_account = PartnershipAccount.query.get(partnership_account_id) if partnership_account_id else None
try:
response_text = response.get_data(as_text=True)[:750]
except Exception as e:
response_text = None
update_data = {
"response_code": response_code,
"curl": GenerateCurl().curl_generate(request),
"response_text": response_text,
"status": "success" if response_code < 400 else "failed"
}
if error:
update_data['error'] = error
if not request.path.startswith('/bw') and request.path != '/form_leads':
if partnership:
update_data['partnership_id'] = partnership.id if partnership else None
update_data['partnership_name'] = partnership.name if partnership else None
if partnership_account:
update_data['partnership_account_id'] = partnership_account.id if partnership_account else None
update_data['partnership_account_name'] = partnership_account.name if partnership_account else None
# if current_user:
# update_data['user_id'] = current_user.id if not current_user.is_anonymous else None
# update_data['role'] = current_user.role if not current_user.is_anonymous else None
update_request_log.delay(request_id, update_data, response.status_code)