Skip to content

Commit 05b4c2d

Browse files
committed
Add timed_window_unique node to core
1 parent 059e20a commit 05b4c2d

1 file changed

Lines changed: 120 additions & 0 deletions

File tree

streamz/core.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1267,6 +1267,126 @@ def cb(self):
12671267
yield gen.sleep(self.interval)
12681268

12691269

1270+
@Stream.register_api()
1271+
class timed_window_unique(Stream):
1272+
"""
1273+
Emit a group of elements with unique keys every ``interval`` seconds.
1274+
1275+
Parameters
1276+
----------
1277+
interval: Union[int, str]
1278+
Number of seconds over which to group elements, or a ``pandas``-style
1279+
duration string that can be converted into seconds.
1280+
key: Union[Hashable, Callable[[Any], Hashable]]
1281+
Callable that accepts a stream element and returns a unique, hashable
1282+
representation of the incoming data (``key(x)``), or a hashable that gets
1283+
the corresponding value of a stream element (``x[key]``). For example,
1284+
``key=lambda x: x["a"]`` would allow only elements with unique ``"a"`` values
1285+
to pass through.
1286+
1287+
.. note:: By default, we simply use the element object itself as the key,
1288+
so that object must be hashable. If that's not the case, a non-default
1289+
key must be provided.
1290+
1291+
keep: str
1292+
Which element to keep in the case that a unique key is already found
1293+
in the group. If "first", keep element from the first occurrence of a given
1294+
key; if "last", keep element from the most recent occurrence. Note that
1295+
relative ordering of *elements* is preserved in the data passed through,
1296+
and not ordering of *keys*.
1297+
1298+
Examples
1299+
--------
1300+
>>> source = Stream()
1301+
>>> stream = source.timed_window_unique(interval=2, keep="first").sink(print)
1302+
>>> eles = [1, 2, 1, 3, 1, 3, 3, 2]
1303+
>>> for ele in eles:
1304+
... source.emit(ele)
1305+
... time.sleep(0.6)
1306+
()
1307+
(1, 2, 3)
1308+
(1, 3)
1309+
(2,)
1310+
()
1311+
1312+
>>> source = Stream()
1313+
>>> stream = source.timed_window_unique(interval=2, keep="last").sink(print)
1314+
>>> eles = [1, 2, 1, 3, 1, 3, 3, 2]
1315+
>>> for ele in eles:
1316+
... source.emit(ele)
1317+
... time.sleep(0.6)
1318+
()
1319+
(2, 1, 3)
1320+
(1, 3)
1321+
(2,)
1322+
()
1323+
1324+
>>> source = Stream()
1325+
>>> stream = source.timed_window_unique(interval=2, key=lambda x: len(x), keep="last").sink(print)
1326+
>>> eles = ["f", "fo", "f", "foo", "f", "foo", "foo", "fo"]
1327+
>>> for ele in eles:
1328+
... source.emit(ele)
1329+
... time.sleep(0.6)
1330+
()
1331+
('fo', 'f', 'foo')
1332+
('f', 'foo')
1333+
('fo',)
1334+
()
1335+
"""
1336+
_graphviz_shape = "octagon"
1337+
1338+
def __init__(
1339+
self,
1340+
upstream,
1341+
interval: Union[int, str],
1342+
key: Union[Hashable, Callable[[Any], Hashable]] = identity,
1343+
keep: str = "first", # Literal["first", "last"]
1344+
**kwargs
1345+
):
1346+
self.interval = convert_interval(interval)
1347+
self.key = key
1348+
self.keep = keep
1349+
self._buffer = {}
1350+
self._metadata_buffer = {}
1351+
self.last = gen.moment
1352+
Stream.__init__(self, upstream, ensure_io_loop=True, **kwargs)
1353+
self.loop.add_callback(self.cb)
1354+
1355+
def _get_key(self, x):
1356+
if callable(self.key):
1357+
return self.key(x)
1358+
else:
1359+
return x[self.key]
1360+
1361+
def update(self, x, who=None, metadata=None):
1362+
self._retain_refs(metadata)
1363+
y = self._get_key(x)
1364+
if self.keep == "last":
1365+
# remove key if already present so that emitted value
1366+
# will reflect elements' actual relative ordering
1367+
self._buffer.pop(y, None)
1368+
self._metadata_buffer.pop(y, None)
1369+
self._buffer[y] = x
1370+
self._metadata_buffer[y] = metadata
1371+
else: # self.keep == "first"
1372+
if y not in self._buffer:
1373+
self._buffer[y] = x
1374+
self._metadata_buffer[y] = metadata
1375+
return self.last
1376+
1377+
@gen.coroutine
1378+
def cb(self):
1379+
while True:
1380+
result, self._buffer = tuple(self._buffer.values()), {}
1381+
metadata_result, self._metadata_buffer = list(self._metadata_buffer.values()), {}
1382+
# TODO: figure out why metadata_result is handled differently here...
1383+
m = [m for ml in metadata_result for m in ml]
1384+
self.last = self._emit(result, m)
1385+
self._release_refs(m)
1386+
yield self.last
1387+
yield gen.sleep(self.interval)
1388+
1389+
12701390
@Stream.register_api()
12711391
class delay(Stream):
12721392
""" Add a time delay to results """

0 commit comments

Comments
 (0)