Skip to content

Commit c65dbbc

Browse files
committed
Fixes issue 431 for kafka oauth source
1 parent e6515f4 commit c65dbbc

1 file changed

Lines changed: 4 additions & 2 deletions

File tree

streamz/sources.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,8 @@ def start(self):
452452
weakref.finalize(self, lambda consumer=self.consumer: _close_consumer(consumer))
453453
tp = ck.TopicPartition(self.topics[0], 0, 0)
454454

455-
# blocks for consumer thread to come up
455+
# blocks for consumer thread to come up and invoke poll to establish connection with broker to fetch oauth token for kafka
456+
self.consumer.poll()
456457
self.consumer.get_watermark_offsets(tp)
457458
self.loop.add_callback(self.poll_kafka)
458459

@@ -587,7 +588,8 @@ def start(self):
587588
self.stopped = False
588589
tp = ck.TopicPartition(self.topic, 0, 0)
589590

590-
# blocks for consumer thread to come up
591+
# blocks for consumer thread to come up and invoke poll to establish connection with broker to fetch oauth token for kafka
592+
self.consumer.poll()
591593
self.consumer.get_watermark_offsets(tp)
592594
self.loop.add_callback(self.poll_kafka)
593595

0 commit comments

Comments
 (0)