File: //usr/lib/python3/dist-packages/twisted/conch/test/__pycache__/test_endpoints.cpython-310.pyc
o
�bP� � @ s� d Z ddlZddlmZ ddlmZ ddlmZ ddl m
Z
mZ ddlZddl
mZmZmZ ddlmZ dd lmZ dd
lmZ ddlmZ ddlmZmZmZmZ dd
lmZm Z m!Z!m"Z" ddl#m$Z$m%Z% ddl&m'Z'm(Z( ddl)m*Z*m+Z+ ddl,m-Z- ddl.m/Z/ ddl0m1Z1 ddl2m3Z3 ddl4m5Z5 ddl6m7Z7m8Z8 ddl9m:Z: e5d��re5d��rddl;m<Z< ddl=m>Z>m?Z? ddl@mAZAmBZB ddlCmDZDmEZEmFZFmGZGmHZHmIZImJZJ ddlKmLZL ddlMmNZN dd lOmPZP dd!lQmRZR dd"lSmTZT dd#lUmVZV dd$lWmXZX dd%lYmZZZ dd&l[m\Z\m]Z]m^Z^m_Z_ nd'Z`eaZTeaZZeaZReaZVeaZPeaZNeaZBeaZ?eaZ<dd(lbmcZcmdZd dd)l6meZe G d*d+� d+ec�ZfG d,d-� d-eP�ZgG d.d/� d/eP�ZhG d0d1� d1eP�ZiG d2d3� d3�ZjG d4d5� d5e'�ZkG d6d7� d7�ZlG d8d9� d9eZ�ZmG d:d;� d;eT�Znee$�G d<d=� d=��Zoee%�G d>d?� d?��ZpG d@dA� dA�ZqG dBdC� dCe:eq�ZrG dDdE� dEe:eq�ZsG dFdG� dGe:�ZtG dHdI� dI�ZuG dJdK� dKe:�ZvdS )Lz'
Tests for L{twisted.conch.endpoints}.
� N)�ENOSYS)�pack)�implementer)�verifyClass�verifyObject)�
ConchError�HostKeyChanged�UserRejectedKey)�
IConchUser)�'InMemoryUsernamePasswordDatabaseDontUse)�Portal)�IPv4Address)�CancelledError�Deferred�fail�succeed)�ConnectingCancelledError�ConnectionDone�ConnectionRefusedError�ProcessTerminated)�IAddress�IStreamClientEndpoint)�Factory�Protocol)�LogLevel�globalLogPublisher)�
networkString)�Failure)�FilePath)�msg)�
requireModule)�EventLoggingObserver�MemoryReactorClock)�TestCase�cryptographyzpyasn1.type)� ConchUser)�InMemorySSHKeyDB�SSHPublicKeyChecker)� ConsoleUI�KnownHostsFile)�AuthenticationFailed�SSHCommandAddress�SSHCommandClientEndpoint�_ExistingConnectionHelper�_ISSHConnectionCreator�_NewConnectionHelper� _ReadFile)�common)�SSHAgentServer)�
SSHChannel)�
SSHConnection)�
SSHFactory)�Key)�SSHClientTransport)�SSHUserAuthServer)�privateDSA_openssh�privateRSA_openssh� privateRSA_openssh_encrypted_aes�publicRSA_opensshz%can't run w/o cryptography and pyasn1)�
FakeTransport�connect)�StringTransportc @ s e Zd ZdZdZdd� ZdS )�AbortableFakeTransportzC
A L{FakeTransport} with added C{abortConnection} support.
Fc C �
d| _ dS )z}
Abort the connection in a fake manner.
This should really be implemented in the underlying module.
TN��aborted��self� rF �C/usr/lib/python3/dist-packages/twisted/conch/test/test_endpoints.py�abortConnectionZ �
z&AbortableFakeTransport.abortConnectionN)�__name__�
__module__�__qualname__�__doc__rC rH rF rF rF rG r@ S s r@ c @ � e Zd ZdZdd� ZdS )�BrokenExecSessionzO
L{BrokenExecSession} is a session on which exec requests always fail.
c C � dS )z�
Fail all exec requests.
@param data: Information about what is being executed.
@type data: L{bytes}
@return: C{0} to indicate failure
@rtype: L{int}
r rF �rE �datarF rF rG �request_exech �
zBrokenExecSession.request_execN�rJ rK rL rM rS rF rF rF rG rO c � rO c @ rN )�WorkingExecSessionzS
L{WorkingExecSession} is a session on which exec requests always succeed.
c C rP )z�
Succeed all exec requests.
@param data: Information about what is being executed.
@type data: L{bytes}
@return: C{1} to indicate success
@rtype: L{int}
� rF rQ rF rF rG rS z rT zWorkingExecSession.request_execNrU rF rF rF rG rW u rV rW c @ rN )�UnsatisfiedExecSessionz�
L{UnsatisfiedExecSession} is a session on which exec requests are always
delayed indefinitely, never succeeding or failing.
c C s t � S )z�
Delay all exec requests indefinitely.
@param data: Information about what is being executed.
@type data: L{bytes}
@return: A L{Deferred} which will never fire.
@rtype: L{Deferred}
)r rQ rF rF rG rS � s
z#UnsatisfiedExecSession.request_execNrU rF rF rF rG rY � s rY c @ s e Zd Zdd� Zdd� ZdS )�TrivialRealmc C s
i | _ d S �N)�
channelLookuprD rF rF rG �__init__� �
zTrivialRealm.__init__c G s t � }| j|_t|dd� fS )Nc S � d S r[ rF rF rF rF rG �<lambda>� � z,TrivialRealm.requestAvatar.<locals>.<lambda>)r% r\ r
)rE �avatarId�mind�
interfaces�avatarrF rF rG �
requestAvatar� s zTrivialRealm.requestAvatarN)rJ rK rL r] rf rF rF rF rG rZ � s rZ c @ s e Zd ZdZdd� ZdS )�AddressSpyFactoryNc C s || _ t�| |�S r[ )�addressr �
buildProtocol)rE rh rF rF rG ri � s zAddressSpyFactory.buildProtocol)rJ rK rL rh ri rF rF rF rG rg � � rg c @ s$ e Zd Zdd� Zdd� Zdd� ZdS )�FixedResponseUIc C s
|| _ d S r[ ��result)rE rm rF rF rG r] � r^ zFixedResponseUI.__init__c C s
t | j�S r[ )r rm �rE �textrF rF rG �prompt� r^ zFixedResponseUI.promptc C r_ r[ rF rn rF rF rG �warn� s zFixedResponseUI.warnN)rJ rK rL r] rp rq rF rF rF rG rk � s rk c @ s$ e Zd Zedd� �Zedd� �ZdS )�FakeClockSSHUserAuthServerc C �
| j jjS )zy
Use the C{attemptsBeforeDisconnect} value defined by the factory to make
it easier to override.
)� transport�factory�attemptsBeforeDisconnectrD rF rF rG rv � rI z3FakeClockSSHUserAuthServer.attemptsBeforeDisconnectc C rs )z�
Use the reactor defined by the factory, rather than the default global
reactor, to simplify testing (by allowing an alternate implementation
to be supplied by tests).
)rt ru �reactorrD rF rF rG �clock� s
z FakeClockSSHUserAuthServer.clockN)rJ rK rL �propertyrv rx rF rF rF rG rr � s
rr c @ s2 e Zd Zedd� �Zedd� �Zeed�ZdZ dS )�CommandFactoryc C � dt jtd�iS �N� ssh-rsa)rR )r6 �
fromStringr<