Skip to content

Commit 15cfe9c

Browse files
authored
Merge pull request #376 from roveo/master
Implement timeout and key routing for Stream.partition
2 parents dca7370 + bf2e356 commit 15cfe9c

2 files changed

Lines changed: 125 additions & 16 deletions

File tree

streamz/core.py

Lines changed: 66 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from __future__ import absolute_import, division, print_function
22

3-
from collections import deque
3+
from collections import deque, defaultdict
44
from datetime import timedelta
55
import functools
66
import logging
@@ -985,6 +985,19 @@ def _check_end(self):
985985
class partition(Stream):
986986
""" Partition stream into tuples of equal size
987987
988+
Parameters
989+
----------
990+
n: int
991+
Maximum partition size
992+
timeout: int or float, optional
993+
Number of seconds after which a partition will be emitted,
994+
even if its size is less than ``n``. If ``None`` (default),
995+
a partition will be emitted only when its size reaches ``n``.
996+
key: hashable or callable, optional
997+
Emit items with the same key together as a separate partition.
998+
If ``key`` is callable, partition will be identified by ``key(x)``,
999+
otherwise by ``x[key]``. Defaults to ``None``.
1000+
9881001
Examples
9891002
--------
9901003
>>> source = Stream()
@@ -994,30 +1007,67 @@ class partition(Stream):
9941007
(0, 1, 2)
9951008
(3, 4, 5)
9961009
(6, 7, 8)
1010+
1011+
>>> source = Stream()
1012+
>>> source.partition(2, key=lambda x: x % 2).sink(print)
1013+
>>> for i in range(4):
1014+
... source.emit(i)
1015+
(0, 2)
1016+
(1, 3)
1017+
1018+
>>> from time import sleep
1019+
>>> source = Stream()
1020+
>>> source.partition(5, timeout=1).sink(print)
1021+
>>> for i in range(3):
1022+
... source.emit(i)
1023+
>>> sleep(1)
1024+
(0, 1, 2)
9971025
"""
9981026
_graphviz_shape = 'diamond'
9991027

1000-
def __init__(self, upstream, n, **kwargs):
1028+
def __init__(self, upstream, n, timeout=None, key=None, **kwargs):
10011029
self.n = n
1002-
self._buffer = []
1003-
self.metadata_buffer = []
1004-
Stream.__init__(self, upstream, **kwargs)
1030+
self._timeout = timeout
1031+
self._key = key
1032+
self._buffer = defaultdict(lambda: [])
1033+
self._metadata_buffer = defaultdict(lambda: [])
1034+
self._callbacks = {}
1035+
Stream.__init__(self, upstream, ensure_io_loop=True, **kwargs)
1036+
1037+
def _get_key(self, x):
1038+
if self._key is None:
1039+
return None
1040+
if callable(self._key):
1041+
return self._key(x)
1042+
return x[self._key]
1043+
1044+
@gen.coroutine
1045+
def _flush(self, key):
1046+
result, self._buffer[key] = self._buffer[key], []
1047+
metadata_result, self._metadata_buffer[key] = self._metadata_buffer[key], []
1048+
yield self._emit(tuple(result), list(metadata_result))
1049+
self._release_refs(metadata_result)
10051050

1051+
@gen.coroutine
10061052
def update(self, x, who=None, metadata=None):
10071053
self._retain_refs(metadata)
1008-
self._buffer.append(x)
1054+
key = self._get_key(x)
1055+
buffer = self._buffer[key]
1056+
metadata_buffer = self._metadata_buffer[key]
1057+
buffer.append(x)
10091058
if isinstance(metadata, list):
1010-
self.metadata_buffer.extend(metadata)
1011-
else:
1012-
self.metadata_buffer.append(metadata)
1013-
if len(self._buffer) == self.n:
1014-
result, self._buffer = self._buffer, []
1015-
metadata_result, self.metadata_buffer = self.metadata_buffer, []
1016-
ret = self._emit(tuple(result), list(metadata_result))
1017-
self._release_refs(metadata_result)
1018-
return ret
1059+
metadata_buffer.extend(metadata)
10191060
else:
1020-
return []
1061+
metadata_buffer.append(metadata)
1062+
if len(buffer) == self.n:
1063+
if self._timeout is not None and self.n > 1:
1064+
self._callbacks[key].cancel()
1065+
yield self._flush(key)
1066+
return
1067+
if len(buffer) == 1 and self._timeout is not None:
1068+
self._callbacks[key] = self.loop.call_later(
1069+
self._timeout, self._flush, key
1070+
)
10211071

10221072

10231073
@Stream.register_api()

streamz/tests/test_core.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,65 @@ def test_partition():
164164
assert L == [(0, 1), (2, 3), (4, 5), (6, 7), (8, 9)]
165165

166166

167+
def test_partition_timeout():
168+
source = Stream()
169+
L = source.partition(10, timeout=0.01).sink_to_list()
170+
171+
for i in range(5):
172+
source.emit(i)
173+
174+
sleep(0.1)
175+
176+
assert L == [(0, 1, 2, 3, 4)]
177+
178+
179+
def test_partition_timeout_cancel():
180+
source = Stream()
181+
L = source.partition(3, timeout=0.1).sink_to_list()
182+
183+
for i in range(3):
184+
source.emit(i)
185+
186+
sleep(0.09)
187+
source.emit(3)
188+
sleep(0.02)
189+
190+
assert L == [(0, 1, 2)]
191+
192+
sleep(0.09)
193+
194+
assert L == [(0, 1, 2), (3,)]
195+
196+
197+
def test_partition_key():
198+
source = Stream()
199+
L = source.partition(2, key=0).sink_to_list()
200+
201+
for i in range(4):
202+
source.emit((i % 2, i))
203+
204+
assert L == [((0, 0), (0, 2)), ((1, 1), (1, 3))]
205+
206+
207+
def test_partition_key_callable():
208+
source = Stream()
209+
L = source.partition(2, key=lambda x: x % 2).sink_to_list()
210+
211+
for i in range(10):
212+
source.emit(i)
213+
214+
assert L == [(0, 2), (1, 3), (4, 6), (5, 7)]
215+
216+
217+
def test_partition_size_one():
218+
source = Stream()
219+
220+
source.partition(1, timeout=.01).sink(lambda x: None)
221+
222+
for i in range(10):
223+
source.emit(i)
224+
225+
167226
def test_sliding_window():
168227
source = Stream()
169228
L = source.sliding_window(2).sink_to_list()

0 commit comments

Comments
 (0)