@@ -50,7 +50,7 @@ def stop_docker(name='streamz-kafka', cid=None, let_fail=False):
5050
5151def launch_kafka ():
5252 stop_docker (let_fail = True )
53- os . system ( "docker pull spotify/kafka" )
53+ subprocess . call ( shlex . split ( "docker pull spotify/kafka" ) )
5454 cmd = ("docker run -d -p 2181:2181 -p 9092:9092 --env "
5555 "ADVERTISED_HOST=127.0.0.1 --env ADVERTISED_PORT=9092 "
5656 "--name streamz-kafka spotify/kafka" )
@@ -249,10 +249,11 @@ def test_kafka_batch_npartitions():
249249 kafka , TOPIC = kafka
250250
251251 TOPIC = "test-partitions"
252- os .system ("docker exec streamz-kafka ls /opt" )
253- os .system ("docker exec streamz-kafka /opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh "
254- "--create --zookeeper localhost:2181 "
255- "--replication-factor 1 --partitions 2 --topic test-partitions" )
252+ subprocess .call (shlex .split ("docker exec streamz-kafka "
253+ "/opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh "
254+ "--create --zookeeper localhost:2181 "
255+ "--replication-factor 1 --partitions 2 "
256+ "--topic test-partitions" ))
256257 time .sleep (5 )
257258
258259 for i in range (10 ):
@@ -262,15 +263,12 @@ def test_kafka_batch_npartitions():
262263 kafka .produce (TOPIC , b'value-%d' % i , partition = 1 )
263264 kafka .flush ()
264265
265- try :
266+ with pytest . raises ( ValueError ) :
266267 stream1 = Stream .from_kafka_batched (TOPIC , ARGS1 ,
267268 asynchronous = True ,
268269 npartitions = 0 )
269270 stream1 .gather ().sink_to_list ()
270271 stream1 .start ()
271- except ValueError as e :
272- assert type (e ) is ValueError
273- pass
274272
275273 stream2 = Stream .from_kafka_batched (TOPIC , ARGS1 ,
276274 asynchronous = True ,
0 commit comments