File: //home/arjun/projects/buyercall_forms/buyercall/buyercall/blueprints/mobile/views.py
import logging
import json
import sys
import uuid
from binascii import unhexlify, crc32
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from datetime import datetime, timedelta
from dateutil import tz
from pytz import timezone
from flask import url_for, redirect, flash, Blueprint, request, Response, render_template, jsonify
from flask_babel import gettext as _
from flask_login import login_required, current_user
from sqlalchemy import and_, desc, or_, case
from sqlalchemy.sql import null, func, union_all
from buyercall.blueprints.agents.models import Agent
from buyercall.blueprints.contacts.models import Contact, ContactNotes, BdcStatuses, Status, MarketingSources
from buyercall.blueprints.filters import format_phone_number_bracket
from buyercall.blueprints.form_leads.models import FormLead
from buyercall.blueprints.mobile.models import Endpoint, Domain
from buyercall.blueprints.mobile.utils import get_sip_endpoint, is_iphone, validate_phonenumber
from buyercall.blueprints.phonenumbers.models import Lead, Phone
from buyercall.blueprints.sms.models import Message
from buyercall.blueprints.user.decorators import role_required
from buyercall.extensions import csrf, db
from buyercall.lib.util_wtforms import choices_from_dict
from buyercall.lib.util_twilio import bw_client
provider = "bandwidth"
mobile = Blueprint('mobile', __name__, template_folder='templates')
log = logging.getLogger(__name__)
time_zone = timezone("US/Eastern")
def update_endpoint(id_, sip_password):
endpoint = Endpoint.query.filter(Endpoint.id == id_).first()
domain = Domain.query.filter(Domain.id == endpoint.domain_id).first()
from buyercall.blueprints.partnership.models import Partnership
partner = Partnership.query.filter(Partnership.id == domain.partnership_id).first()
client = bw_client(partner.id)
# Update the sip endpoint in the BuyerCall sip endpoint table
Endpoint.update(id_, endpoint.partnership_account_id, sip_password, endpoint.description,
endpoint.enabled, endpoint.agent_id)
# Update the sip endpoint in the provider system via API
if endpoint.provider == 'bandwidth':
bw_endpoint = client.domains.update_endpoint(
domain_id=domain.domain_id,
endpoint_id=endpoint.provider_id,
description=endpoint.description,
enabled=endpoint.enabled,
credentials={"password": endpoint.sip_password}
)
return bw_endpoint
else:
log.info('The provider does not have a sip endpoint')
def encrypt_media(media, encrypt_key):
key = unhexlify(encrypt_key)
nonce = b'\0' * len(key)
cipher = Cipher(algorithms.AES(key), modes.CTR(nonce), backend=default_backend())
dec = cipher.decryptor()
# Read the encrypted file
file_in = media.read()
crc = 0
if not file_in:
log.info('There is no encrypted file_in file to decrypt ')
# Decrypt the encrypted file
dec_block = dec.update(file_in)
crc = crc32(dec_block, crc)
# The decrypted file that needs to be return to upload to aws before sending it in message
decrypted_file = dec_block
print("Hash: {}".format(crc))
return decrypted_file
def decrypt_media(media, encrypt_key):
key = unhexlify(encrypt_key)
nonce = b'\0' * len(key)
cipher = Cipher(algorithms.AES(key), modes.CTR(nonce), backend=default_backend())
dec = cipher.decryptor()
# Read the encrypted file
file_in = media.read()
crc = 0
if not file_in:
log.info('There is no encrypted file_in file to decrypt ')
# Decrypt the encrypted file
dec_block = dec.update(file_in)
crc = crc32(dec_block, crc)
# The decrypted file that needs to be return to upload to aws before sending it in message
decrypted_file = dec_block
print("Hash: {}".format(crc))
return decrypted_file
@mobile.route('/mobile/contacts/', methods=['GET', 'POST'])
@csrf.exempt
def contacts():
from buyercall.blueprints.agents.models import Agent
args = request.json
sip_username = args['username']
log.info('The username return by the fetch request is {}'.format(args['username']))
sip_password = args['password']
try:
request_last_date = request.headers['If-Modified-Since']
except Exception as e:
log.info('Something went from with the modified date')
request_last_date = datetime.now().strftime('%a, %d %b %Y %H:%M:%S GMT')
log.info("The last modified date is: {}".format(request_last_date))
try:
sip_endpoint = Endpoint.query.filter(
and_(Endpoint.sip_username == sip_username,
Endpoint.sip_password == sip_password)
).first()
sip_domain = Domain.query.filter(Domain.id == sip_endpoint.domain_id).first()
sip_contacts = Contact.query.filter(
and_(Contact.partnership_account_id == sip_endpoint.partnership_account_id,
Contact.is_deactivated.is_(False))
).filter(
Contact.agent_id == sip_endpoint.agent_id
).order_by(desc(Contact.updated_on)).all()
last_updated = sip_contacts[0].updated_on.strftime('%a, %d %b %Y %H:%M:%S GMT')
contacts_list = list()
total_contacts = 0
for contact in sip_contacts:
display_name = ''
# If its the default BuyerCall domain use the default BuyerCall avatars
if sip_domain.default_domain:
avatar = 'https://buyercall-logo.s3-us-west-2.amazonaws.com/contact_placeholder.png'
large_avatar = 'https://buyercall-logo.s3-us-west-2.amazonaws.com/contact_placeholder_large.png'
if not contact.firstname and contact.caller_id:
first_name = contact.caller_id
display_name = first_name
elif not contact.firstname and not contact.caller_id:
first_name = contact.phonenumber_1
display_name = first_name
else:
first_name = contact.firstname
else:
avatar = contact.avatar
large_avatar = contact.large_avatar
first_name = contact.firstname
display_name = first_name + " " + contact.lastname
total_contacts = total_contacts + 1
if len(first_name) != 0 and len(contact.phonenumber_1) != 0:
contact_entries = [{
"entryId": "tel:0",
"label": "phone 1",
"type": "tel",
"uri": contact.phonenumber_1
},
{
"entryId": "tel:1",
"label": "phone 2",
"type": "tel",
"uri": contact.phonenumber_2
},
{
"entryId": "email:1",
"label": "email",
"type": "email",
"uri": contact.email
}]
key_list = ["avatar", "largeAvatar", "birthday", "checksum", "city",
"contactEntries", "contactId", "country", "countryCode",
"displayName", "fname", "lname", "notes", "state", "street", "zip"
]
value_list = [contact.avatar, contact.large_avatar, contact.birthday, "", contact.city, contact_entries,
contact.id, contact.country, "us", display_name, contact.firstname, contact.lastname, "",
contact.state, contact.address_1, contact.zip]
dicti = {key: value for (key, value) in zip(key_list, value_list)}
contacts_list.append(dicti)
log.info('The contact list looks like: {}'.format(contacts_list))
d = {
"contacts": contacts_list
}
json_data = json.dumps(d, default=str)
log.info('The response body sent to Acrobits are: {}'.format(json_data))
# Get the relevant agent to see if they have a contact count
agent = Agent.query.filter(Agent.id == sip_endpoint.agent_id).first()
# If the agent has no agents and its set to None assign 0 value
if agent.contact_count is None:
contact_count = 0
else:
contact_count = agent.contact_count
if request_last_date == last_updated and total_contacts == contact_count:
status = 304
else:
status = 200
agent.contact_count = total_contacts
db.session.commit()
contact_response = Response(response=json_data, status=status, mimetype='application/json; charset=utf-8')
contact_response.headers['Last-Modified'] = last_updated
log.info('The status is {} for the mobile contact web service'.format(status))
return contact_response
except Exception as e:
log.debug('Something went from with the contacts post heres the exception: {}'.format(e))
# else:
# log.info('There is no BuyerCall endpoint or no contact for the sip username: {}'.format(sip_username))
return ''
@mobile.route('/mobile/voicemail/', methods=['GET', 'POST'])
@csrf.exempt
def voicemail():
username = request.args.get('username', '')
password = request.args.get('password', '')
sip_endpoint, sip_domain = get_sip_endpoint(username, password)
if sip_domain:
call_uri = sip_domain.call_uri
inbound = Phone.query.filter(Phone.id == sip_endpoint.inbound_id).first()
if inbound:
sip_phone_number = format_phone_number_bracket(inbound.phonenumber)
else:
sip_phone_number = 'unavailable'
vm_calls = Lead.query.filter(
and_(Lead.inbound_id == sip_endpoint.inbound_id,
Lead.status == 'missed',
Lead.recording_url != ''
)
).order_by(Lead.created_on.desc()).limit(30).all()
else:
call_uri = ''
sip_phone_number = 'unavailable'
vm_calls = []
from_zone = tz.tzutc()
to_zone = tz.tzlocal()
vm_list_recent = list()
vm_list_older = list()
for i in vm_calls:
i.action_number = i.phonenumber
i.phonenumber = format_phone_number_bracket(i.phonenumber)
utc_date = i.created_on
utc_date = utc_date.replace(tzinfo=from_zone)
local_date = utc_date.astimezone(to_zone)
tz_info = i.created_on.tzinfo
date_now = datetime.now(tz_info) - timedelta(days=7)
if i.recording_url:
from buyercall.lib.util_boto3_s3 import get_recording_url_details
recording_details = get_recording_url_details(i.recording_url)
if recording_details and recording_details['key'] and recording_details['bucket']:
from buyercall.lib.util_boto3_s3 import generate_presigned_aws_url
i.recording_url = generate_presigned_aws_url(recording_details['key'], recording_details['bucket'], True)
if i.created_on > date_now:
i.created_on = local_date.astimezone(time_zone).strftime("%d %b, %Y at %H:%M:%S EST")
vm_list_recent.append(i)
else:
i.created_on = local_date.astimezone(time_zone).strftime("%d %b, %Y at %H:%M:%S EST")
vm_list_older.append(i)
template_name = 'mobile/voicemail_ios.jinja2' if is_iphone(request) \
else 'mobile/voicemail_android.jinja2'
return render_template(template_name,
phone_number=sip_phone_number,
vm=vm_calls,
vm_recent=vm_list_recent,
vm_older=vm_list_older,
call_uri=call_uri,
time_zone=time_zone
)
@mobile.route('/inbound/<int:inbound_id>/mobile-app-email-instructions/<email>', methods=['POST', 'GET'])
@login_required
@role_required('admin')
def send_mobile_app_info(inbound_id, email):
partnership_account_id = current_user.partnership_account_id
# Look up the sip endpoint to get qr code information
sip_endpoint = Endpoint.query.filter(
and_(Endpoint.inbound_id == inbound_id,
Endpoint.partnership_account_id == partnership_account_id,
Endpoint.is_deactivated.is_(False)
)
).first()
reset_password = request.args.get('reset')
new_agent_id = request.args.get('mobile_agent_id')
if reset_password and new_agent_id and sip_endpoint:
new_password = uuid.uuid4().hex[:23].lower().replace('0', 'X').replace('o', 'Y').replace('e', 'E')
new_endpoint = Endpoint.api_update(sip_endpoint.id, partnership_account_id, new_password,
sip_endpoint.description, sip_endpoint.enabled, new_agent_id)
if not new_endpoint:
flash('An error occurred while resetting the password. Please contact support for assistance.', 'danger')
return redirect(url_for('phonenumbers.install_instructions', id=inbound_id))
try:
# Look up the inbound routing to get the new phone number
inbound = Phone.query.filter(and_(Phone.id == inbound_id, Phone.is_deactivated.is_(False))).first()
# Import the celery email sending task and send the email with instructions
from .tasks import send_mobile_app_info_email
send_mobile_app_info_email.delay(sip_endpoint.id, inbound.phonenumber, email, partnership_account_id)
flash(_('The mobile app instructions email was successfully sent to {}'.format(email)),
'success')
return redirect(url_for('phonenumbers.inbound_list'))
except Exception as e:
log.error(f'Something went wrong while trying to send the mobile app instructions email. The error is: {e} ')
flash(_('Something went wrong while trying to send the mobile app instructions email. '
'Please contact support for assistance or please try again.'), 'danger')
return redirect(url_for('phonenumbers.install_instructions', id=inbound_id))
@mobile.route('/mobile/contact_leads/detail', methods=['GET', 'POST'])
@csrf.exempt
def contact_leads():
log.info(f"form data:{request.args}")
username = request.args.get('username', '')
password = request.args.get('password', '')
sip_endpoint, _sip_domain = get_sip_endpoint(username, password)
template_name = 'mobile/contact_ios.jinja2' if is_iphone(request) \
else 'mobile/contact_android.jinja2'
partnership_account_id = sip_endpoint.partnership_account_id if sip_endpoint else None
status_list = Status.get_assigned_status_list(partnership_account_id)
return render_template(
template_name,
status_choices={item.status: item.display_name for item in status_list}
)
@mobile.route('/mobile/contact_leads_list/', methods=['GET'])
@csrf.exempt
def contact_leads_list():
log.info(f"form data:{request.args}")
username = request.args.get('username', '')
password = request.args.get('password', '')
search = request.args.get('search', '')
status = request.args.get('status', '')
agent = request.args.get('agent', '')
start_date = request.args.get('start_date', '')
end_date = request.args.get('end_date', '')
unassigned_agent = request.args.get('unassigned_agent', False)
sip_endpoint, sip_domain = get_sip_endpoint(username, password)
if sip_domain:
call_uri = sip_domain.call_uri
contacts_query = Contact.query.filter(
and_(Contact.partnership_account_id == sip_endpoint.partnership_account_id,
Contact.is_deactivated.is_(False))
)
if unassigned_agent == 'true':
contacts_query = contacts_query.filter(
Contact.agent_id == None
)
else:
contacts_query = contacts_query.filter(
Contact.agent_id == sip_endpoint.agent_id
)
if search:
pattern = f'%{search}%'
contacts_query = contacts_query.filter(or_(
Contact.name.ilike(pattern),
Contact.phonenumber_1.ilike(pattern),
Contact.email.ilike(pattern),
))
if status != 'no status':
contacts_query = contacts_query.filter(
Contact.status == status
)
if agent:
contacts_query = contacts_query.filter(
Contact.agent_id == agent
)
if start_date and end_date:
start_date = datetime.strptime(start_date, "%m/%d/%Y").replace(tzinfo=timezone('UTC'))
end_date = datetime.strptime(end_date, "%m/%d/%Y").replace(tzinfo=timezone('UTC'))
contacts_query = contacts_query.filter(
and_(
func.date(Contact.created_on) >= start_date,
func.date(Contact.created_on) <= end_date
)
)
sip_contacts = contacts_query.order_by(desc(Contact.updated_on)).limit(25).all()
else:
call_uri = ''
sip_contacts = []
log.info(f"sip contacts :{sip_contacts}")
template_name = 'mobile/api/contact_list_ios.jinja2' if is_iphone(request) \
else 'mobile/api/contact_list.jinja2'
return jsonify(
{
'html_content': render_template(
template_name,
contacts=sip_contacts,
today=datetime.today().date(),
time_zone=time_zone
),
'call_uri': call_uri
}
)
@mobile.route('/mobile/contact_notes/<int:contact_id>', methods=['GET', 'POST'])
@csrf.exempt
def contact_notes(contact_id):
template_name = 'mobile/contact_note_ios.jinja2' if is_iphone(request) \
else 'mobile/contact_note_android.jinja2'
return render_template(
template_name
)
@mobile.route('/mobile/contact_notes_list/<int:contact_id>', methods=['GET'])
@csrf.exempt
def contact_note_list(contact_id):
username = request.args.get('username', '')
password = request.args.get('password', '')
sip_endpoint, sip_domain = get_sip_endpoint(username, password)
if sip_domain:
sip_contact_notes = ContactNotes.query.filter(
and_(ContactNotes.partnership_account_id == sip_endpoint.partnership_account_id,
ContactNotes.is_enabled.is_(True),
ContactNotes.contact_id == contact_id),
).order_by(desc(ContactNotes.updated_on)).limit(25).all()
agent = Agent.query.filter(
Agent.id == sip_endpoint.agent_id
).first()
user_id = agent.user_id
else:
sip_contact_notes = []
user_id = None
template_name = 'mobile/api/contact_note_list_ios.jinja2' if is_iphone(request) \
else 'mobile/api/contact_note_list.jinja2'
return jsonify(
{
'html_content': render_template(
template_name,
notes=sip_contact_notes,
user_id=user_id,
time_zone=time_zone
),
}
)
@mobile.route('/mobile/add_note/<int:contact_id>', methods=['POST'])
@csrf.exempt
def add_contact_note(contact_id):
username = request.form.get('username', '')
password = request.form.get('password', '')
note_text = request.form.get('note', '')
log.info(f"form data:{request.form}")
sip_endpoint, _sip_domain = get_sip_endpoint(username, password)
if sip_endpoint:
agent = Agent.query.filter(
Agent.id == sip_endpoint.agent_id
).first()
if agent.user_id and db.session.query(
Contact.query.filter(Contact.id == contact_id).exists()
).scalar():
note = {
'text': note_text,
'contact_id': contact_id,
'created_on': datetime.now(),
'updated_on': datetime.now(),
'user_id': agent.user_id,
'is_enabled': True,
'partnership_account_id': sip_endpoint.partnership_account_id
}
result = ContactNotes.create(note)
if result:
return jsonify({"status": True})
if not agent:
error_message = "No Agent Found."
elif not agent.user_id:
log.info(f"Agent id :{agent.id}")
error_message = "No User Found."
else:
error_message = "Something Went Wrong"
return jsonify({"status": False, "error": error_message})
return jsonify({"status": False, "error": "No Sip Endpoint found."})
@mobile.route('/mobile/edit_note/<int:contact_id>', methods=['POST'])
@csrf.exempt
def edit_contact_note(contact_id):
username = request.form.get('username', '')
password = request.form.get('password', '')
note_text = request.form.get('note', '')
note_id = request.form.get('note_id', '')
sip_endpoint, sip_domain = get_sip_endpoint(username, password)
result = False
if sip_endpoint:
agent = Agent.query.filter(
Agent.id == sip_endpoint.agent_id
).first()
if agent and Contact.query.filter(Contact.id == contact_id).all():
note = [note_id, note_text, datetime.now(), agent.user_id, True]
result = ContactNotes.update(*note)
return jsonify({"status": result, "error": "No Sip EndPoint Found"})
@mobile.route('/mobile/delete_note/<int:note_id>', methods=['POST'])
@csrf.exempt
def delete_contact_note(note_id):
username = request.form.get('username', '')
password = request.form.get('password', '')
sip_endpoint, sip_domain = get_sip_endpoint(username, password)
result = False
if sip_endpoint:
result = ContactNotes.delete(note_id)
return jsonify({"status": result})
@mobile.route('/mobile/contact_add/', methods=['GET', 'POST'])
@csrf.exempt
def contact_add():
username = request.args.get('username', '')
password = request.args.get('password', '')
sip_endpoint, sip_domain = get_sip_endpoint(username, password)
if sip_domain:
current_agent = sip_endpoint.agent_id
if request.method == "POST":
data = request.form.to_dict()
if data.get("agent_assigned"):
agent = Agent.query.filter(Agent.id == data.get("agent_assigned")).first()
else:
agent = None
if not validate_phonenumber(data.get("phonenumber_1")):
return {"status": False, "error": "Enter a valid 10 digit Phonenumber"}
if data.get("phonenumber_2"):
if not validate_phonenumber(data.get("phonenumber_2")):
return {"status": False, "error": "Enter a valid 10 digit Phonenumber"}
data["phonenumber_2"] = validate_phonenumber(data.get("phonenumber_2"))
data["phonenumber_1"] = validate_phonenumber(data.get("phonenumber_1"))
data["partnership_account_id"] = sip_endpoint.partnership_account_id
data["agent_assigned"] = agent.full_name if agent else None
data["agent_id"] = agent.id if agent else None
data["external_source_type"] = "mobile"
result = Contact.api_create(data)
if result:
return {"status": True}
return {"status": False, "error": "something went wrong"}
all_agents = Agent.query.filter(
Agent.partnership_account_id == sip_endpoint.partnership_account_id,
Agent.is_deactivated.is_(False)
).all()
else:
if request.method == "POST":
return {"status": False, "error": "No Sip Endpoint Found."}
all_agents = []
current_agent = None
template_name = 'mobile/add_contact_ios.jinja2' if is_iphone(request) \
else 'mobile/add_contact.jinja2'
# populate status result
partnership_account_id = sip_endpoint.partnership_account_id if sip_endpoint else None
bdc_status_list = BdcStatuses.get_assigned_status_list(partnership_account_id)
status_list = Status.get_assigned_status_list(partnership_account_id)
marketing_list = MarketingSources.get_assigned_source_list(partnership_account_id)
return render_template(
template_name,
agents=all_agents,
current_agent=current_agent,
bdc_status_choices={item.status: item.display_name for item in bdc_status_list},
status_choices={item.status: item.display_name for item in status_list},
sources_choices={item.source: item.display_name for item in marketing_list},
)
@mobile.route('/mobile/contact_detail/<int:contact_id>/', methods=['GET', 'POST'])
@csrf.exempt
def contact_detail(contact_id):
username = request.args.get('username', '')
password = request.args.get('password', '')
sip_endpoint, sip_domain = get_sip_endpoint(username, password)
if sip_domain:
if request.method == "POST":
data = request.form.to_dict()
log.info(f"request data is: {data}")
# validate phone number 1
if not validate_phonenumber(data.get("phonenumber_1")):
return {"status": False, "error": "Enter a valid 10 digit Phonenumber"}
# validate phone number 2, if exists
if data.get("phonenumber_2"):
if not validate_phonenumber(data.get("phonenumber_2")):
return {"status": False, "error": "Enter a valid 10 digit Phonenumber"}
data["phonenumber_2"] = validate_phonenumber(data.get("phonenumber_2"))
data["phonenumber_1"] = validate_phonenumber(data.get("phonenumber_1"))
# Assign Agent
if not data.get("agent_assigned"):
data['agent_id'] = None
data['agent_assigned'] = None
else:
agent = Agent.query.filter(
Agent.id == data.get("agent_assigned")
).first()
data['agent_id'] = agent.id if agent else None
data['agent_assigned'] = agent.full_name if agent else None
result = Contact.mobile_api_update(contact_id, sip_endpoint.partnership_account_id, **data)
if result:
return {"status": True}
return {"status": False, "error": "Something Went Wrong"}
call_uri = sip_domain.call_uri
contact = Contact.query.filter(
and_(Contact.id == contact_id,
Contact.partnership_account_id == sip_endpoint.partnership_account_id,
Contact.is_deactivated.is_(False)),
).first()
all_agents = Agent.query.filter(
Agent.partnership_account_id == sip_endpoint.partnership_account_id,
Agent.is_deactivated.is_(False)
).all()
else:
call_uri = ''
contact = None
all_agents = []
template_name = 'mobile/contact_detail_ios.jinja2' if is_iphone(request) \
else 'mobile/contact_detail.jinja2'
partnership_account_id = sip_endpoint.partnership_account_id if sip_endpoint else None
bdc_status_list = BdcStatuses.get_assigned_status_list(partnership_account_id)
status_list = Status.get_assigned_status_list(partnership_account_id)
# populate marketing result
marketing_list = MarketingSources.get_assigned_source_list(partnership_account_id)
return render_template(
template_name,
contact=contact,
call_uri=call_uri,
agents=all_agents,
bdc_status_choices={item.status: item.display_name for item in bdc_status_list},
status_choices={item.status: item.display_name for item in status_list},
sources_choices={item.source: item.display_name for item in marketing_list},
time_zone=time_zone
)
@mobile.route('/mobile/timeline/<int:contact_id>', methods=['GET'])
@csrf.exempt
def contact_timeline(contact_id):
username = request.args.get('username', '')
password = request.args.get('password', '')
sip_endpoint, sip_domain = get_sip_endpoint(username, password)
if sip_domain:
# query expression for notes
notes = db.session.query(
func.concat("note").label("type"),
func.concat("Note added").label("title"),
null().label("status"),
ContactNotes.created_on.label('created_on'),
func.concat("Added by ", ContactNotes.name).label("user"),
ContactNotes.text.label('content'),
null().label('content_url'),
).filter(
and_(ContactNotes.partnership_account_id == sip_endpoint.partnership_account_id,
ContactNotes.is_enabled.is_(True),
ContactNotes.contact_id == contact_id)
)
# query expression for Messages
messages = db.session.query(
case(
[
(Message.type == 'mms', 'mms'),
],
else_='message'
).label("type"),
# func.concat("message").label("type"),
case(
[
(Message.status == 'received', 'Message received'),
(Message.status == 'sent', 'Messsage sent'),
],
else_=''
).label("title"),
# func.concat("A Message was ", Message.status).label("title"),
Message.status.label("status"),
Message.created_on.label('created_on'),
func.concat(Message.status, " by ", Message.message_source).label("user"),
Message.body_text.label("content"),
case(
[
(Message.type == 'mms', Message.media_url),
],
else_=null()
).label("content_url"),
).filter(
Message.contact_id == contact_id,
Message.partnership_account_id == sip_endpoint.partnership_account_id
)
# query expression for Leads
calls = db.session.query(
func.concat("call").label("type"),
case(
[
(Lead.call_type == 'inbound', 'Call received'),
(Lead.call_type == 'outbound', 'Call made'),
],
else_=''
).label("title"),
# func.concat("A call was ", Lead.status).label("title"),
Lead.status.label("status"),
Lead.created_on.label('created_on'),
case(
[
(
and_(Lead.agent == None, Lead.call_type == 'inbound'),
func.concat("Received by ", Lead.source_name)
),
(
and_(Lead.agent != None, Lead.call_type == 'inbound'),
func.concat("Received by ", Lead.agent_full_name)
),
(
and_(Lead.agent != None, Lead.call_type == 'outbound'),
func.concat("Called by ", Lead.agent_full_name)
),
(
and_(Lead.agent == None, Lead.call_type == 'outbound'),
func.concat("Called by ", Lead.source_name)
),
],
else_=''
).label("user"),
# func.concat(Lead.status, " by ", Message.source).label("user"),
null().label('content'),
Lead.recording_url.label("content_url")
).filter(
Lead.partnership_account_id == sip_endpoint.partnership_account_id,
Lead.contact_id == contact_id,
or_(Lead.status == 'missed', Lead.status == 'completed')
)
# query expression for FormLeads
form_leads = db.session.query(
func.concat("form").label("type"),
func.concat("Form lead submitted").label("title"),
null().label("status"),
FormLead.created_on.label('created_on'),
func.concat("Submitted using ", FormLead.source).label("user"),
null().label('content'),
null().label("content_url")
).filter(
FormLead.partnership_account_id == sip_endpoint.partnership_account_id,
FormLead.contact_id == contact_id,
)
# Combining all data using union
time_lines = notes.union_all(calls, messages, form_leads).order_by(
desc('created_on')
).limit(25).all()
else:
time_lines = []
template_name = 'mobile/timeline_ios.jinja2' if is_iphone(request) \
else 'mobile/timeline.jinja2'
time_lines = [row._asdict() for row in time_lines]
char_list = '",{,},[,]'.split(",")
for timeline in time_lines:
if timeline['type'] == "mms":
media_string = timeline.get("content_url", "")
for char in char_list:
media_string = media_string.replace(char, "")
timeline["content_url"] = media_string.split(",")
return render_template(
template_name,
timelines=time_lines,
time_zone=time_zone
)