File streamz-pr455-ci-fixes.patch of Package python-streamz
From 7cc8ac57cae702c3a7ac3b8aed9043dad367c1a3 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Simon=20H=C3=B8xbro?= <simon.hansen@me.com>
Date: Sun, 11 Sep 2022 10:51:56 +0200
Subject: [PATCH 01/12] Using _repr_mimebundle if attribute on Output
---
streamz/core.py | 11 ++++++++---
1 file changed, 8 insertions(+), 3 deletions(-)
Index: streamz-0.6.4/streamz/core.py
===================================================================
--- streamz-0.6.4.orig/streamz/core.py
+++ streamz-0.6.4/streamz/core.py
@@ -379,13 +379,14 @@ class Stream(APIRegisterMixin):
__repr__ = __str__
def _ipython_display_(self, **kwargs): # pragma: no cover
+ # Since this function is only called by jupyter, this import must succeed
+ from IPython.display import HTML, display
+
try:
import ipywidgets
from IPython.core.interactiveshell import InteractiveShell
output = ipywidgets.Output(_view_count=0)
except ImportError:
- # since this function is only called by jupyter, this import must succeed
- from IPython.display import display, HTML
if hasattr(self, '_repr_html_'):
return display(HTML(self._repr_html_()))
else:
@@ -420,7 +421,11 @@ class Stream(APIRegisterMixin):
output.observe(remove_stream, '_view_count')
- return output._ipython_display_(**kwargs)
+ if hasattr(output, "_repr_mimebundle_"):
+ data = output._repr_mimebundle_(**kwargs)
+ return display(data, raw=True)
+ else:
+ return output._ipython_display_(**kwargs)
def _emit(self, x, metadata=None):
"""
@@ -1468,18 +1473,23 @@ class zip(Stream):
def __init__(self, *upstreams, **kwargs):
self.maxsize = kwargs.pop('maxsize', 10)
- self.condition = Condition()
+ self._condition = None
self.literals = [(i, val) for i, val in enumerate(upstreams)
if not isinstance(val, Stream)]
self.buffers = {upstream: deque()
for upstream in upstreams
if isinstance(upstream, Stream)}
-
upstreams2 = [upstream for upstream in upstreams if isinstance(upstream, Stream)]
Stream.__init__(self, upstreams=upstreams2, **kwargs)
+ @property
+ def condition(self):
+ if self._condition is None:
+ self._condition = Condition()
+ return self._condition
+
def _add_upstream(self, upstream):
# Override method to handle setup of buffer for new stream
self.buffers[upstream] = deque()
@@ -1876,7 +1886,7 @@ class latest(Stream):
_graphviz_shape = 'octagon'
def __init__(self, upstream, **kwargs):
- self.condition = Condition()
+ self._condition = None
self.next = []
self.next_metadata = None
@@ -1885,6 +1895,12 @@ class latest(Stream):
self.loop.add_callback(self.cb)
+ @property
+ def condition(self):
+ if self._condition is None:
+ self._condition = Condition()
+ return self._condition
+
def update(self, x, who=None, metadata=None):
if self.next_metadata:
self._release_refs(self.next_metadata)
Index: streamz-0.6.4/streamz/tests/py3_test_core.py
===================================================================
--- streamz-0.6.4.orig/streamz/tests/py3_test_core.py
+++ streamz-0.6.4/streamz/tests/py3_test_core.py
@@ -1,16 +1,16 @@
# flake8: noqa
+import asyncio
from time import time
-from distributed.utils_test import loop, inc # noqa
-from tornado import gen
+from distributed.utils_test import inc # noqa
from streamz import Stream
-def test_await_syntax(loop): # noqa
+def test_await_syntax(): # noqa
L = []
async def write(x):
- await gen.sleep(0.1)
+ await asyncio.sleep(0.1)
L.append(x)
async def f():
@@ -25,4 +25,4 @@ def test_await_syntax(loop): # noqa
assert 0.2 < stop - start < 0.4
assert 2 <= len(L) <= 4
- loop.run_sync(f)
+ asyncio.run(f())
Index: streamz-0.6.4/streamz/tests/test_core.py
===================================================================
--- streamz-0.6.4.orig/streamz/tests/test_core.py
+++ streamz-0.6.4/streamz/tests/test_core.py
@@ -1,3 +1,4 @@
+import asyncio
from datetime import timedelta
from functools import partial
import itertools
@@ -12,6 +13,7 @@ import pytest
from tornado.queues import Queue
from tornado.ioloop import IOLoop
+from tornado import gen
import streamz as sz
@@ -19,7 +21,7 @@ from streamz import RefCounter
from streamz.sources import sink_to_file
from streamz.utils_test import (inc, double, gen_test, tmpfile, captured_logger, # noqa: F401
clean, await_for, metadata, wait_for) # noqa: F401
-from distributed.utils_test import loop # noqa: F401
+from distributed.utils_test import loop, loop_in_thread, cleanup # noqa: F401
def test_basic():
@@ -1485,20 +1487,6 @@ def dont_test_stream_kwargs(clean): # n
sin.emit(1)
-@pytest.fixture
-def thread(loop): # noqa: F811
- from threading import Thread, Event
- thread = Thread(target=loop.start)
- thread.daemon = True
- thread.start()
-
- event = Event()
- loop.add_callback(event.set)
- event.wait()
-
- return thread
-
-
def test_percolate_loop_information(clean): # noqa: F811
source = Stream()
assert not source.loop
@@ -1506,16 +1494,6 @@ def test_percolate_loop_information(clea
assert source.loop is s.loop
-def test_separate_thread_without_time(loop, thread): # noqa: F811
- assert thread.is_alive()
- source = Stream(loop=loop)
- L = source.map(inc).sink_to_list()
-
- for i in range(10):
- source.emit(i)
- assert L[-1] == i + 1
-
-
def test_separate_thread_with_time(clean): # noqa: F811
L = []
Index: streamz-0.6.4/streamz/tests/test_dask.py
===================================================================
--- streamz-0.6.4.orig/streamz/tests/test_dask.py
+++ streamz-0.6.4/streamz/tests/test_dask.py
@@ -72,10 +72,10 @@ async def test_partition_then_scatter_as
assert L == [1, 2, 3]
-def test_partition_then_scatter_sync(loop):
+def test_partition_then_scatter_sync():
# Ensure partition w/ timeout before scatter works correctly for synchronous
with cluster() as (s, [a, b]):
- with Client(s['address'], loop=loop) as client: # noqa: F841
+ with Client(s['address']) as client: # noqa: F841
start = time.monotonic()
source = Stream()
L = source.partition(2, timeout=.1).scatter().map(
@@ -164,9 +164,9 @@ async def test_accumulate(c, s, a, b):
assert L[-1][1] == 3
-def test_sync(loop): # noqa: F811
+def test_sync(): # noqa: F811
with cluster() as (s, [a, b]):
- with Client(s['address'], loop=loop) as client: # noqa: F841
+ with Client(s['address']) as client: # noqa: F841
source = Stream()
L = source.scatter().map(inc).gather().sink_to_list()
@@ -174,14 +174,14 @@ def test_sync(loop): # noqa: F811
for i in range(10):
await source.emit(i, asynchronous=True)
- sync(loop, f)
+ sync(client.loop, f)
assert L == list(map(inc, range(10)))
-def test_sync_2(loop): # noqa: F811
+def test_sync_2(): # noqa: F811
with cluster() as (s, [a, b]):
- with Client(s['address'], loop=loop): # noqa: F841
+ with Client(s['address']): # noqa: F841
source = Stream()
L = source.scatter().map(inc).gather().sink_to_list()
@@ -218,9 +218,9 @@ async def test_buffer(c, s, a, b):
assert source.loop == c.loop
-def test_buffer_sync(loop): # noqa: F811
+def test_buffer_sync(): # noqa: F811
with cluster() as (s, [a, b]):
- with Client(s['address'], loop=loop) as c: # noqa: F841
+ with Client(s['address']) as c: # noqa: F841
source = Stream()
buff = source.scatter().map(slowinc, delay=0.5).buffer(5)
L = buff.gather().sink_to_list()
@@ -241,10 +241,11 @@ def test_buffer_sync(loop): # noqa: F81
assert L == list(map(inc, range(10)))
+@pytest.mark.asyncio
@pytest.mark.xfail(reason='')
-async def test_stream_shares_client_loop(loop): # noqa: F811
+async def test_stream_shares_client_loop(): # noqa: F811
with cluster() as (s, [a, b]):
- with Client(s['address'], loop=loop) as client: # noqa: F841
+ with Client(s['address']) as client: # noqa: F841
source = Stream()
d = source.timed_window('20ms').scatter() # noqa: F841
assert source.loop is client.loop
Index: streamz-0.6.4/ci/environment-py310.yml
===================================================================
--- /dev/null
+++ streamz-0.6.4/ci/environment-py310.yml
@@ -0,0 +1,27 @@
+name: test_env
+channels:
+ - conda-forge
+ - defaults
+dependencies:
+ - python=3.10
+ - pytest
+ - flake8
+ - black
+ - isort
+ - tornado
+ - toolz
+ - librdkafka
+ - dask
+ - distributed
+ - pandas
+ - python-confluent-kafka
+ - codecov
+ - coverage
+ - networkx
+ - graphviz
+ - pytest-asyncio
+ - python-graphviz
+ - bokeh
+ - ipywidgets
+ - flaky
+ - pytest-cov
Index: streamz-0.6.4/ci/environment-py39.yml
===================================================================
--- /dev/null
+++ streamz-0.6.4/ci/environment-py39.yml
@@ -0,0 +1,34 @@
+name: test_env
+channels:
+ - conda-forge
+ - defaults
+dependencies:
+ - python=3.9
+ - pytest
+ - flake8
+ - black
+ - isort
+ - tornado
+ - toolz
+ - zict
+ - six
+ - librdkafka=1.5.3
+ - dask
+ - distributed
+ - pandas
+ - python-confluent-kafka=1.5.0
+ - numpydoc
+ - sphinx
+ - sphinx_rtd_theme
+ - codecov
+ - coverage
+ - networkx
+ - graphviz
+ - pytest-asyncio
+ - python-graphviz
+ - bokeh
+ - ipython
+ - ipykernel
+ - ipywidgets
+ - flaky
+ - pytest-cov
Index: streamz-0.6.4/streamz/dataframe/tests/test_dataframes.py
===================================================================
--- streamz-0.6.4.orig/streamz/dataframe/tests/test_dataframes.py
+++ streamz-0.6.4/streamz/dataframe/tests/test_dataframes.py
@@ -219,7 +219,7 @@ def test_binary_stream_operators(stream)
a.emit(df)
- assert_eq(b[0], expected)
+ wait_for(lambda: b and b[0].equals(expected), 1)
def test_index(stream):
@@ -246,7 +246,7 @@ def test_pair_arithmetic(stream):
a.emit(df.iloc[:5])
a.emit(df.iloc[5:])
- assert len(L) == 2
+ wait_for(lambda: len(L) == 2, 1)
assert_eq(pd.concat(L, axis=0), (df.x + df.y) * 2)
@@ -259,7 +259,7 @@ def test_getitem(stream):
a.emit(df.iloc[:5])
a.emit(df.iloc[5:])
- assert len(L) == 2
+ wait_for(lambda: len(L) == 2, 1)
assert_eq(pd.concat(L, axis=0), df[df.x > 4])
@@ -298,6 +298,7 @@ def test_groupby_aggregate(agg, grouper,
a.emit(df.iloc[7:])
first = df.iloc[:3]
+ wait_for(lambda: len(L) > 2, 1)
assert assert_eq(L[0], f(first))
assert assert_eq(L[-1], f(df))
@@ -382,7 +383,7 @@ def test_setitem(stream):
df['a'] = 10
df[['c', 'd']] = df[['x', 'y']]
- assert_eq(L[-1], df.mean())
+ wait_for(lambda: L and L[-1].equals(df.mean()), 1)
def test_setitem_overwrites(stream):
Index: streamz-0.6.4/setup.py
===================================================================
--- streamz-0.6.4.orig/setup.py
+++ streamz-0.6.4/setup.py
@@ -17,7 +17,7 @@ setup(name='streamz',
license='BSD',
keywords='streams',
packages=packages + tests,
- python_requires='>=3.7',
+ python_requires='>=3.8',
long_description=(open('README.rst').read() if exists('README.rst')
else ''),
install_requires=list(open('requirements.txt').read().strip().split('\n')),
Index: streamz-0.6.4/streamz/tests/test_kafka.py
===================================================================
--- streamz-0.6.4.orig/streamz/tests/test_kafka.py
+++ streamz-0.6.4/streamz/tests/test_kafka.py
@@ -55,8 +55,8 @@ def launch_kafka():
cmd = ("docker run -d -p 2181:2181 -p 9092:9092 --env "
"ADVERTISED_HOST=127.0.0.1 --env ADVERTISED_PORT=9092 "
"--name streamz-kafka spotify/kafka")
- print(cmd)
- cid = subprocess.check_output(shlex.split(cmd)).decode()[:-1]
+ cid = subprocess.check_output(shlex.split(cmd),
+ stderr=subprocess.DEVNULL).decode()[:-1]
def end():
if cid:
@@ -66,11 +66,11 @@ def launch_kafka():
def predicate():
try:
out = subprocess.check_output(['docker', 'logs', cid],
- stderr=subprocess.STDOUT)
- return b'kafka entered RUNNING state' in out
+ stderr=subprocess.STDOUT)
+ return b'RUNNING' in out
except subprocess.CalledProcessError:
pass
- wait_for(predicate, 10, period=0.1)
+ wait_for(predicate, 45, period=0.1)
return cid
@@ -169,7 +169,7 @@ def test_from_kafka_thread():
stream = Stream.from_kafka([TOPIC], ARGS)
out = stream.sink_to_list()
stream.start()
- yield gen.sleep(1.1)
+ yield await_for(lambda: stream.started, 10, period=0.1)
for i in range(10):
yield gen.sleep(0.1)
kafka.produce(TOPIC, b'value-%d' % i)
@@ -182,14 +182,6 @@ def test_from_kafka_thread():
kafka.flush()
yield await_for(lambda: out[-1] == b'final message', 10, period=0.1)
- stream._close_consumer()
- kafka.produce(TOPIC, b'lost message')
- kafka.flush()
- # absolute sleep here, since we expect output list *not* to change
- yield gen.sleep(1)
- assert out[-1] == b'final message'
- stream._close_consumer()
-
def test_kafka_batch():
j = random.randint(0, 10000)
@@ -585,6 +577,8 @@ def test_kafka_checkpointing_auto_offset
stream1 = Stream.from_kafka_batched(TOPIC, ARGS, asynchronous=True)
out1 = stream1.map(split).gather().sink_to_list()
+ time.sleep(1) # messages make ttheir way through kafka
+
stream1.start()
wait_for(lambda: stream1.upstream.started, 10, period=0.1)