We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
1 parent 3f2c913 commit ccc7d42Copy full SHA for ccc7d42
1 file changed
streamz/sources.py
@@ -486,7 +486,10 @@ def checkpoint_emit(_part):
486
487
if self.npartitions is None:
488
kafka_cluster_metadata = self.consumer.list_topics(self.topic)
489
- self.npartitions = len(kafka_cluster_metadata.topics[self.topic].partitions)
+ if self.engine == "cudf": # pragma: no cover
490
+ self.npartitions = len(kafka_cluster_metadata[self.topic.encode('utf-8')])
491
+ else:
492
+ self.npartitions = len(kafka_cluster_metadata.topics[self.topic].partitions)
493
self.positions = [0] * self.npartitions
494
495
tps = []
0 commit comments