HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/env/lib/python3.10/site-packages/celery/app/__pycache__/amqp.cpython-310.pyc
o

-we�Z�@s
dZddlZddlmZddlmZddlmZddlm	Z	ddl
mZmZm
Z
mZmZmZddlmZdd	lmZdd
lmZddlmZddlmZdd
lmZddlmZddl m!Z!ddl"m#Z$dZ%dZ&dZ'edd�Z(ddd�Z)Gdd�de*�Z+Gdd�d�Z,dS)z/Sending/Receiving Messages (Kombu integration).�N)�
namedtuple)�Mapping)�	timedelta)�WeakValueDictionary)�
Connection�Consumer�Exchange�Producer�Queue�pools)�	Broadcast)�
maybe_list)�cached_property)�signals)�
anon_nodename)�saferepr)�indent)�maybe_make_aware�)�routes)�AMQP�Queues�task_messagei�zS
.> {0.name:<16} exchange={0.exchange.name}({0.exchange.type}) key={0.routing_key}
r��headers�
properties�body�
sent_event�utf-8cs�fdd�|��D�S)Ncs*i|]\}}t|t�r|���n||�qS�)�
isinstance�bytes�decode)�.0�k�v��encodingr�H/home/arjun/projects/env/lib/python3.10/site-packages/celery/app/amqp.py�
<dictcomp>%s�zutf8dict.<locals>.<dictcomp>)�items)�dr'rr&r(�utf8dict$s
�r,cs�eZdZdZdZ			d!�fdd�	Z�fdd�Z�fdd	�Zd
d�Zdd
�Z	dd�Z
dd�Zdd�Zd"dd�Z
dd�Zdd�Zdd�Zdd�Zedd ��Z�ZS)#ru�Queue name⇒ declaration mapping.

    Arguments:
        queues (Iterable): Initial list/tuple or dict of queues.
        create_missing (bool): By default any unknown queues will be
            added automatically, but if this flag is disabled the occurrence
            of unknown queues in `wanted` will raise :exc:`KeyError`.
        max_priority (int): Default x-max-priority for queues with none set.
    NTc	s�t���t�|_||_||_||_|durtn||_||_	|dur.t
|t�s.dd�|D�}|p1i}|��D]\}}t
|t
�rD|�|�n|j|fi|��q6dS)NcSsi|]}|j|�qSr)�name)r#�qrrr(r)Csz#Queues.__init__.<locals>.<dictcomp>)�super�__init__r�aliases�default_exchange�default_routing_key�create_missingr�autoexchange�max_priorityr rr*r
�add�
add_compat)	�self�queuesr2r4r5r6r3r-r.��	__class__rr(r08s
$��zQueues.__init__cs,z|j|WStyt��|�YSw�N)r1�KeyErrorr/�__getitem__�r9r-r;rr(r?Hs
�zQueues.__getitem__cs<|jr
|js
|j|_t��||�|jr||j|j<dSdSr=)r2�exchanger/�__setitem__�aliasr1)r9r-�queuer;rr(rBNs�zQueues.__setitem__cCs|jr|�|�|��St|��r=)r4r7�new_missingr>r@rrr(�__missing__UszQueues.__missing__cKs&t|t�s|j|fi|��S|�|�S)a�Add new queue.

        The first argument can either be a :class:`kombu.Queue` instance,
        or the name of a queue.  If the former the rest of the keyword
        arguments are ignored, and options are simply taken from the queue
        instance.

        Arguments:
            queue (kombu.Queue, str): Queue to add.
            exchange (kombu.Exchange, str):
                if queue is str, specifies exchange name.
            routing_key (str): if queue is str, specifies binding key.
            exchange_type (str): if queue is str, specifies type of exchange.
            **options (Any): Additional declaration options used when
                queue is a str.
        )r r
r8�_add)r9rD�kwargsrrr(r7Zs

z
Queues.addcKs>|�d|�d��|ddur||d<|�tj|fi|���S)N�routing_key�binding_key)�
setdefault�getrGr
�	from_dict)r9r-�optionsrrr(r8oszQueues.add_compatcCs`|jdus|jjdkr|j|_|js|j|_|jdur)|jdur#i|_|�|j�|||j<|S)N�)rAr-r2rIr3r6�queue_arguments�_set_max_priority)r9rDrrr(rGvs


zQueues._addcCs*d|vr|jdur|�d|ji�SdSdS)Nzx-max-priority)r6�update)r9�argsrrr(rQ�s�zQueues._set_max_priorityrcCs\|j}|sdSdd�t|���D�}|rtd�|�|�S|ddtd�|dd��|�S)z/Format routing table into string for log dumps.rOcSsg|]\}}t���|��qSr)�QUEUE_FORMAT�strip�format)r#�_r.rrr(�
<listcomp>�s�z!Queues.format.<locals>.<listcomp>�
rrN)�consume_from�sortedr*�
textindent�join)r9r�indent_first�active�inforrr(rV�s
�$z
Queues.formatcKs,|j|fi|��}|jdur||j|j<|S)z�Add new task queue that'll be consumed from.

        The queue will be active even when a subset has been selected
        using the :option:`celery worker -Q` option.
        N)r7�
_consume_fromr-)r9rDrHr.rrr(�
select_add�s
zQueues.select_addcs$|r�fdd�t|�D��_dSdS)z�Select a subset of currently defined queues to consume from.

        Arguments:
            include (Sequence[str], str): Names of queues to consume from.
        c�i|]}|�|�qSrr)r#r-�r9rr(r)�s
�z!Queues.select.<locals>.<dictcomp>N)r
ra)r9�includerrdr(�select�s

��z
Queues.selectcsN�r#t���|jdur|��fdd�|D��S�D]}|j�|d�qdSdS)z�Deselect queues so that they won't be consumed from.

        Arguments:
            exclude (Sequence[str], str): Names of queues to avoid
                consuming from.
        Nc3s�|]	}|�vr|VqdSr=r)r#r$��excluderr(�	<genexpr>�s�z"Queues.deselect.<locals>.<genexpr>)r
rarf�pop)r9rhrDrrgr(�deselect�s
�zQueues.deselectcCst||�|�|�Sr=)r
r5r@rrr(rE�szQueues.new_missingcCs|jdur|jS|Sr=)rardrrr(rZ�s
zQueues.consume_from)NNTNNN)rT)�__name__�
__module__�__qualname__�__doc__rar0r?rBrFr7r8rGrQrVrbrfrkrE�propertyrZ�
__classcell__rrr;r(r)s*�
rc@sNeZdZdZeZeZeZeZeZ	dZ
dZdZdZ
dZdd�Zedd��Zedd	��Z		d0d
d�Zd1dd
�Zdd�Zd1dd�Z									d2dd�Z							d3dd�Zdd�Zdd�Zedd��Zedd��Zejd d��Zed!d"��Zed#d$��Zejd%d$��Zed&d'��Z e Z!ed(d)��Z"ed*d+��Z#ed,d-��Z$d.d/�Z%dS)4rzApp AMQP API: app.amqp.NicCs*||_|j|jd�|_|jj�|j�dS)N)r�)�app�
as_task_v1�
as_task_v2�task_protocols�_conf�bind_to�_handle_conf_update)r9rsrrr(r0�s
�z
AMQP.__init__cC�|j|jjjSr=)rvrs�conf�
task_protocolrdrrr(�create_task_message��zAMQP.create_task_messagecC�|��Sr=)�_create_task_senderrdrrr(�send_task_message��zAMQP.send_task_messagecCsp|jj}|j}|dur|j}|dur|j}|s$|jr$t|j|j|d�f}|dur+|jn|}|�	||j||||�S)N)rArI)
rsr{�task_default_routing_key�task_create_missing_queues�task_queue_max_priority�task_default_queuer
r2r5�
queues_cls)r9r:r4r5r6r{r3rrr(r�s$
���zAMQP.QueuescCs&tj|j|p|j|j�d|�|jd�S)zReturn the current task router.r�)rs)�_routes�Routerrr:rs�either)r9r:r4rrr(r�s��zAMQP.RoutercCst�|jjj�|_dSr=)r��preparersr{�task_routes�_rtablerdrrr(�flush_routesszAMQP.flush_routescKs:|dur	|jjj}|j|f||pt|jj���d�|��S)N)�acceptr:)rsr{�accept_contentr�listr:rZ�values)r9�channelr:r��kwrrr(�TaskConsumers
���zAMQP.TaskConsumerrFc!s�|pd}|pi}t|ttf�std��t|t�std��|r<|�|d�|p*|j��}|p0|jj}t	|t
|d�|d�}t|	tj�r`|�|	d�|pN|j��}|pT|jj}t	|t
|	d�|d�}	t|t
�sk|oj|��}t|	t
�sv|	ou|	��}	|dur�t||j�}|dur�t||j�}|s�|}�fdd	�|p�gD�}id
d�d|�d
|�d|�d|�d|	�d|�d|�d|
�d||g�d|�d|�d|�d|�d|p�t��d|�d|�||d��} t| ||p�dd�||||
||d�f|r�|||||||
||	d�	d �Sdd �S)!Nr�!task args must be a list or tuple�(task keyword arguments must be a mapping�	countdown��seconds)�tz�expirescrcrr)r#�header�rNrr(r)Dsz#AMQP.as_task_v2.<locals>.<dictcomp>�lang�py�task�id�shadow�eta�group�group_index�retries�	timelimit�root_id�	parent_id�argsrepr�
kwargsrepr�origin�
ignore_result�replaced_task_nesting)�stamped_headers�stampsrO��correlation_id�reply_to)�	callbacks�errbacks�chain�chord)	�uuidr�r�r-rSrHr�r�r�r)r r��tuple�	TypeErrorr�_verify_secondsrs�now�timezonerr�numbers�Real�str�	isoformatr�argsrepr_maxsize�kwargsrepr_maxsizerr)!r9�task_idr-rSrHr�r��group_idr�r�r�r�r�r�r��
time_limit�soft_time_limit�create_sent_eventr�r�r�r�r�r�r�r�r�r�r�r�rNr�rrr�r(rus�

��

��������	�
���
��
�����������zAMQP.as_task_v2cKs|pd}|pi}|j}t|ttf�std��t|t�std��|r5|�|d�|p-|j��}|t	|d�}t|	t
j�rO|�|	d�|pG|j��}|t	|	d�}	|oT|��}|	oZ|	��}	t
i||padd�|||||||
||	|||
||f||d	�|r�||t|�t|�|
||	d
�d�Sdd�S)Nrr�r�r�r�r�rOr�)r�r�rSrHr�r�r�r�r��utcr�r�r��tasksetr�)r�r-rSrHr�r�r�r)r�r r�r�r�rr�rsr�rr�r�r�rr)r9r�r-rSrHr�r�r�r�r�r�r�r�r�r�r�r�r�r�r�r�r�r��
compat_kwargsr�rrr(rtvsf
������zAMQP.as_task_v1cCs|tkr
t|�d|����|S)Nz is out of range: )�INT_MIN�
ValueError)r9�s�whatrrr(r��szAMQP._verify_secondscs�|jjj�|jjj�|jjj�|j�|j�tjj	�
tjj
�tjj	�tjj
�tjj	�tjj
�|j
�|j�|jjj�	|jjj�
|jjj�						d����������	�
���
��fdd�	}|S)Ncsl|dur�n|}|\}}}}|
r|�|
�|r|�|�|}|dur(|dur(�}|dur<t|t�r9|�|}}n|j}|
durTz|jj}
Wn	tyOYnw|
pS�}
|durjz|jj}Wntyid}Ynw|rn|sx|dkrxd|}}n|dur�|jjp��}|p�|jp��	}|dur�|r�t|t	�s�|g}|dur��n|}|r�t
�fi|��n�}�r��
||||||||d�|j|f|||	pˆ
|pň|||
||d�	|��}�rۈ|||||d���rt|t�r��||d||d|d|d	|d
d�n�||d||d|d
|d	|dd�|�r4|�p�}|}t|t
��r!|j}|�|||d��|jd||||d�|S)N�directrO)�senderrrArI�declarerr�retry_policy)	rArI�
serializer�compression�retryr��
delivery_moder�r)r�rrrArIr�rrr�r�)r�r�r�rSrHr�r�rSrHr�)rDrArIz	task-sent)r�r�)rRr r�r-rAr��AttributeError�typerIr�dict�publishr�r)�producerr-�messagerArIrD�event_dispatcherr�r�r�r�r�r�r�
exchange_typerH�headers2rrr�qname�_rp�ret�evd�exname��after_receivers�before_receivers�default_compressor�default_delivery_mode�default_evdr2�default_policy�
default_queue�
default_retry�default_rkey�default_serializerr:�send_after_publish�send_before_publish�send_task_sent�sent_receiversrr(r��s�


������	��

�
�
��z3AMQP._create_task_sender.<locals>.send_task_message)NNNNNNNNNNNN)rsr{�task_publish_retry�task_publish_retry_policy�task_default_delivery_moder�r:r�before_task_publish�send�	receivers�after_task_publish�	task_sent�_event_dispatcherr2r��task_serializer�task_compression)r9r�rr�r(r��s0





,�bzAMQP._create_task_sendercCrzr=)r:rsr{r�rdrrr(r�0r~zAMQP.default_queuecCs|�|jjj�S)u"Queue name⇒ declaration mapping.)rrsr{�task_queuesrdrrr(r:4szAMQP.queuescCs
|�|�Sr=)r)r9r:rrr(r:9�
cCs|jdur	|��|jSr=)r�r�rdrrr(r=s
zAMQP.routescCrr=)r�rdrrr(�routerCr�zAMQP.routercCs|Sr=r)r9�valuerrr(rGscCs0|jdurtj|j��|_|jjj|j_|jSr=)�_producer_poolr�	producersrs�connection_for_write�pool�limitrdrrr(�
producer_poolKs
�zAMQP.producer_poolcCst|jjj|jjj�Sr=)rrsr{�task_default_exchange�task_default_exchange_typerdrrr(r2Ts
�zAMQP.default_exchangecCs
|jjjSr=)rsr{�
enable_utcrdrrr(r�YrzAMQP.utccCs|jjjdd�S)NF)�enabled)rs�events�
Dispatcherrdrrr(r�]szAMQP._event_dispatchercOs&d|vsd|vr|��|��|_dS)Nr�)r�r�r)r9rSrHrrr(rycs
zAMQP._handle_conf_update)NNN)NN)NNNNNNNrNNNNNNFNNNNNNNFNNNr)NNNNNNNrNNNNNNFNNNNN)&rlrmrnrorrr	�BrokerConnectionrr�r�rr5r�r�r0rr}r�r�r�r�rurtr�r�r�r:�setterrprrr
�publisher_poolr2r�r�ryrrrr(r�s�


�

	
�^
�<y









r)r)-ror��collectionsr�collections.abcr�datetimer�weakrefr�komburrrr	r
r�kombu.commonr�kombu.utils.functionalr
�kombu.utils.objectsr�celeryr�celery.utils.nodenamesr�celery.utils.safereprr�celery.utils.textrr\�celery.utils.timerrOrr��__all__r�rTrr,r�rrrrrr(�<module>s4 �