@@ -453,7 +453,7 @@ def _close_consumer(self):
453453class FromKafkaBatched (Stream ):
454454 """Base class for both local and cluster-based batched kafka processing"""
455455 def __init__ (self , topic , consumer_params , poll_interval = '1s' ,
456- npartitions = None , refresh_cycles = None ,
456+ npartitions = None , check_npartitions_every = None ,
457457 max_batch_size = 10000 , keys = False ,
458458 engine = None , ** kwargs ):
459459 self .consumer_params = consumer_params
@@ -463,7 +463,7 @@ def __init__(self, topic, consumer_params, poll_interval='1s',
463463 consumer_params ['auto.offset.reset' ] = 'latest'
464464 self .topic = topic
465465 self .npartitions = npartitions
466- self .refresh_cycles = refresh_cycles
466+ self .check_npartitions_every = check_npartitions_every
467467 if self .npartitions is not None and self .npartitions <= 0 :
468468 raise ValueError ("Number of Kafka topic partitions must be > 0." )
469469 self .poll_interval = convert_interval (poll_interval )
@@ -511,12 +511,12 @@ def checkpoint_emit(_part):
511511 break
512512
513513 try :
514- if self .refresh_cycles is not None :
514+ if self .check_npartitions_every is not None :
515515 cycles = 0
516516 while not self .stopped :
517517 out = []
518518
519- if self .refresh_cycles is not None and cycles == 0 :
519+ if self .check_npartitions_every is not None and cycles == 0 :
520520 kafka_cluster_metadata = self .consumer .list_topics (self .topic )
521521 if self .engine == "cudf" : # pragma: no cover
522522 new_partitions = len (kafka_cluster_metadata [self .topic .encode ('utf-8' )])
@@ -548,8 +548,8 @@ def checkpoint_emit(_part):
548548 self .positions [partition ] = high
549549 self .consumer_params ['auto.offset.reset' ] = 'earliest'
550550
551- if self .refresh_cycles is not None :
552- cycles = (cycles + 1 ) % self .refresh_cycles
551+ if self .check_npartitions_every is not None :
552+ cycles = (cycles + 1 ) % self .check_npartitions_every
553553
554554 for part in out :
555555 yield self .loop .add_callback (checkpoint_emit , part )
@@ -580,7 +580,7 @@ def start(self):
580580
581581@Stream .register_api (staticmethod )
582582def from_kafka_batched (topic , consumer_params , poll_interval = '1s' ,
583- npartitions = None , refresh_cycles = None ,
583+ npartitions = None , check_npartitions_every = None ,
584584 start = False , dask = False ,
585585 max_batch_size = 10000 , keys = False ,
586586 engine = None , ** kwargs ):
@@ -617,12 +617,12 @@ def from_kafka_batched(topic, consumer_params, poll_interval='1s',
617617 npartitions: int (None)
618618 | Number of partitions in the topic.
619619 | If None, streamz will poll Kafka to get the number of partitions.
620- refresh_cycles : int (None)
620+ check_npartitions_every : int (None)
621621 | Useful if the user expects to increase the number of partitions on the fly,
622622 | maybe to handle spikes in load, etc. Streamz polls Kafka after every
623- | 'refresh cycles ' number of batches to determine the current number of topic
624- | partitions. If partitions have been added, streamz will automatically start
625- | reading data from the new partitions as well.
623+ | 'check_npartitions_every ' number of batches/cycles to determine the current
624+ | number of topic partitions. If partitions have been added, streamz will
625+ | automatically start reading data from the new partitions as well.
626626 | If set to None, streamz will not accommodate changing partitions on the fly.
627627 | It is recommended to restart the stream after decreasing the number of partitions.
628628 start: bool (False)
@@ -675,7 +675,7 @@ def from_kafka_batched(topic, consumer_params, poll_interval='1s',
675675 source = FromKafkaBatched (topic , consumer_params ,
676676 poll_interval = poll_interval ,
677677 npartitions = npartitions ,
678- refresh_cycles = refresh_cycles ,
678+ check_npartitions_every = check_npartitions_every ,
679679 max_batch_size = max_batch_size ,
680680 keys = keys ,
681681 engine = engine ,
0 commit comments