Skip to content

Commit 621b669

Browse files
committed
Adding timeout for consumer.pol() when establishing connection with broker
1 parent e12cdb6 commit 621b669

1 file changed

Lines changed: 2 additions & 2 deletions

File tree

streamz/sources.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,7 @@ def start(self):
456456

457457
# blocks for consumer thread to come up and invoke poll to
458458
# establish connection with broker to fetch oauth token for kafka
459-
self.consumer.poll()
459+
self.consumer.poll(timeout=1)
460460
self.consumer.get_watermark_offsets(tp)
461461
self.loop.add_callback(self.poll_kafka)
462462

@@ -594,7 +594,7 @@ def start(self):
594594

595595
# blocks for consumer thread to come up and invoke poll to establish
596596
# connection with broker to fetch oauth token for kafka
597-
self.consumer.poll()
597+
self.consumer.poll(timeout=1)
598598
self.consumer.get_watermark_offsets(tp)
599599
self.loop.add_callback(self.poll_kafka)
600600

0 commit comments

Comments
 (0)