Skip to content

Commit b3812a6

Browse files
authored
Merge pull request #455 from martindurant/ci_fix
CI fixes
2 parents b73a8c4 + c8eb271 commit b3812a6

11 files changed

Lines changed: 96 additions & 78 deletions

File tree

.github/workflows/main.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ jobs:
1313
strategy:
1414
fail-fast: false
1515
matrix:
16-
CONDA_ENV: [py37, py38]
17-
env:
18-
STREAMZ_LAUNCH_KAFKA: true
16+
CONDA_ENV: [py38, py39, py310]
17+
# env:
18+
# STREAMZ_LAUNCH_KAFKA: true
1919

2020
steps:
2121
- name: APT

ci/environment-py310.yml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
name: test_env
2+
channels:
3+
- conda-forge
4+
- defaults
5+
dependencies:
6+
- python=3.10
7+
- pytest
8+
- flake8
9+
- black
10+
- isort
11+
- tornado
12+
- toolz
13+
- librdkafka
14+
- dask
15+
- distributed
16+
- pandas
17+
- python-confluent-kafka
18+
- codecov
19+
- coverage
20+
- networkx
21+
- graphviz
22+
- pytest-asyncio
23+
- python-graphviz
24+
- bokeh
25+
- ipywidgets
26+
- flaky
27+
- pytest-cov
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ channels:
33
- conda-forge
44
- defaults
55
dependencies:
6-
- python=3.7
6+
- python=3.9
77
- pytest
88
- flake8
99
- black

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
license='BSD',
1818
keywords='streams',
1919
packages=packages + tests,
20-
python_requires='>=3.7',
20+
python_requires='>=3.8',
2121
long_description=(open('README.rst').read() if exists('README.rst')
2222
else ''),
2323
install_requires=list(open('requirements.txt').read().strip().split('\n')),

streamz/core.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ def wrapped(*args, **kwargs):
168168
def register_plugin_entry_point(cls, entry_point, modifier=identity):
169169
if hasattr(cls, entry_point.name):
170170
raise ValueError(
171-
f"Can't add {entry_point.name} from {entry_point.module_name} "
171+
f"Can't add {entry_point.name} "
172172
f"to {cls.__name__}: duplicate method name."
173173
)
174174

@@ -178,7 +178,6 @@ def stub(*args, **kwargs):
178178
if not issubclass(node, Stream):
179179
raise TypeError(
180180
f"Error loading {entry_point.name} "
181-
f"from module {entry_point.module_name}: "
182181
f"{node.__class__.__name__} must be a subclass of Stream"
183182
)
184183
if getattr(cls, entry_point.name).__name__ == "stub":
@@ -379,13 +378,14 @@ def __str__(self):
379378
__repr__ = __str__
380379

381380
def _ipython_display_(self, **kwargs): # pragma: no cover
381+
# Since this function is only called by jupyter, this import must succeed
382+
from IPython.display import HTML, display
383+
382384
try:
383385
import ipywidgets
384386
from IPython.core.interactiveshell import InteractiveShell
385387
output = ipywidgets.Output(_view_count=0)
386388
except ImportError:
387-
# since this function is only called by jupyter, this import must succeed
388-
from IPython.display import display, HTML
389389
if hasattr(self, '_repr_html_'):
390390
return display(HTML(self._repr_html_()))
391391
else:
@@ -420,7 +420,11 @@ def remove_stream(change):
420420

421421
output.observe(remove_stream, '_view_count')
422422

423-
return output._ipython_display_(**kwargs)
423+
if hasattr(output, "_repr_mimebundle_"):
424+
data = output._repr_mimebundle_(**kwargs)
425+
return display(data, raw=True)
426+
else:
427+
return output._ipython_display_(**kwargs)
424428

425429
def _emit(self, x, metadata=None):
426430
"""
@@ -1468,18 +1472,23 @@ class zip(Stream):
14681472

14691473
def __init__(self, *upstreams, **kwargs):
14701474
self.maxsize = kwargs.pop('maxsize', 10)
1471-
self.condition = Condition()
1475+
self._condition = None
14721476
self.literals = [(i, val) for i, val in enumerate(upstreams)
14731477
if not isinstance(val, Stream)]
14741478

14751479
self.buffers = {upstream: deque()
14761480
for upstream in upstreams
14771481
if isinstance(upstream, Stream)}
1478-
14791482
upstreams2 = [upstream for upstream in upstreams if isinstance(upstream, Stream)]
14801483

14811484
Stream.__init__(self, upstreams=upstreams2, **kwargs)
14821485

1486+
@property
1487+
def condition(self):
1488+
if self._condition is None:
1489+
self._condition = Condition()
1490+
return self._condition
1491+
14831492
def _add_upstream(self, upstream):
14841493
# Override method to handle setup of buffer for new stream
14851494
self.buffers[upstream] = deque()
@@ -1876,7 +1885,7 @@ class latest(Stream):
18761885
_graphviz_shape = 'octagon'
18771886

18781887
def __init__(self, upstream, **kwargs):
1879-
self.condition = Condition()
1888+
self._condition = None
18801889
self.next = []
18811890
self.next_metadata = None
18821891

@@ -1885,6 +1894,12 @@ def __init__(self, upstream, **kwargs):
18851894

18861895
self.loop.add_callback(self.cb)
18871896

1897+
@property
1898+
def condition(self):
1899+
if self._condition is None:
1900+
self._condition = Condition()
1901+
return self._condition
1902+
18881903
def update(self, x, who=None, metadata=None):
18891904
if self.next_metadata:
18901905
self._release_refs(self.next_metadata)

streamz/dataframe/tests/test_dataframes.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ def test_binary_stream_operators(stream):
219219

220220
a.emit(df)
221221

222-
assert_eq(b[0], expected)
222+
wait_for(lambda: b and b[0].equals(expected), 1)
223223

224224

225225
def test_index(stream):
@@ -246,7 +246,7 @@ def test_pair_arithmetic(stream):
246246
a.emit(df.iloc[:5])
247247
a.emit(df.iloc[5:])
248248

249-
assert len(L) == 2
249+
wait_for(lambda: len(L) == 2, 1)
250250
assert_eq(pd.concat(L, axis=0), (df.x + df.y) * 2)
251251

252252

@@ -259,7 +259,7 @@ def test_getitem(stream):
259259
a.emit(df.iloc[:5])
260260
a.emit(df.iloc[5:])
261261

262-
assert len(L) == 2
262+
wait_for(lambda: len(L) == 2, 1)
263263
assert_eq(pd.concat(L, axis=0), df[df.x > 4])
264264

265265

@@ -298,6 +298,7 @@ def f(x):
298298
a.emit(df.iloc[7:])
299299

300300
first = df.iloc[:3]
301+
wait_for(lambda: len(L) > 2, 1)
301302
assert assert_eq(L[0], f(first))
302303
assert assert_eq(L[-1], f(df))
303304

@@ -382,7 +383,7 @@ def test_setitem(stream):
382383
df['a'] = 10
383384
df[['c', 'd']] = df[['x', 'y']]
384385

385-
assert_eq(L[-1], df.mean())
386+
wait_for(lambda: L and L[-1].equals(df.mean()), 1)
386387

387388

388389
def test_setitem_overwrites(stream):

streamz/plugins.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import warnings
22

3-
import pkg_resources
3+
import importlib.metadata
44

55

66
def try_register(cls, entry_point, *modifier):
@@ -14,9 +14,10 @@ def try_register(cls, entry_point, *modifier):
1414

1515

1616
def load_plugins(cls):
17-
for entry_point in pkg_resources.iter_entry_points("streamz.sources"):
17+
eps = importlib.metadata.entry_points()
18+
for entry_point in eps.get("streamz.sources", []):
1819
try_register(cls, entry_point, staticmethod)
19-
for entry_point in pkg_resources.iter_entry_points("streamz.nodes"):
20+
for entry_point in eps.get("streamz.nodes", []):
2021
try_register(cls, entry_point)
21-
for entry_point in pkg_resources.iter_entry_points("streamz.sinks"):
22+
for entry_point in eps.get("streamz.sinks", []):
2223
try_register(cls, entry_point)

streamz/tests/py3_test_core.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
# flake8: noqa
2+
import asyncio
23
from time import time
3-
from distributed.utils_test import loop, inc # noqa
4-
from tornado import gen
4+
from distributed.utils_test import inc # noqa
55

66
from streamz import Stream
77

88

9-
def test_await_syntax(loop): # noqa
9+
def test_await_syntax(): # noqa
1010
L = []
1111

1212
async def write(x):
13-
await gen.sleep(0.1)
13+
await asyncio.sleep(0.1)
1414
L.append(x)
1515

1616
async def f():
@@ -25,4 +25,4 @@ async def f():
2525
assert 0.2 < stop - start < 0.4
2626
assert 2 <= len(L) <= 4
2727

28-
loop.run_sync(f)
28+
asyncio.run(f())

streamz/tests/test_core.py

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
from datetime import timedelta
23
from functools import partial
34
import itertools
@@ -12,14 +13,15 @@
1213

1314
from tornado.queues import Queue
1415
from tornado.ioloop import IOLoop
16+
from tornado import gen
1517

1618
import streamz as sz
1719

1820
from streamz import RefCounter
1921
from streamz.sources import sink_to_file
2022
from streamz.utils_test import (inc, double, gen_test, tmpfile, captured_logger, # noqa: F401
2123
clean, await_for, metadata, wait_for) # noqa: F401
22-
from distributed.utils_test import loop # noqa: F401
24+
from distributed.utils_test import loop, loop_in_thread, cleanup # noqa: F401
2325

2426

2527
def test_basic():
@@ -1485,37 +1487,13 @@ def dont_test_stream_kwargs(clean): # noqa: F811
14851487
sin.emit(1)
14861488

14871489

1488-
@pytest.fixture
1489-
def thread(loop): # noqa: F811
1490-
from threading import Thread, Event
1491-
thread = Thread(target=loop.start)
1492-
thread.daemon = True
1493-
thread.start()
1494-
1495-
event = Event()
1496-
loop.add_callback(event.set)
1497-
event.wait()
1498-
1499-
return thread
1500-
1501-
15021490
def test_percolate_loop_information(clean): # noqa: F811
15031491
source = Stream()
15041492
assert not source.loop
15051493
s = source.timed_window(0.5)
15061494
assert source.loop is s.loop
15071495

15081496

1509-
def test_separate_thread_without_time(loop, thread): # noqa: F811
1510-
assert thread.is_alive()
1511-
source = Stream(loop=loop)
1512-
L = source.map(inc).sink_to_list()
1513-
1514-
for i in range(10):
1515-
source.emit(i)
1516-
assert L[-1] == i + 1
1517-
1518-
15191497
def test_separate_thread_with_time(clean): # noqa: F811
15201498
L = []
15211499

streamz/tests/test_dask.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,10 @@ async def test_partition_then_scatter_async(c, s, a, b):
7272
assert L == [1, 2, 3]
7373

7474

75-
def test_partition_then_scatter_sync(loop):
75+
def test_partition_then_scatter_sync():
7676
# Ensure partition w/ timeout before scatter works correctly for synchronous
7777
with cluster() as (s, [a, b]):
78-
with Client(s['address'], loop=loop) as client: # noqa: F841
78+
with Client(s['address']) as client: # noqa: F841
7979
start = time.monotonic()
8080
source = Stream()
8181
L = source.partition(2, timeout=.1).scatter().map(
@@ -164,24 +164,24 @@ async def test_accumulate(c, s, a, b):
164164
assert L[-1][1] == 3
165165

166166

167-
def test_sync(loop): # noqa: F811
167+
def test_sync(): # noqa: F811
168168
with cluster() as (s, [a, b]):
169-
with Client(s['address'], loop=loop) as client: # noqa: F841
169+
with Client(s['address']) as client: # noqa: F841
170170
source = Stream()
171171
L = source.scatter().map(inc).gather().sink_to_list()
172172

173173
async def f():
174174
for i in range(10):
175175
await source.emit(i, asynchronous=True)
176176

177-
sync(loop, f)
177+
sync(client.loop, f)
178178

179179
assert L == list(map(inc, range(10)))
180180

181181

182-
def test_sync_2(loop): # noqa: F811
182+
def test_sync_2(): # noqa: F811
183183
with cluster() as (s, [a, b]):
184-
with Client(s['address'], loop=loop): # noqa: F841
184+
with Client(s['address']): # noqa: F841
185185
source = Stream()
186186
L = source.scatter().map(inc).gather().sink_to_list()
187187

@@ -218,9 +218,9 @@ async def test_buffer(c, s, a, b):
218218
assert source.loop == c.loop
219219

220220

221-
def test_buffer_sync(loop): # noqa: F811
221+
def test_buffer_sync(): # noqa: F811
222222
with cluster() as (s, [a, b]):
223-
with Client(s['address'], loop=loop) as c: # noqa: F841
223+
with Client(s['address']) as c: # noqa: F841
224224
source = Stream()
225225
buff = source.scatter().map(slowinc, delay=0.5).buffer(5)
226226
L = buff.gather().sink_to_list()
@@ -241,10 +241,11 @@ def test_buffer_sync(loop): # noqa: F811
241241
assert L == list(map(inc, range(10)))
242242

243243

244+
@pytest.mark.asyncio
244245
@pytest.mark.xfail(reason='')
245-
async def test_stream_shares_client_loop(loop): # noqa: F811
246+
async def test_stream_shares_client_loop(): # noqa: F811
246247
with cluster() as (s, [a, b]):
247-
with Client(s['address'], loop=loop) as client: # noqa: F841
248+
with Client(s['address']) as client: # noqa: F841
248249
source = Stream()
249250
d = source.timed_window('20ms').scatter() # noqa: F841
250251
assert source.loop is client.loop

0 commit comments

Comments
 (0)