HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //lib/python3/dist-packages/samba/tests/__pycache__/auth_log_base.cpython-310.pyc
o

�/a��@sxdZddlZddlmZddlmZmZddlm	Z	ddl
Z
ddlZddlZddl
Z
ddlmZGdd�dejj�ZdS)	z&Tests for the Auth and AuthZ logging.
�N)�	Messaging)�MSG_AUTH_LOG�AUTH_EVENT_NAME)�LoadParm)�paramcs^eZdZedd��Zedd��Z�fdd�Zddd	�Zed
d��Zdd
�Z	dZ
dd�Z�ZS)�AuthLogTestBasecCs�t�d�}|r
t|d�}ntj��td|d�|_|j�t	�tj��}|�
d�d}|�d�d}||_dd	�}d
gi|_
||j
f|_|jj|jtd�d|_tjd|_d|_dS)
N�SERVERCONFFILE)�filename_for_non_global_lp)�)�lp_ctx�
interfacesr�/cSs$t|�t�|�}|d�|�dS)N�messages)�print�json�loads�append)�context�msgType�src�message�jsonMsg�r�;/usr/lib/python3/dist-packages/samba/tests/auth_log_base.py�messageHandler:s
z2AuthLogTestBase.setUpClass.<locals>.messageHandlerr��msg_type�SERVER)�os�getenvr�samba�tests�env_loadparmr�msg_ctx�
irpc_add_namer�get�split�
remoteAddressr�msg_handler_and_context�registerr�environ�server�
connection)�self�server_confr�client_ip_and_mask�	client_iprrrr�
setUpClass"s&




�
zAuthLogTestBase.setUpClasscCs,|jr|jj|jtd�|j�t�dSdS)Nr)r(r#�
deregisterr�irpc_remove_namer�r-rrr�
tearDownClassJs
��zAuthLogTestBase.tearDownClasscstt|���|��dS�N)�superr�setUp�discardMessagesr4��	__class__rrr8QszAuthLogTestBase.setUpNcs���fdd�}�fdd��|�_t��}|�jd�s5�j�d�t��|dkr.d�_gS|�jd�rd�_tt��jd��S)	z�Wait for all the expected messages to arrive
        The connection is passed through to keep the connection alive
        until all the logging messages have been received.
        cs$|D]
}�|�r�|�rdSqdS)NTFr)rr)�isLastExpectedMessage�isRemoterr�	completed[s
�z2AuthLogTestBase.waitForMessages.<locals>.completedcsz�jdurdSd}|ddkr|dd}n|ddkr#|dd}ndSz
|�d�}|d�jkWSty<YdSw)	NT�type�
Authorizationr'�AuthenticationF�:r
)r'r&�
IndexError)r�remote�addrr4rrr=as

�z1AuthLogTestBase.waitForMessages.<locals>.isRemoterg�������?r
N)r,�timerr#�	loop_once�list�filter)r-r<r,r>�
start_timer)r<r=r-r�waitForMessagesUs�zAuthLogTestBase.waitForMessagescCsF|j�d�t|jd�r!g|jd<|j�d�t|jd�s
dSdS)Ng����MbP?r)r#rG�lenrr4rrrr9�s

�zAuthLogTestBase.discardMessagescCsdd�}tt||��S)NcSs d|vrdS|dd}|dkS)NrAT�serviceDescription�NETLOGONr)�msg�sdrrr�is_not_netlogon�szAAuthLogTestBase.remove_netlogon_messages.<locals>.is_not_netlogon)rHrI)r-rrQrrr�remove_netlogon_messages�sz(AuthLogTestBase.remove_netlogon_messagesz<[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}cCst�|j|�Sr6)�re�match�GUID_RE)r-�guidrrr�is_guid�szAuthLogTestBase.is_guidr6)
�__name__�
__module__�__qualname__�classmethodr1r5r8rKr9rRrUrW�
__classcell__rrr:rr s
'

+

	r)�__doc__�samba.testsr �samba.messagingr�samba.dcerpc.messagingrr�samba.paramrrFrrrSrr!�TestCaserrrrr�<module>s