File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -39,3 +39,8 @@ max-line-length = 120
3939
4040[bdist_wheel]
4141universal =1
42+
43+ [tool:pytest]
44+ markers:
45+ network: Test requires an internet connection
46+ slow: Skipped unless --runslow passed
Original file line number Diff line number Diff line change 2323
2424try :
2525 from distributed .client import default_client as _dask_default_client
26- except ImportError :
26+ except ImportError : # pragma: no cover
2727 _dask_default_client = None
2828
2929from collections .abc import Iterable
@@ -356,8 +356,6 @@ def __str__(self):
356356 s = str (at )
357357 elif hasattr (at , '__name__' ):
358358 s = getattr (self , m ).__name__
359- elif hasattr (at .__class__ , '__name__' ):
360- s = getattr (self , m ).__class__ .__name__
361359 else :
362360 s = None
363361 if s :
Original file line number Diff line number Diff line change @@ -407,6 +407,7 @@ def test_timed_window_ref_counts():
407407 _ = source .timed_window (0.01 )
408408
409409 ref1 = RefCounter ()
410+ assert str (ref1 ) == "<RefCounter count=0>"
410411 source .emit (1 , metadata = [{'ref' : ref1 }])
411412 assert ref1 .count == 1
412413 yield gen .sleep (0.05 )
@@ -417,6 +418,13 @@ def test_timed_window_ref_counts():
417418 assert ref2 .count == 1
418419
419420
421+ def test_mixed_async ():
422+ s1 = Stream (asynchronous = False )
423+ with pytest .raises (ValueError ):
424+ Stream (asynchronous = True , upstream = s1 )
425+
426+
427+
420428@gen_test ()
421429def test_timed_window_metadata ():
422430 source = Stream ()
Original file line number Diff line number Diff line change @@ -91,6 +91,7 @@ def test_partition_then_scatter_sync(loop):
9191 assert L == [1 , 2 , 3 ]
9292
9393
94+ @gen_cluster (client = True )
9495def test_non_unique_emit (c , s , a , b ):
9596 """Regression for https://github.com/python-streamz/streams/issues/397
9697
@@ -191,7 +192,7 @@ def test_sync_2(loop): # noqa: F811
191192 assert L == list (map (inc , range (10 )))
192193
193194
194- @gen_cluster (client = True , ncores = [('127.0.0.1' , 1 )] * 2 )
195+ @gen_cluster (client = True , nthreads = [('127.0.0.1' , 1 )] * 2 )
195196def test_buffer (c , s , a , b ):
196197 source = Stream (asynchronous = True )
197198 L = source .scatter ().map (slowinc , delay = 0.5 ).buffer (5 ).gather ().sink_to_list ()
You can’t perform that action at this time.
0 commit comments