Skip to content

Commit 2bebd38

Browse files
author
Martin Durant
committed
Add docs
1 parent 51c19d3 commit 2bebd38

5 files changed

Lines changed: 43 additions & 7 deletions

File tree

docs/source/api.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@ Stream
4343
.. automethod:: Stream.emit
4444
.. automethod:: Stream.frequencies
4545
.. automethod:: Stream.register_api
46+
.. automethod:: Stream.sink
4647
.. automethod:: Stream.sink_to_list
48+
.. automethod:: Stream.sink_to_textfile
49+
.. automethod:: Stream.to_websocket
4750
.. automethod:: Stream.update
4851
.. automethod:: Stream.visualize
4952

@@ -56,6 +59,7 @@ Sources
5659
from_kafka
5760
from_kafka_batched
5861
from_process
62+
from_websocket
5963
from_textfile
6064
from_tcp
6165
from_http_server

streamz/core.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
from collections import deque, defaultdict
23
from datetime import timedelta
34
import functools
@@ -485,15 +486,14 @@ def emit(self, x, asynchronous=False, metadata=None):
485486
finally:
486487
thread_state.asynchronous = ts_async
487488
else:
488-
@gen.coroutine
489-
def _():
489+
async def _():
490490
thread_state.asynchronous = True
491491
try:
492-
result = yield self._emit(x, metadata=metadata)
492+
result = await asyncio.gather(*self._emit(x, metadata=metadata))
493493
finally:
494494
del thread_state.asynchronous
495+
return result
495496

496-
raise gen.Return(result)
497497
sync(self.loop, _)
498498

499499
def update(self, x, who=None, metadata=None):

streamz/sinks.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,22 @@ def update(self, x, who=None, metadata=None):
113113

114114

115115
@Stream.register_api()
116-
class sink_to_websocket(Sink):
116+
class to_websocket(Sink):
117+
"""Write bytes data to websocket
118+
119+
The websocket will be opened on first call, and kept open. Should
120+
it close at some point, future writes will fail.
121+
122+
Requires the ``websockets`` package.
123+
124+
:param uri: str
125+
Something like "ws://host:port". Use "wss:" to allow TLS.
126+
:param ws_kwargs: dict
127+
Further kwargs to pass to ``websockets.connect``, please
128+
read its documentation.
129+
:param kwargs:
130+
Passed to superclass
131+
"""
117132

118133
def __init__(self, upstream, uri, ws_kwargs=None, **kwargs):
119134
self.uri = uri
@@ -129,7 +144,7 @@ def update(self, x, who=None, metadata=None):
129144
yield self.ws.send(x)
130145

131146
def destroy(self):
132-
super(sink_to_websocket, self).destroy()
147+
super().destroy()
133148
if self.ws is not None:
134149
sync(self.loop, self.ws.protocol.close)
135150
self.ws = None

streamz/sources.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -783,6 +783,23 @@ async def run(self):
783783

784784
@Stream.register_api()
785785
class from_websocket(Source):
786+
"""Read binary data from a websocket
787+
788+
This source will accept connections on a given port and handle messages
789+
coming in.
790+
791+
The websockets library must be installed.
792+
793+
:param host: str
794+
Typically "localhost"
795+
:param port: int
796+
Which port to listen on (must be available)
797+
:param serve_kwargs: dict
798+
Passed to ``websockets.serve``
799+
:param kwargs:
800+
Passed to superclass
801+
"""
802+
786803
def __init__(self, host, port, serve_kwargs=None, **kwargs):
787804
self.host = host
788805
self.port = port

streamz/tests/test_sinks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ def test_ws_roundtrip():
8080

8181
data = [b'0123'] * 4
8282
s = Stream.from_iterable(data)
83-
s.sink_to_websocket("ws://localhost:8989")
83+
s.to_websocket("ws://localhost:8989")
8484
s.start()
8585

8686
wait_for(lambda: data == l, timeout=1)

0 commit comments

Comments
 (0)