We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
1 parent 91c0ba7 commit 3f2c913Copy full SHA for 3f2c913
1 file changed
streamz/sources.py
@@ -515,7 +515,8 @@ def checkpoint_emit(_part):
515
continue
516
if 'auto.offset.reset' in self.consumer_params.keys():
517
if self.consumer_params['auto.offset.reset'] == 'latest' and \
518
- self.positions == [-1001] * self.npartitions:
+ (self.positions == [-1001] * self.npartitions
519
+ or self.positions == [0] * self.npartitions):
520
self.positions[partition] = high
521
current_position = self.positions[partition]
522
lowest = max(current_position, low)
0 commit comments