@@ -312,8 +312,9 @@ async def post(self):
312312 application = Application ([
313313 (self .path , Handler ),
314314 ])
315- self .server = HTTPServer (application , ** self .server_kwargs )
316- self .server .listen (self .port )
315+ server = HTTPServer (application , ** self .server_kwargs )
316+ server .listen (self .port )
317+ self .server = server
317318
318319 def stop (self ):
319320 """Shutdown HTTP server"""
@@ -447,7 +448,7 @@ def start(self):
447448 self .stopped = False
448449 self .consumer = ck .Consumer (self .cpars )
449450 self .consumer .subscribe (self .topics )
450- weakref .finalize (self , self .consumer . close )
451+ weakref .finalize (self , lambda consumer = self .consumer : _close_consumer ( consumer ) )
451452 tp = ck .TopicPartition (self .topics [0 ], 0 , 0 )
452453
453454 # blocks for consumer thread to come up
@@ -463,6 +464,13 @@ def _close_consumer(self):
463464 self .stopped = True
464465
465466
467+ def _close_consumer (consumer ):
468+ try :
469+ consumer .close ()
470+ except RuntimeError :
471+ pass
472+
473+
466474class FromKafkaBatched (Source ):
467475 """Base class for both local and cluster-based batched kafka processing"""
468476 def __init__ (self , topic , consumer_params , poll_interval = '1s' ,
@@ -574,7 +582,7 @@ def start(self):
574582 self .consumer = kafka .Consumer (self .consumer_params )
575583 else :
576584 self .consumer = ck .Consumer (self .consumer_params )
577- weakref .finalize (self , self .consumer . close )
585+ weakref .finalize (self , lambda consumer = self .consumer : _close_consumer ( consumer ) )
578586 self .stopped = False
579587 tp = ck .TopicPartition (self .topic , 0 , 0 )
580588
0 commit comments