Skip to content

Commit 9743bc1

Browse files
author
Martin Durant
committed
megre/fix
1 parent 6030a13 commit 9743bc1

1 file changed

Lines changed: 7 additions & 3 deletions

File tree

streamz/sources.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -449,10 +449,13 @@ def start(self):
449449
self.stopped = False
450450
self.consumer = ck.Consumer(self.cpars)
451451
self.consumer.subscribe(self.topics)
452-
weakref.finalize(self, lambda consumer=self.consumer: _close_consumer(consumer))
452+
weakref.finalize(
453+
self, lambda consumer=self.consumer: _close_consumer(consumer)
454+
)
453455
tp = ck.TopicPartition(self.topics[0], 0, 0)
454456

455-
# blocks for consumer thread to come up and invoke poll to establish connection with broker to fetch oauth token for kafka
457+
# blocks for consumer thread to come up and invoke poll to
458+
# establish connection with broker to fetch oauth token for kafka
456459
self.consumer.poll()
457460
self.consumer.get_watermark_offsets(tp)
458461
self.loop.add_callback(self.poll_kafka)
@@ -480,7 +483,8 @@ def __init__(self, topic, consumer_params, poll_interval='1s',
480483
max_batch_size=10000, keys=False,
481484
engine=None, **kwargs):
482485
self.consumer_params = consumer_params
483-
# Override the auto-commit config to enforce custom streamz checkpointing
486+
# Override the auto-commit config to enforce custom streamz
487+
# checkpointing
484488
self.consumer_params['enable.auto.commit'] = 'false'
485489
if 'auto.offset.reset' not in self.consumer_params.keys():
486490
consumer_params['auto.offset.reset'] = 'latest'

0 commit comments

Comments
 (0)