Skip to content

Commit dca7370

Browse files
authored
Merge pull request #380 from roveo/plugins
A simple plugin system
2 parents e9a2545 + fe0f9d9 commit dca7370

9 files changed

Lines changed: 230 additions & 6 deletions

File tree

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ install:
2121
- conda update conda
2222

2323
# Install dependencies
24-
- conda env create --name test-streamz --file ./conda/environments/streamz_dev.yml
24+
- travis_wait 30 conda env create --name test-streamz --file ./conda/environments/streamz_dev.yml
2525
- source activate test-streamz
2626

2727
- python setup.py install

docs/source/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,4 @@ data streaming systems like `Apache Flink <https://flink.apache.org/>`_,
122122
collections-api.rst
123123
async.rst
124124
plotting.rst
125+
plugins.rst

docs/source/plugins.rst

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
Plugins
2+
=======
3+
4+
In addition to using ``@Stream.register_api()`` decorator, custom stream nodes can
5+
be added to Streamz by installing 3rd-party Python packages.
6+
7+
8+
Known plugins
9+
-------------
10+
11+
Extras
12+
++++++
13+
14+
These plugins are supported by the Streamz community and can be installed as extras,
15+
e.g. ``pip install streamz[kafka]``.
16+
17+
There are no plugins here yet, but hopefully soon there will be.
18+
19+
.. only:: comment
20+
================= ======================================================
21+
Extra name Description
22+
================= ======================================================
23+
``files`` Advanced filesystem operations: listening for new
24+
files in a directory, writing to multiple files etc.
25+
``kafka`` Reading from and writing to Kafka topics.
26+
================= ======================================================
27+
28+
29+
Entry points
30+
------------
31+
32+
Plugins register themselves with Streamz by using ``entry_points`` argument
33+
in ``setup.py``:
34+
35+
.. code-block:: Python
36+
37+
# setup.py
38+
39+
from setuptools import setup
40+
41+
setup(
42+
name="streamz_example_plugin",
43+
version="0.0.1",
44+
entry_points={
45+
"streamz.nodes": [
46+
"repeat = streamz_example_plugin:RepeatNode"
47+
]
48+
}
49+
)
50+
51+
In this example, ``RepeatNode`` class will be imported from
52+
``streamz_example_plugin`` package and will be available as ``Stream.repeat``.
53+
In practice, class name and entry point name (the part before ``=`` in entry point
54+
definition) are usually the same, but they `can` be different.
55+
56+
Different kinds of add-ons go into different entry point groups:
57+
58+
=========== ======================= =====================
59+
Node type Required parent class Entry point group
60+
=========== ======================= =====================
61+
Source ``streamz.Source`` ``streamz.sources``
62+
Node ``streamz.Stream`` ``streamz.nodes``
63+
Sink ``streamz.Stream`` ``streamz.sinks``
64+
=========== ======================= =====================
65+
66+
67+
Lazy loading
68+
++++++++++++
69+
70+
Streamz will attach methods from existing plugins to the ``Stream`` class when it's
71+
imported, but actual classes will be loaded only when the corresponding ``Stream``
72+
method is first called. Streamz will also validate the loaded class before attaching it
73+
and will raise an appropriate exception if validation fails.
74+
75+
76+
Reference implementation
77+
------------------------
78+
79+
Let's look at how stream nodes can be implemented.
80+
81+
.. code-block:: Python
82+
83+
# __init__.py
84+
85+
from tornado import gen
86+
from streamz import Stream
87+
88+
89+
class RepeatNode(Stream):
90+
91+
def __init__(self, upstream, n, **kwargs):
92+
super().__init__(upstream, ensure_io_loop=True, **kwargs)
93+
self._n = n
94+
95+
@gen.coroutine
96+
def update(self, x, who=None, metadata=None):
97+
for _ in range(self._n):
98+
yield self._emit(x, metadata=metadata)
99+
100+
As you can see, implementation is the same as usual, but there's no
101+
``@Stream.register_api()`` — Streamz will take care of that when loading the plugin.
102+
It will still work if you add the decorator, but you don't have to.

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ tornado
22
toolz
33
zict
44
six
5+
setuptools

streamz/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
from .core import *
44
from .graph import *
55
from .sources import *
6+
from .plugins import load_plugins
7+
8+
load_plugins(Stream)
9+
610
try:
711
from .dask import DaskStream, scatter
812
except ImportError:

streamz/core.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ def _remove_upstream(self, upstream):
260260
self.upstreams.remove(upstream)
261261

262262
@classmethod
263-
def register_api(cls, modifier=identity):
263+
def register_api(cls, modifier=identity, attribute_name=None):
264264
""" Add callable to Stream API
265265
266266
This allows you to register a new method onto this class. You can use
@@ -273,7 +273,7 @@ def register_api(cls, modifier=identity):
273273
>>> Stream().foo(...) # this works now
274274
275275
It attaches the callable as a normal attribute to the class object. In
276-
doing so it respsects inheritance (all subclasses of Stream will also
276+
doing so it respects inheritance (all subclasses of Stream will also
277277
get the foo attribute).
278278
279279
By default callables are assumed to be instance methods. If you like
@@ -285,15 +285,49 @@ def register_api(cls, modifier=identity):
285285
... ...
286286
287287
>>> Stream.foo(...) # Foo operates as a static method
288+
289+
You can also provide an optional ``attribute_name`` argument to control
290+
the name of the attribute your callable will be attached as.
291+
292+
>>> @Stream.register_api(attribute_name="bar")
293+
... class foo(Stream):
294+
... ...
295+
296+
>> Stream().bar(...) # foo was actually attached as bar
288297
"""
289298
def _(func):
290299
@functools.wraps(func)
291300
def wrapped(*args, **kwargs):
292301
return func(*args, **kwargs)
293-
setattr(cls, func.__name__, modifier(wrapped))
302+
name = attribute_name if attribute_name else func.__name__
303+
setattr(cls, name, modifier(wrapped))
294304
return func
295305
return _
296306

307+
@classmethod
308+
def register_plugin_entry_point(cls, entry_point, modifier=identity):
309+
if hasattr(cls, entry_point.name):
310+
raise ValueError(
311+
f"Can't add {entry_point.name} from {entry_point.module_name} "
312+
f"to {cls.__name__}: duplicate method name."
313+
)
314+
315+
def stub(*args, **kwargs):
316+
""" Entrypoints-based streamz plugin. Will be loaded on first call. """
317+
node = entry_point.load()
318+
if not issubclass(node, Stream):
319+
raise TypeError(
320+
f"Error loading {entry_point.name} "
321+
f"from module {entry_point.module_name}: "
322+
f"{node.__class__.__name__} must be a subclass of Stream"
323+
)
324+
if getattr(cls, entry_point.name).__name__ == "stub":
325+
cls.register_api(
326+
modifier=modifier, attribute_name=entry_point.name
327+
)(node)
328+
return node(*args, **kwargs)
329+
cls.register_api(modifier=modifier, attribute_name=entry_point.name)(stub)
330+
297331
def start(self):
298332
""" Start any upstream sources """
299333
for upstream in self.upstreams:

streamz/dataframe/tests/test_dataframes.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1008,9 +1008,9 @@ def test_windowed_groupby_aggs_with_start_state(stream):
10081008
out_df1 = pd.DataFrame({'name':['Alice', 'Bob', 'Linda', 'Tom'], 'amount':[50.0, 550.0, 100.0, 150.0]})
10091009
assert_eq(output1[-1][1].reset_index(), out_df1)
10101010

1011-
1011+
10121012
def test_dir(stream):
10131013
example = pd.DataFrame({'name': [], 'amount': []})
10141014
sdf = DataFrame(stream, example=example)
10151015
assert 'name' in dir(sdf)
1016-
assert 'amount' in dir(sdf)
1016+
assert 'amount' in dir(sdf)

streamz/plugins.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import warnings
2+
3+
import pkg_resources
4+
5+
6+
def try_register(cls, entry_point, *modifier):
7+
try:
8+
cls.register_plugin_entry_point(entry_point, *modifier)
9+
except ValueError:
10+
warnings.warn(
11+
f"Can't add {entry_point.name} from {entry_point.module_name}: "
12+
"name collision with existing stream node."
13+
)
14+
15+
16+
def load_plugins(cls):
17+
for entry_point in pkg_resources.iter_entry_points("streamz.sources"):
18+
try_register(cls, entry_point, staticmethod)
19+
for entry_point in pkg_resources.iter_entry_points("streamz.nodes"):
20+
try_register(cls, entry_point)
21+
for entry_point in pkg_resources.iter_entry_points("streamz.sinks"):
22+
try_register(cls, entry_point)

streamz/tests/test_plugins.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import inspect
2+
3+
import pytest
4+
from streamz import Source, Stream
5+
6+
7+
class MockEntryPoint:
8+
9+
def __init__(self, name, cls, module_name=None):
10+
self.name = name
11+
self.cls = cls
12+
self.module_name = module_name
13+
14+
def load(self):
15+
return self.cls
16+
17+
18+
def test_register_plugin_entry_point():
19+
class test_stream(Stream):
20+
pass
21+
22+
entry_point = MockEntryPoint("test_node", test_stream)
23+
Stream.register_plugin_entry_point(entry_point)
24+
25+
assert Stream.test_node.__name__ == "stub"
26+
27+
Stream().test_node()
28+
29+
assert Stream.test_node.__name__ == "test_stream"
30+
31+
32+
def test_register_plugin_entry_point_modifier():
33+
class test_source(Source):
34+
pass
35+
36+
entry_point = MockEntryPoint("from_test", test_source)
37+
Stream.register_plugin_entry_point(entry_point, staticmethod)
38+
39+
Stream.from_test()
40+
41+
assert inspect.isfunction(Stream().from_test)
42+
43+
44+
def test_register_plugin_entry_point_raises_type():
45+
class invalid_node:
46+
pass
47+
48+
entry_point = MockEntryPoint("test", invalid_node, "test_module.test")
49+
50+
Stream.register_plugin_entry_point(entry_point)
51+
52+
with pytest.raises(TypeError):
53+
Stream.test()
54+
55+
56+
def test_register_plugin_entry_point_raises_duplicate_name():
57+
entry_point = MockEntryPoint("map", None)
58+
59+
with pytest.raises(ValueError):
60+
Stream.register_plugin_entry_point(entry_point)

0 commit comments

Comments
 (0)