Skip to content

Commit 0e0c225

Browse files
committed
Simplify my fix!
1 parent c58dadf commit 0e0c225

2 files changed

Lines changed: 19 additions & 7 deletions

File tree

streamz/core.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1473,18 +1473,23 @@ class zip(Stream):
14731473

14741474
def __init__(self, *upstreams, **kwargs):
14751475
self.maxsize = kwargs.pop('maxsize', 10)
1476-
self.condition = Condition()
1476+
self._condition = None
14771477
self.literals = [(i, val) for i, val in enumerate(upstreams)
14781478
if not isinstance(val, Stream)]
14791479

14801480
self.buffers = {upstream: deque()
14811481
for upstream in upstreams
14821482
if isinstance(upstream, Stream)}
1483-
14841483
upstreams2 = [upstream for upstream in upstreams if isinstance(upstream, Stream)]
14851484

14861485
Stream.__init__(self, upstreams=upstreams2, **kwargs)
14871486

1487+
@property
1488+
def condition(self):
1489+
if self._condition is None:
1490+
self._condition = Condition()
1491+
return self._condition
1492+
14881493
def _add_upstream(self, upstream):
14891494
# Override method to handle setup of buffer for new stream
14901495
self.buffers[upstream] = deque()
@@ -1881,7 +1886,7 @@ class latest(Stream):
18811886
_graphviz_shape = 'octagon'
18821887

18831888
def __init__(self, upstream, **kwargs):
1884-
self.condition = Condition()
1889+
self._condition = None
18851890
self.next = []
18861891
self.next_metadata = None
18871892

@@ -1890,6 +1895,12 @@ def __init__(self, upstream, **kwargs):
18901895

18911896
self.loop.add_callback(self.cb)
18921897

1898+
@property
1899+
def condition(self):
1900+
if self._condition is None:
1901+
self._condition = Condition()
1902+
return self._condition
1903+
18931904
def update(self, x, who=None, metadata=None):
18941905
if self.next_metadata:
18951906
self._release_refs(self.next_metadata)

streamz/dataframe/tests/test_dataframes.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ def test_binary_stream_operators(stream):
219219

220220
a.emit(df)
221221

222-
assert_eq(b[0], expected)
222+
wait_for(lambda: b and b[0].equals(expected), 1)
223223

224224

225225
def test_index(stream):
@@ -246,7 +246,7 @@ def test_pair_arithmetic(stream):
246246
a.emit(df.iloc[:5])
247247
a.emit(df.iloc[5:])
248248

249-
assert len(L) == 2
249+
wait_for(lambda: len(L) == 2, 1)
250250
assert_eq(pd.concat(L, axis=0), (df.x + df.y) * 2)
251251

252252

@@ -259,7 +259,7 @@ def test_getitem(stream):
259259
a.emit(df.iloc[:5])
260260
a.emit(df.iloc[5:])
261261

262-
assert len(L) == 2
262+
wait_for(lambda: len(L) == 2, 1)
263263
assert_eq(pd.concat(L, axis=0), df[df.x > 4])
264264

265265

@@ -298,6 +298,7 @@ def f(x):
298298
a.emit(df.iloc[7:])
299299

300300
first = df.iloc[:3]
301+
wait_for(lambda: len(L) > 2, 1)
301302
assert assert_eq(L[0], f(first))
302303
assert assert_eq(L[-1], f(df))
303304

@@ -382,7 +383,7 @@ def test_setitem(stream):
382383
df['a'] = 10
383384
df[['c', 'd']] = df[['x', 'y']]
384385

385-
assert_eq(L[-1], df.mean())
386+
wait_for(lambda: L and L[-1].equals(df.mean()), 1)
386387

387388

388389
def test_setitem_overwrites(stream):

0 commit comments

Comments
 (0)