Skip to content

Commit f6dd5aa

Browse files
authored
Merge pull request #386 from roveo/refactor_sinks
Refactor sinks
2 parents c8954d8 + 269d3ab commit f6dd5aa

8 files changed

Lines changed: 197 additions & 78 deletions

File tree

docs/source/api.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ Stream
2626
rate_limit
2727
scatter
2828
sink
29+
sink_to_textfile
2930
slice
3031
sliding_window
3132
starmap
@@ -84,6 +85,7 @@ Definitions
8485
.. autofunction:: partition
8586
.. autofunction:: rate_limit
8687
.. autofunction:: sink
88+
.. autofunction:: sink_to_textfile
8789
.. autofunction:: sliding_window
8890
.. autofunction:: Stream
8991
.. autofunction:: timed_window

docs/source/plugins.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ Different kinds of add-ons go into different entry point groups:
6060
=========== ======================= =====================
6161
Source ``streamz.Source`` ``streamz.sources``
6262
Node ``streamz.Stream`` ``streamz.nodes``
63-
Sink ``streamz.Stream`` ``streamz.sinks``
63+
Sink ``streamz.Sink`` ``streamz.sinks``
6464
=========== ======================= =====================
6565

6666

streamz/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from .core import *
44
from .graph import *
55
from .sources import *
6+
from .sinks import *
67
from .plugins import load_plugins
78

89
load_plugins(Stream)

streamz/core.py

Lines changed: 7 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@
2727

2828
no_default = '--no-default--'
2929

30-
_global_sinks = set()
31-
3230
_html_update_streams = set()
3331

3432
thread_state = threading.local()
@@ -167,8 +165,10 @@ def __init__(self, upstream=None, upstreams=None, stream_name=None,
167165
self.downstreams = OrderedWeakrefSet()
168166
if upstreams is not None:
169167
self.upstreams = list(upstreams)
170-
else:
168+
elif upstream is not None:
171169
self.upstreams = [upstream]
170+
else:
171+
self.upstreams = []
172172

173173
self._set_asynchronous(asynchronous)
174174
self._set_loop(loop)
@@ -238,10 +238,7 @@ def _inform_asynchronous(self, asynchronous):
238238
def _add_upstream(self, upstream):
239239
"""Add upstream to current upstreams, this method is overridden for
240240
classes which handle stream specific buffers/caches"""
241-
if self.upstreams == [None]:
242-
self.upstreams[0] = upstream
243-
else:
244-
self.upstreams.append(upstream)
241+
self.upstreams.append(upstream)
245242

246243
def _add_downstream(self, downstream):
247244
"""Add downstream to current downstreams"""
@@ -254,10 +251,7 @@ def _remove_downstream(self, downstream):
254251
def _remove_upstream(self, upstream):
255252
"""Remove upstream from current upstreams, this method is overridden for
256253
classes which handle stream specific buffers/caches"""
257-
if len(self.upstreams) == 1:
258-
self.upstreams[0] = [None]
259-
else:
260-
self.upstreams.remove(upstream)
254+
self.upstreams.remove(upstream)
261255

262256
@classmethod
263257
def register_api(cls, modifier=identity, attribute_name=None):
@@ -529,8 +523,8 @@ def destroy(self, streams=None):
529523
if streams is None:
530524
streams = self.upstreams
531525
for upstream in list(streams):
532-
upstream.downstreams.remove(self)
533-
self.upstreams.remove(upstream)
526+
upstream._remove_downstream(self)
527+
self._remove_upstream(upstream)
534528

535529
def scatter(self, **kwargs):
536530
from .dask import scatter
@@ -656,48 +650,6 @@ def _release_refs(self, metadata, n=1):
656650
m['ref'].release(n)
657651

658652

659-
@Stream.register_api()
660-
class sink(Stream):
661-
""" Apply a function on every element
662-
663-
Examples
664-
--------
665-
>>> source = Stream()
666-
>>> L = list()
667-
>>> source.sink(L.append)
668-
>>> source.sink(print)
669-
>>> source.sink(print)
670-
>>> source.emit(123)
671-
123
672-
123
673-
>>> L
674-
[123]
675-
676-
See Also
677-
--------
678-
map
679-
Stream.sink_to_list
680-
"""
681-
_graphviz_shape = 'trapezium'
682-
683-
def __init__(self, upstream, func, *args, **kwargs):
684-
self.func = func
685-
# take the stream specific kwargs out
686-
stream_name = kwargs.pop("stream_name", None)
687-
self.kwargs = kwargs
688-
self.args = args
689-
690-
Stream.__init__(self, upstream, stream_name=stream_name)
691-
_global_sinks.add(self)
692-
693-
def update(self, x, who=None, metadata=None):
694-
result = self.func(x, *self.args, **self.kwargs)
695-
if gen.isawaitable(result):
696-
return result
697-
else:
698-
return []
699-
700-
701653
@Stream.register_api()
702654
class map(Stream):
703655
""" Apply a function to every element in the stream

streamz/graph.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,6 @@ def create_graph(node, graph):
6969
"""
7070
# Step 1 build a set of all the nodes
7171
node_set = build_node_set(node)
72-
if None in node_set:
73-
node_set.remove(None)
7472

7573
# Step 2 for each node in the set add to the graph
7674
for n in node_set:
@@ -87,8 +85,7 @@ def create_graph(node, graph):
8785
# Step 3 for each node establish its edges
8886
for n in node_set:
8987
t = hash(n)
90-
upstreams = [_ for _ in n.upstreams if _ is not None]
91-
for nn in upstreams:
88+
for nn in n.upstreams:
9289
tt = hash(nn)
9390
graph.add_edge(tt, t)
9491

streamz/sinks.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
import inspect
2+
import weakref
3+
4+
from tornado import gen
5+
6+
from streamz import Stream
7+
8+
# sinks add themselves here to avoid being garbage-collected
9+
_global_sinks = set()
10+
11+
12+
class Sink(Stream):
13+
14+
_graphviz_shape = 'trapezium'
15+
16+
def __init__(self, upstream, **kwargs):
17+
super().__init__(upstream, **kwargs)
18+
_global_sinks.add(self)
19+
20+
21+
@Stream.register_api()
22+
class sink(Sink):
23+
""" Apply a function on every element
24+
25+
Parameters
26+
----------
27+
func: callable
28+
A function that will be applied on every element.
29+
args:
30+
Positional arguments that will be passed to ``func`` after the incoming element.
31+
kwargs:
32+
Stream-specific arguments will be passed to ``Stream.__init__``, the rest of
33+
them will be passed to ``func``.
34+
35+
Examples
36+
--------
37+
>>> source = Stream()
38+
>>> L = list()
39+
>>> source.sink(L.append)
40+
>>> source.sink(print)
41+
>>> source.sink(print)
42+
>>> source.emit(123)
43+
123
44+
123
45+
>>> L
46+
[123]
47+
48+
See Also
49+
--------
50+
map
51+
Stream.sink_to_list
52+
"""
53+
54+
def __init__(self, upstream, func, *args, **kwargs):
55+
self.func = func
56+
# take the stream specific kwargs out
57+
sig = set(inspect.signature(Stream).parameters)
58+
stream_kwargs = {k: v for (k, v) in kwargs.items() if k in sig}
59+
self.kwargs = {k: v for (k, v) in kwargs.items() if k not in sig}
60+
self.args = args
61+
super().__init__(upstream, **stream_kwargs)
62+
63+
def update(self, x, who=None, metadata=None):
64+
result = self.func(x, *self.args, **self.kwargs)
65+
if gen.isawaitable(result):
66+
return result
67+
else:
68+
return []
69+
70+
def destroy(self):
71+
super().destroy()
72+
_global_sinks.remove(self)
73+
74+
75+
@Stream.register_api()
76+
class sink_to_textfile(Sink):
77+
""" Write elements to a plain text file, one element per line.
78+
79+
Type of elements must be ``str``.
80+
81+
Parameters
82+
----------
83+
file: str or file-like
84+
File to write the elements to. ``str`` is treated as a file name to open.
85+
If file-like, descriptor must be open in text mode. Note that the file
86+
descriptor will be closed when this sink is destroyed.
87+
end: str, optional
88+
This value will be written to the file after each element.
89+
Defaults to newline character.
90+
mode: str, optional
91+
If file is ``str``, file will be opened in this mode. Defaults to ``"a"``
92+
(append mode).
93+
94+
Examples
95+
--------
96+
>>> source = Stream()
97+
>>> source.map(str).sink_to_textfile("test.txt")
98+
>>> source.emit(0)
99+
>>> source.emit(1)
100+
>>> print(open("test.txt", "r").read())
101+
0
102+
1
103+
"""
104+
def __init__(self, upstream, file, end="\n", mode="a", **kwargs):
105+
self._end = end
106+
self._fp = open(file, mode=mode) if isinstance(file, str) else file
107+
weakref.finalize(self, self._fp.close)
108+
super().__init__(upstream, **kwargs)
109+
110+
def __del__(self):
111+
self._fp.close()
112+
113+
def update(self, x, who=None, metadata=None):
114+
self._fp.write(x + self._end)

streamz/tests/test_core.py

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -395,23 +395,6 @@ def test_sink_to_file():
395395
assert data == 'a\nb\n'
396396

397397

398-
def test_sink_with_args_and_kwargs():
399-
L = dict()
400-
401-
def mycustomsink(elem, key, prefix=""):
402-
key = prefix + key
403-
if key not in L:
404-
L[key] = list()
405-
L[key].append(elem)
406-
407-
s = Stream()
408-
s.sink(mycustomsink, "cat", "super")
409-
410-
s.emit(1)
411-
s.emit(2)
412-
assert L['supercat'] == [1, 2]
413-
414-
415398
@gen_test()
416399
def test_counter():
417400
counter = itertools.count()
@@ -1097,7 +1080,7 @@ def test_connect():
10971080
# connect assumes this default behaviour
10981081
# of stream initialization
10991082
assert not source_downstream.downstreams
1100-
assert source_downstream.upstreams == [None]
1083+
assert source_downstream.upstreams == []
11011084

11021085
# initialize the second stream to connect to
11031086
source_upstream = Stream()

streamz/tests/test_sinks.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import weakref
2+
3+
import pytest
4+
from streamz import Stream
5+
from streamz.sinks import _global_sinks
6+
from streamz.utils_test import tmpfile
7+
8+
9+
def test_sink_with_args_and_kwargs():
10+
L = dict()
11+
12+
def mycustomsink(elem, key, prefix=""):
13+
key = prefix + key
14+
if key not in L:
15+
L[key] = list()
16+
L[key].append(elem)
17+
18+
s = Stream()
19+
sink = s.sink(mycustomsink, "cat", "super", stream_name="test")
20+
s.emit(1)
21+
s.emit(2)
22+
23+
assert L['supercat'] == [1, 2]
24+
assert sink.name == "test"
25+
26+
27+
def test_sink_to_textfile_fp():
28+
source = Stream()
29+
with tmpfile() as filename, open(filename, "w") as fp:
30+
source.map(str).sink_to_textfile(fp)
31+
source.emit(0)
32+
source.emit(1)
33+
34+
fp.flush()
35+
36+
assert open(filename, "r").read() == "0\n1\n"
37+
38+
39+
def test_sink_to_textfile_named():
40+
source = Stream()
41+
with tmpfile() as filename:
42+
_sink = source.map(str).sink_to_textfile(filename)
43+
source.emit(0)
44+
source.emit(1)
45+
46+
_sink._fp.flush()
47+
48+
assert open(filename, "r").read() == "0\n1\n"
49+
50+
51+
def test_sink_to_textfile_closes():
52+
source = Stream()
53+
with tmpfile() as filename:
54+
sink = source.sink_to_textfile(filename)
55+
fp = sink._fp
56+
_global_sinks.remove(sink)
57+
del sink
58+
59+
with pytest.raises(ValueError, match=r"I/O operation on closed file\."):
60+
fp.write(".")
61+
62+
63+
def test_sink_destroy():
64+
source = Stream()
65+
sink = source.sink(lambda x: None)
66+
ref = weakref.ref(sink)
67+
sink.destroy()
68+
del sink
69+
70+
assert ref() is None

0 commit comments

Comments
 (0)