Skip to content

Commit bceb79a

Browse files
committed
refactor sinks, sink_to_textfile
1 parent e9a2545 commit bceb79a

6 files changed

Lines changed: 142 additions & 62 deletions

File tree

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ install:
2121
- conda update conda
2222

2323
# Install dependencies
24-
- conda env create --name test-streamz --file ./conda/environments/streamz_dev.yml
24+
- travis_wait 30 conda env create --name test-streamz --file ./conda/environments/streamz_dev.yml
2525
- source activate test-streamz
2626

2727
- python setup.py install

streamz/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from .core import *
44
from .graph import *
55
from .sources import *
6+
from .sinks import *
67
try:
78
from .dask import DaskStream, scatter
89
except ImportError:

streamz/core.py

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@
2727

2828
no_default = '--no-default--'
2929

30-
_global_sinks = set()
31-
3230
_html_update_streams = set()
3331

3432
thread_state = threading.local()
@@ -622,48 +620,6 @@ def _release_refs(self, metadata, n=1):
622620
m['ref'].release(n)
623621

624622

625-
@Stream.register_api()
626-
class sink(Stream):
627-
""" Apply a function on every element
628-
629-
Examples
630-
--------
631-
>>> source = Stream()
632-
>>> L = list()
633-
>>> source.sink(L.append)
634-
>>> source.sink(print)
635-
>>> source.sink(print)
636-
>>> source.emit(123)
637-
123
638-
123
639-
>>> L
640-
[123]
641-
642-
See Also
643-
--------
644-
map
645-
Stream.sink_to_list
646-
"""
647-
_graphviz_shape = 'trapezium'
648-
649-
def __init__(self, upstream, func, *args, **kwargs):
650-
self.func = func
651-
# take the stream specific kwargs out
652-
stream_name = kwargs.pop("stream_name", None)
653-
self.kwargs = kwargs
654-
self.args = args
655-
656-
Stream.__init__(self, upstream, stream_name=stream_name)
657-
_global_sinks.add(self)
658-
659-
def update(self, x, who=None, metadata=None):
660-
result = self.func(x, *self.args, **self.kwargs)
661-
if gen.isawaitable(result):
662-
return result
663-
else:
664-
return []
665-
666-
667623
@Stream.register_api()
668624
class map(Stream):
669625
""" Apply a function to every element in the stream

streamz/sinks.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
from tornado import gen
2+
3+
from streamz import Stream
4+
5+
_global_sinks = set()
6+
7+
8+
class Sink(Stream):
9+
10+
_graphviz_shape = 'trapezium'
11+
12+
def __init__(self, upstream, **kwargs):
13+
super().__init__(upstream, **kwargs)
14+
_global_sinks.add(self)
15+
16+
17+
@Stream.register_api()
18+
class sink(Sink):
19+
""" Apply a function on every element
20+
21+
Examples
22+
--------
23+
>>> source = Stream()
24+
>>> L = list()
25+
>>> source.sink(L.append)
26+
>>> source.sink(print)
27+
>>> source.sink(print)
28+
>>> source.emit(123)
29+
123
30+
123
31+
>>> L
32+
[123]
33+
34+
See Also
35+
--------
36+
map
37+
Stream.sink_to_list
38+
"""
39+
40+
def __init__(self, upstream, func, *args, **kwargs):
41+
self.func = func
42+
# take the stream specific kwargs out
43+
stream_name = kwargs.pop("stream_name", None)
44+
self.kwargs = kwargs
45+
self.args = args
46+
super().__init__(upstream, stream_name=stream_name)
47+
48+
def update(self, x, who=None, metadata=None):
49+
result = self.func(x, *self.args, **self.kwargs)
50+
if gen.isawaitable(result):
51+
return result
52+
else:
53+
return []
54+
55+
56+
@Stream.register_api()
57+
class sink_to_textfile(Sink):
58+
""" Write elements to a plain text file, one element per line. Type of elements
59+
must be ``str``.
60+
61+
Arguments
62+
---------
63+
file: str or file-like
64+
File to write the elements to. ``str`` is treated as a file name to open.
65+
If file-like, descriptor must be open in text mode.
66+
Note that the file descriptor will be closed when sink is destroyed.
67+
end: str, optional
68+
This value will be written to the file after each element.
69+
Defaults to newline character.
70+
mode: str, optional
71+
If file is ``str``, file will be opened in this mode. Defaults to ``"a"``
72+
(append mode).
73+
74+
Examples
75+
--------
76+
>>> source = Stream()
77+
>>> source.map(str).sink_to_file("test.txt")
78+
>>> source.emit(0)
79+
>>> source.emit(1)
80+
>>> print(open("test.txt", "r").read())
81+
0
82+
1
83+
"""
84+
def __init__(self, upstream, file, end="\n", mode="a", **kwargs):
85+
self._fp = open(file, mode=mode, buffering=1) if isinstance(file, str) else file
86+
self._end = end
87+
super().__init__(upstream, ensure_io_loop=True, **kwargs)
88+
89+
def __del__(self):
90+
self._fp.close()
91+
92+
@gen.coroutine
93+
def update(self, x, who=None, metadata=None):
94+
yield self.loop.run_in_executor(None, self._fp.write, x)
95+
yield self.loop.run_in_executor(None, self._fp.write, self._end)

streamz/tests/test_core.py

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -336,23 +336,6 @@ def test_sink_to_file():
336336
assert data == 'a\nb\n'
337337

338338

339-
def test_sink_with_args_and_kwargs():
340-
L = dict()
341-
342-
def mycustomsink(elem, key, prefix=""):
343-
key = prefix + key
344-
if key not in L:
345-
L[key] = list()
346-
L[key].append(elem)
347-
348-
s = Stream()
349-
s.sink(mycustomsink, "cat", "super")
350-
351-
s.emit(1)
352-
s.emit(2)
353-
assert L['supercat'] == [1, 2]
354-
355-
356339
@gen_test()
357340
def test_counter():
358341
counter = itertools.count()

streamz/tests/test_sinks.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from time import sleep
2+
3+
from streamz import Stream
4+
from streamz.utils_test import tmpfile
5+
6+
7+
def test_sink_with_args_and_kwargs():
8+
L = dict()
9+
10+
def mycustomsink(elem, key, prefix=""):
11+
key = prefix + key
12+
if key not in L:
13+
L[key] = list()
14+
L[key].append(elem)
15+
16+
s = Stream()
17+
s.sink(mycustomsink, "cat", "super")
18+
19+
s.emit(1)
20+
s.emit(2)
21+
assert L['supercat'] == [1, 2]
22+
23+
24+
def test_sink_to_textfile_fp():
25+
source = Stream()
26+
with tmpfile() as filename, open(filename, "w", buffering=1) as fp:
27+
source.map(str).sink_to_textfile(fp)
28+
source.emit(0)
29+
source.emit(1)
30+
31+
sleep(0.01)
32+
33+
assert open(filename, "r").read() == "0\n1\n"
34+
35+
36+
def test_sink_to_textfile_named():
37+
source = Stream()
38+
with tmpfile() as filename:
39+
source.map(str).sink_to_textfile(filename)
40+
source.emit(0)
41+
source.emit(1)
42+
43+
sleep(0.01)
44+
45+
assert open(filename, "r").read() == "0\n1\n"

0 commit comments

Comments
 (0)