HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall/venv/lib/python2.7/site-packages/twilio/task_router/__init__.py
import time
from .. import jwt
from .taskrouter_config import TaskRouterConfig
from .workflow_config import WorkflowConfig
from .workflow_ruletarget import WorkflowRuleTarget
from .workflow_rule import WorkflowRule

import warnings
warnings.simplefilter('always', DeprecationWarning)

TASK_ROUTER_BASE_URL = 'https://taskrouter.twilio.com'
TASK_ROUTER_BASE_EVENTS_URL = 'https://event-bridge.twilio.com/v1/wschannels'
TASK_ROUTER_VERSION = "v1"

REQUIRED = {'required': True}
OPTIONAL = {'required': False}


def deprecated(func):
    def log_warning(*args, **kwargs):
        # stacklevel = 2 makes the warning refer to the caller of the
        # deprecation rather than the source of deprecation itself
        warnings.warn("Call to deprecated function {0}.".
                      format(func.__name__),
                      stacklevel=2,
                      category=DeprecationWarning)
        return func(*args, **kwargs)
    return log_warning


class TaskRouterCapability(object):
    def __init__(self, account_sid, auth_token, workspace_sid, channel_id):
        self.account_sid = account_sid
        self.auth_token = auth_token
        self.policies = []

        self.workspace_sid = workspace_sid
        self.channel_id = channel_id
        self.base_url = "{0}/{1}/Workspaces/{2}".format(TASK_ROUTER_BASE_URL,
                                                        TASK_ROUTER_VERSION,
                                                        workspace_sid)

        # validate the JWT
        self.validate_jwt()

        # add permissions to GET and POST to the event-bridge channel
        self.allow_web_sockets(channel_id)

        # set up resources
        self.setup_resource()

        # add permissions to fetch the instance resource
        self.add_policy(self.resource_url, "GET", True)

    @property
    def channel_prefix(self):
        return self.channel_id[0:2]

    def setup_resource(self):
        if self.channel_prefix == "WS":
            self.resource_url = self.base_url
        elif self.channel_prefix == "WK":
            self.resource_url = self.base_url + "/Workers/" + self.channel_id

            activity_url = self.base_url + "/Activities"
            self.allow(activity_url, "GET")

            tasks_url = self.base_url + "/Tasks/**"
            self.allow(tasks_url, "GET")

            worker_reservations_url = self.resource_url + "/Reservations/**"
            self.allow(worker_reservations_url, "GET")

        elif self.channel_prefix == "WQ":
            self.resource_url = "{0}/TaskQueues/{1}".format(
                self.base_url, self.channel_id)

    def allow_web_sockets(self, channel_id):
        web_socket_url = "{0}/{1}/{2}".format(TASK_ROUTER_BASE_EVENTS_URL,
                                              self.account_sid,
                                              self.channel_id)

        self.policies.append(self.make_policy(web_socket_url, "GET", True))
        self.policies.append(self.make_policy(web_socket_url, "POST", True))

    def validate_jwt(self):
        if self.account_sid is None or self.account_sid[0:2] != "AC":
            raise ValueError('Invalid AccountSid provided: ' +
                             self.account_sid)
        if self.workspace_sid is None or self.workspace_sid[0:2] != "WS":
            raise ValueError('Invalid WorkspaceSid provided: ' +
                             self.workspace_sid)
        if self.channel_id is None:
            raise ValueError('ChannelId not provided')

        if self.channel_prefix != "WS" and self.channel_prefix != "WK" \
                and self.channel_prefix != "WQ":
            raise ValueError('Invalid ChannelId provided: ' + self.channel_id)

    def allow_fetch_subresources(self):
        self.allow(self.resource_url + "/**", "GET")

    def allow_updates(self):
        self.allow(self.resource_url, "POST")

    def allow_updates_subresources(self):
        self.allow(self.resource_url + "/**", "POST")

    def allow_delete(self):
        self.allow(self.resource_url, "DELETE")

    def allow_delete_subresources(self):
        self.allow(self.resource_url + "/**", "DELETE")

    @deprecated
    def allow_worker_fetch_attributes(self):
        if self.channel_prefix != "WK":
            raise ValueError("Deprecated func not applicable to non Worker")
        else:
            self.policies.append(self.make_policy(
                self.resource_url,
                'GET'))

    @deprecated
    def allow_worker_activity_updates(self):
        if self.channel_prefix == "WK":
            self.policies.append(self.make_policy(
                self.resource_url,
                'POST',
                True,
                post_filter={'ActivitySid': REQUIRED}))
        else:
            raise ValueError("Deprecated func not applicable to non Worker")

    @deprecated
    def allow_task_reservation_updates(self):
        if self.channel_prefix == "WK":
            tasks_url = self.base_url + "/Tasks/**"
            self.policies.append(self.make_policy(
                tasks_url,
                'POST',
                True))
        else:
            raise ValueError("Deprecated func not applicable to non Worker")

    def add_policy(self, url, method,
                   allowed, query_filter=None, post_filter=None):

        policy = self.make_policy(url, method,
                                  allowed, query_filter, post_filter)
        self.policies.append(policy)

    def allow(self, url, method, query_filter=None, post_filter=None):
        self.add_policy(url, method, True, query_filter, post_filter)

    def deny(self, url, method, query_filter=None, post_filter=None):
        self.add_policy(url, method, False, query_filter, post_filter)

    def make_policy(self, url, method,
                    allowed=True, query_filter=None, post_filter=None):

        """Create a policy dictionary for the given resource and method.
        :param str url: the resource URL to grant or deny access to
        :param str method: the HTTP method to allow or deny
        :param allowed bool: whether this request is allowed
        :param dict query_filter: specific GET parameter names
            to require or allow
        :param dict post_filter: POST parameter names
            to require or allow
        """

        return {
            'url': url,
            'method': method,
            'allow': allowed,
            'query_filter': query_filter or {},
            'post_filter': post_filter or {}
        }

    def get_resource_url(self):
        return self.resource_url

    def generate_token(self, ttl=3600):
        task_router_attributes = {
            'account_sid': self.account_sid,
            'workspace_sid': self.workspace_sid,
            'channel': self.channel_id
        }

        if self.channel_prefix == "WK":
            task_router_attributes["worker_sid"] = self.channel_id
        elif self.channel_prefix == "WQ":
            task_router_attributes["taskqueue_sid"] = self.channel_id

        return self._generate_token(ttl, task_router_attributes)

    def _generate_token(self, ttl, attributes=None):
        payload = {
            'iss': self.account_sid,
            'exp': int(time.time()) + ttl,
            'version': TASK_ROUTER_VERSION,
            'friendly_name': self.channel_id,
            'policies': self.policies,
        }

        if attributes is not None:
            payload.update(attributes)

        return jwt.encode(payload, self.auth_token, 'HS256')


class TaskRouterWorkerCapability(TaskRouterCapability):
    def __init__(self, account_sid, auth_token, workspace_sid, worker_sid):
        super(TaskRouterWorkerCapability, self).__init__(account_sid,
                                                         auth_token,
                                                         workspace_sid,
                                                         worker_sid)

        self.activity_url = self.base_url + "/Activities"
        self.reservations_url = self.base_url + "/Tasks/**"
        self.worker_reservations_url = self.resource_url + "/Reservations/**"

        # add permissions to fetch the
        # list of activities, tasks, and worker reservations
        self.allow(self.activity_url, "GET")
        self.allow(self.reservations_url, "GET")
        self.allow(self.worker_reservations_url, "GET")

    def setup_resource(self):
        self.resource_url = self.base_url + "/Workers/" + self.channel_id

    def allow_activity_updates(self):
        self.policies.append(self.make_policy(
            self.resource_url,
            'POST',
            True,
            post_filter={'ActivitySid': REQUIRED}))

    def allow_reservation_updates(self):
        self.policies.append(self.make_policy(
            self.reservations_url,
            'POST',
            True))
        self.policies.append(self.make_policy(
            self.worker_reservations_url,
            'POST',
            True))


class TaskRouterTaskQueueCapability(TaskRouterCapability):
    def setup_resource(self):
        self.resource_url = self.base_url + "/TaskQueues/" + self.channel_id


class TaskRouterWorkspaceCapability(TaskRouterCapability):
    def __init__(self, account_sid, auth_token, workspace_sid):
        super(TaskRouterWorkspaceCapability, self).__init__(account_sid,
                                                            auth_token,
                                                            workspace_sid,
                                                            workspace_sid)

    def setup_resource(self):
        self.resource_url = self.base_url