Skip to content

Commit b6f8af0

Browse files
committed
implement timeout and key routing for partition, closes #375
1 parent e9a2545 commit b6f8af0

2 files changed

Lines changed: 115 additions & 16 deletions

File tree

streamz/core.py

Lines changed: 65 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from __future__ import absolute_import, division, print_function
22

3-
from collections import deque
3+
from collections import deque, defaultdict
44
from datetime import timedelta
55
import functools
66
import logging
@@ -951,6 +951,19 @@ def _check_end(self):
951951
class partition(Stream):
952952
""" Partition stream into tuples of equal size
953953
954+
Parameters
955+
----------
956+
n: int
957+
Maximum partition size
958+
timeout: int or float, optional
959+
Number of seconds after which a partition will be emitted,
960+
even if it's size is less than ``n``. If ``None`` (default),
961+
a partition will be emitted only when it's size reaches ``n``.
962+
key: hashable or callable, optional
963+
Emit items with the same key together as a separate partition.
964+
If ``key`` is callable, partition will be identified by ``key(x)``,
965+
otherwise by ``key[x]``. Defaults to ``None``.
966+
954967
Examples
955968
--------
956969
>>> source = Stream()
@@ -960,30 +973,66 @@ class partition(Stream):
960973
(0, 1, 2)
961974
(3, 4, 5)
962975
(6, 7, 8)
976+
977+
>>> source = Stream()
978+
>>> source.partition(2, key=lambda x: x % 2).sink(print)
979+
>>> for i in range(4):
980+
... source.emit(i)
981+
(0, 2)
982+
(1, 3)
983+
984+
>>> from time import sleep
985+
>>> source = Stream()
986+
>>> source.partition(5, timeout=1).sink(print)
987+
>>> for i in range(3):
988+
... source.emit(i)
989+
>>> sleep(1)
990+
(0, 1, 2)
963991
"""
964992
_graphviz_shape = 'diamond'
965993

966-
def __init__(self, upstream, n, **kwargs):
994+
def __init__(self, upstream, n, timeout=None, key=None, **kwargs):
967995
self.n = n
968-
self._buffer = []
969-
self.metadata_buffer = []
970-
Stream.__init__(self, upstream, **kwargs)
996+
self._timeout = timeout
997+
self._key = key
998+
self._buffer = defaultdict(lambda: [])
999+
self._metadata_buffer = defaultdict(lambda: [])
1000+
self._callbacks = {}
1001+
Stream.__init__(self, upstream, ensure_io_loop=True, **kwargs)
1002+
1003+
def _get_key(self, x):
1004+
if self._key is None:
1005+
return None
1006+
if callable(self._key):
1007+
return self._key(x)
1008+
return x[self._key]
1009+
1010+
@gen.coroutine
1011+
def _flush(self, key):
1012+
result, self._buffer[key] = self._buffer[key], []
1013+
metadata_result, self._metadata_buffer[key] = self._metadata_buffer[key], []
1014+
yield self._emit(tuple(result), list(metadata_result))
1015+
self._release_refs(metadata_result)
9711016

1017+
@gen.coroutine
9721018
def update(self, x, who=None, metadata=None):
9731019
self._retain_refs(metadata)
974-
self._buffer.append(x)
1020+
key = self._get_key(x)
1021+
buffer = self._buffer[key]
1022+
metadata_buffer = self._metadata_buffer[key]
1023+
buffer.append(x)
9751024
if isinstance(metadata, list):
976-
self.metadata_buffer.extend(metadata)
1025+
metadata_buffer.extend(metadata)
9771026
else:
978-
self.metadata_buffer.append(metadata)
979-
if len(self._buffer) == self.n:
980-
result, self._buffer = self._buffer, []
981-
metadata_result, self.metadata_buffer = self.metadata_buffer, []
982-
ret = self._emit(tuple(result), list(metadata_result))
983-
self._release_refs(metadata_result)
984-
return ret
985-
else:
986-
return []
1027+
metadata_buffer.append(metadata)
1028+
if len(buffer) == 1 and self._timeout is not None:
1029+
self._callbacks[key] = self.loop.call_later(
1030+
self._timeout, self._flush, key
1031+
)
1032+
if len(buffer) == self.n:
1033+
if self._timeout is not None:
1034+
self._callbacks[key].cancel()
1035+
yield self._flush(key)
9871036

9881037

9891038
@Stream.register_api()

streamz/tests/test_core.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,56 @@ def test_partition():
164164
assert L == [(0, 1), (2, 3), (4, 5), (6, 7), (8, 9)]
165165

166166

167+
def test_partition_timeout():
168+
source = Stream()
169+
L = source.partition(10, timeout=0.01).sink_to_list()
170+
171+
for i in range(5):
172+
source.emit(i)
173+
174+
sleep(0.1)
175+
176+
assert L == [(0, 1, 2, 3, 4)]
177+
178+
179+
def test_partition_timeout_cancel():
180+
source = Stream()
181+
L = source.partition(3, timeout=0.1).sink_to_list()
182+
183+
for i in range(3):
184+
source.emit(i)
185+
186+
sleep(0.09)
187+
source.emit(3)
188+
sleep(0.02)
189+
190+
assert L == [(0, 1, 2)]
191+
192+
sleep(0.09)
193+
194+
assert L == [(0, 1, 2), (3,)]
195+
196+
197+
def test_partition_key():
198+
source = Stream()
199+
L = source.partition(2, key=0).sink_to_list()
200+
201+
for i in range(4):
202+
source.emit((i % 2, i))
203+
204+
assert L == [((0, 0), (0, 2)), ((1, 1), (1, 3))]
205+
206+
207+
def test_partition_key_callable():
208+
source = Stream()
209+
L = source.partition(2, key=lambda x: x % 2).sink_to_list()
210+
211+
for i in range(10):
212+
source.emit(i)
213+
214+
assert L == [(0, 2), (1, 3), (4, 6), (5, 7)]
215+
216+
167217
def test_sliding_window():
168218
source = Stream()
169219
L = source.sliding_window(2).sink_to_list()

0 commit comments

Comments
 (0)