File: //home/arjun/projects/buyercall/buyercall/lib/util_boto3_sns.py
import logging
import boto3
log = logging.getLogger(__name__)
class AmazonSimpleNotificationService:
sns_client = None
def init_app(self, app):
self.sns_client = boto3.client(
'sns',
region_name=app.config.get('SES_REGION_NAME', ''),
aws_access_key_id=app.config.get('AMAZON_ACCESS_KEY', ''),
aws_secret_access_key=app.config.get('AMAZON_SECRET_KEY', '')
)
def create_topic(self, name, tags=None, attributes=None):
"""Creates a topic to which notifications can be published.
Args:
name (str): The name of the topic you want to create.
tags (list, optional): list of tags to add to a new topic.
attributes (dict, optional): A map of attributes with their corresponding values. Defaults to {}.
Returns:
dict: Contains the Amazon Resource Name (ARN) assigned to the created topic.
"""
if attributes is None:
attributes = {}
if tags is None:
tags = []
response = {'status': True, 'message': 'Success', 'data': {}}
try:
if name:
topic_response = self.sns_client.create_topic(
Name=name,
Attributes=attributes,
Tags=tags
)
response['data'] = topic_response
else:
response['status'] = False
response['message'] = 'Invalid inputs'
except Exception as ex:
log.error(str(ex))
response['status'] = False
response['message'] = str(ex)
return response
def subscribe(self, topic_arn, endpoint, protocol='https', attributes=None):
"""Subscribes an endpoint to an Amazon SNS topic.
Args:
topic_arn (str): The ARN of the topic you want to subscribe to.
endpoint (str): The endpoint that you want to receive notifications.
protocol (str, optional): The protocol that you want to use. Defaults to 'https'.
attributes (dict, optional): A map of attributes with their corresponding values. Defaults to {}.
Returns:
dict: Contains the Amazon Resource Name (ARN) assigned to the subscription.
"""
if attributes is None:
attributes = {}
response = {'status': True, 'message': 'Success', 'data': {}}
try:
if topic_arn:
topic_subscribe_response = self.sns_client.subscribe(
TopicArn=topic_arn,
Protocol=protocol,
Endpoint=endpoint,
Attributes=attributes,
ReturnSubscriptionArn=True
)
response['data'] = topic_subscribe_response
else:
response['status'] = False
response['message'] = 'Invalid topic arn'
except Exception as ex:
log.error(str(ex))
response['status'] = False
response['message'] = str(ex)
return response
def confirm_subscription(self, topic_arn, token):
""" Verifies an endpoint owner's intent to receive messages by validating the token sent
to the endpoint by an earlier Subscribe action.
Args:
topic_arn (str): The ARN of the topic for which you wish to confirm a subscription.
token (str): Short-lived token sent to an endpoint during the Subscribe action.
Returns:
dict: Contains the Amazon Resource Name (ARN) assigned to the subscription.
"""
response = {'status': True, 'message': 'Success', 'data': {}}
try:
if topic_arn:
topic_subscribe_response = self.sns_client.confirm_subscription(
TopicArn=topic_arn,
Token=token,
# AuthenticateOnUnsubscribe - Disallows unauthenticated unsubscribes of the subscription.
AuthenticateOnUnsubscribe='true'
)
response['data'] = topic_subscribe_response
else:
response['status'] = False
response['message'] = 'Invalid topic arn'
except Exception as ex:
log.error(str(ex))
response['status'] = False
response['message'] = str(ex)
return response