File: //lib/python3/dist-packages/twisted/application/test/__pycache__/test_internet.cpython-310.pyc
o
�bO� � @ st d Z ddlZddlmZ ddlmZ ddlmZ ddlm Z m
Z
mZ ddlm
Z
ddlmZmZ dd lmZmZmZmZmZ dd
lmZmZ ddlmZ ddlmZmZ dd
lm Z ddl!m"Z" ddl#m$Z$m%Z% dd� Z&ee�G dd� d��Z'eee'� ee�G dd� d��Z(eee'� G dd� de%�Z)G dd� de%�Z*G dd� d�Z+d$dd�Z,efdd �Z-d!Z.G d"d#� d#e$�Z/dS )%z�
Tests for (new code in) L{twisted.application.internet}.
@var AT_LEAST_ONE_ATTEMPT: At least enough seconds for L{ClientService} to make
one attempt.
� N)�implementer)�verifyClass)�internet)�
ClientService�StreamServerEndpointService�TimerService)�task)�CancelledError�Deferred)�IFileDescriptorReceiver�IHalfCloseableProtocol�IListeningPort�IStreamClientEndpoint�IStreamServerEndpoint)�Factory�Protocol)�Clock)�formatEvent�globalLogPublisher)�Failure)�StringTransport)�SynchronousTestCase�TestCasec C s dS )zM
A fake target function for testing TimerService which does nothing.
N� r r r �H/usr/lib/python3/dist-packages/twisted/application/test/test_internet.py�fakeTargetFunction) s r c @ sF e Zd ZdZdZdZdZe� ZdZ dd� Z
dd� Zdd � Zd
d� Z
dS )�
FakeServeraq
In-memory implementation of L{IStreamServerEndpoint}.
@ivar result: The L{Deferred} resulting from the call to C{listen}, after
C{listen} has been called.
@ivar factory: The factory passed to C{listen}.
@ivar cancelException: The exception to errback C{self.result} when it is
cancelled.
@ivar port: The L{IListeningPort} which C{listen}'s L{Deferred} will fire
with.
@ivar listenAttempts: The number of times C{listen} has been invoked.
@ivar failImmediately: If set, the exception to fail the L{Deferred}
returned from C{listen} before it is returned.
Nr c C s t � | _d S �N)�FakePort�port��selfr r r �__init__L s zFakeServer.__init__c sF � j d7 _ |� _t� fdd�d�� _� jdur � j�� j� � jS )z�
Return a Deferred and store it for future use. (Implementation of
L{IStreamServerEndpoint}).
@param factory: the factory to listen with
@return: a L{Deferred} stored in L{FakeServer.result}
� c s | � � j�S r )�errback�cancelException)�dr r r �<lambda>Z � z#FakeServer.listen.<locals>.<lambda>)� cancellerN)�listenAttempts�factoryr
�result�failImmediatelyr$ )r! r+ r r r �listenO s
zFakeServer.listenc C s | j �| j� dS )z�
Test code should invoke this method after causing C{listen} to be
invoked in order to fire the L{Deferred} previously returned from
C{listen}.
N)r, �callbackr r r r r �startedListening_ � zFakeServer.startedListeningc C s | j j�d� dS )a
Test code should invoke this method after causing C{stopListening} to
be invoked on the port fired from the L{Deferred} returned from
C{listen} in order to cause the L{Deferred} returned from
C{stopListening} to fire.
N)r �deferredr/ r r r r �stoppedListeningg s zFakeServer.stoppedListening)�__name__�
__module__�__qualname__�__doc__r, r+ r- r r% r* r"