Skip to content

Commit 13ff9df

Browse files
author
Martin Durant
committed
kafka fixes
1 parent b2037d7 commit 13ff9df

3 files changed

Lines changed: 63 additions & 50 deletions

File tree

streamz/core.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -512,8 +512,10 @@ def disconnect(self, downstream):
512512

513513
@property
514514
def upstream(self):
515-
if len(self.upstreams) != 1:
515+
if len(self.upstreams) > 1:
516516
raise ValueError("Stream has multiple upstreams")
517+
elif len(self.upstreams) == 0:
518+
return None
517519
else:
518520
return self.upstreams[0]
519521

@@ -535,6 +537,13 @@ def remove(self, predicate):
535537
""" Only pass through elements for which the predicate returns False """
536538
return self.filter(lambda x: not predicate(x))
537539

540+
def stop(self):
541+
"""Call on any stream node to halt all upstream sources"""
542+
prev, s = self.upstream, self
543+
while s:
544+
prev, s = s, s.upstream
545+
prev.stopped = True
546+
538547
@property
539548
def scan(self):
540549
return self.accumulate

streamz/sources.py

Lines changed: 42 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import os
44
import time
55
from tornado import gen
6+
import weakref
67

78
from .core import Stream, convert_interval, RefCounter
89

@@ -446,6 +447,7 @@ def start(self):
446447
self.stopped = False
447448
self.consumer = ck.Consumer(self.cpars)
448449
self.consumer.subscribe(self.topics)
450+
weakref.finalize(self, self.consumer.close)
449451
tp = ck.TopicPartition(self.topics[0], 0, 0)
450452

451453
# blocks for consumer thread to come up
@@ -496,7 +498,7 @@ def commit(_part):
496498

497499
@gen.coroutine
498500
def checkpoint_emit(_part):
499-
ref = RefCounter(cb=lambda: commit(_part))
501+
ref = RefCounter(cb=lambda: commit(_part), loop=self.loop)
500502
yield self._emit(_part, metadata=[{'ref': ref}])
501503

502504
if self.npartitions is None:
@@ -521,61 +523,58 @@ def checkpoint_emit(_part):
521523
self.positions[tp.partition] = tp.offset
522524
break
523525

524-
try:
525-
while not self.stopped:
526-
out = []
527-
528-
if self.refresh_partitions:
529-
kafka_cluster_metadata = self.consumer.list_topics(self.topic)
530-
if self.engine == "cudf": # pragma: no cover
531-
new_partitions = len(kafka_cluster_metadata[self.topic.encode('utf-8')])
532-
else:
533-
new_partitions = len(kafka_cluster_metadata.topics[self.topic].partitions)
534-
if new_partitions > self.npartitions:
535-
self.positions.extend([-1001] * (new_partitions - self.npartitions))
536-
self.npartitions = new_partitions
526+
while not self.stopped:
527+
out = []
537528

538-
for partition in range(self.npartitions):
539-
tp = ck.TopicPartition(self.topic, partition, 0)
540-
try:
541-
low, high = self.consumer.get_watermark_offsets(
542-
tp, timeout=0.1)
543-
except (RuntimeError, ck.KafkaException):
544-
continue
545-
self.started = True
546-
if 'auto.offset.reset' in self.consumer_params.keys():
547-
if self.consumer_params['auto.offset.reset'] == 'latest' and \
548-
self.positions[partition] == -1001:
549-
self.positions[partition] = high
550-
current_position = self.positions[partition]
551-
lowest = max(current_position, low)
552-
if high > lowest + self.max_batch_size:
553-
high = lowest + self.max_batch_size
554-
if high > lowest:
555-
out.append((self.consumer_params, self.topic, partition,
556-
self.keys, lowest, high - 1))
529+
if self.refresh_partitions:
530+
kafka_cluster_metadata = self.consumer.list_topics(self.topic)
531+
if self.engine == "cudf": # pragma: no cover
532+
new_partitions = len(kafka_cluster_metadata[self.topic.encode('utf-8')])
533+
else:
534+
new_partitions = len(kafka_cluster_metadata.topics[self.topic].partitions)
535+
if new_partitions > self.npartitions:
536+
self.positions.extend([-1001] * (new_partitions - self.npartitions))
537+
self.npartitions = new_partitions
538+
539+
for partition in range(self.npartitions):
540+
tp = ck.TopicPartition(self.topic, partition, 0)
541+
try:
542+
low, high = self.consumer.get_watermark_offsets(
543+
tp, timeout=0.1)
544+
except (RuntimeError, ck.KafkaException):
545+
continue
546+
self.started = True
547+
if 'auto.offset.reset' in self.consumer_params.keys():
548+
if self.consumer_params['auto.offset.reset'] == 'latest' and \
549+
self.positions[partition] == -1001:
557550
self.positions[partition] = high
558-
self.consumer_params['auto.offset.reset'] = 'earliest'
551+
current_position = self.positions[partition]
552+
lowest = max(current_position, low)
553+
if high > lowest + self.max_batch_size:
554+
high = lowest + self.max_batch_size
555+
if high > lowest:
556+
out.append((self.consumer_params, self.topic, partition,
557+
self.keys, lowest, high - 1))
558+
self.positions[partition] = high
559+
self.consumer_params['auto.offset.reset'] = 'earliest'
560+
561+
for part in out:
562+
yield self.loop.add_callback(checkpoint_emit, part)
559563

560-
for part in out:
561-
yield self.loop.add_callback(checkpoint_emit, part)
562-
563-
else:
564-
yield gen.sleep(self.poll_interval)
565-
finally:
566-
self.consumer.unsubscribe()
567-
self.consumer.close()
564+
else:
565+
yield gen.sleep(self.poll_interval)
568566

569567
def start(self):
570568
import confluent_kafka as ck
571569
if self.engine == "cudf": # pragma: no cover
572570
from custreamz import kafka
573571

574572
if self.stopped:
575-
if self.engine == "cudf": # pragma: no cover
573+
if self.engine == "cudf": # pragma: no cover
576574
self.consumer = kafka.Consumer(self.consumer_params)
577575
else:
578576
self.consumer = ck.Consumer(self.consumer_params)
577+
weakref.finalize(self, self.consumer.close)
579578
self.stopped = False
580579
tp = ck.TopicPartition(self.topic, 0, 0)
581580

streamz/tests/test_kafka.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,9 @@ def test_from_kafka():
116116
stream = Stream.from_kafka([TOPIC], ARGS, asynchronous=True)
117117
out = stream.sink_to_list()
118118
stream.start()
119-
yield gen.sleep(0.1) # for loop to run
119+
yield gen.sleep(1.1) # for loop to run
120120
for i in range(10):
121-
yield gen.sleep(0.2)
121+
yield gen.sleep(0.1) # small pause ensures correct ordering
122122
kafka.produce(TOPIC, b'value-%d' % i)
123123
kafka.flush()
124124
# it takes some time for messages to come back out of kafka
@@ -168,7 +168,9 @@ def test_from_kafka_thread():
168168
stream = Stream.from_kafka([TOPIC], ARGS)
169169
out = stream.sink_to_list()
170170
stream.start()
171+
yield gen.sleep(1.1)
171172
for i in range(10):
173+
yield gen.sleep(0.1)
172174
kafka.produce(TOPIC, b'value-%d' % i)
173175
kafka.flush()
174176
# it takes some time for messages to come back out of kafka
@@ -231,8 +233,10 @@ def test_kafka_dask_batch(c, s, w1, w2):
231233
kafka.produce(TOPIC, b'value-%d' % i)
232234
kafka.flush()
233235
yield await_for(lambda: any(out), 10, period=0.2)
234-
assert {'key':None, 'value':b'value-1'} in out[0]
235-
stream.upstream.stopped = True
236+
assert {'key': None, 'value': b'value-1'} in out[0]
237+
stream.stop()
238+
yield gen.sleep(0)
239+
stream.upstream.upstream.consumer.close()
236240

237241

238242
def test_kafka_batch_npartitions():
@@ -551,11 +555,12 @@ def test_kafka_batch_checkpointing_async_nodes_2():
551555
assert committed3[1].offset == 1
552556

553557

558+
@flaky(max_runs=3, min_passes=1)
554559
def test_kafka_checkpointing_auto_offset_reset_latest():
555-
'''
560+
"""
556561
Testing whether checkpointing works as expected with multiple topic partitions and
557562
with auto.offset.reset configuration set to latest (also default).
558-
'''
563+
"""
559564
j = random.randint(0, 10000)
560565
ARGS = {'bootstrap.servers': 'localhost:9092',
561566
'group.id': 'streamz-test%i' % j,

0 commit comments

Comments
 (0)