Skip to content

Commit 4f45852

Browse files
author
Martin Durant
committed
fix stop()
1 parent 3033a29 commit 4f45852

3 files changed

Lines changed: 9 additions & 13 deletions

File tree

streamz/core.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -560,13 +560,6 @@ def remove(self, predicate):
560560
""" Only pass through elements for which the predicate returns False """
561561
return self.filter(lambda x: not predicate(x))
562562

563-
def stop(self):
564-
"""Call on any stream node to halt all upstream sources"""
565-
prev, s = self.upstream, self
566-
while s:
567-
prev, s = s, s.upstream
568-
prev.stopped = True
569-
570563
@property
571564
def scan(self):
572565
return self.accumulate

streamz/dataframe/core.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -876,7 +876,7 @@ def __init__(self, datafn=random_datablock, interval='500ms', dask=False,
876876
self.loop = source.loop
877877
self.interval = pd.Timedelta(interval).total_seconds()
878878
self.source = source
879-
self.continue_ = [True]
879+
self.continue_ = [False] # like the oppose of self.stopped
880880
self.kwargs = kwargs
881881

882882
stream = self.source.map(
@@ -889,8 +889,10 @@ def __init__(self, datafn=random_datablock, interval='500ms', dask=False,
889889
self.start()
890890

891891
def start(self):
892-
self.loop.add_callback(self._cb, self.interval, self.source,
893-
self.continue_)
892+
if not self.continue_[0]:
893+
self.continue_[0] = True
894+
self.loop.add_callback(self._cb, self.interval, self.source,
895+
self.continue_)
894896

895897
def __del__(self):
896898
self.stop()
@@ -924,8 +926,9 @@ class Random(PeriodicDataFrame):
924926
"""
925927

926928
def __init__(self, freq='100ms', interval='500ms', dask=False,
927-
datafn=random_datablock):
928-
super(Random, self).__init__(datafn, interval, dask, freq=pd.Timedelta(freq))
929+
start=True, datafn=random_datablock):
930+
super(Random, self).__init__(datafn, interval, dask, start,
931+
freq=pd.Timedelta(freq))
929932

930933

931934
_stream_types['streaming'].append((is_dataframe_like, DataFrame))

streamz/dataframe/tests/test_dataframes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,6 @@ def test_gc():
557557
a = DataFrame({'volatility': sdf.x.rolling('100ms').var(),
558558
'sub': sdf.x - sdf.x.rolling('100ms').mean()})
559559
n = len(sdf.stream.downstreams)
560-
yield gen.sleep(0.1)
561560
a = DataFrame({'volatility': sdf.x.rolling('100ms').var(),
562561
'sub': sdf.x - sdf.x.rolling('100ms').mean()})
563562
yield gen.sleep(0.1)
@@ -566,6 +565,7 @@ def test_gc():
566565
yield gen.sleep(0.1)
567566
a = DataFrame({'volatility': sdf.x.rolling('100ms').var(),
568567
'sub': sdf.x - sdf.x.rolling('100ms').mean()})
568+
yield gen.sleep(0.1)
569569

570570
assert len(sdf.stream.downstreams) == n
571571
del a

0 commit comments

Comments
 (0)