Skip to content

Commit 6e07036

Browse files
authored
Merge pull request #393 from roveo/from_iterable2
from_iterable source
2 parents f6dd5aa + 6b23b6f commit 6e07036

5 files changed

Lines changed: 79 additions & 6 deletions

File tree

docs/source/api.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ Sources
5151
-------
5252

5353
.. autosummary::
54+
from_iterable
5455
filenames
5556
from_kafka
5657
from_kafka_batched
@@ -95,6 +96,7 @@ Definitions
9596
.. autofunction:: zip
9697
.. autofunction:: zip_latest
9798

99+
.. autofunction:: from_iterable
98100
.. autofunction:: filenames
99101
.. autofunction:: from_kafka
100102
.. autofunction:: from_kafka_batched

streamz/sinks.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ def __init__(self, upstream, **kwargs):
1717
super().__init__(upstream, **kwargs)
1818
_global_sinks.add(self)
1919

20+
def destroy(self):
21+
super().destroy()
22+
_global_sinks.remove(self)
23+
2024

2125
@Stream.register_api()
2226
class sink(Sink):
@@ -67,10 +71,6 @@ def update(self, x, who=None, metadata=None):
6771
else:
6872
return []
6973

70-
def destroy(self):
71-
super().destroy()
72-
_global_sinks.remove(self)
73-
7474

7575
@Stream.register_api()
7676
class sink_to_textfile(Sink):

streamz/sources.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -731,3 +731,39 @@ def get_message_batch_cudf(kafka_params, topic, partition, keys, low, high, time
731731
finally:
732732
consumer.close()
733733
return gdf
734+
735+
736+
@Stream.register_api(staticmethod)
737+
class from_iterable(Source):
738+
""" Emits items from an iterable.
739+
740+
Parameters
741+
----------
742+
iterable: iterable
743+
An iterable to emit messages from.
744+
745+
Examples
746+
--------
747+
748+
>>> source = Stream.from_iterable(range(3))
749+
>>> L = source.sink_to_list()
750+
>>> source.start()
751+
>>> L
752+
[0, 1, 2]
753+
"""
754+
755+
def __init__(self, iterable, **kwargs):
756+
super().__init__(ensure_io_loop=True, **kwargs)
757+
self._iterable = iterable
758+
759+
def start(self):
760+
self.stopped = False
761+
self.loop.add_callback(self._run)
762+
763+
@gen.coroutine
764+
def _run(self):
765+
for x in self._iterable:
766+
if self.stopped:
767+
break
768+
yield self._emit(x)
769+
self.stopped = True

streamz/tests/test_sinks.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import pytest
44
from streamz import Stream
5-
from streamz.sinks import _global_sinks
5+
from streamz.sinks import _global_sinks, Sink
66
from streamz.utils_test import tmpfile
77

88

@@ -62,9 +62,12 @@ def test_sink_to_textfile_closes():
6262

6363
def test_sink_destroy():
6464
source = Stream()
65-
sink = source.sink(lambda x: None)
65+
sink = Sink(source)
6666
ref = weakref.ref(sink)
6767
sink.destroy()
68+
69+
assert sink not in _global_sinks
70+
6871
del sink
6972

7073
assert ref() is None

streamz/tests/test_sources.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,35 @@ def test_process():
9595
s.start()
9696
yield await_for(lambda: out == [b'0\n', b'1\n', b'2\n', b'3\n'], timeout=5)
9797
s.stop()
98+
99+
100+
def test_from_iterable():
101+
source = Source.from_iterable(range(3))
102+
L = source.sink_to_list()
103+
source.start()
104+
wait_for(lambda: L == [0, 1, 2], 0.1)
105+
106+
107+
def test_from_iterable_backpressure():
108+
it = iter(range(5))
109+
source = Source.from_iterable(it)
110+
L = source.rate_limit(0.01).sink_to_list()
111+
source.start()
112+
113+
wait_for(lambda: L == [0], 1)
114+
assert next(it) == 2 # 1 is in blocked _emit
115+
116+
117+
def test_from_iterable_stop():
118+
from _pytest.outcomes import Failed
119+
120+
source = Source.from_iterable(range(5))
121+
L = source.rate_limit(0.01).sink_to_list()
122+
source.start()
123+
124+
wait_for(lambda: L == [0], 1)
125+
source.stop()
126+
127+
assert source.stopped
128+
with pytest.raises(Failed):
129+
wait_for(lambda: L == [0, 1, 2], 0.1)

0 commit comments

Comments
 (0)