HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall/buyercall/integrations/elasticsearch/utilities.py
from uuid import uuid4
from dateutil.tz import UTC
from datetime import datetime, timedelta
from random import randrange
import logging
from elasticsearch_dsl import UpdateByQuery

log = logging.getLogger(__name__)


# def get_error_data(request):
#     current_user = request.user
#     partnership = Partnership.query.get(current_user.partnership_id)
#     if not partnership:
#         partnership = Partnership.query.get(1)
#     partnership_account = PartnershipAccount.query.get(current_user.partnership_account_id)
#     if partnership_account:
#         account_id = partnership_account.sid
#     else:
#         account_id = 'NO-PARTNERSHIP-ACCOUNT'
#     params = {}
#     for k,v in request.args:
#         print(k, v)
#     data = {
#         'environment': 'DEVELOPMENT',
#         'hostname': request.host,
#         'ip': request.remote_addr,
#         'created': datetime.now(tz=UTC),
#         'method': request.method,
#         'is_api': False,
#         'params': {
#             'key': 'name',
#             'value': 'Jay'
#         },
#         'url': request.host_url,
#         'status_code': 500,
#         'status_code_root': 500,
#         'error_detail': traceback.format_exc(),
#         'user_id': current_user.sid,
#         'partnership_id': partnership.sid,
#         'account_id': account_id
#     }
#     return data


def random_date(start=None, end=None, last_n_days=5):
    """
    Generate a random datetime between two datetime objects.
    """
    if not start and not end:
        start = datetime.now(tz=UTC) - timedelta(days=last_n_days)
        end = datetime.now(tz=UTC)

    delta = end - start
    int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
    random_second = randrange(int_delta)
    return start + timedelta(seconds=random_second)


def create_notification_data(partnership, partnership_account, user_ip, user_device, message, payload,
                             notify_type, notify_message_type, sid, hyperlink, is_read=False, is_viewed=False):

    account_id = partnership_account.sid if partnership_account else None

    data = {
        'notification_id': str(uuid4()),
        'notification_type': notify_type,
        'notification_message_type': notify_message_type,

        'user_id': sid,
        'user_ip': user_ip,
        'user_device': user_device,
        'message': message,
        'payload': payload,
        'hyperlink': hyperlink,

        'partnership_id': str(partnership.sid),
        'account_id': account_id,
        'is_read': is_read,
        'is_viewed': is_viewed,
        'created_at': datetime.now(tz=UTC)
    }
    return data


def insert_data_by_index(data, es_client, index, debug=True, *args, **kwargs):
    try:
        r = {'status': True, 'message': 'success', 'meta': {}}
        if index and es_client and data:

            if 'buyercall-error-log' in index:
                from buyercall.integrations.elasticsearch.documents import ErrorLogger
                es_doc = ErrorLogger(**data)
            elif 'buyercall-notification-log' in index:
                from buyercall.integrations.elasticsearch.documents import ActivityLogger
                es_doc = ActivityLogger(**data)
            elif 'buyercall-lead-interaction' in index:
                from buyercall.integrations.elasticsearch.documents import LeadInteraction
                es_doc = LeadInteraction(**data)
            elif 'buyercall-interaction-task' in index:
                from buyercall.integrations.elasticsearch.documents import InteractionTask
                es_doc = InteractionTask(**data)
            else:
                r = {'status': False, 'message': 'Index not found'}

            if r['status']:
                res = es_doc.save(using=es_client.es, index=index, return_doc_meta=True)
                if res:
                    r['meta'] = res
        else:
            r = {'status': False, 'message': 'Invalid parameters', 'meta': {}}
    except Exception as e:
        r = {'status': False, 'message': f'Error: {e}', 'meta': {}}
    return r if debug else r['status']


def search_data(q, es_client, index, count_only=False, limit=10, offset=0, sort=''):

    response = []
    try:
        if es_client:

            search_client = es_client.search

            search_client.index = index
            if q:
                search_client = search_client.query(q)

            # Offset and limit
            search_client = search_client[offset:offset + limit]  # The end obj is offset+limit

            if sort:
                search_client = search_client.sort(sort)

            if count_only:
                response = search_client.count()
            else:
                response = search_client.execute()
    except Exception as e:
        log.error(f'Elastic search error : {str(e)}')
    return response


def update_index_by_query(update_query, es_client, index_name, script):

    response = {'status': True, 'message': {}}
    try:
        if es_client and index_name and update_query:
            update_client = UpdateByQuery(using=es_client.es, index=index_name)
            update_client = update_client.update_from_dict(update_query)
            update_client = update_client.script(source=script)

            resp = update_client.execute()

            if not resp:
                response = {'status': False, 'message': 'Something went wrong'}
            else:
                resp_dict = {}
                for k, v in enumerate(resp):
                    resp_dict[v] = resp[v]
                response['message'] = str(resp_dict)
        else:
            response = {'status': False, 'message': 'Missing parameters'}
    except Exception as e:
        log.error(f'{e}')
        response = {'status': False, 'message': f'{e}'}
    return response