File: //usr/lib/python3/dist-packages/jeepney/io/tests/__pycache__/test_asyncio.cpython-310.pyc
o
��a&
� @ s� d dl Z d dlZd dlZd dlmZmZ d dlmZmZ d dl m
Z
mZmZ ddl
mZ ejj ejje dd�gZed d
dd�Ze�� d
d� �Zdd� Ze�� dd� �Zdd� Zdd� Zdd� Zdd� ZdS )� N)�DBusAddress�new_method_call)�message_bus� MatchRule)�open_dbus_connection�open_dbus_router�Proxy� )�have_session_buszTests require DBus session bus)�reasonzorg.freedesktop.DBusz/org/freedesktop/DBuszorg.freedesktop.DBus.Peer)�bus_name�object_path� interfacec C sR �t dd�I d H 4 I d H �} | V W d �I d H d S 1 I d H s"w Y d S �N�SESSION��bus)r ��conn� r �?/usr/lib/python3/dist-packages/jeepney/io/tests/test_asyncio.py�
connection s �.�r c � s �| j �d�s J �d S )N�:)�unique_name�
startswith)r r r r �test_connect s �r c C sL �t dd�4 I d H �} | V W d �I d H d S 1 I d H sw Y d S r )r )�routerr r r r # s �.�r c � s8 �t td�}tj| �|�dd�I d H }|jdksJ �d S )N�Ping� ��timeoutr )r �bus_peer�asyncio�wait_for�send_and_get_reply�body)r � ping_call�replyr r r �test_send_and_get_reply( s �
�r( c � sN �t t| �}d}|�|�I d H }|dv sJ �|�|�I d H \}|du s%J �d S )Nz+io.gitlab.takluyver.jeepney.examples.Server> �r �� T)r r �RequestName�NameHasOwner)r �proxy�name�res� has_ownerr r r �
test_proxy/ s �
r2 c � s� �t t| �}d}tdtjtjdtjd�}|�d|� |�|�I d H | �|��/}|� |�I d H \}|dks7J �t
j|�� dd�I d H }|j
|d | jfksNJ �W d � d S 1 sYw Y d S )
Nz5io.gitlab.takluyver.jeepney.tests.asyncio_test_filter�signal�NameOwnerChanged)�type�senderr �member�pathr r g @r � )r r r r r r
�add_arg_condition�AddMatch�filterr, r"