File event_loop.patch of Package python-aiorpcX

From: Jiri Slaby <jslaby@suse.cz>
Date: Mon, 30 Sep 2024 08:42:26 +0200
Subject: don't use event_loop pytest-asyncio fixture
References: build-fix
Patch-mainline: reported, https://github.com/kyuupichan/aiorpcX/issues/55

event_loop fixture was deprecated and removed from pytest-asyncio 1.0.0.
Use the internal _function_event_loop for the time being. Hopefully this
will be fixed upstream eventually...

Signed-off-by: Jiri Slaby <jslaby@suse.cz>
---
 tests/test_session.py    |   24 ++++++++++++------------
 tests/test_socks.py      |    6 +++---
 tests/test_unixsocket.py |   10 +++++-----
 3 files changed, 20 insertions(+), 20 deletions(-)

--- a/tests/test_session.py
+++ b/tests/test_session.py
@@ -86,21 +86,21 @@ def caplog_count(caplog, message):
 
 
 @pytest.fixture
-def server_port(unused_tcp_port, event_loop):
-    coro = serve_rs(MyServerSession, 'localhost', unused_tcp_port, loop=event_loop)
-    server = event_loop.run_until_complete(coro)
+def server_port(unused_tcp_port, _function_event_loop):
+    coro = serve_rs(MyServerSession, 'localhost', unused_tcp_port, loop=_function_event_loop)
+    server = _function_event_loop.run_until_complete(coro)
     yield unused_tcp_port
     if hasattr(asyncio, 'all_tasks'):
-        tasks = asyncio.all_tasks(event_loop)
+        tasks = asyncio.all_tasks(_function_event_loop)
     else:
-        tasks = asyncio.Task.all_tasks(loop=event_loop)
+        tasks = asyncio.Task.all_tasks(loop=_function_event_loop)
 
     async def close_all():
         server.close()
         await server.wait_closed()
         if tasks:
             await asyncio.wait(tasks)
-    event_loop.run_until_complete(close_all())
+    _function_event_loop.run_until_complete(close_all())
 
 
 class TestRPCSession:
@@ -765,21 +765,21 @@ class MessageServer(MessageSession):
 
 
 @pytest.fixture
-def msg_server_port(event_loop, unused_tcp_port):
-    coro = serve_rs(MessageServer, 'localhost', unused_tcp_port, loop=event_loop)
-    server = event_loop.run_until_complete(coro)
+def msg_server_port(_function_event_loop, unused_tcp_port):
+    coro = serve_rs(MessageServer, 'localhost', unused_tcp_port, loop=_function_event_loop)
+    server = _function_event_loop.run_until_complete(coro)
     yield unused_tcp_port
     if hasattr(asyncio, 'all_tasks'):
-        tasks = asyncio.all_tasks(event_loop)
+        tasks = asyncio.all_tasks(_function_event_loop)
     else:
-        tasks = asyncio.Task.all_tasks(loop=event_loop)
+        tasks = asyncio.Task.all_tasks(loop=_function_event_loop)
 
     async def close_all():
         server.close()
         await server.wait_closed()
         if tasks:
             await asyncio.wait(tasks)
-    event_loop.run_until_complete(close_all())
+    _function_event_loop.run_until_complete(close_all())
 
 
 def connect_message_session(host, port, proxy=None, framer=None):
--- a/tests/test_socks.py
+++ b/tests/test_socks.py
@@ -482,10 +482,10 @@ localhosts = ['127.0.0.1', '::1', 'local
 
 
 @pytest.fixture(params=localhosts)
-def proxy_address(request, event_loop, unused_tcp_port):
+def proxy_address(request, _function_event_loop, unused_tcp_port):
     host = request.param
-    coro = event_loop.create_server(FakeServer, host=host, port=unused_tcp_port)
-    server = event_loop.run_until_complete(coro)
+    coro = _function_event_loop.create_server(FakeServer, host=host, port=unused_tcp_port)
+    server = _function_event_loop.run_until_complete(coro)
     yield NetAddress(host, unused_tcp_port)
     server.close()
 
--- a/tests/test_unixsocket.py
+++ b/tests/test_unixsocket.py
@@ -11,20 +11,20 @@ if sys.platform.startswith("win"):
 
 
 @pytest.fixture
-def us_server(event_loop):
+def us_server(_function_event_loop):
     with tempfile.TemporaryDirectory() as tmp_folder:
         socket_path = path.join(tmp_folder, 'test.socket')
-        coro = serve_us(MyServerSession, socket_path, loop=event_loop)
-        server = event_loop.run_until_complete(coro)
+        coro = serve_us(MyServerSession, socket_path, loop=_function_event_loop)
+        server = _function_event_loop.run_until_complete(coro)
         yield socket_path
-        tasks = asyncio.all_tasks(event_loop)
+        tasks = asyncio.all_tasks(_function_event_loop)
 
         async def close_all():
             server.close()
             await server.wait_closed()
             if tasks:
                 await asyncio.wait(tasks)
-        event_loop.run_until_complete(close_all())
+        _function_event_loop.run_until_complete(close_all())
 
 
 class TestUSTransport:
openSUSE Build Service is sponsored by