HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //usr/lib/python3/dist-packages/twisted/test/__pycache__/test_threadable.cpython-310.pyc
o

�b

�@s�dZddlZddlZddlmZzddlZWney!dZYnwdZddlm	Z	ddl
mZmZGdd	�d	�Z
e	�e
�Gd
d�de�ZGdd
�d
e�ZdS)z)
Tests for L{twisted.python.threadable}.
�N)�skipIfTF)�
threadable)�FailTest�SynchronousTestCasec@s"eZdZdgZdZdZdd�ZdS)�
TestObject�aMethod����cCsLtd�D]}|j|j|_|_|j|j|_|jdks#Jd|jf��qdS)N�
rzz == %d, not 0 as expected)�range�y�x�z)�self�i�r�>/usr/lib/python3/dist-packages/twisted/test/test_threadable.pyrs
�zTestObject.aMethodN)�__name__�
__module__�__qualname__�synchronizedr
rrrrrrrs
rc@sHeZdZdd�Zdd�Zeed�dd��Zeed�dd	��Zd
d�Z	dS)
�SynchronizationTestscCs |�tjt���t�d�dS)z�
        Reduce the CPython check interval so that thread switches happen much
        more often, hopefully exercising more possible race conditions.  Also,
        delay actual test startup until the reactor has been started.
        gH�����z>N)�
addCleanup�sys�setswitchinterval�getswitchinterval�rrrr�setUp)szSynchronizationTests.setUpcCs|�dtjj�dS)zk
        The name of a synchronized method is inaffected by the synchronization
        decorator.
        rN)�assertEqualrrrrrrr�test_synchronizedName2sz*SynchronizationTests.test_synchronizedName�!Platform does not support threadscsTt��g�tj�fdd�d�}|��|��|��dd�|�t��d�dS)z�
        L{threadable.isInIOThread} returns C{True} if and only if it is called
        in the same thread as L{threadable.registerAsIOThread}.
        cs��t���S�N)�appendr�isInIOThreadr��
foreignResultrr�<lambda>Bsz8SynchronizationTests.test_isInIOThread.<locals>.<lambda>��targetrz#Non-IO thread reported as IO threadz#IO thread reported as not IO threadN)	r�registerAsIOThread�	threading�Thread�start�join�assertFalse�
assertTruer#)r�trr$r�test_isInIOThread9s
��z&SynchronizationTests.test_isInIOThreadcsjt��g���fdd�}g}td�D]}tj|d�}|�|�|��q|D]}|��q&�r3t���dS)Nc
sRztd�D]}���qWdSty(}z��t|��WYd}~dSd}~ww�Ni�)rr�AssertionErrorr"�str)r�e��errors�orr�callMethodLotsQs
���zHSynchronizationTests.testThreadedSynchronization.<locals>.callMethodLots�r')rrr*r+r"r,r-r)rr9�threadsr
r0rr6r�testThreadedSynchronizationKs


�z0SynchronizationTests.testThreadedSynchronizationcCs t�}td�D]}|��qdSr2)rrr)rr8rrrr�testUnthreadedSynchronizationds
�z2SynchronizationTests.testUnthreadedSynchronizationN)
rrrrrr�
threadingSkipr1r<r=rrrrr(s	

rc@s&eZdZeed�dd��Zdd�ZdS)�SerializationTestsr cCs4t��}t|�}t�|�}t�|�}|�||�dSr!)r�XLock�type�pickle�dumps�loads�assertIsInstance)r�lock�lockType�
lockPickle�newLockrrr�testPicklingks


zSerializationTests.testPicklingcCs(d}t�|�}t�|d�}t�|�dS)Ns6ctwisted.python.threadable
unpickle_lock
p0
(tp1
Rp2
.�)rBrDrC)rrHrF�	newPicklerrr�testUnpicklingss
z!SerializationTests.testUnpicklingN)rrrrr>rJrMrrrrr?js
r?)�__doc__rBr�unittestrr*�ImportErrorr>�twisted.pythonr�twisted.trial.unittestrrr�synchronizerr?rrrr�<module>s �

B