Skip to content

Commit 464f478

Browse files
committed
fix callback cancelling for partition, add test
1 parent b6f8af0 commit 464f478

2 files changed

Lines changed: 14 additions & 4 deletions

File tree

streamz/core.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,14 +1025,15 @@ def update(self, x, who=None, metadata=None):
10251025
metadata_buffer.extend(metadata)
10261026
else:
10271027
metadata_buffer.append(metadata)
1028+
if len(buffer) == self.n:
1029+
if self._timeout is not None and self.n > 1:
1030+
self._callbacks[key].cancel()
1031+
yield self._flush(key)
1032+
return
10281033
if len(buffer) == 1 and self._timeout is not None:
10291034
self._callbacks[key] = self.loop.call_later(
10301035
self._timeout, self._flush, key
10311036
)
1032-
if len(buffer) == self.n:
1033-
if self._timeout is not None:
1034-
self._callbacks[key].cancel()
1035-
yield self._flush(key)
10361037

10371038

10381039
@Stream.register_api()

streamz/tests/test_core.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,15 @@ def test_partition_key_callable():
214214
assert L == [(0, 2), (1, 3), (4, 6), (5, 7)]
215215

216216

217+
def test_partition_size_one():
218+
source = Stream()
219+
220+
source.partition(1, timeout=.01).sink(lambda x: None)
221+
222+
for i in range(10):
223+
source.emit(i)
224+
225+
217226
def test_sliding_window():
218227
source = Stream()
219228
L = source.sliding_window(2).sink_to_list()

0 commit comments

Comments
 (0)