Skip to content

Commit dffd264

Browse files
Make npartitions optional & fix Kafka checkpointing with auto.offset.reset=latest which is now default (2). No support for adding partitions on the fly
1 parent b834903 commit dffd264

2 files changed

Lines changed: 10 additions & 13 deletions

File tree

streamz/sources.py

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ def __init__(self, topic, consumer_params, poll_interval='1s',
459459
# Override the auto-commit config to enforce custom streamz checkpointing
460460
self.consumer_params['enable.auto.commit'] = 'false'
461461
if 'auto.offset.reset' not in self.consumer_params.keys():
462-
consumer_params['auto.offset.reset'] = 'earliest'
462+
consumer_params['auto.offset.reset'] = 'latest'
463463
self.topic = topic
464464
self.npartitions = npartitions
465465
self.poll_interval = convert_interval(poll_interval)
@@ -484,6 +484,11 @@ def checkpoint_emit(_part):
484484
ref = RefCounter(cb=lambda: commit(_part))
485485
yield self._emit(_part, metadata=[{'ref': ref}])
486486

487+
if self.npartitions is None:
488+
kafka_cluster_metadata = self.consumer.list_topics(self.topic)
489+
self.npartitions = len(kafka_cluster_metadata.topics[self.topic].partitions)
490+
self.positions = [0] * self.npartitions
491+
487492
tps = []
488493
for partition in range(self.npartitions):
489494
tps.append(ck.TopicPartition(self.topic, partition))
@@ -501,10 +506,6 @@ def checkpoint_emit(_part):
501506
try:
502507
while not self.stopped:
503508
out = []
504-
# kafka_cluster_metadata = self.consumer.list_topics(self.topic)
505-
# self.npartitions = len(kafka_cluster_metadata.topics[self.topic].partitions)
506-
# if self.npartitions > len(self.positions):
507-
# self.positions.extend([0] * (self.npartitions - len(self.positions)))
508509
for partition in range(self.npartitions):
509510
tp = ck.TopicPartition(self.topic, partition, 0)
510511
try:
@@ -546,12 +547,6 @@ def start(self):
546547
else:
547548
self.consumer = ck.Consumer(self.consumer_params)
548549
self.stopped = False
549-
550-
if self.npartitions is None:
551-
kafka_cluster_metadata = self.consumer.list_topics(self.topic)
552-
self.npartitions = len(kafka_cluster_metadata.topics[self.topic].partitions)
553-
self.positions = [0] * self.npartitions
554-
555550
tp = ck.TopicPartition(self.topic, 0, 0)
556551

557552
# blocks for consumer thread to come up
@@ -597,6 +592,8 @@ def from_kafka_batched(topic, consumer_params, poll_interval='1s',
597592
npartitions: int (None)
598593
| Number of partitions in the topic.
599594
| If None, streamz will poll Kafka to get the number of partitions.
595+
| As of now, streamz does not support changing number of partitions on the fly.
596+
| It is recommended to restart the stream after changing the number of partitions.
600597
start: bool (False)
601598
Whether to start polling upon instantiation
602599
max_batch_size: int

streamz/tests/test_kafka.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ def test_kafka_batch():
223223
stream.upstream.stopped = True
224224

225225

226+
@flaky(max_runs=3, min_passes=1)
226227
@gen_cluster(client=True, timeout=60)
227228
def test_kafka_dask_batch(c, s, w1, w2):
228229
j = random.randint(0, 10000)
@@ -334,8 +335,7 @@ def test_kafka_batch_checkpointing_async_nodes_1():
334335
j = random.randint(0, 10000)
335336
ARGS = {'bootstrap.servers': 'localhost:9092',
336337
'group.id': 'streamz-test%i' % j,
337-
'enable.auto.commit': False,
338-
'auto.offset.reset': 'earliest'}
338+
'enable.auto.commit': False}
339339
with kafka_service() as kafka:
340340
kafka, TOPIC = kafka
341341
stream1 = Stream.from_kafka_batched(TOPIC, ARGS)

0 commit comments

Comments
 (0)