Skip to content

Commit 31018a7

Browse files
committed
Merge commit 'dca737083b2f16b2bbf0ebbb2754fbbe6f38fdff' into refactor_sinks
# Conflicts: # streamz/__init__.py
2 parents bceb79a + dca7370 commit 31018a7

8 files changed

Lines changed: 229 additions & 5 deletions

File tree

docs/source/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,4 @@ data streaming systems like `Apache Flink <https://flink.apache.org/>`_,
122122
collections-api.rst
123123
async.rst
124124
plotting.rst
125+
plugins.rst

docs/source/plugins.rst

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
Plugins
2+
=======
3+
4+
In addition to using ``@Stream.register_api()`` decorator, custom stream nodes can
5+
be added to Streamz by installing 3rd-party Python packages.
6+
7+
8+
Known plugins
9+
-------------
10+
11+
Extras
12+
++++++
13+
14+
These plugins are supported by the Streamz community and can be installed as extras,
15+
e.g. ``pip install streamz[kafka]``.
16+
17+
There are no plugins here yet, but hopefully soon there will be.
18+
19+
.. only:: comment
20+
================= ======================================================
21+
Extra name Description
22+
================= ======================================================
23+
``files`` Advanced filesystem operations: listening for new
24+
files in a directory, writing to multiple files etc.
25+
``kafka`` Reading from and writing to Kafka topics.
26+
================= ======================================================
27+
28+
29+
Entry points
30+
------------
31+
32+
Plugins register themselves with Streamz by using ``entry_points`` argument
33+
in ``setup.py``:
34+
35+
.. code-block:: Python
36+
37+
# setup.py
38+
39+
from setuptools import setup
40+
41+
setup(
42+
name="streamz_example_plugin",
43+
version="0.0.1",
44+
entry_points={
45+
"streamz.nodes": [
46+
"repeat = streamz_example_plugin:RepeatNode"
47+
]
48+
}
49+
)
50+
51+
In this example, ``RepeatNode`` class will be imported from
52+
``streamz_example_plugin`` package and will be available as ``Stream.repeat``.
53+
In practice, class name and entry point name (the part before ``=`` in entry point
54+
definition) are usually the same, but they `can` be different.
55+
56+
Different kinds of add-ons go into different entry point groups:
57+
58+
=========== ======================= =====================
59+
Node type Required parent class Entry point group
60+
=========== ======================= =====================
61+
Source ``streamz.Source`` ``streamz.sources``
62+
Node ``streamz.Stream`` ``streamz.nodes``
63+
Sink ``streamz.Stream`` ``streamz.sinks``
64+
=========== ======================= =====================
65+
66+
67+
Lazy loading
68+
++++++++++++
69+
70+
Streamz will attach methods from existing plugins to the ``Stream`` class when it's
71+
imported, but actual classes will be loaded only when the corresponding ``Stream``
72+
method is first called. Streamz will also validate the loaded class before attaching it
73+
and will raise an appropriate exception if validation fails.
74+
75+
76+
Reference implementation
77+
------------------------
78+
79+
Let's look at how stream nodes can be implemented.
80+
81+
.. code-block:: Python
82+
83+
# __init__.py
84+
85+
from tornado import gen
86+
from streamz import Stream
87+
88+
89+
class RepeatNode(Stream):
90+
91+
def __init__(self, upstream, n, **kwargs):
92+
super().__init__(upstream, ensure_io_loop=True, **kwargs)
93+
self._n = n
94+
95+
@gen.coroutine
96+
def update(self, x, who=None, metadata=None):
97+
for _ in range(self._n):
98+
yield self._emit(x, metadata=metadata)
99+
100+
As you can see, implementation is the same as usual, but there's no
101+
``@Stream.register_api()`` — Streamz will take care of that when loading the plugin.
102+
It will still work if you add the decorator, but you don't have to.

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ tornado
22
toolz
33
zict
44
six
5+
setuptools

streamz/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44
from .graph import *
55
from .sources import *
66
from .sinks import *
7+
from .plugins import load_plugins
8+
9+
load_plugins(Stream)
10+
711
try:
812
from .dask import DaskStream, scatter
913
except ImportError:

streamz/core.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ def _remove_upstream(self, upstream):
258258
self.upstreams.remove(upstream)
259259

260260
@classmethod
261-
def register_api(cls, modifier=identity):
261+
def register_api(cls, modifier=identity, attribute_name=None):
262262
""" Add callable to Stream API
263263
264264
This allows you to register a new method onto this class. You can use
@@ -271,7 +271,7 @@ def register_api(cls, modifier=identity):
271271
>>> Stream().foo(...) # this works now
272272
273273
It attaches the callable as a normal attribute to the class object. In
274-
doing so it respsects inheritance (all subclasses of Stream will also
274+
doing so it respects inheritance (all subclasses of Stream will also
275275
get the foo attribute).
276276
277277
By default callables are assumed to be instance methods. If you like
@@ -283,15 +283,49 @@ def register_api(cls, modifier=identity):
283283
... ...
284284
285285
>>> Stream.foo(...) # Foo operates as a static method
286+
287+
You can also provide an optional ``attribute_name`` argument to control
288+
the name of the attribute your callable will be attached as.
289+
290+
>>> @Stream.register_api(attribute_name="bar")
291+
... class foo(Stream):
292+
... ...
293+
294+
>> Stream().bar(...) # foo was actually attached as bar
286295
"""
287296
def _(func):
288297
@functools.wraps(func)
289298
def wrapped(*args, **kwargs):
290299
return func(*args, **kwargs)
291-
setattr(cls, func.__name__, modifier(wrapped))
300+
name = attribute_name if attribute_name else func.__name__
301+
setattr(cls, name, modifier(wrapped))
292302
return func
293303
return _
294304

305+
@classmethod
306+
def register_plugin_entry_point(cls, entry_point, modifier=identity):
307+
if hasattr(cls, entry_point.name):
308+
raise ValueError(
309+
f"Can't add {entry_point.name} from {entry_point.module_name} "
310+
f"to {cls.__name__}: duplicate method name."
311+
)
312+
313+
def stub(*args, **kwargs):
314+
""" Entrypoints-based streamz plugin. Will be loaded on first call. """
315+
node = entry_point.load()
316+
if not issubclass(node, Stream):
317+
raise TypeError(
318+
f"Error loading {entry_point.name} "
319+
f"from module {entry_point.module_name}: "
320+
f"{node.__class__.__name__} must be a subclass of Stream"
321+
)
322+
if getattr(cls, entry_point.name).__name__ == "stub":
323+
cls.register_api(
324+
modifier=modifier, attribute_name=entry_point.name
325+
)(node)
326+
return node(*args, **kwargs)
327+
cls.register_api(modifier=modifier, attribute_name=entry_point.name)(stub)
328+
295329
def start(self):
296330
""" Start any upstream sources """
297331
for upstream in self.upstreams:

streamz/dataframe/tests/test_dataframes.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1008,9 +1008,9 @@ def test_windowed_groupby_aggs_with_start_state(stream):
10081008
out_df1 = pd.DataFrame({'name':['Alice', 'Bob', 'Linda', 'Tom'], 'amount':[50.0, 550.0, 100.0, 150.0]})
10091009
assert_eq(output1[-1][1].reset_index(), out_df1)
10101010

1011-
1011+
10121012
def test_dir(stream):
10131013
example = pd.DataFrame({'name': [], 'amount': []})
10141014
sdf = DataFrame(stream, example=example)
10151015
assert 'name' in dir(sdf)
1016-
assert 'amount' in dir(sdf)
1016+
assert 'amount' in dir(sdf)

streamz/plugins.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import warnings
2+
3+
import pkg_resources
4+
5+
6+
def try_register(cls, entry_point, *modifier):
7+
try:
8+
cls.register_plugin_entry_point(entry_point, *modifier)
9+
except ValueError:
10+
warnings.warn(
11+
f"Can't add {entry_point.name} from {entry_point.module_name}: "
12+
"name collision with existing stream node."
13+
)
14+
15+
16+
def load_plugins(cls):
17+
for entry_point in pkg_resources.iter_entry_points("streamz.sources"):
18+
try_register(cls, entry_point, staticmethod)
19+
for entry_point in pkg_resources.iter_entry_points("streamz.nodes"):
20+
try_register(cls, entry_point)
21+
for entry_point in pkg_resources.iter_entry_points("streamz.sinks"):
22+
try_register(cls, entry_point)

streamz/tests/test_plugins.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import inspect
2+
3+
import pytest
4+
from streamz import Source, Stream
5+
6+
7+
class MockEntryPoint:
8+
9+
def __init__(self, name, cls, module_name=None):
10+
self.name = name
11+
self.cls = cls
12+
self.module_name = module_name
13+
14+
def load(self):
15+
return self.cls
16+
17+
18+
def test_register_plugin_entry_point():
19+
class test_stream(Stream):
20+
pass
21+
22+
entry_point = MockEntryPoint("test_node", test_stream)
23+
Stream.register_plugin_entry_point(entry_point)
24+
25+
assert Stream.test_node.__name__ == "stub"
26+
27+
Stream().test_node()
28+
29+
assert Stream.test_node.__name__ == "test_stream"
30+
31+
32+
def test_register_plugin_entry_point_modifier():
33+
class test_source(Source):
34+
pass
35+
36+
entry_point = MockEntryPoint("from_test", test_source)
37+
Stream.register_plugin_entry_point(entry_point, staticmethod)
38+
39+
Stream.from_test()
40+
41+
assert inspect.isfunction(Stream().from_test)
42+
43+
44+
def test_register_plugin_entry_point_raises_type():
45+
class invalid_node:
46+
pass
47+
48+
entry_point = MockEntryPoint("test", invalid_node, "test_module.test")
49+
50+
Stream.register_plugin_entry_point(entry_point)
51+
52+
with pytest.raises(TypeError):
53+
Stream.test()
54+
55+
56+
def test_register_plugin_entry_point_raises_duplicate_name():
57+
entry_point = MockEntryPoint("map", None)
58+
59+
with pytest.raises(ValueError):
60+
Stream.register_plugin_entry_point(entry_point)

0 commit comments

Comments
 (0)