File: //usr/lib/python3/dist-packages/twisted/internet/test/__pycache__/test_fdset.cpython-310.pyc
o
�b�4 � @ s� d Z ddlZddlZddlZddlmZ ddlmZ ddlm Z ddl
mZmZ ddl
mZmZ ddlmZ dd lmZ dd
lmZ dd� ZG d
d� de�Zee�G dd� d��Ze� �e�� � dS )z0
Tests for implementations of L{IReactorFDSet}.
� N)�skipIf)�implementer)�FileDescriptor)�
IReactorFDSet�IReadDescriptor)�EINPROGRESS�EWOULDBLOCK)�ReactorBuilder)�platform)�SkipTestc
C s� t � � } | �d� | �d� zQt � � }z7|�d� z
|�d| �� d f� W n tyB } z|jd tt fvr8� W Y d }~nd }~ww | �
� \}}W n tyV |�� � w W | �� ||fS | �� w )N)� 127.0.0.1r � Fr r )
�socket�bind�listen�setblocking�connect�getsockname�OSError�argsr r �accept�
BaseException�close)�serverSocket�client�e�server�addr� r �B/usr/lib/python3/dist-packages/twisted/internet/test/test_fdset.py�
socketpair s0
�����
�r c @ s� e Zd ZdZegZdd� Zdd� Zdd� Zdd � Z d
d� Z
dd
� Zdd� Zdd� Z
dd� Zdd� Zdd� Zdd� Zee�� d�dd� �Zdd� ZdS ) �ReactorFDSetTestsBuilderz>
Builder defining tests relating to L{IReactorFDSet}.
c C s* t � \}}| �|j� | �|j� ||fS )zL
Return the two sockets which make up a new TCP connection.
)r �
addCleanupr )�selfr r r r r �_connectedPair8 s
z'ReactorFDSetTestsBuilder._connectedPairc C s. | � � }| �� \}}t|�}|j|_|||fS �N)�buildReactorr$ r �fileno)r# �reactorr r �fdr r r �_simpleSetupA s
z%ReactorFDSetTestsBuilder._simpleSetupc sD | � � \�� }� �fdd�}|� _��� � |�d� | ��� dS )z�
C{reactor.addReader()} accepts an L{IReadDescriptor} provider and calls
its C{doRead} method when there may be data available on its C{fileno}.
c � �� � � ��� d S r% )�removeReader�stopr �r) r( r r �
removeAndStopR �
z>ReactorFDSetTestsBuilder.test_addReader.<locals>.removeAndStop� xN)r* �doRead� addReader�sendall�
runReactor�r# r r/ r r. r �test_addReaderK s
z'ReactorFDSetTestsBuilder.test_addReaderc s` � � � \}}}� fdd�}||_|�|� |�|� |�d� |�d|jd|j� � �|� dS )z�
L{reactor.removeReader()} accepts an L{IReadDescriptor} provider
previously passed to C{reactor.addReader()} and causes it to no longer
be monitored for input events.
c � � � d� d S �NzdoRead should not be called��failr �r# r r r; f � z8ReactorFDSetTestsBuilder.test_removeReader.<locals>.failr1 r N)r* r2 r3 r, r4 � callLaterr- r5 �r# r( r) r r; r r<