HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall_new/buyercall/buyercall/blueprints/sms/tasks.py
import os
import logging as log
import traceback

from buyercall.app import create_celery_app
from buyercall.lib.flask_mailplus import _try_renderer_template
from buyercall.lib.util_ses_email import send_ses_email
from boto.s3.key import Key
import boto
import uuid
from flask import current_app
from flask_babel import lazy_gettext as _
from buyercall.blueprints.partnership.models import PartnershipAccount, Partnership
from buyercall.blueprints.contacts.models import Contact
from buyercall.blueprints.filters import format_phone_number

celery = create_celery_app(current_app)


@celery.task
def upload_mms_image(partner_name, partnership_account_id, file_name, file_path, **kwargs):
    """ Upload the given MMS Image file to S3.
    """
    file_extension = file_name.rsplit('.', 1)[1]
    if file_extension in ['jpg', 'jpeg', 'png', 'tiff', 'tif', 'bmp']:
        content_type = 'image/' + file_extension
    else:
        content_type = 'application/octet-stream'
    hex_value = uuid.uuid4().hex
    try:
        key = '{}_{}/{}_{}_{}'.format(
            partner_name, partnership_account_id, hex_value, 'mms', file_name)
    finally:
        log.debug('Could not set key for mms attachment url')
    bucket = celery.conf['MMS_BUCKET']
    from buyercall.lib.util_boto3_s3 import upload_file
    mms_url = upload_file(file_path, bucket, key, content_type)
    os.remove(file_path)
    return mms_url


@celery.task
def send_sms_rule_email(email_address, email_name, email_body, partnership_account_id, lead, **kwargs):

    partner_account = PartnershipAccount.query.filter(PartnershipAccount.id == partnership_account_id).first()
    partner = Partnership.query.filter(Partnership.id == partner_account.partnership_id).first()

    partnership_name = partner.name
    partnership_logo = partner.logo

    # Get the lead phone number that sent message to trigger email
    new_lead = lead.to_dict()
    msg_lead_phone = format_phone_number(new_lead.get("From", ""))
    msg_lead_body = new_lead.get("Body", "")

    # Get the necessary contact lead info if exist
    contact_lead = Contact.query.filter(Contact.phonenumber_1 == msg_lead_phone)\
        .filter(Contact.partnership_account_id == partnership_account_id).first()

    if contact_lead:
        log.info('The contact lead id that triggered the email is: {}'.format(contact_lead.id))
        msg_lead_name = '{} {}'.format(contact_lead.firstname, contact_lead.lastname)
        msg_lead_email = contact_lead.email
        msg_lead_number = contact_lead.phonenumber_1
    else:
        log.info('There is no contact yet for phone number {}'.format(msg_lead_phone))
        msg_lead_name = ''
        msg_lead_email = ''
        msg_lead_number = msg_lead_phone

    email_to = list()
    email_to.append(email_address)

    ctx = {
        'name': email_name,
        'body': email_body,
        'company': partnership_name,
        'partner_logo': partnership_logo,
        'lead_name': msg_lead_name,
        'lead_email': msg_lead_email,
        'lead_number': msg_lead_number,
        'lead_message': msg_lead_body
    }

    try:
        # Render html template for email
        sms_rule_email_template = _try_renderer_template('sms/mail/sms_rule_email', ext='html', **ctx)
        send_ses_email(recipients=email_to,
                       p_id=partner.id,
                       subject='An Email triggered by an SMS',
                       html=sms_rule_email_template
                       )
    except Exception as e:
        log.error(traceback.format_exc())