@@ -449,10 +449,14 @@ def start(self):
449449 self .stopped = False
450450 self .consumer = ck .Consumer (self .cpars )
451451 self .consumer .subscribe (self .topics )
452- weakref .finalize (self , lambda consumer = self .consumer : _close_consumer (consumer ))
452+ weakref .finalize (
453+ self , lambda consumer = self .consumer : _close_consumer (consumer )
454+ )
453455 tp = ck .TopicPartition (self .topics [0 ], 0 , 0 )
454456
455- # blocks for consumer thread to come up
457+ # blocks for consumer thread to come up and invoke poll to
458+ # establish connection with broker to fetch oauth token for kafka
459+ self .consumer .poll (timeout = 1 )
456460 self .consumer .get_watermark_offsets (tp )
457461 self .loop .add_callback (self .poll_kafka )
458462
@@ -479,7 +483,8 @@ def __init__(self, topic, consumer_params, poll_interval='1s',
479483 max_batch_size = 10000 , keys = False ,
480484 engine = None , ** kwargs ):
481485 self .consumer_params = consumer_params
482- # Override the auto-commit config to enforce custom streamz checkpointing
486+ # Override the auto-commit config to enforce custom streamz
487+ # checkpointing
483488 self .consumer_params ['enable.auto.commit' ] = 'false'
484489 if 'auto.offset.reset' not in self .consumer_params .keys ():
485490 consumer_params ['auto.offset.reset' ] = 'latest'
@@ -587,7 +592,9 @@ def start(self):
587592 self .stopped = False
588593 tp = ck .TopicPartition (self .topic , 0 , 0 )
589594
590- # blocks for consumer thread to come up
595+ # blocks for consumer thread to come up and invoke poll to establish
596+ # connection with broker to fetch oauth token for kafka
597+ self .consumer .poll (timeout = 1 )
591598 self .consumer .get_watermark_offsets (tp )
592599 self .loop .add_callback (self .poll_kafka )
593600
0 commit comments