File: //home/arjun/projects/buyercall/buyercall/blueprints/agents/webhooks/inbox_interaction.py
import requests
from flask import current_app
class InboxConnector:
def __init__(self, endpoint):
self.endpoint = f'{endpoint}/webhook/interaction'
self.session = requests.Session()
def send_interaction(self, contact_id, source_id, interaction_type, **kwargs):
post_data = {
"lead_id": contact_id,
"source_id": source_id,
"channel": kwargs.get('channel_id', ''),
"interaction_type": interaction_type,
"partnership_id": kwargs.get('partnership_id'),
"partnership_account_id": kwargs.get('partnership_account_id'),
"agent_id": kwargs.get('agent_ids'),
"fromLead": kwargs.get('from_lead'),
"payload": {
"content": kwargs.get('payload'),
}
}
print('post_data : ', post_data)
self.session.auth = (current_app.config.get('CHAT_API_USERNAME'), current_app.config.get('CHAT_API_PASSWORD'))
try:
resp = self.session.post(self.endpoint, json=post_data)
except Exception as e:
print('Error : ', e)
resp = None
if resp:
print('Socket webhook post success', resp.json())
return True
print('Socket webhook post failed', resp.json())
return False