Skip to content

Commit b52b92c

Browse files
committed
plugins lazy loading, specifying attribute_name in register_api, tests
1 parent b7a3692 commit b52b92c

4 files changed

Lines changed: 59 additions & 6 deletions

File tree

streamz/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from .sources import *
66
from .plugins import load_plugins
77

8-
load_plugins()
8+
load_plugins(Stream)
99

1010
try:
1111
from .dask import DaskStream, scatter

streamz/core.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ def _remove_upstream(self, upstream):
260260
self.upstreams.remove(upstream)
261261

262262
@classmethod
263-
def register_api(cls, modifier=identity):
263+
def register_api(cls, modifier=identity, attribute_name=None):
264264
""" Add callable to Stream API
265265
266266
This allows you to register a new method onto this class. You can use
@@ -290,10 +290,21 @@ def _(func):
290290
@functools.wraps(func)
291291
def wrapped(*args, **kwargs):
292292
return func(*args, **kwargs)
293-
setattr(cls, func.__name__, modifier(wrapped))
293+
name = attribute_name if attribute_name else func.__name__
294+
setattr(cls, name, modifier(wrapped))
294295
return func
295296
return _
296297

298+
@classmethod
299+
def register_plugin_entry_point(cls, entry_point, modifier=identity):
300+
def stub(*args, **kwargs):
301+
attribute = entry_point.load()
302+
cls.register_api(
303+
modifier=modifier, attribute_name=entry_point.name
304+
)(attribute)
305+
return attribute(*args, **kwargs)
306+
cls.register_api(modifier=modifier, attribute_name=entry_point.name)(stub)
307+
297308
def start(self):
298309
""" Start any upstream sources """
299310
for upstream in self.upstreams:

streamz/plugins.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import pkg_resources
22

33

4-
def load_plugins():
5-
for entry_point in pkg_resources.iter_entry_points("streamz.plugins"):
6-
entry_point.load()
4+
def load_plugins(cls):
5+
for entry_point in pkg_resources.iter_entry_points("streamz.sources"):
6+
cls.register_plugin_entrypoint(entry_point, staticmethod)
7+
for entry_point in pkg_resources.iter_entry_points("streamz.nodes"):
8+
cls.register_plugin_entrypoint(entry_point)
9+
for entry_point in pkg_resources.iter_entry_points("streamz.sinks"):
10+
cls.register_plugin_entrypoint(entry_point)

streamz/tests/test_plugins.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from streamz.sources import Source
2+
from streamz import Stream
3+
4+
5+
class MockEntryPoint:
6+
7+
def __init__(self, name, cls):
8+
self.name = name
9+
self.cls = cls
10+
11+
def load(self):
12+
return self.cls
13+
14+
15+
def test_register_plugin_entry_point():
16+
class test(Stream):
17+
pass
18+
19+
entry_point = MockEntryPoint("test_node", test)
20+
Stream.register_plugin_entry_point(entry_point)
21+
22+
assert Stream.test_node.__name__ == "stub"
23+
24+
Stream().test_node()
25+
26+
assert Stream.test_node.__name__ == "test"
27+
28+
29+
def test_register_plugin_entry_point_modifier():
30+
class test(Source):
31+
pass
32+
33+
entry_point = MockEntryPoint("from_test", test)
34+
Stream.register_plugin_entry_point(entry_point, staticmethod)
35+
36+
Stream.from_test()
37+
38+
assert Stream.from_test.__self__ is Stream

0 commit comments

Comments
 (0)