Skip to content

Commit 6a3b1ae

Browse files
Add refresh_cycles parameter to from_kafka_batched
1 parent f9f0c60 commit 6a3b1ae

2 files changed

Lines changed: 82 additions & 6 deletions

File tree

streamz/sources.py

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,8 @@ def _close_consumer(self):
453453
class FromKafkaBatched(Stream):
454454
"""Base class for both local and cluster-based batched kafka processing"""
455455
def __init__(self, topic, consumer_params, poll_interval='1s',
456-
npartitions=None, max_batch_size=10000, keys=False,
456+
npartitions=None, refresh_cycles=None,
457+
max_batch_size=10000, keys=False,
457458
engine=None, **kwargs):
458459
self.consumer_params = consumer_params
459460
# Override the auto-commit config to enforce custom streamz checkpointing
@@ -462,6 +463,7 @@ def __init__(self, topic, consumer_params, poll_interval='1s',
462463
consumer_params['auto.offset.reset'] = 'latest'
463464
self.topic = topic
464465
self.npartitions = npartitions
466+
self.refresh_cycles = refresh_cycles
465467
if self.npartitions is not None and self.npartitions <= 0:
466468
raise ValueError("Number of Kafka topic partitions must be > 0.")
467469
self.poll_interval = convert_interval(poll_interval)
@@ -509,8 +511,21 @@ def checkpoint_emit(_part):
509511
break
510512

511513
try:
514+
if self.refresh_cycles is not None:
515+
cycles = 0
512516
while not self.stopped:
513517
out = []
518+
519+
if self.refresh_cycles is not None and cycles == 0:
520+
kafka_cluster_metadata = self.consumer.list_topics(self.topic)
521+
if self.engine == "cudf": # pragma: no cover
522+
new_partitions = len(kafka_cluster_metadata[self.topic.encode('utf-8')])
523+
else:
524+
new_partitions = len(kafka_cluster_metadata.topics[self.topic].partitions)
525+
if new_partitions > self.npartitions:
526+
self.positions.extend([-1001] * (new_partitions - self.npartitions))
527+
self.npartitions = new_partitions
528+
514529
for partition in range(self.npartitions):
515530
tp = ck.TopicPartition(self.topic, partition, 0)
516531
try:
@@ -533,6 +548,9 @@ def checkpoint_emit(_part):
533548
self.positions[partition] = high
534549
self.consumer_params['auto.offset.reset'] = 'earliest'
535550

551+
if self.refresh_cycles is not None:
552+
cycles = (cycles + 1) % self.refresh_cycles
553+
536554
for part in out:
537555
yield self.loop.add_callback(checkpoint_emit, part)
538556

@@ -562,7 +580,8 @@ def start(self):
562580

563581
@Stream.register_api(staticmethod)
564582
def from_kafka_batched(topic, consumer_params, poll_interval='1s',
565-
npartitions=None, start=False, dask=False,
583+
npartitions=None, refresh_cycles=None,
584+
start=False, dask=False,
566585
max_batch_size=10000, keys=False,
567586
engine=None, **kwargs):
568587
""" Get messages and keys (optional) from Kafka in batches
@@ -598,8 +617,14 @@ def from_kafka_batched(topic, consumer_params, poll_interval='1s',
598617
npartitions: int (None)
599618
| Number of partitions in the topic.
600619
| If None, streamz will poll Kafka to get the number of partitions.
601-
| As of now, streamz does not support changing number of partitions on the fly.
602-
| It is recommended to restart the stream after changing the number of partitions.
620+
refresh_cycles: int (None)
621+
| Useful if the user expects to increase the number of partitions on the fly,
622+
| maybe to handle spikes in load, etc. Streamz polls Kafka after every
623+
| 'refresh cycles' number of batches to determine the current number of topic
624+
| partitions. If partitions have been added, streamz will automatically start
625+
| reading data from the new partitions as well.
626+
| If set to None, streamz will not accommodate changing partitions on the fly.
627+
| It is recommended to restart the stream after decreasing the number of partitions.
603628
start: bool (False)
604629
Whether to start polling upon instantiation
605630
max_batch_size: int
@@ -631,7 +656,6 @@ def from_kafka_batched(topic, consumer_params, poll_interval='1s',
631656
| More information at: https://rapids.ai/start.html
632657
633658
Important Kafka Configurations
634-
----------
635659
By default, a stream will start reading from the latest offsets
636660
available. Please set 'auto.offset.reset': 'earliest' in the
637661
consumer configs, if the stream needs to start processing from
@@ -651,6 +675,7 @@ def from_kafka_batched(topic, consumer_params, poll_interval='1s',
651675
source = FromKafkaBatched(topic, consumer_params,
652676
poll_interval=poll_interval,
653677
npartitions=npartitions,
678+
refresh_cycles=refresh_cycles,
654679
max_batch_size=max_batch_size,
655680
keys=keys,
656681
engine=engine,

streamz/tests/test_kafka.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ def kafka_service():
8787
"Kafka not available. "
8888
"To launch kafka use `export STREAMZ_LAUNCH_KAFKA=true`")
8989

90-
producer = ck.Producer({'bootstrap.servers': 'localhost:9092'})
90+
producer = ck.Producer({'bootstrap.servers': 'localhost:9092',
91+
'topic.metadata.refresh.interval.ms': '5000'})
9192
producer.produce('test-start-kafka', b'test')
9293
out = producer.flush(10)
9394
if out > 0:
@@ -289,6 +290,56 @@ def test_kafka_batch_npartitions():
289290
stream3.upstream.stopped = True
290291

291292

293+
def test_kafka_refresh_cycles():
294+
j1 = random.randint(0, 10000)
295+
ARGS = {'bootstrap.servers': 'localhost:9092',
296+
'group.id': 'streamz-test%i' % j1,
297+
'enable.auto.commit': False,
298+
'auto.offset.reset': 'earliest'}
299+
with kafka_service() as kafka:
300+
kafka, TOPIC = kafka
301+
TOPIC = "test-partitions"
302+
subprocess.call(shlex.split("docker exec streamz-kafka "
303+
"/opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh "
304+
"--create --zookeeper localhost:2181 "
305+
"--replication-factor 1 --partitions 2 "
306+
"--topic test-partitions"))
307+
time.sleep(2)
308+
309+
for i in range(10):
310+
if i % 2 == 0:
311+
kafka.produce(TOPIC, b'value-%d' % i, partition=0)
312+
else:
313+
kafka.produce(TOPIC, b'value-%d' % i, partition=1)
314+
kafka.flush()
315+
316+
stream = Stream.from_kafka_batched(TOPIC, ARGS,
317+
asynchronous=True,
318+
refresh_cycles=1,
319+
poll_interval='2s')
320+
out = stream.gather().sink_to_list()
321+
stream.start()
322+
time.sleep(5)
323+
assert (len(out) == 2 and (len(out[0]) + len(out[1])) == 10)
324+
325+
subprocess.call(shlex.split("docker exec streamz-kafka "
326+
"/opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh "
327+
"--alter --zookeeper localhost:2181 "
328+
"--topic test-partitions --partitions 4"))
329+
time.sleep(5)
330+
331+
for i in range(10,20):
332+
if i % 2 == 0:
333+
kafka.produce(TOPIC, b'value-%d' % i, partition=2)
334+
else:
335+
kafka.produce(TOPIC, b'value-%d' % i, partition=3)
336+
kafka.flush()
337+
time.sleep(5)
338+
339+
assert (len(out) == 4 and (len(out[2]) + len(out[3])) == 10)
340+
stream.upstream.stopped = True
341+
342+
292343
def test_kafka_batch_checkpointing_sync_nodes():
293344
'''
294345
Streams 1 and 3 have different consumer groups, while Stream 2

0 commit comments

Comments
 (0)