Skip to content

Commit ba1c5c3

Browse files
committed
Add tests for partition_unique node
1 parent 05b4c2d commit ba1c5c3

1 file changed

Lines changed: 37 additions & 0 deletions

File tree

streamz/tests/test_core.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,43 @@ def test_partition():
164164
assert L == [(0, 1), (2, 3), (4, 5), (6, 7), (8, 9)]
165165

166166

167+
@pytest.mark.parametrize(
168+
"n,key,keep,elements,exp_result",
169+
[
170+
(3, sz.identity, "first", [1, 2, 1, 3, 1, 3, 3, 2], [(1, 2, 3), (1, 3, 2)]),
171+
(3, sz.identity, "last", [1, 2, 1, 3, 1, 3, 3, 2], [(2, 1, 3), (1, 3, 2)]),
172+
(
173+
3,
174+
len,
175+
"last",
176+
["f", "fo", "f", "foo", "f", "foo", "foo", "fo"],
177+
[("fo", "f", "foo"), ("f", "foo", "fo")],
178+
),
179+
(
180+
2,
181+
"id",
182+
"first",
183+
[{"id": 0, "foo": "bar"}, {"id": 0, "foo": "baz"}, {"id": 1, "foo": "bat"}],
184+
[({"id": 0, "foo": "bar"}, {"id": 1, "foo": "bat"})],
185+
),
186+
(
187+
2,
188+
"id",
189+
"last",
190+
[{"id": 0, "foo": "bar"}, {"id": 0, "foo": "baz"}, {"id": 1, "foo": "bat"}],
191+
[({"id": 0, "foo": "baz"}, {"id": 1, "foo": "bat"})],
192+
),
193+
]
194+
)
195+
def test_partition_unique(n, key, keep, elements, exp_result):
196+
source = Stream()
197+
L = source.partition_unique(n, key, keep).sink_to_list()
198+
for ele in elements:
199+
source.emit(ele)
200+
201+
assert L == exp_result
202+
203+
167204
def test_partition_timeout():
168205
source = Stream()
169206
L = source.partition(10, timeout=0.01).sink_to_list()

0 commit comments

Comments
 (0)