File: //home/arjun/projects/buyercall/buyercall/integrations/elasticsearch/utilities.py
from uuid import uuid4
from dateutil.tz import UTC
from datetime import datetime, timedelta
from random import randrange
import logging
from elasticsearch_dsl import UpdateByQuery
log = logging.getLogger(__name__)
# def get_error_data(request):
# current_user = request.user
# partnership = Partnership.query.get(current_user.partnership_id)
# if not partnership:
# partnership = Partnership.query.get(1)
# partnership_account = PartnershipAccount.query.get(current_user.partnership_account_id)
# if partnership_account:
# account_id = partnership_account.sid
# else:
# account_id = 'NO-PARTNERSHIP-ACCOUNT'
# params = {}
# for k,v in request.args:
# print(k, v)
# data = {
# 'environment': 'DEVELOPMENT',
# 'hostname': request.host,
# 'ip': request.remote_addr,
# 'created': datetime.now(tz=UTC),
# 'method': request.method,
# 'is_api': False,
# 'params': {
# 'key': 'name',
# 'value': 'Jay'
# },
# 'url': request.host_url,
# 'status_code': 500,
# 'status_code_root': 500,
# 'error_detail': traceback.format_exc(),
# 'user_id': current_user.sid,
# 'partnership_id': partnership.sid,
# 'account_id': account_id
# }
# return data
def random_date(start=None, end=None, last_n_days=5):
"""
Generate a random datetime between two datetime objects.
"""
if not start and not end:
start = datetime.now(tz=UTC) - timedelta(days=last_n_days)
end = datetime.now(tz=UTC)
delta = end - start
int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
random_second = randrange(int_delta)
return start + timedelta(seconds=random_second)
def create_notification_data(partnership, partnership_account, user_ip, user_device, message, payload,
notify_type, notify_message_type, sid, hyperlink, is_read=False, is_viewed=False):
account_id = partnership_account.sid if partnership_account else None
data = {
'notification_id': str(uuid4()),
'notification_type': notify_type,
'notification_message_type': notify_message_type,
'user_id': sid,
'user_ip': user_ip,
'user_device': user_device,
'message': message,
'payload': payload,
'hyperlink': hyperlink,
'partnership_id': str(partnership.sid),
'account_id': account_id,
'is_read': is_read,
'is_viewed': is_viewed,
'created_at': datetime.now(tz=UTC)
}
return data
def insert_data_by_index(data, es_client, index, debug=True, *args, **kwargs):
try:
r = {'status': True, 'message': 'success', 'meta': {}}
if index and es_client and data:
if 'buyercall-error-log' in index:
from buyercall.integrations.elasticsearch.documents import ErrorLogger
es_doc = ErrorLogger(**data)
elif 'buyercall-notification-log' in index:
from buyercall.integrations.elasticsearch.documents import ActivityLogger
es_doc = ActivityLogger(**data)
elif 'buyercall-lead-interaction' in index:
from buyercall.integrations.elasticsearch.documents import LeadInteraction
es_doc = LeadInteraction(**data)
elif 'buyercall-interaction-task' in index:
from buyercall.integrations.elasticsearch.documents import InteractionTask
es_doc = InteractionTask(**data)
else:
r = {'status': False, 'message': 'Index not found'}
if r['status']:
res = es_doc.save(using=es_client.es, index=index, return_doc_meta=True)
if res:
r['meta'] = res
else:
r = {'status': False, 'message': 'Invalid parameters', 'meta': {}}
except Exception as e:
r = {'status': False, 'message': f'Error: {e}', 'meta': {}}
return r if debug else r['status']
def search_data(q, es_client, index, count_only=False, limit=10, offset=0, sort=''):
response = []
try:
if es_client:
search_client = es_client.search
search_client.index = index
if q:
search_client = search_client.query(q)
# Offset and limit
search_client = search_client[offset:offset + limit] # The end obj is offset+limit
if sort:
search_client = search_client.sort(sort)
if count_only:
response = search_client.count()
else:
response = search_client.execute()
except Exception as e:
log.error(f'Elastic search error : {str(e)}')
return response
def update_index_by_query(update_query, es_client, index_name, script):
response = {'status': True, 'message': {}}
try:
if es_client and index_name and update_query:
update_client = UpdateByQuery(using=es_client.es, index=index_name)
update_client = update_client.update_from_dict(update_query)
update_client = update_client.script(source=script)
resp = update_client.execute()
if not resp:
response = {'status': False, 'message': 'Something went wrong'}
else:
resp_dict = {}
for k, v in enumerate(resp):
resp_dict[v] = resp[v]
response['message'] = str(resp_dict)
else:
response = {'status': False, 'message': 'Missing parameters'}
except Exception as e:
log.error(f'{e}')
response = {'status': False, 'message': f'{e}'}
return response