We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
1 parent 2a3390a commit 63f7209Copy full SHA for 63f7209
2 files changed
streamz/sources.py
@@ -763,4 +763,7 @@ def start(self):
763
@gen.coroutine
764
def _run(self):
765
for x in self._iterable:
766
+ if self.stopped:
767
+ break
768
yield self._emit(x)
769
+ self.stopped = True
streamz/tests/test_sinks.py
@@ -2,7 +2,7 @@
2
3
import pytest
4
from streamz import Stream
5
-from streamz.sinks import _global_sinks
+from streamz.sinks import _global_sinks, Sink
6
from streamz.utils_test import tmpfile
7
8
@@ -62,7 +62,7 @@ def test_sink_to_textfile_closes():
62
63
def test_sink_destroy():
64
source = Stream()
65
- sink = source.sink(lambda x: None)
+ sink = Sink(source)
66
ref = weakref.ref(sink)
67
sink.destroy()
68
del sink
0 commit comments