Skip to content

Commit 3f6135c

Browse files
committed
Source.from_tcp fixes
Running the network word count example showed an attempt to await a list object in the Tornado framework. I tracked it down to the EmitServer.handle_stream assuming the result of source._emit was awaitable. Pivoting on inspect.isawaitable there makes the example as written work. I added a try/except block to trap the keyboard interrupt and suppress the stack spew to the example.
1 parent 8c73290 commit 3f6135c

File tree

3 files changed

+42
-2
lines changed

3 files changed

+42
-2
lines changed

examples/network_wordcount.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,9 @@
1818
)
1919

2020
s.start()
21-
time.sleep(600)
21+
22+
try:
23+
while True:
24+
time.sleep(600)
25+
except KeyboardInterrupt:
26+
pass

streamz/sources.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import queue
44
import os
55
import time
6+
from inspect import isawaitable
7+
68
from tornado import gen
79
import weakref
810

@@ -252,7 +254,9 @@ async def handle_stream(self, stream, address):
252254
while not self.source.stopped:
253255
try:
254256
data = await stream.read_until(self.source.delimiter)
255-
await self.source._emit(data)
257+
result = self.source._emit(data)
258+
if isawaitable(result):
259+
await result
256260
except StreamClosedError:
257261
break
258262

streamz/tests/test_sources.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,37 @@ def test_tcp():
4747
sock2.close()
4848

4949

50+
@flaky(max_runs=3, min_passes=1)
51+
def test_tcp_word_count_example():
52+
port = 9999
53+
s = Source.from_tcp(port)
54+
out = s.map(bytes.split).flatten().frequencies().sink_to_list()
55+
s.start()
56+
wait_for(lambda: s.server is not None, 2, period=0.02)
57+
58+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
59+
sock.connect(("localhost", port))
60+
sock.send(b'data\n')
61+
62+
with (socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock,
63+
socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock2):
64+
sock.connect(("localhost", port))
65+
sock2.connect(("localhost", port))
66+
sock.send(b'data\n')
67+
# regression test a bug in from_tcp where a second packet from
68+
# the same socket is dropped due to the socket handler dying
69+
sock.send(b'data\n')
70+
sock2.send(b'data2\n')
71+
72+
expected = [{b"data": 1}, {b"data": 2}, {b"data": 3}, {b"data": 3, b"data2": 1}]
73+
74+
def fail_func():
75+
assert out == expected
76+
77+
wait_for(lambda: out == expected, 2, fail_func=fail_func, period=0.01)
78+
79+
80+
5081
@flaky(max_runs=3, min_passes=1)
5182
@gen_test(timeout=60)
5283
def test_tcp_async():

0 commit comments

Comments
 (0)