Skip to content

Commit 51c19d3

Browse files
author
Martin Durant
committed
websocket integration
1 parent 6d5bcf4 commit 51c19d3

3 files changed

Lines changed: 67 additions & 2 deletions

File tree

streamz/sinks.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from tornado import gen
55

66
from streamz import Stream
7+
from streamz.core import sync
78

89
# sinks add themselves here to avoid being garbage-collected
910
_global_sinks = set()
@@ -109,3 +110,26 @@ def __init__(self, upstream, file, end="\n", mode="a", **kwargs):
109110

110111
def update(self, x, who=None, metadata=None):
111112
self._fp.write(x + self._end)
113+
114+
115+
@Stream.register_api()
116+
class sink_to_websocket(Sink):
117+
118+
def __init__(self, upstream, uri, ws_kwargs=None, **kwargs):
119+
self.uri = uri
120+
self.ws_kw = ws_kwargs
121+
self.ws = None
122+
super().__init__(upstream, **kwargs)
123+
124+
@gen.coroutine
125+
def update(self, x, who=None, metadata=None):
126+
import websockets
127+
if self.ws is None:
128+
self.ws = yield websockets.connect(self.uri, **(self.ws_kw or {}))
129+
yield self.ws.send(x)
130+
131+
def destroy(self):
132+
super(sink_to_websocket, self).destroy()
133+
if self.ws is not None:
134+
sync(self.loop, self.ws.protocol.close)
135+
self.ws = None

streamz/sources.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from tornado import gen
66
import weakref
77

8-
from .core import Stream, convert_interval, RefCounter
8+
from .core import Stream, convert_interval, RefCounter, sync
99

1010

1111
def sink_to_file(filename, upstream, mode='w', prefix='', suffix='\n', flush=False):
@@ -779,3 +779,29 @@ async def run(self):
779779
break
780780
await asyncio.gather(*self._emit(x))
781781
self.stopped = True
782+
783+
784+
@Stream.register_api()
785+
class from_websocket(Source):
786+
def __init__(self, host, port, serve_kwargs=None, **kwargs):
787+
self.host = host
788+
self.port = port
789+
self.s_kw = serve_kwargs
790+
self.server = None
791+
super().__init__(**kwargs)
792+
793+
@gen.coroutine
794+
def _read(self, ws, path):
795+
while not self.stopped:
796+
data = yield ws.recv()
797+
yield self._emit(data)
798+
799+
async def run(self):
800+
import websockets
801+
self.server = await websockets.serve(
802+
self._read, self.host, self.port, **(self.s_kw or {})
803+
)
804+
805+
def stop(self):
806+
self.server.close()
807+
sync(self.loop, self.server.wait_closed)

streamz/tests/test_sinks.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import pytest
44
from streamz import Stream
55
from streamz.sinks import _global_sinks, Sink
6-
from streamz.utils_test import tmpfile
6+
from streamz.utils_test import tmpfile, wait_for
77

88

99
def test_sink_with_args_and_kwargs():
@@ -71,3 +71,18 @@ def test_sink_destroy():
7171
del sink
7272

7373
assert ref() is None
74+
75+
76+
def test_ws_roundtrip():
77+
pytest.importorskip("websockets")
78+
s0 = Stream.from_websocket("localhost", 8989, start=True)
79+
l = s0.sink_to_list()
80+
81+
data = [b'0123'] * 4
82+
s = Stream.from_iterable(data)
83+
s.sink_to_websocket("ws://localhost:8989")
84+
s.start()
85+
86+
wait_for(lambda: data == l, timeout=1)
87+
s.stop()
88+
s0.stop()

0 commit comments

Comments
 (0)