Skip to content

Commit b51c6ae

Browse files
committed
default downsteams is empty list, override destroy for sinks
1 parent 5c1b3a6 commit b51c6ae

4 files changed

Lines changed: 24 additions & 12 deletions

File tree

streamz/core.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,10 @@ def __init__(self, upstream=None, upstreams=None, stream_name=None,
165165
self.downstreams = OrderedWeakrefSet()
166166
if upstreams is not None:
167167
self.upstreams = list(upstreams)
168-
else:
168+
elif upstream is not None:
169169
self.upstreams = [upstream]
170+
else:
171+
self.upstreams = []
170172

171173
self._set_asynchronous(asynchronous)
172174
self._set_loop(loop)
@@ -236,10 +238,7 @@ def _inform_asynchronous(self, asynchronous):
236238
def _add_upstream(self, upstream):
237239
"""Add upstream to current upstreams, this method is overridden for
238240
classes which handle stream specific buffers/caches"""
239-
if self.upstreams == [None]:
240-
self.upstreams[0] = upstream
241-
else:
242-
self.upstreams.append(upstream)
241+
self.upstreams.append(upstream)
243242

244243
def _add_downstream(self, downstream):
245244
"""Add downstream to current downstreams"""
@@ -252,10 +251,7 @@ def _remove_downstream(self, downstream):
252251
def _remove_upstream(self, upstream):
253252
"""Remove upstream from current upstreams, this method is overridden for
254253
classes which handle stream specific buffers/caches"""
255-
if len(self.upstreams) == 1:
256-
self.upstreams[0] = [None]
257-
else:
258-
self.upstreams.remove(upstream)
254+
self.upstreams.remove(upstream)
259255

260256
@classmethod
261257
def register_api(cls, modifier=identity, attribute_name=None):
@@ -527,8 +523,8 @@ def destroy(self, streams=None):
527523
if streams is None:
528524
streams = self.upstreams
529525
for upstream in list(streams):
530-
upstream.downstreams.remove(self)
531-
self.upstreams.remove(upstream)
526+
upstream._remove_downstream(self)
527+
self._remove_upstream(upstream)
532528

533529
def scatter(self, **kwargs):
534530
from .dask import scatter

streamz/sinks.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ def update(self, x, who=None, metadata=None):
6767
else:
6868
return []
6969

70+
def destroy(self):
71+
super().destroy()
72+
_global_sinks.remove(self)
73+
7074

7175
@Stream.register_api()
7276
class sink_to_textfile(Sink):

streamz/tests/test_core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1080,7 +1080,7 @@ def test_connect():
10801080
# connect assumes this default behaviour
10811081
# of stream initialization
10821082
assert not source_downstream.downstreams
1083-
assert source_downstream.upstreams == [None]
1083+
assert source_downstream.upstreams == []
10841084

10851085
# initialize the second stream to connect to
10861086
source_upstream = Stream()

streamz/tests/test_sinks.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import weakref
2+
13
import pytest
24
from streamz import Stream
35
from streamz.sinks import _global_sinks
@@ -56,3 +58,13 @@ def test_sink_to_textfile_closes():
5658

5759
with pytest.raises(ValueError, match=r"I/O operation on closed file\."):
5860
fp.write(".")
61+
62+
63+
def test_sink_destroy():
64+
source = Stream()
65+
sink = source.sink(lambda x: None)
66+
ref = weakref.ref(sink)
67+
sink.destroy()
68+
del sink
69+
70+
assert ref() is None

0 commit comments

Comments
 (0)