Skip to content

Commit 2a3390a

Browse files
committed
add from_iterable, move .destroy in sink to the base class
1 parent f6dd5aa commit 2a3390a

4 files changed

Lines changed: 46 additions & 4 deletions

File tree

docs/source/api.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ Sources
5151
-------
5252

5353
.. autosummary::
54+
from_iterable
5455
filenames
5556
from_kafka
5657
from_kafka_batched
@@ -95,6 +96,7 @@ Definitions
9596
.. autofunction:: zip
9697
.. autofunction:: zip_latest
9798

99+
.. autofunction:: from_iterable
98100
.. autofunction:: filenames
99101
.. autofunction:: from_kafka
100102
.. autofunction:: from_kafka_batched

streamz/sinks.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ def __init__(self, upstream, **kwargs):
1717
super().__init__(upstream, **kwargs)
1818
_global_sinks.add(self)
1919

20+
def destroy(self):
21+
super().destroy()
22+
_global_sinks.remove(self)
23+
2024

2125
@Stream.register_api()
2226
class sink(Sink):
@@ -67,10 +71,6 @@ def update(self, x, who=None, metadata=None):
6771
else:
6872
return []
6973

70-
def destroy(self):
71-
super().destroy()
72-
_global_sinks.remove(self)
73-
7474

7575
@Stream.register_api()
7676
class sink_to_textfile(Sink):

streamz/sources.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -731,3 +731,36 @@ def get_message_batch_cudf(kafka_params, topic, partition, keys, low, high, time
731731
finally:
732732
consumer.close()
733733
return gdf
734+
735+
736+
@Stream.register_api(staticmethod)
737+
class from_iterable(Source):
738+
""" Emits items from an iterable.
739+
740+
Parameters
741+
----------
742+
iterable: iterable
743+
An iterable to emit messages from.
744+
745+
Examples
746+
--------
747+
748+
>>> source = Stream.from_iterable(range(3))
749+
>>> L = source.sink_to_list()
750+
>>> source.start()
751+
>>> L
752+
[0, 1, 2]
753+
"""
754+
755+
def __init__(self, iterable, **kwargs):
756+
super().__init__(ensure_io_loop=True, **kwargs)
757+
self._iterable = iterable
758+
759+
def start(self):
760+
self.stopped = False
761+
self.loop.add_callback(self._run)
762+
763+
@gen.coroutine
764+
def _run(self):
765+
for x in self._iterable:
766+
yield self._emit(x)

streamz/tests/test_sources.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,10 @@ def test_process():
9595
s.start()
9696
yield await_for(lambda: out == [b'0\n', b'1\n', b'2\n', b'3\n'], timeout=5)
9797
s.stop()
98+
99+
100+
def test_from_iterable():
101+
source = Source.from_iterable(range(3))
102+
L = source.sink_to_list()
103+
source.start()
104+
wait_for(lambda: L == [0, 1, 2], 0.1)

0 commit comments

Comments
 (0)