Skip to content

Commit b82ca42

Browse files
authored
Merge pull request #389 from bdewilde/add-uniqueing-nodes
Add nodes that unique their buffers
2 parents 6e07036 + d4bb082 commit b82ca42

3 files changed

Lines changed: 305 additions & 0 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ log
1313
.cache/
1414
.ipynb_checkpoints/
1515
.vscode
16+
.python-version

streamz/core.py

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import sys
99
import threading
1010
from time import time
11+
from typing import Any, Callable, Hashable, Union
1112
import weakref
1213

1314
import toolz
@@ -1022,6 +1023,107 @@ def update(self, x, who=None, metadata=None):
10221023
)
10231024

10241025

1026+
@Stream.register_api()
1027+
class partition_unique(Stream):
1028+
"""
1029+
Partition stream elements into groups of equal size with unique keys only.
1030+
1031+
Parameters
1032+
----------
1033+
n: int
1034+
Number of (unique) elements to pass through as a group.
1035+
key: Union[Hashable, Callable[[Any], Hashable]]
1036+
Callable that accepts a stream element and returns a unique, hashable
1037+
representation of the incoming data (``key(x)``), or a hashable that gets
1038+
the corresponding value of a stream element (``x[key]``). For example,
1039+
``key=lambda x: x["a"]`` would allow only elements with unique ``"a"`` values
1040+
to pass through.
1041+
1042+
.. note:: By default, we simply use the element object itself as the key,
1043+
so that object must be hashable. If that's not the case, a non-default
1044+
key must be provided.
1045+
1046+
keep: str
1047+
Which element to keep in the case that a unique key is already found
1048+
in the group. If "first", keep element from the first occurrence of a given
1049+
key; if "last", keep element from the most recent occurrence. Note that
1050+
relative ordering of *elements* is preserved in the data passed through,
1051+
and not ordering of *keys*.
1052+
**kwargs
1053+
1054+
Examples
1055+
--------
1056+
>>> source = Stream()
1057+
>>> stream = source.partition_unique(n=3, keep="first").sink(print)
1058+
>>> eles = [1, 2, 1, 3, 1, 3, 3, 2]
1059+
>>> for ele in eles:
1060+
... source.emit(ele)
1061+
(1, 2, 3)
1062+
(1, 3, 2)
1063+
1064+
>>> source = Stream()
1065+
>>> stream = source.partition_unique(n=3, keep="last").sink(print)
1066+
>>> eles = [1, 2, 1, 3, 1, 3, 3, 2]
1067+
>>> for ele in eles:
1068+
... source.emit(ele)
1069+
(2, 1, 3)
1070+
(1, 3, 2)
1071+
1072+
>>> source = Stream()
1073+
>>> stream = source.partition_unique(n=3, key=lambda x: len(x), keep="last").sink(print)
1074+
>>> eles = ["f", "fo", "f", "foo", "f", "foo", "foo", "fo"]
1075+
>>> for ele in eles:
1076+
... source.emit(ele)
1077+
('fo', 'f', 'foo')
1078+
('f', 'foo', 'fo')
1079+
"""
1080+
_graphviz_shape = "diamond"
1081+
1082+
def __init__(
1083+
self,
1084+
upstream,
1085+
n: int,
1086+
key: Union[Hashable, Callable[[Any], Hashable]] = identity,
1087+
keep: str = "first", # Literal["first", "last"]
1088+
**kwargs
1089+
):
1090+
self.n = n
1091+
self.key = key
1092+
self.keep = keep
1093+
self._buffer = {}
1094+
self._metadata_buffer = {}
1095+
Stream.__init__(self, upstream, **kwargs)
1096+
1097+
def _get_key(self, x):
1098+
if callable(self.key):
1099+
return self.key(x)
1100+
else:
1101+
return x[self.key]
1102+
1103+
def update(self, x, who=None, metadata=None):
1104+
self._retain_refs(metadata)
1105+
y = self._get_key(x)
1106+
if self.keep == "last":
1107+
# remove key if already present so that emitted value
1108+
# will reflect elements' actual relative ordering
1109+
self._buffer.pop(y, None)
1110+
self._metadata_buffer.pop(y, None)
1111+
self._buffer[y] = x
1112+
self._metadata_buffer[y] = metadata
1113+
else: # self.keep == "first"
1114+
if y not in self._buffer:
1115+
self._buffer[y] = x
1116+
self._metadata_buffer[y] = metadata
1117+
if len(self._buffer) == self.n:
1118+
result, self._buffer = tuple(self._buffer.values()), {}
1119+
metadata_result, self._metadata_buffer = list(self._metadata_buffer.values()), {}
1120+
ret = self._emit(result, metadata_result)
1121+
self._release_refs(metadata_result)
1122+
return ret
1123+
else:
1124+
return []
1125+
1126+
10251127
@Stream.register_api()
10261128
class sliding_window(Stream):
10271129
""" Produce overlapping tuples of size n
@@ -1117,6 +1219,124 @@ def cb(self):
11171219
yield gen.sleep(self.interval)
11181220

11191221

1222+
@Stream.register_api()
1223+
class timed_window_unique(Stream):
1224+
"""
1225+
Emit a group of elements with unique keys every ``interval`` seconds.
1226+
1227+
Parameters
1228+
----------
1229+
interval: Union[int, str]
1230+
Number of seconds over which to group elements, or a ``pandas``-style
1231+
duration string that can be converted into seconds.
1232+
key: Union[Hashable, Callable[[Any], Hashable]]
1233+
Callable that accepts a stream element and returns a unique, hashable
1234+
representation of the incoming data (``key(x)``), or a hashable that gets
1235+
the corresponding value of a stream element (``x[key]``). For example, both
1236+
``key=lambda x: x["a"]`` and ``key="a"`` would allow only elements with unique
1237+
``"a"`` values to pass through.
1238+
1239+
.. note:: By default, we simply use the element object itself as the key,
1240+
so that object must be hashable. If that's not the case, a non-default
1241+
key must be provided.
1242+
1243+
keep: str
1244+
Which element to keep in the case that a unique key is already found
1245+
in the group. If "first", keep element from the first occurrence of a given
1246+
key; if "last", keep element from the most recent occurrence. Note that
1247+
relative ordering of *elements* is preserved in the data passed through,
1248+
and not ordering of *keys*.
1249+
1250+
Examples
1251+
--------
1252+
>>> source = Stream()
1253+
1254+
Get unique hashable elements in a window, keeping just the first occurrence:
1255+
>>> stream = source.timed_window_unique(interval=1.0, keep="first").sink(print)
1256+
>>> for ele in [1, 2, 3, 3, 2, 1]:
1257+
... source.emit(ele)
1258+
()
1259+
(1, 2, 3)
1260+
()
1261+
1262+
Get unique hashable elements in a window, keeping just the last occurrence:
1263+
>>> stream = source.timed_window_unique(interval=1.0, keep="last").sink(print)
1264+
>>> for ele in [1, 2, 3, 3, 2, 1]:
1265+
... source.emit(ele)
1266+
()
1267+
(3, 2, 1)
1268+
()
1269+
1270+
Get unique elements in a window by (string) length, keeping just the first occurrence:
1271+
>>> stream = source.timed_window_unique(interval=1.0, key=len, keep="first")
1272+
>>> for ele in ["f", "b", "fo", "ba", "foo", "bar"]:
1273+
... source.emit(ele)
1274+
()
1275+
('f', 'fo', 'foo')
1276+
()
1277+
1278+
Get unique elements in a window by (string) length, keeping just the last occurrence:
1279+
>>> stream = source.timed_window_unique(interval=1.0, key=len, keep="last")
1280+
>>> for ele in ["f", "b", "fo", "ba", "foo", "bar"]:
1281+
... source.emit(ele)
1282+
()
1283+
('b', 'ba', 'bar')
1284+
()
1285+
"""
1286+
_graphviz_shape = "octagon"
1287+
1288+
def __init__(
1289+
self,
1290+
upstream,
1291+
interval: Union[int, str],
1292+
key: Union[Hashable, Callable[[Any], Hashable]] = identity,
1293+
keep: str = "first", # Literal["first", "last"]
1294+
**kwargs
1295+
):
1296+
self.interval = convert_interval(interval)
1297+
self.key = key
1298+
self.keep = keep
1299+
self._buffer = {}
1300+
self._metadata_buffer = {}
1301+
self.last = gen.moment
1302+
Stream.__init__(self, upstream, ensure_io_loop=True, **kwargs)
1303+
self.loop.add_callback(self.cb)
1304+
1305+
def _get_key(self, x):
1306+
if callable(self.key):
1307+
return self.key(x)
1308+
else:
1309+
return x[self.key]
1310+
1311+
def update(self, x, who=None, metadata=None):
1312+
self._retain_refs(metadata)
1313+
y = self._get_key(x)
1314+
if self.keep == "last":
1315+
# remove key if already present so that emitted value
1316+
# will reflect elements' actual relative ordering
1317+
self._buffer.pop(y, None)
1318+
self._metadata_buffer.pop(y, None)
1319+
self._buffer[y] = x
1320+
self._metadata_buffer[y] = metadata
1321+
else: # self.keep == "first"
1322+
if y not in self._buffer:
1323+
self._buffer[y] = x
1324+
self._metadata_buffer[y] = metadata
1325+
return self.last
1326+
1327+
@gen.coroutine
1328+
def cb(self):
1329+
while True:
1330+
result, self._buffer = tuple(self._buffer.values()), {}
1331+
metadata_result, self._metadata_buffer = list(self._metadata_buffer.values()), {}
1332+
# TODO: figure out why metadata_result is handled differently here...
1333+
m = [m for ml in metadata_result for m in ml]
1334+
self.last = self._emit(result, m)
1335+
self._release_refs(m)
1336+
yield self.last
1337+
yield gen.sleep(self.interval)
1338+
1339+
11201340
@Stream.register_api()
11211341
class delay(Stream):
11221342
""" Add a time delay to results """

streamz/tests/test_core.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,43 @@ def test_partition():
164164
assert L == [(0, 1), (2, 3), (4, 5), (6, 7), (8, 9)]
165165

166166

167+
@pytest.mark.parametrize(
168+
"n,key,keep,elements,exp_result",
169+
[
170+
(3, sz.identity, "first", [1, 2, 1, 3, 1, 3, 3, 2], [(1, 2, 3), (1, 3, 2)]),
171+
(3, sz.identity, "last", [1, 2, 1, 3, 1, 3, 3, 2], [(2, 1, 3), (1, 3, 2)]),
172+
(
173+
3,
174+
len,
175+
"last",
176+
["f", "fo", "f", "foo", "f", "foo", "foo", "fo"],
177+
[("fo", "f", "foo"), ("f", "foo", "fo")],
178+
),
179+
(
180+
2,
181+
"id",
182+
"first",
183+
[{"id": 0, "foo": "bar"}, {"id": 0, "foo": "baz"}, {"id": 1, "foo": "bat"}],
184+
[({"id": 0, "foo": "bar"}, {"id": 1, "foo": "bat"})],
185+
),
186+
(
187+
2,
188+
"id",
189+
"last",
190+
[{"id": 0, "foo": "bar"}, {"id": 0, "foo": "baz"}, {"id": 1, "foo": "bat"}],
191+
[({"id": 0, "foo": "baz"}, {"id": 1, "foo": "bat"})],
192+
),
193+
]
194+
)
195+
def test_partition_unique(n, key, keep, elements, exp_result):
196+
source = Stream()
197+
L = source.partition_unique(n, key, keep).sink_to_list()
198+
for ele in elements:
199+
source.emit(ele)
200+
201+
assert L == exp_result
202+
203+
167204
def test_partition_timeout():
168205
source = Stream()
169206
L = source.partition(10, timeout=0.01).sink_to_list()
@@ -296,6 +333,53 @@ def read_from_q():
296333
assert end - start >= 0.2
297334

298335

336+
@gen_test()
337+
def test_timed_window_unique():
338+
tests = [
339+
(0.05, sz.identity, "first", [1, 2, 1, 3, 1, 3, 3, 2], [(1, 2, 3)]),
340+
(0.05, sz.identity, "last", [1, 2, 1, 3, 1, 3, 3, 2], [(1, 3, 2)]),
341+
(
342+
0.05,
343+
len,
344+
"last",
345+
["f", "fo", "f", "foo", "f", "foo", "foo", "fo"],
346+
[("f", "foo", "fo")],
347+
),
348+
(
349+
0.05,
350+
"id",
351+
"first",
352+
[{"id": 0, "foo": "bar"}, {"id": 1, "foo": "bat"}, {"id": 0, "foo": "baz"}],
353+
[({"id": 0, "foo": "bar"}, {"id": 1, "foo": "bat"})],
354+
),
355+
(
356+
0.05,
357+
"id",
358+
"last",
359+
[{"id": 0, "foo": "bar"}, {"id": 1, "foo": "bat"}, {"id": 0, "foo": "baz"}],
360+
[({"id": 1, "foo": "bat"}, {"id": 0, "foo": "baz"})],
361+
),
362+
]
363+
for interval, key, keep, elements, exp_result in tests:
364+
source = Stream(asynchronous=True)
365+
a = source.timed_window_unique(interval, key, keep)
366+
367+
assert a.loop is IOLoop.current()
368+
L = a.sink_to_list()
369+
370+
for ele in elements:
371+
yield source.emit(ele)
372+
yield gen.sleep(a.interval)
373+
374+
assert L
375+
assert all(wi in elements for window in L for wi in window)
376+
assert sum(1 for window in L for _ in window) <= len(elements)
377+
assert L == exp_result
378+
379+
yield gen.sleep(a.interval)
380+
assert not L[-1]
381+
382+
299383
@gen_test()
300384
def test_timed_window():
301385
source = Stream(asynchronous=True)

0 commit comments

Comments
 (0)