Skip to content

Commit 573566c

Browse files
author
Martin Durant
committed
Merge branch 'master' into river
2 parents 5bd3bc4 + e6515f4 commit 573566c

6 files changed

Lines changed: 258 additions & 7 deletions

File tree

ci/environment-py38.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ channels:
33
- conda-forge
44
- defaults
55
dependencies:
6-
- python=3.7
6+
- python=3.8
77
- pytest
88
- flake8
99
- black
@@ -32,3 +32,5 @@ dependencies:
3232
- flaky
3333
- pytest-cov
3434
- coveralls
35+
- paho-mqtt
36+
- websockets

docs/source/api.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,11 @@ Stream
4343
.. automethod:: Stream.emit
4444
.. automethod:: Stream.frequencies
4545
.. automethod:: Stream.register_api
46+
.. automethod:: Stream.sink
4647
.. automethod:: Stream.sink_to_list
48+
.. automethod:: Stream.sink_to_textfile
49+
.. automethod:: Stream.to_websocket
50+
.. automethod:: Stream.to_mqtt
4751
.. automethod:: Stream.update
4852
.. automethod:: Stream.visualize
4953

@@ -55,7 +59,9 @@ Sources
5559
filenames
5660
from_kafka
5761
from_kafka_batched
62+
from_mqtt
5863
from_process
64+
from_websocket
5965
from_textfile
6066
from_tcp
6167
from_http_server

streamz/core.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
from collections import deque, defaultdict
23
from datetime import timedelta
34
import functools
@@ -485,15 +486,14 @@ def emit(self, x, asynchronous=False, metadata=None):
485486
finally:
486487
thread_state.asynchronous = ts_async
487488
else:
488-
@gen.coroutine
489-
def _():
489+
async def _():
490490
thread_state.asynchronous = True
491491
try:
492-
result = yield self._emit(x, metadata=metadata)
492+
result = await asyncio.gather(*self._emit(x, metadata=metadata))
493493
finally:
494494
del thread_state.asynchronous
495+
return result
495496

496-
raise gen.Return(result)
497497
sync(self.loop, _)
498498

499499
def update(self, x, who=None, metadata=None):

streamz/sinks.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from tornado import gen
55

66
from streamz import Stream
7+
from streamz.core import sync
78

89
# sinks add themselves here to avoid being garbage-collected
910
_global_sinks = set()
@@ -192,3 +193,80 @@ def cb(self, err, msg):
192193

193194
def flush(self, timeout=-1):
194195
self.producer.flush(timeout)
196+
197+
198+
class to_websocket(Sink):
199+
"""Write bytes data to websocket
200+
201+
The websocket will be opened on first call, and kept open. Should
202+
it close at some point, future writes will fail.
203+
204+
Requires the ``websockets`` package.
205+
206+
:param uri: str
207+
Something like "ws://host:port". Use "wss:" to allow TLS.
208+
:param ws_kwargs: dict
209+
Further kwargs to pass to ``websockets.connect``, please
210+
read its documentation.
211+
:param kwargs:
212+
Passed to superclass
213+
"""
214+
215+
def __init__(self, upstream, uri, ws_kwargs=None, **kwargs):
216+
self.uri = uri
217+
self.ws_kw = ws_kwargs
218+
self.ws = None
219+
super().__init__(upstream, ensure_io_loop=True, **kwargs)
220+
221+
async def update(self, x, who=None, metadata=None):
222+
import websockets
223+
if self.ws is None:
224+
self.ws = await websockets.connect(self.uri, **(self.ws_kw or {}))
225+
await self.ws.send(x)
226+
227+
def destroy(self):
228+
super().destroy()
229+
if self.ws is not None:
230+
sync(self.loop, self.ws.protocol.close)
231+
self.ws = None
232+
233+
234+
@Stream.register_api()
235+
class to_mqtt(Sink):
236+
"""
237+
Send data to MQTT broker
238+
239+
See also ``sources.from_mqtt``.
240+
241+
Requires ``paho.mqtt``
242+
243+
:param host: str
244+
:param port: int
245+
:param topic: str
246+
:param keepalive: int
247+
See mqtt docs - to keep the channel alive
248+
:param client_kwargs:
249+
Passed to the client's ``connect()`` method
250+
"""
251+
def __init__(self, upstream, host, port, topic, keepalive=60, client_kwargs=None,
252+
**kwargs):
253+
self.host = host
254+
self.port = port
255+
self.c_kw = client_kwargs or {}
256+
self.client = None
257+
self.topic = topic
258+
self.keepalive = keepalive
259+
super().__init__(upstream, ensure_io_loop=True, **kwargs)
260+
261+
def update(self, x, who=None, metadata=None):
262+
import paho.mqtt.client as mqtt
263+
if self.client is None:
264+
self.client = mqtt.Client()
265+
self.client.connect(self.host, self.port, self.keepalive, **self.c_kw)
266+
# TODO: wait on successful delivery
267+
self.client.publish(self.topic, x)
268+
269+
def destroy(self):
270+
self.client.disconnect()
271+
self.client = None
272+
super().destroy()

streamz/sources.py

Lines changed: 134 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import asyncio
22
from glob import glob
3+
import queue
34
import os
45
import time
56
from tornado import gen
67
import weakref
78

8-
from .core import Stream, convert_interval, RefCounter
9+
from .core import Stream, convert_interval, RefCounter, sync
910

1011

1112
def sink_to_file(filename, upstream, mode='w', prefix='', suffix='\n', flush=False):
@@ -779,3 +780,135 @@ async def run(self):
779780
break
780781
await asyncio.gather(*self._emit(x))
781782
self.stopped = True
783+
784+
785+
@Stream.register_api()
786+
class from_websocket(Source):
787+
"""Read binary data from a websocket
788+
789+
This source will accept connections on a given port and handle messages
790+
coming in.
791+
792+
The websockets library must be installed.
793+
794+
:param host: str
795+
Typically "localhost"
796+
:param port: int
797+
Which port to listen on (must be available)
798+
:param serve_kwargs: dict
799+
Passed to ``websockets.serve``
800+
:param kwargs:
801+
Passed to superclass
802+
"""
803+
804+
def __init__(self, host, port, serve_kwargs=None, **kwargs):
805+
self.host = host
806+
self.port = port
807+
self.s_kw = serve_kwargs
808+
self.server = None
809+
super().__init__(**kwargs)
810+
811+
@gen.coroutine
812+
def _read(self, ws, path):
813+
while not self.stopped:
814+
data = yield ws.recv()
815+
yield self._emit(data)
816+
817+
async def run(self):
818+
import websockets
819+
self.server = await websockets.serve(
820+
self._read, self.host, self.port, **(self.s_kw or {})
821+
)
822+
823+
def stop(self):
824+
self.server.close()
825+
sync(self.loop, self.server.wait_closed)
826+
827+
828+
@Stream.register_api()
829+
class from_q(Source):
830+
"""Source events from a threading.Queue, running another event framework
831+
832+
The queue is polled, i.e., there is a latency/overhead tradeoff, since
833+
we cannot use ``await`` directly with a multithreaded queue.
834+
835+
Allows mixing of another event loop, for example pyqt, on another thread.
836+
Note that, by default, a streamz.Source such as this one will start
837+
an event loop in a new thread, unless otherwise specified.
838+
"""
839+
840+
def __init__(self, q, sleep_time=0.01, **kwargs):
841+
"""
842+
:param q: threading.Queue
843+
Any items pushed into here will become streamz events
844+
:param sleep_time: int
845+
Sets how long we wait before checking the input queue when
846+
empty (in s)
847+
:param kwargs:
848+
passed to streamz.Source
849+
"""
850+
self.q = q
851+
self.sleep = sleep_time
852+
super().__init__(**kwargs)
853+
854+
async def _run(self):
855+
"""Poll threading queue for events
856+
This uses check-and-wait, but overhead is low. Could maybe have
857+
a sleep-free version with an threading.Event.
858+
"""
859+
try:
860+
out = self.q.get_nowait()
861+
await self.emit(out, asynchronous=True)
862+
except queue.Empty:
863+
await asyncio.sleep(self.sleep)
864+
865+
866+
@Stream.register_api()
867+
class from_mqtt(from_q):
868+
"""Read from MQTT source
869+
870+
See https://en.wikipedia.org/wiki/MQTT for a description of the protocol
871+
and its uses.
872+
873+
See also ``sinks.to_mqtt``.
874+
875+
Requires ``paho.mqtt``
876+
877+
The outputs are ``paho.mqtt.client.MQTTMessage`` instances, which each have
878+
attributes timestamp, payload, topic, ...
879+
880+
NB: paho.mqtt.python runs on its own thread in this implementation. We may
881+
wish to instead call client.loop() directly
882+
883+
:param host: str
884+
:param port: int
885+
:param topic: str
886+
(May in the future support a list of topics)
887+
:param keepalive: int
888+
See mqtt docs - to keep the channel alive
889+
:param client_kwargs:
890+
Passed to the client's ``connect()`` method
891+
"""
892+
def __init__(self, host, port, topic, keepalive=60 , client_kwargs=None, **kwargs):
893+
self.host = host
894+
self.port = port
895+
self.keepalive = keepalive
896+
self.topic = topic
897+
self.client_kwargs = client_kwargs
898+
super().__init__(q=queue.Queue(), **kwargs)
899+
900+
def _on_connect(self, client, userdata, flags, rc):
901+
client.subscribe(self.topic)
902+
903+
def _on_message(self, client, userdata, msg):
904+
self.q.put(msg)
905+
906+
async def run(self):
907+
import paho.mqtt.client as mqtt
908+
client = mqtt.Client()
909+
client.on_connect = self._on_connect
910+
client.on_message = self._on_message
911+
client.connect(self.host, self.port, self.keepalive, **(self.client_kwargs or {}))
912+
client.loop_start()
913+
await super().run()
914+
client.disconnect()

streamz/tests/test_sinks.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import pytest
44
from streamz import Stream
55
from streamz.sinks import _global_sinks, Sink
6-
from streamz.utils_test import tmpfile
6+
from streamz.utils_test import tmpfile, wait_for
77

88

99
def test_sink_with_args_and_kwargs():
@@ -71,3 +71,35 @@ def test_sink_destroy():
7171
del sink
7272

7373
assert ref() is None
74+
75+
76+
def test_ws_roundtrip():
77+
pytest.importorskip("websockets")
78+
s0 = Stream.from_websocket("localhost", 8989, start=True)
79+
l = s0.sink_to_list()
80+
81+
data = [b'0123'] * 4
82+
s = Stream.from_iterable(data)
83+
s.to_websocket("ws://localhost:8989")
84+
s.start()
85+
86+
wait_for(lambda: data == l, timeout=1)
87+
s.stop()
88+
s0.stop()
89+
90+
91+
def test_mqtt_roundtrip():
92+
pytest.importorskip("paho.mqtt.client")
93+
s0 = Stream.from_mqtt("mqtt.eclipseprojects.io", 1883, "streamz/sensor/temperature")
94+
l = s0.map(lambda msg: msg.payload).sink_to_list()
95+
s0.start()
96+
97+
data = [b'0123'] * 4
98+
s = Stream.from_iterable(data)
99+
s.to_mqtt("mqtt.eclipseprojects.io", 1883, "streamz/sensor/temperature")
100+
s.start()
101+
102+
wait_for(lambda: data == l, timeout=1)
103+
s.stop()
104+
s0.stop()
105+

0 commit comments

Comments
 (0)