Skip to content

Commit a202a02

Browse files
npartitions test
1 parent ccc7d42 commit a202a02

2 files changed

Lines changed: 60 additions & 11 deletions

File tree

streamz/sources.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,8 @@ def __init__(self, topic, consumer_params, poll_interval='1s',
462462
consumer_params['auto.offset.reset'] = 'latest'
463463
self.topic = topic
464464
self.npartitions = npartitions
465+
if self.npartitions is not None and self.npartitions <= 0:
466+
raise ValueError("Number of Kafka topic partitions must be > 0.")
465467
self.poll_interval = convert_interval(poll_interval)
466468
self.max_batch_size = max_batch_size
467469
self.keys = keys

streamz/tests/test_kafka.py

Lines changed: 58 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,6 @@
2222
ck = pytest.importorskip('confluent_kafka')
2323

2424

25-
def download_kafka(target):
26-
r = requests.get('http://apache.mirror.globo.tech/kafka/1.0.0/'
27-
'%s.tgz' % KAFKA_FILE, stream=True)
28-
with open(target, 'wb') as f:
29-
for chunk in r.iter_content(2 ** 20):
30-
f.write(chunk)
31-
subprocess.check_call(['tar', 'xzf', KAFKA_FILE],
32-
cwd=os.path.dirname(target))
33-
34-
3525
def stop_docker(name='streamz-kafka', cid=None, let_fail=False):
3626
"""Stop docker container with given name tag
3727
@@ -61,6 +51,7 @@ def stop_docker(name='streamz-kafka', cid=None, let_fail=False):
6151

6252
def launch_kafka():
6353
stop_docker(let_fail=True)
54+
os.system("docker pull spotify/kafka")
6455
cmd = ("docker run -d -p 2181:2181 -p 9092:9092 --env "
6556
"ADVERTISED_HOST=127.0.0.1 --env ADVERTISED_PORT=9092 "
6657
"--name streamz-kafka spotify/kafka")
@@ -223,7 +214,6 @@ def test_kafka_batch():
223214
stream.upstream.stopped = True
224215

225216

226-
@flaky(max_runs=3, min_passes=1)
227217
@gen_cluster(client=True, timeout=60)
228218
def test_kafka_dask_batch(c, s, w1, w2):
229219
j = random.randint(0, 10000)
@@ -245,6 +235,63 @@ def test_kafka_dask_batch(c, s, w1, w2):
245235
stream.upstream.stopped = True
246236

247237

238+
def test_kafka_batch_npartitions():
239+
j1 = random.randint(0, 10000)
240+
ARGS1 = {'bootstrap.servers': 'localhost:9092',
241+
'group.id': 'streamz-test%i' % j1,
242+
'enable.auto.commit': False,
243+
'auto.offset.reset': 'earliest'}
244+
j2 = j1 + 1
245+
ARGS2 = {'bootstrap.servers': 'localhost:9092',
246+
'group.id': 'streamz-test%i' % j2,
247+
'enable.auto.commit': False,
248+
'auto.offset.reset': 'earliest'}
249+
with kafka_service() as kafka:
250+
kafka, TOPIC = kafka
251+
252+
TOPIC = "test-partitions"
253+
os.system("docker exec streamz-kafka ls /opt")
254+
os.system("docker exec streamz-kafka /opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh "
255+
"--create --zookeeper localhost:2181 "
256+
"--replication-factor 1 --partitions 2 --topic test-partitions")
257+
time.sleep(5)
258+
259+
for i in range(10):
260+
if i%2 == 0:
261+
kafka.produce(TOPIC, b'value-%d' % i, partition=0)
262+
else:
263+
kafka.produce(TOPIC, b'value-%d' % i, partition=1)
264+
kafka.flush()
265+
266+
try:
267+
stream1 = Stream.from_kafka_batched(TOPIC, ARGS1,
268+
asynchronous=True,
269+
npartitions=0)
270+
stream1.gather().sink_to_list()
271+
stream1.start()
272+
except ValueError as e:
273+
assert type(e) is ValueError
274+
pass
275+
276+
stream2 = Stream.from_kafka_batched(TOPIC, ARGS1,
277+
asynchronous=True,
278+
npartitions=1)
279+
out2 = stream2.gather().sink_to_list()
280+
stream2.start()
281+
time.sleep(5)
282+
assert (len(out2) == 1 and len(out2[0]) == 5)
283+
stream2.upstream.stopped = True
284+
285+
stream3 = Stream.from_kafka_batched(TOPIC, ARGS2,
286+
asynchronous=True,
287+
npartitions=4)
288+
out3 = stream3.gather().sink_to_list()
289+
stream3.start()
290+
time.sleep(5)
291+
assert (len(out3) == 2 and (len(out3[0]) + len(out3[1])) == 10)
292+
stream3.upstream.stopped = True
293+
294+
248295
def test_kafka_batch_checkpointing_sync_nodes():
249296
'''
250297
Streams 1 and 3 have different consumer groups, while Stream 2

0 commit comments

Comments
 (0)