Skip to content

Commit cca4261

Browse files
author
Martin Durant
committed
Use wait_fpr
1 parent b5b55f5 commit cca4261

2 files changed

Lines changed: 7 additions & 5 deletions

File tree

streamz/sources.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,7 @@ def __init__(self, topic, consumer_params, poll_interval='1s',
471471
self.keys = keys
472472
self.engine = engine
473473
self.stopped = True
474+
self.started = False
474475

475476
super(FromKafkaBatched, self).__init__(ensure_io_loop=True, **kwargs)
476477

@@ -531,6 +532,7 @@ def checkpoint_emit(_part):
531532
tp, timeout=0.1)
532533
except (RuntimeError, ck.KafkaException):
533534
continue
535+
self.started = True
534536
if 'auto.offset.reset' in self.consumer_params.keys():
535537
if self.consumer_params['auto.offset.reset'] == 'latest' and \
536538
self.positions[partition] == -1001:

streamz/tests/test_kafka.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ def test_kafka_batch():
202202
stream = Stream.from_kafka_batched(TOPIC, ARGS, max_batch_size=4, keys=True)
203203
out = stream.sink_to_list()
204204
stream.start()
205-
time.sleep(5)
205+
wait_for(lambda: stream.upstream.started, 10, 0.1)
206206
for i in range(10):
207207
kafka.produce(TOPIC, b'value-%d' % i, b'%d' % i)
208208
kafka.flush()
@@ -275,17 +275,17 @@ def test_kafka_batch_npartitions():
275275
npartitions=1)
276276
out2 = stream2.gather().sink_to_list()
277277
stream2.start()
278-
time.sleep(5)
279-
assert (len(out2) == 1 and len(out2[0]) == 5)
278+
wait_for(lambda: stream2.upstream.started, 10, 0.1)
279+
wait_for(lambda: len(out2) == 1 and len(out2[0]) == 5, 10, 0.1)
280280
stream2.upstream.stopped = True
281281

282282
stream3 = Stream.from_kafka_batched(TOPIC, ARGS2,
283283
asynchronous=True,
284284
npartitions=4)
285285
out3 = stream3.gather().sink_to_list()
286286
stream3.start()
287-
time.sleep(5)
288-
assert (len(out3) == 2 and (len(out3[0]) + len(out3[1])) == 10)
287+
wait_for(lambda: stream3.upstream.started, 10, 0.1)
288+
wait_for(lambda: len(out3) == 2 and (len(out3[0]) + len(out3[1])) == 10, 10, 0.1)
289289
stream3.upstream.stopped = True
290290

291291

0 commit comments

Comments
 (0)