Skip to content

Commit 6b23b6f

Browse files
committed
update from_iterable tests
1 parent 63f7209 commit 6b23b6f

2 files changed

Lines changed: 28 additions & 0 deletions

File tree

streamz/tests/test_sinks.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ def test_sink_destroy():
6565
sink = Sink(source)
6666
ref = weakref.ref(sink)
6767
sink.destroy()
68+
69+
assert sink not in _global_sinks
70+
6871
del sink
6972

7073
assert ref() is None

streamz/tests/test_sources.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,28 @@ def test_from_iterable():
102102
L = source.sink_to_list()
103103
source.start()
104104
wait_for(lambda: L == [0, 1, 2], 0.1)
105+
106+
107+
def test_from_iterable_backpressure():
108+
it = iter(range(5))
109+
source = Source.from_iterable(it)
110+
L = source.rate_limit(0.01).sink_to_list()
111+
source.start()
112+
113+
wait_for(lambda: L == [0], 1)
114+
assert next(it) == 2 # 1 is in blocked _emit
115+
116+
117+
def test_from_iterable_stop():
118+
from _pytest.outcomes import Failed
119+
120+
source = Source.from_iterable(range(5))
121+
L = source.rate_limit(0.01).sink_to_list()
122+
source.start()
123+
124+
wait_for(lambda: L == [0], 1)
125+
source.stop()
126+
127+
assert source.stopped
128+
with pytest.raises(Failed):
129+
wait_for(lambda: L == [0, 1, 2], 0.1)

0 commit comments

Comments
 (0)