File streamz-pr455-ci-fixes.patch of Package python-streamz

From 7cc8ac57cae702c3a7ac3b8aed9043dad367c1a3 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Simon=20H=C3=B8xbro?= <simon.hansen@me.com>
Date: Sun, 11 Sep 2022 10:51:56 +0200
Subject: [PATCH 01/12] Using _repr_mimebundle if attribute on Output

---
 streamz/core.py | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)

Index: streamz-0.6.4/streamz/core.py
===================================================================
--- streamz-0.6.4.orig/streamz/core.py
+++ streamz-0.6.4/streamz/core.py
@@ -379,13 +379,14 @@ class Stream(APIRegisterMixin):
     __repr__ = __str__
 
     def _ipython_display_(self, **kwargs):  # pragma: no cover
+        # Since this function is only called by jupyter, this import must succeed
+        from IPython.display import HTML, display
+
         try:
             import ipywidgets
             from IPython.core.interactiveshell import InteractiveShell
             output = ipywidgets.Output(_view_count=0)
         except ImportError:
-            # since this function is only called by jupyter, this import must succeed
-            from IPython.display import display, HTML
             if hasattr(self, '_repr_html_'):
                 return display(HTML(self._repr_html_()))
             else:
@@ -420,7 +421,11 @@ class Stream(APIRegisterMixin):
 
         output.observe(remove_stream, '_view_count')
 
-        return output._ipython_display_(**kwargs)
+        if hasattr(output, "_repr_mimebundle_"):
+            data = output._repr_mimebundle_(**kwargs)
+            return display(data, raw=True)
+        else:
+            return output._ipython_display_(**kwargs)
 
     def _emit(self, x, metadata=None):
         """
@@ -1468,18 +1473,23 @@ class zip(Stream):
 
     def __init__(self, *upstreams, **kwargs):
         self.maxsize = kwargs.pop('maxsize', 10)
-        self.condition = Condition()
+        self._condition = None
         self.literals = [(i, val) for i, val in enumerate(upstreams)
                          if not isinstance(val, Stream)]
 
         self.buffers = {upstream: deque()
                         for upstream in upstreams
                         if isinstance(upstream, Stream)}
-
         upstreams2 = [upstream for upstream in upstreams if isinstance(upstream, Stream)]
 
         Stream.__init__(self, upstreams=upstreams2, **kwargs)
 
+    @property
+    def condition(self):
+        if self._condition is None:
+            self._condition = Condition()
+        return self._condition
+
     def _add_upstream(self, upstream):
         # Override method to handle setup of buffer for new stream
         self.buffers[upstream] = deque()
@@ -1876,7 +1886,7 @@ class latest(Stream):
     _graphviz_shape = 'octagon'
 
     def __init__(self, upstream, **kwargs):
-        self.condition = Condition()
+        self._condition = None
         self.next = []
         self.next_metadata = None
 
@@ -1885,6 +1895,12 @@ class latest(Stream):
 
         self.loop.add_callback(self.cb)
 
+    @property
+    def condition(self):
+        if self._condition is None:
+            self._condition = Condition()
+        return self._condition
+
     def update(self, x, who=None, metadata=None):
         if self.next_metadata:
             self._release_refs(self.next_metadata)
Index: streamz-0.6.4/streamz/tests/py3_test_core.py
===================================================================
--- streamz-0.6.4.orig/streamz/tests/py3_test_core.py
+++ streamz-0.6.4/streamz/tests/py3_test_core.py
@@ -1,16 +1,16 @@
 # flake8: noqa
+import asyncio
 from time import time
-from distributed.utils_test import loop, inc  # noqa
-from tornado import gen
+from distributed.utils_test import inc  # noqa
 
 from streamz import Stream
 
 
-def test_await_syntax(loop):  # noqa
+def test_await_syntax():  # noqa
     L = []
 
     async def write(x):
-        await gen.sleep(0.1)
+        await asyncio.sleep(0.1)
         L.append(x)
 
     async def f():
@@ -25,4 +25,4 @@ def test_await_syntax(loop):  # noqa
         assert 0.2 < stop - start < 0.4
         assert 2 <= len(L) <= 4
 
-    loop.run_sync(f)
+    asyncio.run(f())
Index: streamz-0.6.4/streamz/tests/test_core.py
===================================================================
--- streamz-0.6.4.orig/streamz/tests/test_core.py
+++ streamz-0.6.4/streamz/tests/test_core.py
@@ -1,3 +1,4 @@
+import asyncio
 from datetime import timedelta
 from functools import partial
 import itertools
@@ -12,6 +13,7 @@ import pytest
 
 from tornado.queues import Queue
 from tornado.ioloop import IOLoop
+from tornado import gen
 
 import streamz as sz
 
@@ -19,7 +21,7 @@ from streamz import RefCounter
 from streamz.sources import sink_to_file
 from streamz.utils_test import (inc, double, gen_test, tmpfile, captured_logger,   # noqa: F401
         clean, await_for, metadata, wait_for)  # noqa: F401
-from distributed.utils_test import loop   # noqa: F401
+from distributed.utils_test import loop, loop_in_thread, cleanup   # noqa: F401
 
 
 def test_basic():
@@ -1485,20 +1487,6 @@ def dont_test_stream_kwargs(clean):  # n
     sin.emit(1)
 
 
-@pytest.fixture
-def thread(loop):  # noqa: F811
-    from threading import Thread, Event
-    thread = Thread(target=loop.start)
-    thread.daemon = True
-    thread.start()
-
-    event = Event()
-    loop.add_callback(event.set)
-    event.wait()
-
-    return thread
-
-
 def test_percolate_loop_information(clean):  # noqa: F811
     source = Stream()
     assert not source.loop
@@ -1506,16 +1494,6 @@ def test_percolate_loop_information(clea
     assert source.loop is s.loop
 
 
-def test_separate_thread_without_time(loop, thread):  # noqa: F811
-    assert thread.is_alive()
-    source = Stream(loop=loop)
-    L = source.map(inc).sink_to_list()
-
-    for i in range(10):
-        source.emit(i)
-        assert L[-1] == i + 1
-
-
 def test_separate_thread_with_time(clean):  # noqa: F811
     L = []
 
Index: streamz-0.6.4/streamz/tests/test_dask.py
===================================================================
--- streamz-0.6.4.orig/streamz/tests/test_dask.py
+++ streamz-0.6.4/streamz/tests/test_dask.py
@@ -72,10 +72,10 @@ async def test_partition_then_scatter_as
     assert L == [1, 2, 3]
 
 
-def test_partition_then_scatter_sync(loop):
+def test_partition_then_scatter_sync():
     # Ensure partition w/ timeout before scatter works correctly for synchronous
     with cluster() as (s, [a, b]):
-        with Client(s['address'], loop=loop) as client:  # noqa: F841
+        with Client(s['address']) as client:  # noqa: F841
             start = time.monotonic()
             source = Stream()
             L = source.partition(2, timeout=.1).scatter().map(
@@ -164,9 +164,9 @@ async def test_accumulate(c, s, a, b):
     assert L[-1][1] == 3
 
 
-def test_sync(loop):  # noqa: F811
+def test_sync():  # noqa: F811
     with cluster() as (s, [a, b]):
-        with Client(s['address'], loop=loop) as client:  # noqa: F841
+        with Client(s['address']) as client:  # noqa: F841
             source = Stream()
             L = source.scatter().map(inc).gather().sink_to_list()
 
@@ -174,14 +174,14 @@ def test_sync(loop):  # noqa: F811
                 for i in range(10):
                     await source.emit(i, asynchronous=True)
 
-            sync(loop, f)
+            sync(client.loop, f)
 
             assert L == list(map(inc, range(10)))
 
 
-def test_sync_2(loop):  # noqa: F811
+def test_sync_2():  # noqa: F811
     with cluster() as (s, [a, b]):
-        with Client(s['address'], loop=loop):  # noqa: F841
+        with Client(s['address']):  # noqa: F841
             source = Stream()
             L = source.scatter().map(inc).gather().sink_to_list()
 
@@ -218,9 +218,9 @@ async def test_buffer(c, s, a, b):
     assert source.loop == c.loop
 
 
-def test_buffer_sync(loop):  # noqa: F811
+def test_buffer_sync():  # noqa: F811
     with cluster() as (s, [a, b]):
-        with Client(s['address'], loop=loop) as c:  # noqa: F841
+        with Client(s['address']) as c:  # noqa: F841
             source = Stream()
             buff = source.scatter().map(slowinc, delay=0.5).buffer(5)
             L = buff.gather().sink_to_list()
@@ -241,10 +241,11 @@ def test_buffer_sync(loop):  # noqa: F81
             assert L == list(map(inc, range(10)))
 
 
+@pytest.mark.asyncio
 @pytest.mark.xfail(reason='')
-async def test_stream_shares_client_loop(loop):  # noqa: F811
+async def test_stream_shares_client_loop():  # noqa: F811
     with cluster() as (s, [a, b]):
-        with Client(s['address'], loop=loop) as client:  # noqa: F841
+        with Client(s['address']) as client:  # noqa: F841
             source = Stream()
             d = source.timed_window('20ms').scatter()  # noqa: F841
             assert source.loop is client.loop
Index: streamz-0.6.4/ci/environment-py310.yml
===================================================================
--- /dev/null
+++ streamz-0.6.4/ci/environment-py310.yml
@@ -0,0 +1,27 @@
+name: test_env
+channels:
+  - conda-forge
+  - defaults
+dependencies:
+  - python=3.10
+  - pytest
+  - flake8
+  - black
+  - isort
+  - tornado
+  - toolz
+  - librdkafka
+  - dask
+  - distributed
+  - pandas
+  - python-confluent-kafka
+  - codecov
+  - coverage
+  - networkx
+  - graphviz
+  - pytest-asyncio
+  - python-graphviz
+  - bokeh
+  - ipywidgets
+  - flaky
+  - pytest-cov
Index: streamz-0.6.4/ci/environment-py39.yml
===================================================================
--- /dev/null
+++ streamz-0.6.4/ci/environment-py39.yml
@@ -0,0 +1,34 @@
+name: test_env
+channels:
+  - conda-forge
+  - defaults
+dependencies:
+  - python=3.9
+  - pytest
+  - flake8
+  - black
+  - isort
+  - tornado
+  - toolz
+  - zict
+  - six
+  - librdkafka=1.5.3
+  - dask
+  - distributed
+  - pandas
+  - python-confluent-kafka=1.5.0
+  - numpydoc
+  - sphinx
+  - sphinx_rtd_theme
+  - codecov
+  - coverage
+  - networkx
+  - graphviz
+  - pytest-asyncio
+  - python-graphviz
+  - bokeh
+  - ipython
+  - ipykernel
+  - ipywidgets
+  - flaky
+  - pytest-cov
Index: streamz-0.6.4/streamz/dataframe/tests/test_dataframes.py
===================================================================
--- streamz-0.6.4.orig/streamz/dataframe/tests/test_dataframes.py
+++ streamz-0.6.4/streamz/dataframe/tests/test_dataframes.py
@@ -219,7 +219,7 @@ def test_binary_stream_operators(stream)
 
     a.emit(df)
 
-    assert_eq(b[0], expected)
+    wait_for(lambda: b and b[0].equals(expected), 1)
 
 
 def test_index(stream):
@@ -246,7 +246,7 @@ def test_pair_arithmetic(stream):
     a.emit(df.iloc[:5])
     a.emit(df.iloc[5:])
 
-    assert len(L) == 2
+    wait_for(lambda: len(L) == 2, 1)
     assert_eq(pd.concat(L, axis=0), (df.x + df.y) * 2)
 
 
@@ -259,7 +259,7 @@ def test_getitem(stream):
     a.emit(df.iloc[:5])
     a.emit(df.iloc[5:])
 
-    assert len(L) == 2
+    wait_for(lambda: len(L) == 2, 1)
     assert_eq(pd.concat(L, axis=0), df[df.x > 4])
 
 
@@ -298,6 +298,7 @@ def test_groupby_aggregate(agg, grouper,
     a.emit(df.iloc[7:])
 
     first = df.iloc[:3]
+    wait_for(lambda: len(L) > 2, 1)
     assert assert_eq(L[0], f(first))
     assert assert_eq(L[-1], f(df))
 
@@ -382,7 +383,7 @@ def test_setitem(stream):
     df['a'] = 10
     df[['c', 'd']] = df[['x', 'y']]
 
-    assert_eq(L[-1], df.mean())
+    wait_for(lambda: L and L[-1].equals(df.mean()), 1)
 
 
 def test_setitem_overwrites(stream):
Index: streamz-0.6.4/setup.py
===================================================================
--- streamz-0.6.4.orig/setup.py
+++ streamz-0.6.4/setup.py
@@ -17,7 +17,7 @@ setup(name='streamz',
       license='BSD',
       keywords='streams',
       packages=packages + tests,
-      python_requires='>=3.7',
+      python_requires='>=3.8',
       long_description=(open('README.rst').read() if exists('README.rst')
                         else ''),
       install_requires=list(open('requirements.txt').read().strip().split('\n')),
Index: streamz-0.6.4/streamz/tests/test_kafka.py
===================================================================
--- streamz-0.6.4.orig/streamz/tests/test_kafka.py
+++ streamz-0.6.4/streamz/tests/test_kafka.py
@@ -55,8 +55,8 @@ def launch_kafka():
     cmd = ("docker run -d -p 2181:2181 -p 9092:9092 --env "
            "ADVERTISED_HOST=127.0.0.1 --env ADVERTISED_PORT=9092 "
            "--name streamz-kafka spotify/kafka")
-    print(cmd)
-    cid = subprocess.check_output(shlex.split(cmd)).decode()[:-1]
+    cid = subprocess.check_output(shlex.split(cmd),
+                                  stderr=subprocess.DEVNULL).decode()[:-1]
 
     def end():
         if cid:
@@ -66,11 +66,11 @@ def launch_kafka():
     def predicate():
         try:
             out = subprocess.check_output(['docker', 'logs', cid],
-                                      stderr=subprocess.STDOUT)
-            return b'kafka entered RUNNING state' in out
+                                          stderr=subprocess.STDOUT)
+            return b'RUNNING' in out
         except subprocess.CalledProcessError:
             pass
-    wait_for(predicate, 10, period=0.1)
+    wait_for(predicate, 45, period=0.1)
     return cid
 
 
@@ -169,7 +169,7 @@ def test_from_kafka_thread():
         stream = Stream.from_kafka([TOPIC], ARGS)
         out = stream.sink_to_list()
         stream.start()
-        yield gen.sleep(1.1)
+        yield await_for(lambda: stream.started, 10, period=0.1)
         for i in range(10):
             yield gen.sleep(0.1)
             kafka.produce(TOPIC, b'value-%d' % i)
@@ -182,14 +182,6 @@ def test_from_kafka_thread():
         kafka.flush()
         yield await_for(lambda: out[-1] == b'final message', 10, period=0.1)
 
-        stream._close_consumer()
-        kafka.produce(TOPIC, b'lost message')
-        kafka.flush()
-        # absolute sleep here, since we expect output list *not* to change
-        yield gen.sleep(1)
-        assert out[-1] == b'final message'
-        stream._close_consumer()
-
 
 def test_kafka_batch():
     j = random.randint(0, 10000)
@@ -585,6 +577,8 @@ def test_kafka_checkpointing_auto_offset
 
         stream1 = Stream.from_kafka_batched(TOPIC, ARGS, asynchronous=True)
         out1 = stream1.map(split).gather().sink_to_list()
+        time.sleep(1)  # messages make ttheir way through kafka
+
         stream1.start()
         wait_for(lambda: stream1.upstream.started, 10, period=0.1)
 
openSUSE Build Service is sponsored by