Skip to content

Commit c58dadf

Browse files
committed
Remove dask loop argument
1 parent 5a6a6e5 commit c58dadf

1 file changed

Lines changed: 12 additions & 12 deletions

File tree

streamz/tests/test_dask.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
from distributed import Future, Client
1515
from distributed.utils import sync
16-
from distributed.utils_test import gen_cluster, loop, loop_in_thread, cleanup, inc, cluster, loop, slowinc # noqa: F401
16+
from distributed.utils_test import gen_cluster, inc, cluster, loop, slowinc # noqa: F401
1717

1818

1919
@gen_cluster(client=True)
@@ -72,10 +72,10 @@ async def test_partition_then_scatter_async(c, s, a, b):
7272
assert L == [1, 2, 3]
7373

7474

75-
def test_partition_then_scatter_sync(loop):
75+
def test_partition_then_scatter_sync():
7676
# Ensure partition w/ timeout before scatter works correctly for synchronous
7777
with cluster() as (s, [a, b]):
78-
with Client(s['address'], loop=loop) as client: # noqa: F841
78+
with Client(s['address']) as client: # noqa: F841
7979
start = time.monotonic()
8080
source = Stream()
8181
L = source.partition(2, timeout=.1).scatter().map(
@@ -164,24 +164,24 @@ async def test_accumulate(c, s, a, b):
164164
assert L[-1][1] == 3
165165

166166

167-
def test_sync(loop): # noqa: F811
167+
def test_sync(): # noqa: F811
168168
with cluster() as (s, [a, b]):
169-
with Client(s['address'], loop=loop) as client: # noqa: F841
169+
with Client(s['address']) as client: # noqa: F841
170170
source = Stream()
171171
L = source.scatter().map(inc).gather().sink_to_list()
172172

173173
async def f():
174174
for i in range(10):
175175
await source.emit(i, asynchronous=True)
176176

177-
sync(loop, f)
177+
sync(client.loop, f)
178178

179179
assert L == list(map(inc, range(10)))
180180

181181

182-
def test_sync_2(loop): # noqa: F811
182+
def test_sync_2(): # noqa: F811
183183
with cluster() as (s, [a, b]):
184-
with Client(s['address'], loop=loop): # noqa: F841
184+
with Client(s['address']): # noqa: F841
185185
source = Stream()
186186
L = source.scatter().map(inc).gather().sink_to_list()
187187

@@ -218,9 +218,9 @@ async def test_buffer(c, s, a, b):
218218
assert source.loop == c.loop
219219

220220

221-
def test_buffer_sync(loop): # noqa: F811
221+
def test_buffer_sync(): # noqa: F811
222222
with cluster() as (s, [a, b]):
223-
with Client(s['address'], loop=loop) as c: # noqa: F841
223+
with Client(s['address']) as c: # noqa: F841
224224
source = Stream()
225225
buff = source.scatter().map(slowinc, delay=0.5).buffer(5)
226226
L = buff.gather().sink_to_list()
@@ -243,9 +243,9 @@ def test_buffer_sync(loop): # noqa: F811
243243

244244
@pytest.mark.asyncio
245245
@pytest.mark.xfail(reason='')
246-
async def test_stream_shares_client_loop(loop): # noqa: F811
246+
async def test_stream_shares_client_loop(): # noqa: F811
247247
with cluster() as (s, [a, b]):
248-
with Client(s['address'], loop=loop) as client: # noqa: F841
248+
with Client(s['address']) as client: # noqa: F841
249249
source = Stream()
250250
d = source.timed_window('20ms').scatter() # noqa: F841
251251
assert source.loop is client.loop

0 commit comments

Comments
 (0)