Skip to content

Commit 38212a4

Browse files
author
Martin Durant
committed
fix
1 parent a336b0c commit 38212a4

2 files changed

Lines changed: 8 additions & 7 deletions

File tree

streamz/sources.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import asyncio
22
from glob import glob
33
import os
4-
import inspect
54
import time
65
import tornado.ioloop
76
from tornado import gen
@@ -18,6 +17,7 @@ def _():
1817

1918
pc = tornado.ioloop.PeriodicCallback(_, callback_time, **kwargs)
2019
pc.start()
20+
pc.start()
2121
return source
2222

2323

@@ -39,6 +39,7 @@ class Source(Stream):
3939
def __init__(self, start=False, **kwargs):
4040
self.stopped = True
4141
super().__init__(ensure_io_loop=True, **kwargs)
42+
self.started = False
4243
if start:
4344
self.start()
4445

@@ -50,6 +51,7 @@ def stop(self):
5051
def start(self):
5152
"""start polling"""
5253
self.stopped = False
54+
self.started = True
5355
self.loop.add_callback(self.run)
5456

5557
async def run(self):
@@ -91,11 +93,11 @@ def __init__(self, f, poll_interval=0.100, delimiter='\n',
9193
if isinstance(f, str):
9294
f = open(f)
9395
self.buffer = ''
96+
self.file = f
97+
self.from_end = from_end
9498
if self.from_end:
9599
# this only happens when we are ready to read
96100
self.file.seek(0, 2)
97-
self.file = f
98-
self.from_end = from_end
99101
self.delimiter = delimiter
100102

101103
self.poll_interval = poll_interval
@@ -109,7 +111,7 @@ async def _run(self):
109111
parts = self.buffer.split(self.delimiter)
110112
self.buffer = parts.pop(-1)
111113
for part in parts:
112-
await self._emit(part + self.delimiter)
114+
await asyncio.gather(*self._emit(part + self.delimiter))
113115
else:
114116
await asyncio.sleep(self.poll_interval)
115117

@@ -149,7 +151,7 @@ async def _run(self):
149151
new = filenames - self.seen
150152
for fn in sorted(new):
151153
self.seen.add(fn)
152-
await self._emit(fn)
154+
await asyncio.gather(*self._emit(fn))
153155
await asyncio.sleep(self.poll_interval) # TODO: remove poll if delayed
154156

155157

@@ -405,7 +407,7 @@ def _close_consumer(self):
405407
self.stopped = True
406408

407409

408-
class FromKafkaBatched(Stream):
410+
class FromKafkaBatched(Source):
409411
"""Base class for both local and cluster-based batched kafka processing"""
410412
def __init__(self, topic, consumer_params, poll_interval='1s',
411413
npartitions=None, refresh_partitions=False,

streamz/tests/test_sources.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ def test_http():
8686
requests.post('http://localhost:%i/other' % port, data=b'data2')
8787

8888

89-
#@flaky(max_runs=3, min_passes=1)
9089
@gen_test(timeout=60)
9190
def test_process():
9291
import sys

0 commit comments

Comments
 (0)