Skip to content

Commit e24d0ba

Browse files
authored
Merge pull request #368 from martindurant/multiple_topic_partitions
Multiple topic partitions
2 parents 7f92174 + f3d6afe commit e24d0ba

4 files changed

Lines changed: 85 additions & 20 deletions

File tree

.travis.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ install:
2323
# Install dependencies
2424
- conda env create --name test-streamz --file ./conda/environments/streamz_dev.yml
2525
- source activate test-streamz
26-
- pip install git+https://github.com/dask/distributed.git --upgrade --no-deps
27-
- pip install flaky
2826

2927
- python setup.py install
3028

conda/environments/streamz_dev.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,4 @@ dependencies:
3030
- ipython
3131
- ipykernel
3232
- ipywidgets
33+
- flaky

streamz/sources.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,7 @@ def __init__(self, topic, consumer_params, poll_interval='1s',
471471
self.keys = keys
472472
self.engine = engine
473473
self.stopped = True
474+
self.started = False
474475

475476
super(FromKafkaBatched, self).__init__(ensure_io_loop=True, **kwargs)
476477

@@ -531,10 +532,10 @@ def checkpoint_emit(_part):
531532
tp, timeout=0.1)
532533
except (RuntimeError, ck.KafkaException):
533534
continue
535+
self.started = True
534536
if 'auto.offset.reset' in self.consumer_params.keys():
535537
if self.consumer_params['auto.offset.reset'] == 'latest' and \
536-
(self.positions == [-1001] * self.npartitions
537-
or self.positions == [0] * self.npartitions):
538+
self.positions[partition] == -1001:
538539
self.positions[partition] = high
539540
current_position = self.positions[partition]
540541
lowest = max(current_position, low)

streamz/tests/test_kafka.py

Lines changed: 81 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ def test_kafka_batch():
202202
stream = Stream.from_kafka_batched(TOPIC, ARGS, max_batch_size=4, keys=True)
203203
out = stream.sink_to_list()
204204
stream.start()
205-
time.sleep(5)
205+
wait_for(lambda: stream.upstream.started, 10, 0.1)
206206
for i in range(10):
207207
kafka.produce(TOPIC, b'value-%d' % i, b'%d' % i)
208208
kafka.flush()
@@ -255,7 +255,6 @@ def test_kafka_batch_npartitions():
255255
"--create --zookeeper localhost:2181 "
256256
"--replication-factor 1 --partitions 2 "
257257
"--topic test-partitions"))
258-
time.sleep(5)
259258

260259
for i in range(10):
261260
if i % 2 == 0:
@@ -276,17 +275,17 @@ def test_kafka_batch_npartitions():
276275
npartitions=1)
277276
out2 = stream2.gather().sink_to_list()
278277
stream2.start()
279-
time.sleep(5)
280-
assert (len(out2) == 1 and len(out2[0]) == 5)
278+
wait_for(lambda: stream2.upstream.started, 10, 0.1)
279+
wait_for(lambda: len(out2) == 1 and len(out2[0]) == 5, 10, 0.1)
281280
stream2.upstream.stopped = True
282281

283282
stream3 = Stream.from_kafka_batched(TOPIC, ARGS2,
284283
asynchronous=True,
285284
npartitions=4)
286285
out3 = stream3.gather().sink_to_list()
287286
stream3.start()
288-
time.sleep(5)
289-
assert (len(out3) == 2 and (len(out3[0]) + len(out3[1])) == 10)
287+
wait_for(lambda: stream3.upstream.started, 10, 0.1)
288+
wait_for(lambda: len(out3) == 2 and (len(out3[0]) + len(out3[1])) == 10, 10, 0.1)
290289
stream3.upstream.stopped = True
291290

292291

@@ -304,7 +303,6 @@ def test_kafka_refresh_partitions():
304303
"--create --zookeeper localhost:2181 "
305304
"--replication-factor 1 --partitions 2 "
306305
"--topic test-refresh-partitions"))
307-
time.sleep(2)
308306

309307
for i in range(10):
310308
if i % 2 == 0:
@@ -319,25 +317,23 @@ def test_kafka_refresh_partitions():
319317
poll_interval='2s')
320318
out = stream.gather().sink_to_list()
321319
stream.start()
322-
time.sleep(5)
323-
assert (len(out) == 2 and (len(out[0]) + len(out[1])) == 10)
320+
wait_for(lambda: stream.upstream.started, 10, 0.1)
321+
wait_for(lambda: len(out) == 2 and (len(out[0]) + len(out[1])) == 10, 10, 0.1)
324322

325323
subprocess.call(shlex.split("docker exec streamz-kafka "
326324
"/opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh "
327325
"--alter --zookeeper localhost:2181 "
328326
"--topic test-refresh-partitions --partitions 4"))
329327
time.sleep(5)
330-
331328
for i in range(10,20):
332329
if i % 2 == 0:
333330
kafka.produce(TOPIC, b'value-%d' % i, partition=2)
334331
else:
335332
kafka.produce(TOPIC, b'value-%d' % i, partition=3)
336333
kafka.flush()
337-
time.sleep(5)
338334

339-
assert (len(out) == 4 and (len(out[2]) + len(out[3])) == 10
340-
and out[3][4] == b'value-19')
335+
wait_for(lambda: len(out) == 4 and (len(out[2]) + len(out[3])) == 10
336+
and out[3][4] == b'value-19', 10, 0.1)
341337
stream.upstream.stopped = True
342338

343339

@@ -448,7 +444,7 @@ def test_kafka_batch_checkpointing_async_nodes_1():
448444
stream2 = Stream.from_kafka_batched(TOPIC, ARGS)
449445
out2 = stream2.partition(2).sliding_window(2, return_partial=False).sink_to_list()
450446
stream2.start()
451-
time.sleep(2)
447+
wait_for(lambda: stream2.upstream.started, 10, 0.1)
452448
for i in range(2,6):
453449
kafka.produce(TOPIC, b'value-%d' % i)
454450
kafka.flush()
@@ -461,9 +457,9 @@ def test_kafka_batch_checkpointing_async_nodes_1():
461457
stream3 = Stream.from_kafka_batched(TOPIC, ARGS)
462458
out3 = stream3.sink_to_list()
463459
stream3.start()
464-
time.sleep(2)
460+
wait_for(lambda: stream3.upstream.started, 10, 0.1)
465461
#Stream picks up from where it left before, i.e., from the last committed offset.
466-
assert len(out3) == 1 and out3[0] == [b'value-3', b'value-4', b'value-5']
462+
wait_for(lambda: len(out3) == 1 and out3[0] == [b'value-3', b'value-4', b'value-5'], 10, 0.1)
467463
stream3.upstream.stopped = True
468464
stream3.destroy()
469465

@@ -553,3 +549,72 @@ def test_kafka_batch_checkpointing_async_nodes_2():
553549
assert committed2[1].offset == 2
554550
assert committed3[0].offset == 1
555551
assert committed3[1].offset == 1
552+
553+
554+
def test_kafka_checkpointing_auto_offset_reset_latest():
555+
'''
556+
Testing whether checkpointing works as expected with multiple topic partitions and
557+
with auto.offset.reset configuration set to latest (also default).
558+
'''
559+
j = random.randint(0, 10000)
560+
ARGS = {'bootstrap.servers': 'localhost:9092',
561+
'group.id': 'streamz-test%i' % j,
562+
'auto.offset.reset': 'latest'}
563+
with kafka_service() as kafka:
564+
kafka, TOPIC = kafka
565+
TOPIC = "test-checkpointing-offset-reset-latest"
566+
subprocess.call(shlex.split("docker exec streamz-kafka "
567+
"/opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh "
568+
"--create --zookeeper localhost:2181 "
569+
"--replication-factor 1 --partitions 3 "
570+
"--topic test-checkpointing-offset-reset-latest"))
571+
572+
'''
573+
Since the stream has not started yet, these messages are not read because
574+
the stream has auto.offset.reset set to latest.
575+
'''
576+
for i in range(30):
577+
kafka.produce(TOPIC, b'value-%d' % i)
578+
kafka.flush()
579+
580+
stream1 = Stream.from_kafka_batched(TOPIC, ARGS, asynchronous=True)
581+
out1 = stream1.map(split).gather().sink_to_list()
582+
stream1.start()
583+
wait_for(lambda: stream1.upstream.started, 10, 0.1)
584+
585+
'''
586+
Stream has started, so these are read.
587+
'''
588+
for i in range(30):
589+
kafka.produce(TOPIC, b'value-%d' % i)
590+
kafka.flush()
591+
592+
wait_for(lambda: len(out1) == 3 and (len(out1[0]) + len(out1[1]) + len(out1[2])) == 30, 10, 0.1)
593+
'''
594+
Stream stops but checkpoint has been created.
595+
'''
596+
stream1.upstream.stopped = True
597+
598+
'''
599+
When the stream is restarted, these messages are read, because the checkpoint
600+
overrrides the auto.offset.reset:latest config this time around as expected.
601+
'''
602+
for i in range(30):
603+
kafka.produce(TOPIC, b'value-%d' % i)
604+
kafka.flush()
605+
606+
stream2 = Stream.from_kafka_batched(TOPIC, ARGS, asynchronous=True)
607+
out2 = stream2.map(split).gather().sink_to_list()
608+
609+
'''
610+
Stream restarts here.
611+
'''
612+
stream2.start()
613+
wait_for(lambda: stream2.upstream.started, 10, 0.1)
614+
615+
for i in range(30):
616+
kafka.produce(TOPIC, b'value-%d' % i)
617+
kafka.flush()
618+
619+
wait_for(lambda: len(out2) == 6 and (len(out2[3]) + len(out2[4]) + len(out2[5])) == 30, 10, 0.1)
620+
stream2.upstream.stopped = True

0 commit comments

Comments
 (0)