Skip to content

Commit 5b394a7

Browse files
committed
handle username and pw for mqtt source
1 parent e077622 commit 5b394a7

1 file changed

Lines changed: 6 additions & 1 deletion

File tree

streamz/sources.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -896,12 +896,15 @@ class from_mqtt(from_q):
896896
:param client_kwargs:
897897
Passed to the client's ``connect()`` method
898898
"""
899-
def __init__(self, host, port, topic, keepalive=60 , client_kwargs=None, **kwargs):
899+
def __init__(self, host, port, topic, keepalive=60 , client_kwargs=None,
900+
user=None, pw=None, **kwargs):
900901
self.host = host
901902
self.port = port
902903
self.keepalive = keepalive
903904
self.topic = topic
904905
self.client_kwargs = client_kwargs
906+
self.user = user
907+
self.pw = pw
905908
super().__init__(q=queue.Queue(), **kwargs)
906909

907910
def _on_connect(self, client, userdata, flags, rc):
@@ -913,6 +916,8 @@ def _on_message(self, client, userdata, msg):
913916
async def run(self):
914917
import paho.mqtt.client as mqtt
915918
client = mqtt.Client()
919+
if self.user:
920+
client.username_pw_set(self.user, self.pw)
916921
client.on_connect = self._on_connect
917922
client.on_message = self._on_message
918923
client.connect(self.host, self.port, self.keepalive, **(self.client_kwargs or {}))

0 commit comments

Comments
 (0)