Skip to content

Commit 7096fdd

Browse files
committed
Fix test for timed_window_unique node
1 parent 1f5ed85 commit 7096fdd

1 file changed

Lines changed: 45 additions & 52 deletions

File tree

streamz/tests/test_core.py

Lines changed: 45 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -333,58 +333,51 @@ def read_from_q():
333333
assert end - start >= 0.2
334334

335335

336-
# TODO: looks like gen_test and pytest.mark.parametrize don't play nicely together:
337-
# TypeError: test_timed_window_unique() missing 5 required positional arguments: 'interval', 'key', 'keep', 'elements', and 'exp_result'
338-
# I think gen_test() needs to be modified for this to work, but unsure how
339-
# TODO: fix tests' exp_result values and the logic inside, once it runs without ^ error
340-
# @gen_test()
341-
# @pytest.mark.parametrize(
342-
# "interval,key,keep,elements,exp_result",
343-
# [
344-
# (0.2, sz.identity, "first", [1, 2, 1, 3, 1, 3, 3, 2], [(1, 2, 3), (1, 3, 2)]),
345-
# (0.2, sz.identity, "last", [1, 2, 1, 3, 1, 3, 3, 2], [(2, 1, 3), (1, 3, 2)]),
346-
# (
347-
# 0.02,
348-
# len,
349-
# "last",
350-
# ["f", "fo", "f", "foo", "f", "foo", "foo", "fo"],
351-
# [("fo", "f", "foo"), ("f", "foo", "fo")],
352-
# ),
353-
# (
354-
# 0.02,
355-
# "id",
356-
# "first",
357-
# [{"id": 0, "foo": "bar"}, {"id": 0, "foo": "baz"}, {"id": 1, "foo": "bat"}],
358-
# [({"id": 0, "foo": "bar"}, {"id": 1, "foo": "bat"})],
359-
# ),
360-
# (
361-
# 0.02,
362-
# "id",
363-
# "last",
364-
# [{"id": 0, "foo": "bar"}, {"id": 0, "foo": "baz"}, {"id": 1, "foo": "bat"}],
365-
# [({"id": 0, "foo": "baz"}, {"id": 1, "foo": "bat"})],
366-
# ),
367-
# ]
368-
# )
369-
# def test_timed_window_unique(interval, key, keep, elements, exp_result):
370-
# import pdb; pdb.set_trace()
371-
# source = Stream(asynchronous=True)
372-
# a = source.timed_window_unique(interval, key, keep)
373-
374-
# assert a.loop is IOLoop.current()
375-
# L = a.sink_to_list()
376-
377-
# for ele in elements:
378-
# yield source.emit(ele)
379-
# yield gen.sleep(0.006)
380-
# yield gen.sleep(a.interval)
381-
382-
# assert L
383-
# assert all(len(x) <= 3 for x in L)
384-
# assert any(len(x) >= 2 for x in L)
385-
386-
# yield gen.sleep(0.1)
387-
# assert not L[-1]
336+
@gen_test()
337+
def test_timed_window_unique():
338+
tests = [
339+
(0.05, sz.identity, "first", [1, 2, 1, 3, 1, 3, 3, 2], [(1, 2, 3)]),
340+
(0.05, sz.identity, "last", [1, 2, 1, 3, 1, 3, 3, 2], [(1, 3, 2)]),
341+
(
342+
0.05,
343+
len,
344+
"last",
345+
["f", "fo", "f", "foo", "f", "foo", "foo", "fo"],
346+
[("f", "foo", "fo")],
347+
),
348+
(
349+
0.05,
350+
"id",
351+
"first",
352+
[{"id": 0, "foo": "bar"}, {"id": 1, "foo": "bat"}, {"id": 0, "foo": "baz"}],
353+
[({"id": 0, "foo": "bar"}, {"id": 1, "foo": "bat"})],
354+
),
355+
(
356+
0.05,
357+
"id",
358+
"last",
359+
[{"id": 0, "foo": "bar"}, {"id": 1, "foo": "bat"}, {"id": 0, "foo": "baz"}],
360+
[({"id": 1, "foo": "bat"}, {"id": 0, "foo": "baz"})],
361+
),
362+
]
363+
for interval, key, keep, elements, exp_result in tests:
364+
source = Stream(asynchronous=True)
365+
a = source.timed_window_unique(interval, key, keep)
366+
367+
assert a.loop is IOLoop.current()
368+
L = a.sink_to_list()
369+
370+
for ele in elements:
371+
yield source.emit(ele)
372+
yield gen.sleep(a.interval)
373+
374+
assert L
375+
assert all(wi in elements for window in L for wi in window)
376+
assert sum(1 for window in L for _ in window) <= len(elements)
377+
assert L == exp_result
378+
379+
yield gen.sleep(a.interval)
380+
assert not L[-1]
388381

389382

390383
@gen_test()

0 commit comments

Comments
 (0)