HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/env/lib/python3.10/site-packages/kombu/__pycache__/common.cpython-310.pyc
o

'we�4�@s�dZddlmZddlZddlZddlZddlmZddlm	Z	ddl
mZddlm
Z
ddlmZmZmZmZdd	lmZmZd
dlmZmZd
dlmZd
d
lmZd
dlmZdZdZ ee!�Z"da#dd�Z$dd�Z%d;dd�Z&Gdd�de�Z'dd�Z(d<dd�Z)dd �Z*d!d"�Z+d#d$�Z,d=d%d&�Z-		d=d'd(�Z.d>d)d*�Z/	d?d+d,�Z0d-d.�Z1d/d0�Z2e	d1d2��Z3d@d3d4�Z4d@d5d6�Z5dAd7d8�Z6Gd9d:�d:�Z7dS)BzCommon Utilities.�)�annotationsN)�deque)�contextmanager)�partial)�count)�
NAMESPACE_OID�uuid3�uuid4�uuid5)�ChannelError�RecoverableConnectionError�)�Exchange�Queue)�
get_logger)�registry)�uuid)	�	Broadcast�
maybe_declarer�itermessages�
send_reply�collect_replies�insured�drain_consumer�	eventloopi��cCstdurt�jatS�N)�_node_idr	�int�rr�E/home/arjun/projects/env/lib/python3.10/site-packages/kombu/common.py�get_node_id"sr cCsLd�|||t|��}z
ttt|��}W|Sty%ttt|��}Y|Sw)Nz{:x}-{:x}-{:x}-{:x})�format�id�strrr�
ValueErrorr
)�node_id�
process_id�	thread_id�instance�ent�retrrr�generate_oid)s���r+TcCs$tt�t��|rt��|�Sd|�S�Nr)r+r �os�getpid�	threading�	get_ident)r(�threadsrrr�oid_from3s
��r2cs8eZdZdZejdZ						d�fdd�	Z�ZS)	ra�Broadcast queue.

    Convenience class used to define broadcast queues.

    Every queue instance will have a unique name,
    and both the queue and exchange is configured with auto deletion.

    Arguments:
    ---------
        name (str): This is used as the name of the exchange.
        queue (str): By default a unique id is used for the queue
            name for every consumer.  You can specify a custom
            queue name here.
        unique (bool): Always create a unique queue
            even if a queue name is supplied.
        **kwargs (Any): See :class:`~kombu.Queue` for a list
            of additional keyword arguments supported.
    ))�queueNNFTc
sb|rd�|pdt��}n|pdt���}t�jd|p|||||dur$|nt|dd�d�|��dS)Nz{}.{}�bcastzbcast.�fanout)�type)�aliasr3�name�auto_delete�exchanger)r!r�super�__init__r)�selfr8r3�uniquer9r:r7�kwargs��	__class__rrr<Rs
�
�zBroadcast.__init__)NNFTNN)�__name__�
__module__�__qualname__�__doc__r�attrsr<�
__classcell__rrr@rr<s
�rcCs||jjjvSr)�
connection�client�declared_entities)�entity�channelrrr�declaration_cachedisrMFcKs |rt||fi|��St||�S)zDeclare entity (cached).)�_imaybe_declare�_maybe_declare)rKrL�retry�retry_policyrrrrms
rcCs4|j}|s|std|�d|����|�|�}|SdS)z�Make sure the channel is bound to the entity.

    :param entity: generic kombu nomenclature, generally an exchange or queue
    :param channel: channel to bind to the entity
    :return: the updated entity
    zCannot bind channel z to entity N)�is_boundr�bind)rKrLrRrrr�_ensure_channel_is_boundts�
�rTcCs�|}t||�|dur|jstd|�d���|j}d}}|jr2|jr2|jjj}t|�}||vr2dS|js9t	d��|j
|d�|durJ|rJ|�|�|durR|j|_dS)Nzchannel is None and entity z not bound.F�channel disconnected)rLT)
rTrRrrLrH�can_cache_declarationrIrJ�hashr�declare�addr8)rKrL�orig�declared�identrrrrO�s,

�

rOcKs:t||�|jjs
td��|jjjj|tfi|��||�S)NrU)rTrLrHrrI�ensurerO)rKrLrQrrrrN�s

���rNc
#s��t���fdd�}|g|pg|_|�'t|jjj||dd�D]}z���VWq ty2Yq wWd�dS1s>wYdS)z&Drain messages from consumer instance.cs��||f�dSr)�append)�body�message��accrr�
on_message�sz"drain_consumer.<locals>.on_messageT)�limit�timeout�ignore_timeoutsN)r�	callbacksrrLrHrI�popleft�
IndexError)�consumerrdrergrc�_rrarr�s�

���"�rcKs$t|jd|g|d�|��|||d�S)zIterator over messages.)�queuesrL)rdrergNr)r�Consumer)�connrLr3rdrergr?rrrr�s�rc	csN�|rt|�p	t�D]}z	|j|d�VWq
tjy$|r"|s"�Yq
wdS)aBest practice generator wrapper around ``Connection.drain_events``.

    Able to drain events forever, with a limit, and optionally ignoring
    timeout errors (a timeout of 1 is often used in environments where
    the socket can get "stuck", and is a best practice for Kombu consumers).

    ``eventloop`` is a generator.

    Examples
    --------
        >>> from kombu.common import eventloop

        >>> def run(conn):
        ...     it = eventloop(conn, timeout=1, ignore_timeouts=True)
        ...     next(it)   # one event consumed, or timed out.
        ...
        ...     for _ in eventloop(conn, timeout=1, ignore_timeouts=True):
        ...         pass  # loop forever.

    It also takes an optional limit parameter, and timeout errors
    are propagated by default::

        for _ in eventloop(connection, limit=1, timeout=1):
            pass

    See Also
    --------
        :func:`itermessages`, which is an event loop bound to one or more
        consumers, that yields any messages received.
    )reN)�ranger�drain_events�socketre)rnrdrerf�irrrr�s����rc	KsH|j|f|||d�t|jd|j�d�tj|j|jd�fi|����S)a�Send reply for request.

    Arguments:
    ---------
        exchange (kombu.Exchange, str): Reply exchange
        req (~kombu.Message): Original request, a message with
            a ``reply_to`` property.
        producer (kombu.Producer): Producer instance
        retry (bool): If true must retry according to
            the ``reply_policy`` argument.
        retry_policy (Dict): Retry settings.
        **props (Any): Extra properties.
    )r:rPrQ�reply_to�correlation_id)�routing_keyrt�
serializer�content_encoding)�publish�dict�
properties�get�serializers�type_to_name�content_typerw)r:�req�msg�producerrPrQ�propsrrrr�s��


���rc		os|�|�dd�}d}z*t|||g|�Ri|��D]\}}|s!|��d}|VqW|r2|�|j�dSdS|r=|�|j�ww)z,Generator collecting replies from ``queue``.�no_ackTFN)�
setdefaultr�ack�after_reply_message_receivedr8)	rnrLr3�argsr?r��receivedr_r`rrrrs&�
�����rcCstjd||dd�dS)Nz#Connection error: %r. Retry in %ss
T)�exc_info)�logger�error)�exc�intervalrrr�_ensure_errbacks
�r�c	cs,�zdVWdS|j|jyYdSwr)�connection_errors�channel_errors)rnrrr�_ignore_errorss��r�cOsB|rt|��||i|��Wd�S1swYt|�S)a�Ignore connection and channel errors.

    The first argument must be a connection object, or any other object
    with ``connection_error`` and ``channel_error`` attributes.

    Can be used as a function:

    .. code-block:: python

        def example(connection):
            ignore_errors(connection, consumer.channel.close)

    or as a context manager:

    .. code-block:: python

        def example(connection):
            with ignore_errors(connection):
                consumer.channel.close()


    Note:
    ----
        Connection and channel errors should be properly handled,
        and not ignored.  Using this function is only acceptable in a cleanup
        phase, like when a connection is lost or at shutdown.
    N)r�)rn�funr�r?rrr�
ignore_errors%s

 �r�cCs|r||�dSdSrr)rHrL�	on_reviverrr�revive_connectionGs�r�c
Ks�|pt}|jdd��4}|j|d�|j}tt||d�}	|j||f||	d�|��}
|
|it||d���\}}|Wd�S1sAwYdS)z�Function wrapper to handle connection errors.

    Ensures function performing broker commands completes
    despite intermittent connection failures.
    T)�block)�errback)r�)r�r�)rHN)r��acquire�ensure_connection�default_channelrr��	autoretryry)
�poolr�r�r?r�r��optsrnrL�reviver�retvalrkrrrrLs��$�rc@s@eZdZdZdZdd�Zddd�Zddd	�Zd
d�Zdd
�Z	dS)�QoSa�Thread safe increment/decrement of a channels prefetch_count.

    Arguments:
    ---------
        callback (Callable): Function used to set new prefetch count,
            e.g. ``consumer.qos`` or ``channel.basic_qos``.  Will be called
            with a single ``prefetch_count`` keyword argument.
        initial_value (int): Initial prefetch count value..

    Example:
    -------
        >>> from kombu import Consumer, Connection
        >>> connection = Connection('amqp://')
        >>> consumer = Consumer(connection)
        >>> qos = QoS(consumer.qos, initial_prefetch_count=2)
        >>> qos.update()  # set initial

        >>> qos.value
        2

        >>> def in_some_thread():
        ...     qos.increment_eventually()

        >>> def in_some_other_thread():
        ...     qos.decrement_eventually()

        >>> while 1:
        ...    if qos.prev != qos.value:
        ...        qos.update()  # prefetch changed so update.

    It can be used with any function supporting a ``prefetch_count`` keyword
    argument::

        >>> channel = connection.channel()
        >>> QoS(channel.basic_qos, 10)


        >>> def set_qos(prefetch_count):
        ...     print('prefetch count now: %r' % (prefetch_count,))
        >>> QoS(set_qos, 10)
    NcCs||_t��|_|pd|_dSr,)�callbackr/�RLock�_mutex�value)r=r��
initial_valuerrrr<�s
zQoS.__init__r
cCsZ|j�|jr|jt|d�|_Wd�|jSWd�|jS1s%wY|jS)z�Increment the value, but do not update the channels QoS.

        Note:
        ----
            The MainThread will be responsible for calling :meth:`update`
            when necessary.
        rN)r�r��max�r=�nrrr�increment_eventually�s
��
��zQoS.increment_eventuallycCsx|j�.|jr|j|8_|jdkr(d|_Wd�|jSWd�|jSWd�|jS1s4wY|jS)z�Decrement the value, but do not update the channels QoS.

        Note:
        ----
            The MainThread will be responsible for calling :meth:`update`
            when necessary.
        r
N)r�r�r�rrr�decrement_eventually�s

��
��
��zQoS.decrement_eventuallycCsH||jkr"|}|tkrt�dt�d}t�d|�|j|d�||_|S)z#Set channel prefetch_count setting.z(QoS: Disabled: prefetch_count exceeds %rrzbasic.qos: prefetch_count->%s)�prefetch_count)�prev�PREFETCH_COUNT_MAXr��warning�debugr�)r=�pcount�	new_valuerrr�set�s
�zQoS.setcCs6|j�|�|j�Wd�S1swYdS)z)Update prefetch count with current value.N)r�r�r�)r=rrr�update�s
$�z
QoS.update)r
)
rBrCrDrEr�r<r�r�r�r�rrrrr�`s*



r�)T)NF)r
NN)NNF)NFNr)NN)8rE�
__future__rr-rqr/�collectionsr�
contextlibr�	functoolsr�	itertoolsrrrrr	r
�amqprrrKrr�logr�
serializationrr|�
utils.uuid�__all__r�rBr�rr r+r2rrMrrTrOrNrrrrrr�r�r�r�rr�rrrr�<module>sV

	-



�
	(
�


"