Skip to content

Commit 365a529

Browse files
Use martindurant:multiple_topic_partitions and use wait_for in some more tests
1 parent b5b55f5 commit 365a529

4 files changed

Lines changed: 19 additions & 21 deletions

File tree

.travis.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ install:
2323
# Install dependencies
2424
- conda env create --name test-streamz --file ./conda/environments/streamz_dev.yml
2525
- source activate test-streamz
26-
- pip install git+https://github.com/dask/distributed.git --upgrade --no-deps
27-
- pip install flaky
2826

2927
- python setup.py install
3028

conda/environments/streamz_dev.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,4 @@ dependencies:
3030
- ipython
3131
- ipykernel
3232
- ipywidgets
33+
- flaky

streamz/sources.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,7 @@ def __init__(self, topic, consumer_params, poll_interval='1s',
471471
self.keys = keys
472472
self.engine = engine
473473
self.stopped = True
474+
self.started = False
474475

475476
super(FromKafkaBatched, self).__init__(ensure_io_loop=True, **kwargs)
476477

@@ -531,6 +532,7 @@ def checkpoint_emit(_part):
531532
tp, timeout=0.1)
532533
except (RuntimeError, ck.KafkaException):
533534
continue
535+
self.started = True
534536
if 'auto.offset.reset' in self.consumer_params.keys():
535537
if self.consumer_params['auto.offset.reset'] == 'latest' and \
536538
self.positions[partition] == -1001:

streamz/tests/test_kafka.py

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ def test_kafka_batch():
202202
stream = Stream.from_kafka_batched(TOPIC, ARGS, max_batch_size=4, keys=True)
203203
out = stream.sink_to_list()
204204
stream.start()
205-
time.sleep(5)
205+
wait_for(lambda: stream.upstream.started, 10, 0.1)
206206
for i in range(10):
207207
kafka.produce(TOPIC, b'value-%d' % i, b'%d' % i)
208208
kafka.flush()
@@ -275,17 +275,17 @@ def test_kafka_batch_npartitions():
275275
npartitions=1)
276276
out2 = stream2.gather().sink_to_list()
277277
stream2.start()
278-
time.sleep(5)
279-
assert (len(out2) == 1 and len(out2[0]) == 5)
278+
wait_for(lambda: stream2.upstream.started, 10, 0.1)
279+
wait_for(lambda: len(out2) == 1 and len(out2[0]) == 5, 10, 0.1)
280280
stream2.upstream.stopped = True
281281

282282
stream3 = Stream.from_kafka_batched(TOPIC, ARGS2,
283283
asynchronous=True,
284284
npartitions=4)
285285
out3 = stream3.gather().sink_to_list()
286286
stream3.start()
287-
time.sleep(5)
288-
assert (len(out3) == 2 and (len(out3[0]) + len(out3[1])) == 10)
287+
wait_for(lambda: stream3.upstream.started, 10, 0.1)
288+
wait_for(lambda: len(out3) == 2 and (len(out3[0]) + len(out3[1])) == 10, 10, 0.1)
289289
stream3.upstream.stopped = True
290290

291291

@@ -317,8 +317,8 @@ def test_kafka_refresh_partitions():
317317
poll_interval='2s')
318318
out = stream.gather().sink_to_list()
319319
stream.start()
320-
time.sleep(5)
321-
assert (len(out) == 2 and (len(out[0]) + len(out[1])) == 10)
320+
wait_for(lambda: stream.upstream.started, 10, 0.1)
321+
wait_for(lambda: len(out) == 2 and (len(out[0]) + len(out[1])) == 10, 10, 0.1)
322322

323323
subprocess.call(shlex.split("docker exec streamz-kafka "
324324
"/opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh "
@@ -331,10 +331,9 @@ def test_kafka_refresh_partitions():
331331
else:
332332
kafka.produce(TOPIC, b'value-%d' % i, partition=3)
333333
kafka.flush()
334-
time.sleep(5)
335334

336-
assert (len(out) == 4 and (len(out[2]) + len(out[3])) == 10
337-
and out[3][4] == b'value-19')
335+
wait_for(lambda: len(out) == 4 and (len(out[2]) + len(out[3])) == 10
336+
and out[3][4] == b'value-19', 10, 0.1)
338337
stream.upstream.stopped = True
339338

340339

@@ -445,7 +444,7 @@ def test_kafka_batch_checkpointing_async_nodes_1():
445444
stream2 = Stream.from_kafka_batched(TOPIC, ARGS)
446445
out2 = stream2.partition(2).sliding_window(2, return_partial=False).sink_to_list()
447446
stream2.start()
448-
time.sleep(2)
447+
wait_for(lambda: stream2.upstream.started, 10, 0.1)
449448
for i in range(2,6):
450449
kafka.produce(TOPIC, b'value-%d' % i)
451450
kafka.flush()
@@ -458,9 +457,9 @@ def test_kafka_batch_checkpointing_async_nodes_1():
458457
stream3 = Stream.from_kafka_batched(TOPIC, ARGS)
459458
out3 = stream3.sink_to_list()
460459
stream3.start()
461-
time.sleep(2)
460+
wait_for(lambda: stream3.upstream.started, 10, 0.1)
462461
#Stream picks up from where it left before, i.e., from the last committed offset.
463-
assert len(out3) == 1 and out3[0] == [b'value-3', b'value-4', b'value-5']
462+
wait_for(lambda: len(out3) == 1 and out3[0] == [b'value-3', b'value-4', b'value-5'], 10, 0.1)
464463
stream3.upstream.stopped = True
465464
stream3.destroy()
466465

@@ -581,17 +580,16 @@ def test_kafka_checkpointing_auto_offset_reset_latest():
581580
stream1 = Stream.from_kafka_batched(TOPIC, ARGS, asynchronous=True)
582581
out1 = stream1.map(split).gather().sink_to_list()
583582
stream1.start()
584-
time.sleep(5)
583+
wait_for(lambda: stream1.upstream.started, 10, 0.1)
585584

586585
'''
587586
Stream has started, so these are read.
588587
'''
589588
for i in range(30):
590589
kafka.produce(TOPIC, b'value-%d' % i)
591590
kafka.flush()
592-
time.sleep(5)
593591

594-
assert (len(out1) == 3 and (len(out1[0]) + len(out1[1]) + len(out1[2])) == 30)
592+
wait_for(lambda: len(out1) == 3 and (len(out1[0]) + len(out1[1]) + len(out1[2])) == 30, 10, 0.1)
595593
'''
596594
Stream stops but checkpoint has been created.
597595
'''
@@ -612,12 +610,11 @@ def test_kafka_checkpointing_auto_offset_reset_latest():
612610
Stream restarts here.
613611
'''
614612
stream2.start()
615-
time.sleep(5)
613+
wait_for(lambda: stream2.upstream.started, 10, 0.1)
616614

617615
for i in range(30):
618616
kafka.produce(TOPIC, b'value-%d' % i)
619617
kafka.flush()
620-
time.sleep(5)
621618

622-
assert (len(out2) == 6 and (len(out2[3]) + len(out2[4]) + len(out2[5])) == 30)
619+
wait_for(lambda: len(out2) == 6 and (len(out2[3]) + len(out2[4]) + len(out2[5])) == 30, 10, 0.1)
623620
stream2.upstream.stopped = True

0 commit comments

Comments
 (0)