Skip to content

Commit 906cea8

Browse files
authored
Merge pull request #358 from chinmaychandak/master
Make npartitions optional when streaming from Kafka and set auto.offset.reset to latest as default.
2 parents 2c18ef2 + f9f0c60 commit 906cea8

2 files changed

Lines changed: 90 additions & 28 deletions

File tree

streamz/sources.py

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -453,16 +453,17 @@ def _close_consumer(self):
453453
class FromKafkaBatched(Stream):
454454
"""Base class for both local and cluster-based batched kafka processing"""
455455
def __init__(self, topic, consumer_params, poll_interval='1s',
456-
npartitions=1, max_batch_size=10000, keys=False,
456+
npartitions=None, max_batch_size=10000, keys=False,
457457
engine=None, **kwargs):
458458
self.consumer_params = consumer_params
459459
# Override the auto-commit config to enforce custom streamz checkpointing
460460
self.consumer_params['enable.auto.commit'] = 'false'
461461
if 'auto.offset.reset' not in self.consumer_params.keys():
462-
consumer_params['auto.offset.reset'] = 'earliest'
462+
consumer_params['auto.offset.reset'] = 'latest'
463463
self.topic = topic
464464
self.npartitions = npartitions
465-
self.positions = [0] * npartitions
465+
if self.npartitions is not None and self.npartitions <= 0:
466+
raise ValueError("Number of Kafka topic partitions must be > 0.")
466467
self.poll_interval = convert_interval(poll_interval)
467468
self.max_batch_size = max_batch_size
468469
self.keys = keys
@@ -485,6 +486,14 @@ def checkpoint_emit(_part):
485486
ref = RefCounter(cb=lambda: commit(_part))
486487
yield self._emit(_part, metadata=[{'ref': ref}])
487488

489+
if self.npartitions is None:
490+
kafka_cluster_metadata = self.consumer.list_topics(self.topic)
491+
if self.engine == "cudf": # pragma: no cover
492+
self.npartitions = len(kafka_cluster_metadata[self.topic.encode('utf-8')])
493+
else:
494+
self.npartitions = len(kafka_cluster_metadata.topics[self.topic].partitions)
495+
self.positions = [0] * self.npartitions
496+
488497
tps = []
489498
for partition in range(self.npartitions):
490499
tps.append(ck.TopicPartition(self.topic, partition))
@@ -510,7 +519,9 @@ def checkpoint_emit(_part):
510519
except (RuntimeError, ck.KafkaException):
511520
continue
512521
if 'auto.offset.reset' in self.consumer_params.keys():
513-
if self.consumer_params['auto.offset.reset'] == 'latest':
522+
if self.consumer_params['auto.offset.reset'] == 'latest' and \
523+
(self.positions == [-1001] * self.npartitions
524+
or self.positions == [0] * self.npartitions):
514525
self.positions[partition] = high
515526
current_position = self.positions[partition]
516527
lowest = max(current_position, low)
@@ -551,7 +562,7 @@ def start(self):
551562

552563
@Stream.register_api(staticmethod)
553564
def from_kafka_batched(topic, consumer_params, poll_interval='1s',
554-
npartitions=1, start=False, dask=False,
565+
npartitions=None, start=False, dask=False,
555566
max_batch_size=10000, keys=False,
556567
engine=None, **kwargs):
557568
""" Get messages and keys (optional) from Kafka in batches
@@ -584,8 +595,11 @@ def from_kafka_batched(topic, consumer_params, poll_interval='1s',
584595
| group, each message will be passed to only one of them.
585596
poll_interval: number
586597
Seconds that elapse between polling Kafka for new messages
587-
npartitions: int
588-
Number of partitions in the topic
598+
npartitions: int (None)
599+
| Number of partitions in the topic.
600+
| If None, streamz will poll Kafka to get the number of partitions.
601+
| As of now, streamz does not support changing number of partitions on the fly.
602+
| It is recommended to restart the stream after changing the number of partitions.
589603
start: bool (False)
590604
Whether to start polling upon instantiation
591605
max_batch_size: int
@@ -616,20 +630,19 @@ def from_kafka_batched(topic, consumer_params, poll_interval='1s',
616630
617631
| More information at: https://rapids.ai/start.html
618632
619-
620633
Important Kafka Configurations
621634
----------
622-
If 'auto.offset.reset': 'latest' is set in the consumer configs,
623-
the stream starts reading messages from the latest offset. Else,
624-
if it's set to 'earliest', it will read from the start offset.
625-
635+
By default, a stream will start reading from the latest offsets
636+
available. Please set 'auto.offset.reset': 'earliest' in the
637+
consumer configs, if the stream needs to start processing from
638+
the earliest offsets.
626639
627640
Examples
628641
----------
629642
630643
>>> source = Stream.from_kafka_batched('mytopic',
631644
... {'bootstrap.servers': 'localhost:9092',
632-
... 'group.id': 'streamz'}, npartitions=4) # doctest: +SKIP
645+
... 'group.id': 'streamz'}) # doctest: +SKIP
633646
634647
"""
635648
if dask:

streamz/tests/test_kafka.py

Lines changed: 64 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import os
55
import pytest
66
import random
7-
import requests
87
import shlex
98
import subprocess
109
import time
@@ -22,16 +21,6 @@
2221
ck = pytest.importorskip('confluent_kafka')
2322

2423

25-
def download_kafka(target):
26-
r = requests.get('http://apache.mirror.globo.tech/kafka/1.0.0/'
27-
'%s.tgz' % KAFKA_FILE, stream=True)
28-
with open(target, 'wb') as f:
29-
for chunk in r.iter_content(2 ** 20):
30-
f.write(chunk)
31-
subprocess.check_call(['tar', 'xzf', KAFKA_FILE],
32-
cwd=os.path.dirname(target))
33-
34-
3524
def stop_docker(name='streamz-kafka', cid=None, let_fail=False):
3625
"""Stop docker container with given name tag
3726
@@ -61,6 +50,7 @@ def stop_docker(name='streamz-kafka', cid=None, let_fail=False):
6150

6251
def launch_kafka():
6352
stop_docker(let_fail=True)
53+
subprocess.call(shlex.split("docker pull spotify/kafka"))
6454
cmd = ("docker run -d -p 2181:2181 -p 9092:9092 --env "
6555
"ADVERTISED_HOST=127.0.0.1 --env ADVERTISED_PORT=9092 "
6656
"--name streamz-kafka spotify/kafka")
@@ -244,6 +234,61 @@ def test_kafka_dask_batch(c, s, w1, w2):
244234
stream.upstream.stopped = True
245235

246236

237+
def test_kafka_batch_npartitions():
238+
j1 = random.randint(0, 10000)
239+
ARGS1 = {'bootstrap.servers': 'localhost:9092',
240+
'group.id': 'streamz-test%i' % j1,
241+
'enable.auto.commit': False,
242+
'auto.offset.reset': 'earliest'}
243+
j2 = j1 + 1
244+
ARGS2 = {'bootstrap.servers': 'localhost:9092',
245+
'group.id': 'streamz-test%i' % j2,
246+
'enable.auto.commit': False,
247+
'auto.offset.reset': 'earliest'}
248+
with kafka_service() as kafka:
249+
kafka, TOPIC = kafka
250+
251+
TOPIC = "test-partitions"
252+
subprocess.call(shlex.split("docker exec streamz-kafka "
253+
"/opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh "
254+
"--create --zookeeper localhost:2181 "
255+
"--replication-factor 1 --partitions 2 "
256+
"--topic test-partitions"))
257+
time.sleep(5)
258+
259+
for i in range(10):
260+
if i % 2 == 0:
261+
kafka.produce(TOPIC, b'value-%d' % i, partition=0)
262+
else:
263+
kafka.produce(TOPIC, b'value-%d' % i, partition=1)
264+
kafka.flush()
265+
266+
with pytest.raises(ValueError):
267+
stream1 = Stream.from_kafka_batched(TOPIC, ARGS1,
268+
asynchronous=True,
269+
npartitions=0)
270+
stream1.gather().sink_to_list()
271+
stream1.start()
272+
273+
stream2 = Stream.from_kafka_batched(TOPIC, ARGS1,
274+
asynchronous=True,
275+
npartitions=1)
276+
out2 = stream2.gather().sink_to_list()
277+
stream2.start()
278+
time.sleep(5)
279+
assert (len(out2) == 1 and len(out2[0]) == 5)
280+
stream2.upstream.stopped = True
281+
282+
stream3 = Stream.from_kafka_batched(TOPIC, ARGS2,
283+
asynchronous=True,
284+
npartitions=4)
285+
out3 = stream3.gather().sink_to_list()
286+
stream3.start()
287+
time.sleep(5)
288+
assert (len(out3) == 2 and (len(out3[0]) + len(out3[1])) == 10)
289+
stream3.upstream.stopped = True
290+
291+
247292
def test_kafka_batch_checkpointing_sync_nodes():
248293
'''
249294
Streams 1 and 3 have different consumer groups, while Stream 2
@@ -254,11 +299,13 @@ def test_kafka_batch_checkpointing_sync_nodes():
254299
j1 = random.randint(0, 10000)
255300
ARGS1 = {'bootstrap.servers': 'localhost:9092',
256301
'group.id': 'streamz-test%i' % j1,
257-
'enable.auto.commit': False}
302+
'enable.auto.commit': False,
303+
'auto.offset.reset': 'earliest'}
258304
j2 = j1 + 1
259305
ARGS2 = {'bootstrap.servers': 'localhost:9092',
260306
'group.id': 'streamz-test%i' % j2,
261-
'enable.auto.commit': False}
307+
'enable.auto.commit': False,
308+
'auto.offset.reset': 'earliest'}
262309
with kafka_service() as kafka:
263310
kafka, TOPIC = kafka
264311
for i in range(10):
@@ -291,11 +338,13 @@ def test_kafka_dask_checkpointing_sync_nodes(c, s, w1, w2):
291338
j1 = random.randint(0, 10000)
292339
ARGS1 = {'bootstrap.servers': 'localhost:9092',
293340
'group.id': 'streamz-test%i' % j1,
294-
'enable.auto.commit': False}
341+
'enable.auto.commit': False,
342+
'auto.offset.reset': 'earliest'}
295343
j2 = j1 + 1
296344
ARGS2 = {'bootstrap.servers': 'localhost:9092',
297345
'group.id': 'streamz-test%i' % j2,
298-
'enable.auto.commit': False}
346+
'enable.auto.commit': False,
347+
'auto.offset.reset': 'earliest'}
299348
with kafka_service() as kafka:
300349
kafka, TOPIC = kafka
301350
for i in range(10):

0 commit comments

Comments
 (0)