Skip to content

Commit 2812f1f

Browse files
committed
entrypoints
1 parent 644b563 commit 2812f1f

4 files changed

Lines changed: 13 additions & 8 deletions

File tree

streamz/core.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ def wrapped(*args, **kwargs):
168168
def register_plugin_entry_point(cls, entry_point, modifier=identity):
169169
if hasattr(cls, entry_point.name):
170170
raise ValueError(
171-
f"Can't add {entry_point.name} from {entry_point.module_name} "
171+
f"Can't add {entry_point.name} "
172172
f"to {cls.__name__}: duplicate method name."
173173
)
174174

@@ -178,7 +178,6 @@ def stub(*args, **kwargs):
178178
if not issubclass(node, Stream):
179179
raise TypeError(
180180
f"Error loading {entry_point.name} "
181-
f"from module {entry_point.module_name}: "
182181
f"{node.__class__.__name__} must be a subclass of Stream"
183182
)
184183
if getattr(cls, entry_point.name).__name__ == "stub":

streamz/plugins.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import warnings
22

3-
import pkg_resources
3+
import importlib.metadata
44

55

66
def try_register(cls, entry_point, *modifier):
@@ -14,9 +14,10 @@ def try_register(cls, entry_point, *modifier):
1414

1515

1616
def load_plugins(cls):
17-
for entry_point in pkg_resources.iter_entry_points("streamz.sources"):
17+
eps = importlib.metadata.entry_points()
18+
for entry_point in eps.get("streamz.sources", []):
1819
try_register(cls, entry_point, staticmethod)
19-
for entry_point in pkg_resources.iter_entry_points("streamz.nodes"):
20+
for entry_point in eps.get("streamz.nodes", []):
2021
try_register(cls, entry_point)
21-
for entry_point in pkg_resources.iter_entry_points("streamz.sinks"):
22+
for entry_point in eps.get("streamz.sinks", []):
2223
try_register(cls, entry_point)

streamz/sources.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -896,12 +896,15 @@ class from_mqtt(from_q):
896896
:param client_kwargs:
897897
Passed to the client's ``connect()`` method
898898
"""
899-
def __init__(self, host, port, topic, keepalive=60 , client_kwargs=None, **kwargs):
899+
def __init__(self, host, port, topic, keepalive=60 , client_kwargs=None,
900+
user=None, pw=None, **kwargs):
900901
self.host = host
901902
self.port = port
902903
self.keepalive = keepalive
903904
self.topic = topic
904905
self.client_kwargs = client_kwargs
906+
self.user = user
907+
self.pw = pw
905908
super().__init__(q=queue.Queue(), **kwargs)
906909

907910
def _on_connect(self, client, userdata, flags, rc):
@@ -913,6 +916,8 @@ def _on_message(self, client, userdata, msg):
913916
async def run(self):
914917
import paho.mqtt.client as mqtt
915918
client = mqtt.Client()
919+
if self.user:
920+
client.username_pw_set(self.user, self.pw)
916921
client.on_connect = self._on_connect
917922
client.on_message = self._on_message
918923
client.connect(self.host, self.port, self.keepalive, **(self.client_kwargs or {}))

streamz/tests/test_kafka.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def stop_docker(name='streamz-kafka', cid=None, let_fail=False):
5151

5252
def launch_kafka():
5353
stop_docker(let_fail=True)
54-
subprocess.call(shlex.split("docker pull spotify/kafka"))
54+
subprocess.call(shlex.split("docker pull spotify/kafka"), stderr=subprocess.DEVNULL)
5555
cmd = ("docker run -d -p 2181:2181 -p 9092:9092 --env "
5656
"ADVERTISED_HOST=127.0.0.1 --env ADVERTISED_PORT=9092 "
5757
"--name streamz-kafka spotify/kafka")

0 commit comments

Comments
 (0)