Skip to content

Commit be9495f

Browse files
author
Martin Durant
committed
morefix
1 parent d548c0b commit be9495f

4 files changed

Lines changed: 20 additions & 18 deletions

File tree

streamz/core.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ class Stream(object):
177177

178178
def __init__(self, upstream=None, upstreams=None, stream_name=None,
179179
loop=None, asynchronous=None, ensure_io_loop=False):
180+
self.name = stream_name
180181
self.downstreams = OrderedWeakrefSet()
181182
if upstreams is not None:
182183
self.upstreams = list(upstreams)
@@ -196,8 +197,6 @@ def __init__(self, upstream=None, upstreams=None, stream_name=None,
196197
if upstream:
197198
upstream.downstreams.add(self)
198199

199-
self.name = stream_name
200-
201200
def _set_loop(self, loop):
202201
self.loop = None
203202
if loop is not None:
@@ -1008,7 +1007,8 @@ def __init__(self, upstream, n, timeout=None, key=None, **kwargs):
10081007
self._buffer = defaultdict(lambda: [])
10091008
self._metadata_buffer = defaultdict(lambda: [])
10101009
self._callbacks = {}
1011-
Stream.__init__(self, upstream, ensure_io_loop=True, **kwargs)
1010+
kwargs["ensure_io_loop"] = True
1011+
Stream.__init__(self, upstream, **kwargs)
10121012

10131013
def _get_key(self, x):
10141014
if self._key is None:
@@ -1220,7 +1220,8 @@ def __init__(self, upstream, interval, **kwargs):
12201220
self.metadata_buffer = []
12211221
self.last = gen.moment
12221222

1223-
Stream.__init__(self, upstream, ensure_io_loop=True, **kwargs)
1223+
kwargs["ensure_io_loop"] = True
1224+
Stream.__init__(self, upstream, **kwargs)
12241225

12251226
self.loop.add_callback(self.cb)
12261227

@@ -1322,7 +1323,8 @@ def __init__(
13221323
self._buffer = {}
13231324
self._metadata_buffer = {}
13241325
self.last = gen.moment
1325-
Stream.__init__(self, upstream, ensure_io_loop=True, **kwargs)
1326+
kwargs["ensure_io_loop"] = True
1327+
Stream.__init__(self, upstream, **kwargs)
13261328
self.loop.add_callback(self.cb)
13271329

13281330
def _get_key(self, x):
@@ -1369,7 +1371,8 @@ def __init__(self, upstream, interval, **kwargs):
13691371
self.interval = convert_interval(interval)
13701372
self.queue = Queue()
13711373

1372-
Stream.__init__(self, upstream, ensure_io_loop=True, **kwargs)
1374+
kwargs["ensure_io_loop"] = True
1375+
Stream.__init__(self, upstream,**kwargs)
13731376

13741377
self.loop.add_callback(self.cb)
13751378

@@ -1407,7 +1410,8 @@ def __init__(self, upstream, interval, **kwargs):
14071410
self.interval = convert_interval(interval)
14081411
self.next = 0
14091412

1410-
Stream.__init__(self, upstream, ensure_io_loop=True, **kwargs)
1413+
kwargs["ensure_io_loop"] = True
1414+
Stream.__init__(self, upstream, **kwargs)
14111415

14121416
@gen.coroutine
14131417
def update(self, x, who=None, metadata=None):
@@ -1432,7 +1436,8 @@ class buffer(Stream):
14321436
def __init__(self, upstream, n, **kwargs):
14331437
self.queue = Queue(maxsize=n)
14341438

1435-
Stream.__init__(self, upstream, ensure_io_loop=True, **kwargs)
1439+
kwargs["ensure_io_loop"] = True
1440+
Stream.__init__(self, upstream, **kwargs)
14361441

14371442
self.loop.add_callback(self.cb)
14381443

@@ -1876,7 +1881,8 @@ def __init__(self, upstream, **kwargs):
18761881
self.next = []
18771882
self.next_metadata = None
18781883

1879-
Stream.__init__(self, upstream, ensure_io_loop=True, **kwargs)
1884+
kwargs["ensure_io_loop"] = True
1885+
Stream.__init__(self, upstream, **kwargs)
18801886

18811887
self.loop.add_callback(self.cb)
18821888

@@ -1932,7 +1938,8 @@ def __init__(self, upstream, topic, producer_config, **kwargs):
19321938
self.topic = topic
19331939
self.producer = ck.Producer(producer_config)
19341940

1935-
Stream.__init__(self, upstream, ensure_io_loop=True, **kwargs)
1941+
kwargs["ensure_io_loop"] = True
1942+
Stream.__init__(self, upstream, **kwargs)
19361943
self.stopped = False
19371944
self.polltime = 0.2
19381945
self.loop.add_callback(self.poll)

streamz/dask.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ class DaskStream(Stream):
3838
dask.distributed.Client
3939
"""
4040
def __init__(self, *args, **kwargs):
41-
super().__init__(*args, ensure_io_loop=True, **kwargs)
41+
kwargs["ensure_io_loop"] = True
42+
super().__init__(*args, **kwargs)
4243

4344

4445
@DaskStream.register_api()

streamz/tests/test_core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ def test_timed_window():
403403

404404
@gen_test()
405405
def test_timed_window_ref_counts():
406-
source = Stream()
406+
source = Stream(asynchronous=True)
407407
_ = source.timed_window(0.01)
408408

409409
ref1 = RefCounter()

streamz/tests/test_dask.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ def test_partition_then_scatter_async(c, s, a, b):
7171
assert L == [1, 2, 3]
7272

7373

74-
@pytest.mark.slow
7574
def test_partition_then_scatter_sync(loop):
7675
# Ensure partition w/ timeout before scatter works correctly for synchronous
7776
with cluster() as (s, [a, b]):
@@ -163,7 +162,6 @@ def test_accumulate(c, s, a, b):
163162
assert L[-1][1] == 3
164163

165164

166-
@pytest.mark.slow
167165
def test_sync(loop): # noqa: F811
168166
with cluster() as (s, [a, b]):
169167
with Client(s['address'], loop=loop) as client: # noqa: F841
@@ -180,7 +178,6 @@ def f():
180178
assert L == list(map(inc, range(10)))
181179

182180

183-
@pytest.mark.slow
184181
def test_sync_2(loop): # noqa: F811
185182
with cluster() as (s, [a, b]):
186183
with Client(s['address'], loop=loop): # noqa: F841
@@ -194,7 +191,6 @@ def test_sync_2(loop): # noqa: F811
194191
assert L == list(map(inc, range(10)))
195192

196193

197-
@pytest.mark.slow
198194
@gen_cluster(client=True, ncores=[('127.0.0.1', 1)] * 2)
199195
def test_buffer(c, s, a, b):
200196
source = Stream(asynchronous=True)
@@ -221,7 +217,6 @@ def test_buffer(c, s, a, b):
221217
assert source.loop == c.loop
222218

223219

224-
@pytest.mark.slow
225220
def test_buffer_sync(loop): # noqa: F811
226221
with cluster() as (s, [a, b]):
227222
with Client(s['address'], loop=loop) as c: # noqa: F841
@@ -246,7 +241,6 @@ def test_buffer_sync(loop): # noqa: F811
246241

247242

248243
@pytest.mark.xfail(reason='')
249-
@pytest.mark.slow
250244
def test_stream_shares_client_loop(loop): # noqa: F811
251245
with cluster() as (s, [a, b]):
252246
with Client(s['address'], loop=loop) as client: # noqa: F841

0 commit comments

Comments
 (0)