Skip to content

Commit 1e4ed3b

Browse files
author
Martin Durant
committed
squash finalise warnign
1 parent 765924d commit 1e4ed3b

2 files changed

Lines changed: 11 additions & 4 deletions

File tree

streamz/sources.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ def start(self):
447447
self.stopped = False
448448
self.consumer = ck.Consumer(self.cpars)
449449
self.consumer.subscribe(self.topics)
450-
weakref.finalize(self, self.consumer.close)
450+
weakref.finalize(self, lambda consumer=self.consumer: _close_consumer(consumer))
451451
tp = ck.TopicPartition(self.topics[0], 0, 0)
452452

453453
# blocks for consumer thread to come up
@@ -463,6 +463,13 @@ def _close_consumer(self):
463463
self.stopped = True
464464

465465

466+
def _close_consumer(consumer):
467+
try:
468+
consumer.close()
469+
except RuntimeError:
470+
pass
471+
472+
466473
class FromKafkaBatched(Source):
467474
"""Base class for both local and cluster-based batched kafka processing"""
468475
def __init__(self, topic, consumer_params, poll_interval='1s',
@@ -574,7 +581,7 @@ def start(self):
574581
self.consumer = kafka.Consumer(self.consumer_params)
575582
else:
576583
self.consumer = ck.Consumer(self.consumer_params)
577-
weakref.finalize(self, self.consumer.close)
584+
weakref.finalize(self, lambda consumer=self.consumer: _close_consumer(consumer))
578585
self.stopped = False
579586
tp = ck.TopicPartition(self.topic, 0, 0)
580587

streamz/tests/test_sources.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ def test_periodic():
1313
l = s.sink_to_list()
1414
assert s.stopped
1515
s.start()
16-
wait_for(lambda: l, 0.11, period=0.01)
17-
wait_for(lambda: len(l) > 1, 0.11, period=0.01)
16+
wait_for(lambda: l, 0.3, period=0.01)
17+
wait_for(lambda: len(l) > 1, 0.3, period=0.01)
1818
assert all(l)
1919

2020

0 commit comments

Comments
 (0)