Skip to content

Commit 36a04a7

Browse files
author
Martin Durant
committed
coverage
1 parent 19cf5a8 commit 36a04a7

3 files changed

Lines changed: 40 additions & 8 deletions

File tree

.coveragerc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ omit =
1313
exclude_lines =
1414
if __name__ == '__main__':
1515
pragma: no cover
16+
NotImplementedError

streamz/sources.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ def __init__(self, callback, poll_interval=0.1, **kwargs):
9898
super().__init__(**kwargs)
9999

100100
async def _run(self):
101-
await asyncio.gather(self._emit(self._cb()))
101+
await asyncio.gather(*self._emit(self._cb()))
102102
await asyncio.sleep(self._poll)
103103

104104

@@ -355,26 +355,30 @@ def __init__(self, cmd, open_kwargs=None, with_stderr=False, with_end=True,
355355
super().__init__(**kwargs)
356356

357357
async def run(self):
358+
import shlex
358359
import subprocess
359360
stderr = subprocess.STDOUT if self.with_stderr else None
360361
if isinstance(self.cmd, (list, tuple)):
361362
cmd, *args = self.cmd
362363
else:
363-
cmd, args = self.cmd, ()
364+
cmd, *args = shlex.split(self.cmd)
364365
process = await asyncio.create_subprocess_exec(
365366
cmd, *args, stdout=subprocess.PIPE,
366367
stderr=stderr, **self.open_kwargs)
367368
while not self.stopped:
368369
try:
369370
out = await process.stdout.readuntil(b'\n')
370371
except asyncio.IncompleteReadError as err:
371-
if self.with_end:
372+
if self.with_end and err.partial:
372373
out = err.partial
373374
else:
374375
break
376+
if process.returncode is not None:
377+
self.stopped = True
375378
await asyncio.gather(*self._emit(out))
376-
process.terminate()
377-
await process.wait()
379+
if process.returncode is not None:
380+
process.terminate()
381+
await process.wait()
378382

379383

380384
@Stream.register_api(staticmethod)

streamz/tests/test_sources.py

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,23 @@
1+
import asyncio
2+
import sys
3+
14
from flaky import flaky
25
import pytest
36
from streamz import Source
47
from streamz.utils_test import wait_for, await_for, gen_test
58
import socket
69

710

11+
def test_periodic():
12+
s = Source.from_periodic(lambda: True)
13+
l = s.sink_to_list()
14+
assert s.stopped
15+
s.start()
16+
wait_for(lambda: l, 0.11, period=0.01)
17+
wait_for(lambda: len(l) > 1, 0.11, period=0.01)
18+
assert all(l)
19+
20+
821
@flaky(max_runs=3, min_passes=1)
922
def test_tcp():
1023
port = 9876
@@ -88,9 +101,23 @@ def test_http():
88101

89102
@gen_test(timeout=60)
90103
def test_process():
91-
import sys
92-
import asyncio
93-
cmd = ["python", "-c", "for i in range(4): print(i)"]
104+
cmd = ["python", "-c", "for i in range(4): print(i, end='')"]
105+
s = Source.from_process(cmd, with_end=True)
106+
if sys.platform != "win32":
107+
# don't know why - something with pytest and new processes
108+
policy = asyncio.get_event_loop_policy()
109+
watcher = asyncio.SafeChildWatcher()
110+
policy.set_child_watcher(watcher)
111+
watcher.attach_loop(s.loop.asyncio_loop)
112+
out = s.sink_to_list()
113+
s.start()
114+
yield await_for(lambda: out == [b'0123'], timeout=5)
115+
s.stop()
116+
117+
118+
@gen_test(timeout=60)
119+
def test_process_str():
120+
cmd = 'python -c "for i in range(4): print(i)"'
94121
s = Source.from_process(cmd)
95122
if sys.platform != "win32":
96123
# don't know why - something with pytest and new processes

0 commit comments

Comments
 (0)