Skip to content

Commit b2037d7

Browse files
author
Martin Durant
committed
wait for syntax in flaky test
1 parent 36a04a7 commit b2037d7

1 file changed

Lines changed: 5 additions & 3 deletions

File tree

streamz/tests/test_kafka.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -580,7 +580,7 @@ def test_kafka_checkpointing_auto_offset_reset_latest():
580580
stream1 = Stream.from_kafka_batched(TOPIC, ARGS, asynchronous=True)
581581
out1 = stream1.map(split).gather().sink_to_list()
582582
stream1.start()
583-
wait_for(lambda: stream1.upstream.started, 10, 0.1)
583+
wait_for(lambda: stream1.upstream.started, 10, period=0.1)
584584

585585
'''
586586
Stream has started, so these are read.
@@ -589,7 +589,8 @@ def test_kafka_checkpointing_auto_offset_reset_latest():
589589
kafka.produce(TOPIC, b'value-%d' % i)
590590
kafka.flush()
591591

592-
wait_for(lambda: len(out1) == 3 and (len(out1[0]) + len(out1[1]) + len(out1[2])) == 30, 10, 0.1)
592+
wait_for(lambda: len(out1) == 3 and (len(out1[0]) + len(out1[1]) + len(out1[2])) == 30,
593+
10, period=0.1)
593594
'''
594595
Stream stops but checkpoint has been created.
595596
'''
@@ -616,5 +617,6 @@ def test_kafka_checkpointing_auto_offset_reset_latest():
616617
kafka.produce(TOPIC, b'value-%d' % i)
617618
kafka.flush()
618619

619-
wait_for(lambda: len(out2) == 6 and (len(out2[3]) + len(out2[4]) + len(out2[5])) == 30, 10, 0.1)
620+
wait_for(lambda: len(out2) == 6 and (len(out2[3]) + len(out2[4]) + len(out2[5])) == 30,
621+
10, period=0.1)
620622
stream2.upstream.stopped = True

0 commit comments

Comments
 (0)