Skip to content

Commit c029fbd

Browse files
author
Martin Durant
committed
Add mqtt source/sink pair
1 parent 2bebd38 commit c029fbd

5 files changed

Lines changed: 157 additions & 6 deletions

File tree

ci/environment-py38.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ channels:
33
- conda-forge
44
- defaults
55
dependencies:
6-
- python=3.7
6+
- python=3.8
77
- pytest
88
- flake8
99
- black
@@ -32,3 +32,5 @@ dependencies:
3232
- flaky
3333
- pytest-cov
3434
- coveralls
35+
- paho-mqtt
36+
- websockets

docs/source/api.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ Stream
4747
.. automethod:: Stream.sink_to_list
4848
.. automethod:: Stream.sink_to_textfile
4949
.. automethod:: Stream.to_websocket
50+
.. automethod:: Stream.to_mqtt
5051
.. automethod:: Stream.update
5152
.. automethod:: Stream.visualize
5253

@@ -58,6 +59,7 @@ Sources
5859
filenames
5960
from_kafka
6061
from_kafka_batched
62+
from_mqtt
6163
from_process
6264
from_websocket
6365
from_textfile

streamz/sinks.py

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,17 +134,57 @@ def __init__(self, upstream, uri, ws_kwargs=None, **kwargs):
134134
self.uri = uri
135135
self.ws_kw = ws_kwargs
136136
self.ws = None
137-
super().__init__(upstream, **kwargs)
137+
super().__init__(upstream, ensure_io_loop=True, **kwargs)
138138

139-
@gen.coroutine
140-
def update(self, x, who=None, metadata=None):
139+
async def update(self, x, who=None, metadata=None):
141140
import websockets
142141
if self.ws is None:
143-
self.ws = yield websockets.connect(self.uri, **(self.ws_kw or {}))
144-
yield self.ws.send(x)
142+
self.ws = await websockets.connect(self.uri, **(self.ws_kw or {}))
143+
await self.ws.send(x)
145144

146145
def destroy(self):
147146
super().destroy()
148147
if self.ws is not None:
149148
sync(self.loop, self.ws.protocol.close)
150149
self.ws = None
150+
151+
152+
@Stream.register_api()
153+
class to_mqtt(Sink):
154+
"""
155+
Send data to MQTT broker
156+
157+
See also ``sources.from_mqtt``.
158+
159+
Requires ``paho.mqtt``
160+
161+
:param host: str
162+
:param port: int
163+
:param topic: str
164+
:param keepalive: int
165+
See mqtt docs - to keep the channel alive
166+
:param client_kwargs:
167+
Passed to the client's ``connect()`` method
168+
"""
169+
def __init__(self, upstream, host, port, topic, keepalive=60, client_kwargs=None,
170+
**kwargs):
171+
self.host = host
172+
self.port = port
173+
self.c_kw = client_kwargs or {}
174+
self.client = None
175+
self.topic = topic
176+
self.keepalive = keepalive
177+
super().__init__(upstream, ensure_io_loop=True, **kwargs)
178+
179+
def update(self, x, who=None, metadata=None):
180+
import paho.mqtt.client as mqtt
181+
if self.client is None:
182+
self.client = mqtt.Client()
183+
self.client.connect(self.host, self.port, self.keepalive, **self.c_kw)
184+
# TODO: wait on successful delivery
185+
self.client.publish(self.topic, x)
186+
187+
def destroy(self):
188+
self.client.disconnect()
189+
self.client = None
190+
super().destroy()

streamz/sources.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
from glob import glob
3+
import queue
34
import os
45
import time
56
from tornado import gen
@@ -822,3 +823,92 @@ async def run(self):
822823
def stop(self):
823824
self.server.close()
824825
sync(self.loop, self.server.wait_closed)
826+
827+
828+
@Stream.register_api()
829+
class from_q(Source):
830+
"""Source events from a threading.Queue, running another event framework
831+
832+
The queue is polled, i.e., there is a latency/overhead tradeoff, since
833+
we cannot use ``await`` directly with a multithreaded queue.
834+
835+
Allows mixing of another event loop, for example pyqt, on another thread.
836+
Note that, by default, a streamz.Source such as this one will start
837+
an event loop in a new thread, unless otherwise specified.
838+
"""
839+
840+
def __init__(self, q, sleep_time=0.01, **kwargs):
841+
"""
842+
:param q: threading.Queue
843+
Any items pushed into here will become streamz events
844+
:param sleep_time: int
845+
Sets how long we wait before checking the input queue when
846+
empty (in s)
847+
:param kwargs:
848+
passed to streamz.Source
849+
"""
850+
self.q = q
851+
self.sleep = sleep_time
852+
super().__init__(**kwargs)
853+
854+
async def _run(self):
855+
"""Poll threading queue for events
856+
This uses check-and-wait, but overhead is low. Could maybe have
857+
a sleep-free version with an threading.Event.
858+
"""
859+
try:
860+
out = self.q.get_nowait()
861+
await self.emit(out, asynchronous=True)
862+
except queue.Empty:
863+
await asyncio.sleep(self.sleep)
864+
865+
866+
@Stream.register_api()
867+
class from_mqtt(from_q):
868+
"""Read from MQTT source
869+
870+
See https://en.wikipedia.org/wiki/MQTT for a description of the protocol
871+
and its uses.
872+
873+
See also ``sinks.to_mqtt``.
874+
875+
Requires ``paho.mqtt``
876+
877+
The outputs are ``paho.mqtt.client.MQTTMessage`` instances, which each have
878+
attributes timestamp, payload, topic, ...
879+
880+
NB: paho.mqtt.python runs on its own thread in this implementation. We may
881+
wish to instead call client.loop() directly
882+
883+
:param host: str
884+
:param port: int
885+
:param topic: str
886+
(May in the future support a list of topics)
887+
:param keepalive: int
888+
See mqtt docs - to keep the channel alive
889+
:param client_kwargs:
890+
Passed to the client's ``connect()`` method
891+
"""
892+
def __init__(self, host, port, topic, keepalive=60 , client_kwargs=None, **kwargs):
893+
self.host = host
894+
self.port = port
895+
self.keepalive = keepalive
896+
self.topic = topic
897+
self.client_kwargs = client_kwargs
898+
super().__init__(q=queue.Queue(), **kwargs)
899+
900+
def _on_connect(self, client, userdata, flags, rc):
901+
client.subscribe(self.topic)
902+
903+
def _on_message(self, client, userdata, msg):
904+
self.q.put(msg)
905+
906+
async def run(self):
907+
import paho.mqtt.client as mqtt
908+
client = mqtt.Client()
909+
client.on_connect = self._on_connect
910+
client.on_message = self._on_message
911+
client.connect(self.host, self.port, self.keepalive, **(self.client_kwargs or {}))
912+
client.loop_start()
913+
await super().run()
914+
client.disconnect()

streamz/tests/test_sinks.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,20 @@ def test_ws_roundtrip():
8686
wait_for(lambda: data == l, timeout=1)
8787
s.stop()
8888
s0.stop()
89+
90+
91+
def test_mqtt_roundtrip():
92+
pytest.importorskip("paho.mqtt.client")
93+
s0 = Stream.from_mqtt("mqtt.eclipseprojects.io", 1883, "streamz/sensor/temperature")
94+
l = s0.map(lambda msg: msg.payload).sink_to_list()
95+
s0.start()
96+
97+
data = [b'0123'] * 4
98+
s = Stream.from_iterable(data)
99+
s.to_mqtt("mqtt.eclipseprojects.io", 1883, "streamz/sensor/temperature")
100+
s.start()
101+
102+
wait_for(lambda: data == l, timeout=1)
103+
s.stop()
104+
s0.stop()
105+

0 commit comments

Comments
 (0)