HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //proc/1233/cwd/usr/lib/python3/dist-packages/websocket/tests/__pycache__/test_app.cpython-310.pyc
o

�a�@s�dZddlZddlZddlZddlZddlZej�dd�dkZ	ej�dd�Z
e
dkZdZGd	d
�d
ej
�Zedkr?e��dSdS)ab
test_app.py
websocket - WebSocket client library for Python

Copyright 2021 engn33r

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
�N�TEST_WITH_INTERNET�0�1�LOCAL_WS_SERVER_PORTz-1Tc@s�eZdZGdd�d�Zdd�Zdd�Ze�ed�dd	��Z	e�e
d
�dd��Ze�e
d
�d
d��Ze�e
d
�dd��Z
e�e
d
�dd��Ze�e
d
�dd��Ze�e
d
�dd��Ze�e
d
�dd��Ze�e
d
�dd��ZdS)�WebSocketAppTestc@seZdZdZdS)zWebSocketAppTest.NotSetYetzI A marker class for signalling that a value hasn't been set yet.
        N)�__name__�
__module__�__qualname__�__doc__�rr�:/usr/lib/python3/dist-packages/websocket/tests/test_app.py�	NotSetYet&sr
cCs,t�t�t��t_t��t_t��t_dS�N)�ws�enableTrace�	TRACEABLErr
�keep_running_open�keep_running_close�get_mask_key_id��selfrrr�setUp*s


zWebSocketAppTest.setUpcCs"t��t_t��t_t��t_dSr)rr
rrrrrrr�tearDown1s

zWebSocketAppTest.tearDownz/Tests using local websocket server are disabledcs>dd�}�fdd�}dd�}tjdt|||d�}|��d	S)
z| A WebSocketApp should keep running as long as its self.keep_running
        is not False (in the boolean context).
        c_s|�d�|jt_d|_dS)zn Set the keep_running flag for later inspection and immediately
            close the connection.
            zhello!FN)�send�keep_runningrr�r�args�kwargsrrr�on_open<s

z1WebSocketAppTest.testKeepRunning.<locals>.on_opencst|����dSr��print�close)�wsapp�messagerrr�
on_messageD�z4WebSocketAppTest.testKeepRunning.<locals>.on_messagec_s|jt_dS)z< Set the keep_running flag for the test to use.
            N)rrrrrrr�on_closeHsz2WebSocketAppTest.testKeepRunning.<locals>.on_closezws://127.0.0.1:)rr&r$N)r�WebSocketAppr�run_forever)rrr$r&�apprrr�testKeepRunning6s
z WebSocketAppTest.testKeepRunningz%Internet-requiring tests are disabledcCs0dd�}tjd|d�}|�t|j�t|��dS)zi A WebSocketApp should forward the received mask_key function down
        to the actual socket.
        cSsdS)Nzrrrrr�my_mask_key_funcVsz:WebSocketAppTest.testSockMaskKey.<locals>.my_mask_key_funczwss://stream.meetup.com/2/rsvps)�get_mask_keyN)rr'�assertEqual�idr,)rr+r)rrr�testSockMaskKeyPsz WebSocketAppTest.testSockMaskKeycCsBdd�}dd�}tjd||d�}|jtj|jddd	tjid
�dS)zA Test exception handling if ping_interval < ping_timeout
        cS�td�|��dS�NzGot a ping!r�r)�msgrrr�on_pingdr%zDWebSocketAppTest.testInvalidPingIntervalPingTimeout.<locals>.on_pingcSr0�NzGot a pong! No need to respondrr2rrr�on_ponghr%zDWebSocketAppTest.testInvalidPingIntervalPingTimeout.<locals>.on_pong�wss://api-pub.bitfinex.com/ws/1�r4r6���	cert_reqs��
ping_interval�ping_timeout�ssloptN�rr'�assertRaises�WebSocketExceptionr(�ssl�	CERT_NONE�rr4r6r)rrr�"testInvalidPingIntervalPingTimeout_s"z3WebSocketAppTest.testInvalidPingIntervalPingTimeoutcCs:dd�}dd�}tjd||d�}|jddd	tjid
�dS)z5 Test WebSocketApp proper ping functionality
        cSr0r1rr2rrrr4tr%z2WebSocketAppTest.testPingInterval.<locals>.on_pingcSr0r5rr2rrrr6xr%z2WebSocketAppTest.testPingInterval.<locals>.on_pongr7r8r:r9r;r<N)rr'r(rCrDrErrr�testPingIntervalosz!WebSocketAppTest.testPingIntervalcC�t�d�}|jdddd�dS)z( Test WebSocketApp close opcode
        �'wss://tsock.us1.twilio.com/v3/wsconnectr:r9�Ping payload�r=r>�ping_payloadN�rr'r(�rr)rrr�testOpcodeClose�
z WebSocketAppTest.testOpcodeClosecCrH)z) Test WebSocketApp binary opcode
        z'streaming.vn.teslamotors.com/streaming/r:r9rJrKNrMrNrrr�testOpcodeBinary�rPz!WebSocketAppTest.testOpcodeBinarycC�*t�d�}|jtj|jddtjid�dS)z; A WebSocketApp handling of negative ping_interval
        r7���r;)r=r?Nr@rNrrr�testBadPingInterval��
 z$WebSocketAppTest.testBadPingIntervalcCrR)z: A WebSocketApp handling of negative ping_timeout
        r7���r;)r>r?Nr@rNrrr�testBadPingTimeout�rUz#WebSocketAppTest.testBadPingTimeoutcCs�dd�}tjd|d�}tjtjjdd�}|�ddg|�|��tjtjjd	d�}|�d
d
g|�|��t�d�}tjtjjd	d�}|�d
d
g|�|��|jtj|jdd�d
S)
zU Test extraction of close frame status code and close reason in WebSocketApp
        cSstd�dS)Nzon_close reached)r )r"�close_status_code�	close_msgrrrr&�sz6WebSocketAppTest.testCloseStatusCode.<locals>.on_closerI)r&s�no-init-from-client)�opcode�datai�zno-init-from-client�Nztest if connection is closed)r[)	rr'�ABNF�OPCODE_CLOSEr-�_get_close_argsrA�"WebSocketConnectionClosedExceptionr)rr&r)�
closeframe�app2rrr�testCloseStatusCode�s
z$WebSocketAppTest.testCloseStatusCodeN)rrr	r
rr�unittest�
skipUnless�TEST_WITH_LOCAL_SERVERr*rr/rFrGrOrQrTrWrcrrrrr$s,
















r�__main__)r
�os�os.path�	websocketrrCrd�environ�getrrrfr�TestCaserr�mainrrrr�<module>s�