Skip to content

Commit f3d6afe

Browse files
authored
Merge pull request #1 from chinmaychandak/master
Some more changes to Kafka tests using wait_for
2 parents 51e6579 + eb4aa5e commit f3d6afe

1 file changed

Lines changed: 11 additions & 14 deletions

File tree

streamz/tests/test_kafka.py

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -317,8 +317,8 @@ def test_kafka_refresh_partitions():
317317
poll_interval='2s')
318318
out = stream.gather().sink_to_list()
319319
stream.start()
320-
time.sleep(5)
321-
assert (len(out) == 2 and (len(out[0]) + len(out[1])) == 10)
320+
wait_for(lambda: stream.upstream.started, 10, 0.1)
321+
wait_for(lambda: len(out) == 2 and (len(out[0]) + len(out[1])) == 10, 10, 0.1)
322322

323323
subprocess.call(shlex.split("docker exec streamz-kafka "
324324
"/opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh "
@@ -331,10 +331,9 @@ def test_kafka_refresh_partitions():
331331
else:
332332
kafka.produce(TOPIC, b'value-%d' % i, partition=3)
333333
kafka.flush()
334-
time.sleep(5)
335334

336-
assert (len(out) == 4 and (len(out[2]) + len(out[3])) == 10
337-
and out[3][4] == b'value-19')
335+
wait_for(lambda: len(out) == 4 and (len(out[2]) + len(out[3])) == 10
336+
and out[3][4] == b'value-19', 10, 0.1)
338337
stream.upstream.stopped = True
339338

340339

@@ -445,7 +444,7 @@ def test_kafka_batch_checkpointing_async_nodes_1():
445444
stream2 = Stream.from_kafka_batched(TOPIC, ARGS)
446445
out2 = stream2.partition(2).sliding_window(2, return_partial=False).sink_to_list()
447446
stream2.start()
448-
time.sleep(2)
447+
wait_for(lambda: stream2.upstream.started, 10, 0.1)
449448
for i in range(2,6):
450449
kafka.produce(TOPIC, b'value-%d' % i)
451450
kafka.flush()
@@ -458,9 +457,9 @@ def test_kafka_batch_checkpointing_async_nodes_1():
458457
stream3 = Stream.from_kafka_batched(TOPIC, ARGS)
459458
out3 = stream3.sink_to_list()
460459
stream3.start()
461-
time.sleep(2)
460+
wait_for(lambda: stream3.upstream.started, 10, 0.1)
462461
#Stream picks up from where it left before, i.e., from the last committed offset.
463-
assert len(out3) == 1 and out3[0] == [b'value-3', b'value-4', b'value-5']
462+
wait_for(lambda: len(out3) == 1 and out3[0] == [b'value-3', b'value-4', b'value-5'], 10, 0.1)
464463
stream3.upstream.stopped = True
465464
stream3.destroy()
466465

@@ -581,17 +580,16 @@ def test_kafka_checkpointing_auto_offset_reset_latest():
581580
stream1 = Stream.from_kafka_batched(TOPIC, ARGS, asynchronous=True)
582581
out1 = stream1.map(split).gather().sink_to_list()
583582
stream1.start()
584-
time.sleep(5)
583+
wait_for(lambda: stream1.upstream.started, 10, 0.1)
585584

586585
'''
587586
Stream has started, so these are read.
588587
'''
589588
for i in range(30):
590589
kafka.produce(TOPIC, b'value-%d' % i)
591590
kafka.flush()
592-
time.sleep(5)
593591

594-
assert (len(out1) == 3 and (len(out1[0]) + len(out1[1]) + len(out1[2])) == 30)
592+
wait_for(lambda: len(out1) == 3 and (len(out1[0]) + len(out1[1]) + len(out1[2])) == 30, 10, 0.1)
595593
'''
596594
Stream stops but checkpoint has been created.
597595
'''
@@ -612,12 +610,11 @@ def test_kafka_checkpointing_auto_offset_reset_latest():
612610
Stream restarts here.
613611
'''
614612
stream2.start()
615-
time.sleep(5)
613+
wait_for(lambda: stream2.upstream.started, 10, 0.1)
616614

617615
for i in range(30):
618616
kafka.produce(TOPIC, b'value-%d' % i)
619617
kafka.flush()
620-
time.sleep(5)
621618

622-
assert (len(out2) == 6 and (len(out2[3]) + len(out2[4]) + len(out2[5])) == 30)
619+
wait_for(lambda: len(out2) == 6 and (len(out2[3]) + len(out2[4]) + len(out2[5])) == 30, 10, 0.1)
623620
stream2.upstream.stopped = True

0 commit comments

Comments
 (0)