HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/env/lib/python3.10/site-packages/kombu/__pycache__/pidbox.cpython-310.pyc
o

'we�9�@s"dZddlmZddlZddlZddlmZmZddlm	Z	ddl
m
Z
ddlmZddl
m
Z
d	d
lmZmZmZmZd	dlmZd	dlmZmZd	d
lmZd	dlmZd	dlmZd	dlmZm Z d	dl!m"Z"d	dl#m$Z$dZ%dZ&dZ'ee(�Z)e)j*e)j+Z*Z+Gdd�d�Z,Gdd�d�Z-dS)zGeneric process mailbox.�)�annotationsN)�defaultdict�deque)�contextmanager)�copy)�count)�time�)�Consumer�Exchange�Producer�Queue)�LamportClock)�
maybe_declare�oid_from)�InconsistencyError)�
get_logger)�match)�maybe_evaluate�reprcall)�cached_property)�uuid�
z�A node named {node.hostname} is already using this process mailbox!

Maybe you forgot to shutdown the other node or did not do so properly?
Or if you meant to start multiple nodes on the same host please make sure
you give each node a unique node name!
)�Node�Mailboxc@s�eZdZdZdZdZdZdZdZ		ddd�Z	ddd�Z
dd	�Zd
d�Zddd
�Z
		ddd�Zddd�Zdd�Zdd�Zddd�ZeZdd�ZdS)rz
Mailbox node.NcCs:||_||_||_||_|jjj|_|duri}||_dS�N)�channel�mailbox�hostname�state�clock�adjust�adjust_clock�handlers)�selfrrrr#r�r%�E/home/arjun/projects/env/lib/python3.10/site-packages/kombu/pidbox.py�__init__8s
z
Node.__init__TcsP�j��j�}�fdd�}||_t|p�j|gf||dur!�jjn|d�|��S)Ncs|r
t�tj�d��dSdS)N)�node)�warnings�warn�W_PIDBOX_IN_USE�format)�name�messages�	consumers�r$r%r&�verify_exclusiveFs�z'Node.Consumer.<locals>.verify_exclusive)�no_ack�accept)r�	get_queuer�on_declaredr
rr3)r$rr2r3�options�queuer1r%r0r&r
Cs���z
Node.ConsumercCs||j|j<|Sr)r#�__name__)r$�funr%r%r&�handlerQszNode.handlercCstd|dd�dS)NzCannot decode message: %rr	��exc_info)�error)r$�message�excr%r%r&�on_decode_errorUszNode.on_decode_errorcCs&|j||p|jg|jd�}|��|S)N)r�	callbacksr@)r
�handle_messager@�consume)r$r�callback�consumerr%r%r&�listenXs
�zNode.listenc	
Ks�|pi}tdt|d|d�||�|r|jp|j}z|||�}Wn&ty'�tyE}ztd|dd�dt|�i}WYd}~nd}~ww|rX|j|j	|i|d|d	|d
�|S)Nz1pidbox received method %s [reply_to:%s ticket:%s]r%)�kwargszpidbox command error: %rr	r;r=�exchange�routing_key)rHrI�ticket)
�debugr�handle_call�handle_cast�
SystemExit�	Exceptionr=�repr�replyr)	r$�method�	arguments�reply_torJrG�handlerQr?r%r%r&�dispatch_s*����z
Node.dispatchcCs$|sin|}|j||jfi|��Sr)r#r�r$rRrSr%r%r&rUtszNode.handlecC�|�||�Sr�rUrWr%r%r&rLx�zNode.handle_callcCrXrrYrWr%r%r&rM{rZzNode.handle_castcCs�|�d�}|�d�}|�d�}|r|�|j�d�pd�|j}d}|r*||vr)d}n|r7|r7t|||�r6d}nd}|rC|jdi|��SdS)	N�destination�pattern�matcherr rFTr%)�getr"�headersrrrV)r$�bodyr>r[r\r]r�run_dispatchr%r%r&rB~s&


���zNode.handle_messagecKs"|jj|||||j|jjd�dS)N)r�
serializer)r�_publish_replyrrb)r$�datarHrIrJrGr%r%r&rQ�s
�z
Node.reply�NNNN)NTN�NN)NNNr)r8�
__module__�__qualname__�__doc__rrr#rrr'r
r:r@rFrVrUrLrMrB�dispatch_from_messagerQr%r%r%r&r&s.
�


�

rc@seZdZdZeZdZdZdZdZ	dZ
dZdZdgZ
dZ				d0dd	�Zd
d�Zd1dd
�Z		d1dd�Zd2dd�Zd2dd�Z		d3dd�Zdd�Zedd��Zdd�Zed4dd��Z	d4dd �Z			d5d!d"�Z				d6d$d%�Z		d3d&d'�Zd(d)�Zd*d+�Z e!d,d-��Z"ed.d/��Z#dS)7rzProcess Mailbox.z	%s.pidboxzreply.%s.pidboxN�direct�json�$@cCs�||_||_||_|durt�n||_|�|j|j�|_|�|j�|_t	t
�|_|dur/|jn||_|dur9|j
n||_
||_|	|_|
|_||_||_dSr)�	namespace�
connection�typerr �
_get_exchangerH�_get_reply_exchange�reply_exchangerr�	unclaimedr3rb�	queue_ttl�
queue_expires�reply_queue_ttl�reply_queue_expires�_producer_pool)r$rnrpror r3rb�
producer_poolrurvrwrxr%r%r&r'�s

zMailbox.__init__cCst|�}||_|Sr)rro)r$ro�boundr%r%r&�__call__�szMailbox.__call__cCs |pt��}|j|||||d�S)N)r)�socket�gethostname�node_cls)r$rrrr#r%r%r&r�szMailbox.Nodec	Cs$|sin|}|j|||d|||d�S)NT)rQ�timeoutrDr��
_broadcast)r$r[�commandrGr�rDrr%r%r&�call��
�zMailbox.callcCs|sin|}|j|||dd�S�NF)rQr�)r$r[r�rGr%r%r&�cast�szMailbox.castcCs|sin|}|j||dd�Sr�r�)r$r�rGr%r%r&�abcast�szMailbox.abcastr	c	Cs$|sin|}|j||d||||d�S)NT)rQr��limitrDrr�)r$r�rGr�r�rDrr%r%r&�
multi_call�r�zMailbox.multi_callc	Cs0|j}t|�d|jj��|j|dd|j|jd�S)N�.FT)rHrI�durable�auto_delete�expires�message_ttl)�oidr
rsr-rxrw)r$r�r%r%r&�get_reply_queue�s�zMailbox.get_reply_queuecCs|��Sr)r�r0r%r%r&�reply_queue��zMailbox.reply_queuecCs(t|�d|j�d�|jdd|j|jd�S)Nr�z.pidboxFT)rHr�r�r�r�)r
rnrHrvru)r$rr%r%r&r4�s�zMailbox.get_queueccs^�|r|VdS|jr&|j���}|VWd�dS1swYdSt|dd�VdS)NF)�auto_declare)rz�acquirer)r$�producerrr%r%r&�producer_or_acquires�
"�zMailbox.producer_or_acquirec		Ks�|p|jj}t|dddd�}|�||��3}z|j|f|||g||j��d�dd�|��Wn	ty6Yn	wWd�dSWd�dS1sJwYdS)Nrk�	transientF)�
exchange_type�
delivery_moder�)rJr T)rHrI�declarer_�retry)ro�default_channelrr��publishr �forwardr)	r$rQrHrIrJrr��opts�chanr%r%r&rc
s2��
��
����"�zMailbox._publish_replyc	Cs�||||	|
d�}|p
|jj}|j}
|r't|�|��|j||jj|jd�d�|p+|j	}|�
||��#}|j||
j|
g|j�
�|rEt�|ndd�|dd�Wd�dS1sXwYdS)N)rRrSr[r\r])rHrI)rJrTr)r r�T)rHr�r_rbr�)ror�rHrr��updatersr-r�rbr�r�r r�r)r$rprSr[�reply_ticketrr�rbr�r\r]r>r�rHr%r%r&�_publish s2���

��"�zMailbox._publishFcCs�|durt|ttf�std�t|����|
dur2t|
t�s2|dur2t|t�s2td�t|
�t|����|p5i}|r;t�p<d}|pB|jj	}
|durQ|rQ|rOt
|�pPd}|	pU|j}	|j|||||
||	|
|d�	|rp|j
|||||
d�SdS)Nz'destination must be a list/tuple not {}z.pattern and matcher must be strings not {}, {})r[r�rr�rbr\r])r�r�rDr)�
isinstance�list�tuple�
ValueErrorr,rp�strrror��lenrbr��_collect)r$r�rSr[rQr�r�rDrrbr\r]r�r�r%r%r&r�8sF����

���zMailbox._broadcastc
s|dur|j}|p|jj}|j}t||g|dd�}	g�|j�|jj�z����WSt	y1Ynw�����fdd�}
|	�
|
�zD|	�1|rKt|�pMt�D]}z	|jj
|d�WqNtjydYnw�Wd�W|�|j�S1sxwYW|�|j�dS|�|j�w)NT)r3r2csp|jj}�|d�p
d�|d�}|rt�|krdS|d��}|�kr/�r(�|���|�dS�|�|�dS)Nr rr�rJ)r_r^r�append)r`r>�headerr��this_id�r"rD�	responsesrJrtr%r&�
on_messageps
z$Mailbox._collect.<locals>.on_message)r�)r3ror�r�r
rtr r!�pop�KeyError�register_callback�ranger�drain_eventsr}r��after_reply_message_receivedr-)r$rJr�r�rDrr3r�r7rEr��ir%r�r&r�_s8�
���zMailbox._collectcCst|j||ddd�S)NFr��rpr�r�)r�exchange_fmt)r$rnrpr%r%r&rq��

�zMailbox._get_exchangecCst|j|dddd�S)NrkFr�r�)r�reply_exchange_fmt)r$rnr%r%r&rr�r�zMailbox._get_reply_exchangecCst|�Sr)rr0r%r%r&r��r�zMailbox.oidcCs
t|j�Sr)rryr0r%r%r&rz�s
zMailbox.producer_pool)
rkNNNNNNNNrmrer)Nr	NNNrf)NNNNNNNN)
NNFr	NNNNNN)$r8rgrhrirrr�r�rnrorprHrsr3rbr'r|r�r�r�r�r�rr�r4rr�rcr�r�r�rqrr�propertyr�rzr%r%r%r&r�sj
�

�


�



�
�
�(
�,
r).ri�
__future__rr}r)�collectionsrr�
contextlibrr�	itertoolsrr�r
rrr
�clocksr�commonrr�
exceptionsr�logrr]r�utils.functionalrr�
utils.objectsr�
utils.uuidr�REPLY_QUEUE_EXPIRESr+�__all__r8�loggerrKr=rrr%r%r%r&�<module>s2r