Skip to content

Commit 97701e4

Browse files
authored
Merge pull request #404 from martindurant/small_bits
Pieces that occurred while working on intake-streamz
2 parents ffe3840 + 54071ea commit 97701e4

5 files changed

Lines changed: 133 additions & 96 deletions

File tree

streamz/collection.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ def __rxor__(self, other):
152152
return self.map_partitions(operator.xor, other, self)
153153

154154

155-
class Streaming(OperatorMixin):
155+
class Streaming(OperatorMixin, core.APIRegisterMixin):
156156
"""
157157
Superclass for streaming collections
158158
@@ -228,6 +228,16 @@ def _repr_html_(self):
228228

229229
return "<h5>%s - elements like<h5>\n%s" % (type(self).__name__, body)
230230

231+
@property
232+
def current_value(self):
233+
return self.stream.current_value
234+
235+
def start(self):
236+
self.stream.start()
237+
238+
def stop(self):
239+
self.stream.stop()
240+
231241
def _ipython_display_(self, **kwargs):
232242
return self.stream.latest().rate_limit(
233243
0.5).gather()._ipython_display_(**kwargs)

streamz/core.py

Lines changed: 82 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,79 @@ def __str__(self):
122122
__repr__ = __str__
123123

124124

125-
class Stream(object):
125+
class APIRegisterMixin(object):
126+
127+
@classmethod
128+
def register_api(cls, modifier=identity, attribute_name=None):
129+
""" Add callable to Stream API
130+
131+
This allows you to register a new method onto this class. You can use
132+
it as a decorator.::
133+
134+
>>> @Stream.register_api()
135+
... class foo(Stream):
136+
... ...
137+
138+
>>> Stream().foo(...) # this works now
139+
140+
It attaches the callable as a normal attribute to the class object. In
141+
doing so it respects inheritance (all subclasses of Stream will also
142+
get the foo attribute).
143+
144+
By default callables are assumed to be instance methods. If you like
145+
you can include modifiers to apply before attaching to the class as in
146+
the following case where we construct a ``staticmethod``.
147+
148+
>>> @Stream.register_api(staticmethod)
149+
... class foo(Stream):
150+
... ...
151+
152+
>>> Stream.foo(...) # Foo operates as a static method
153+
154+
You can also provide an optional ``attribute_name`` argument to control
155+
the name of the attribute your callable will be attached as.
156+
157+
>>> @Stream.register_api(attribute_name="bar")
158+
... class foo(Stream):
159+
... ...
160+
161+
>> Stream().bar(...) # foo was actually attached as bar
162+
"""
163+
def _(func):
164+
@functools.wraps(func)
165+
def wrapped(*args, **kwargs):
166+
return func(*args, **kwargs)
167+
name = attribute_name if attribute_name else func.__name__
168+
setattr(cls, name, modifier(wrapped))
169+
return func
170+
return _
171+
172+
@classmethod
173+
def register_plugin_entry_point(cls, entry_point, modifier=identity):
174+
if hasattr(cls, entry_point.name):
175+
raise ValueError(
176+
f"Can't add {entry_point.name} from {entry_point.module_name} "
177+
f"to {cls.__name__}: duplicate method name."
178+
)
179+
180+
def stub(*args, **kwargs):
181+
""" Entrypoints-based streamz plugin. Will be loaded on first call. """
182+
node = entry_point.load()
183+
if not issubclass(node, Stream):
184+
raise TypeError(
185+
f"Error loading {entry_point.name} "
186+
f"from module {entry_point.module_name}: "
187+
f"{node.__class__.__name__} must be a subclass of Stream"
188+
)
189+
if getattr(cls, entry_point.name).__name__ == "stub":
190+
cls.register_api(
191+
modifier=modifier, attribute_name=entry_point.name
192+
)(node)
193+
return node(*args, **kwargs)
194+
cls.register_api(modifier=modifier, attribute_name=entry_point.name)(stub)
195+
196+
197+
class Stream(APIRegisterMixin):
126198
""" A Stream is an infinite sequence of data.
127199
128200
Streams subscribe to each other passing and transforming data between them.
@@ -179,6 +251,8 @@ def __init__(self, upstream=None, upstreams=None, stream_name=None,
179251
loop=None, asynchronous=None, ensure_io_loop=False):
180252
self.name = stream_name
181253
self.downstreams = OrderedWeakrefSet()
254+
self.current_value = None
255+
self.current_metadata = None
182256
if upstreams is not None:
183257
self.upstreams = list(upstreams)
184258
elif upstream is not None:
@@ -267,80 +341,16 @@ def _remove_upstream(self, upstream):
267341
classes which handle stream specific buffers/caches"""
268342
self.upstreams.remove(upstream)
269343

270-
@classmethod
271-
def register_api(cls, modifier=identity, attribute_name=None):
272-
""" Add callable to Stream API
273-
274-
This allows you to register a new method onto this class. You can use
275-
it as a decorator.::
276-
277-
>>> @Stream.register_api()
278-
... class foo(Stream):
279-
... ...
280-
281-
>>> Stream().foo(...) # this works now
282-
283-
It attaches the callable as a normal attribute to the class object. In
284-
doing so it respects inheritance (all subclasses of Stream will also
285-
get the foo attribute).
286-
287-
By default callables are assumed to be instance methods. If you like
288-
you can include modifiers to apply before attaching to the class as in
289-
the following case where we construct a ``staticmethod``.
290-
291-
>>> @Stream.register_api(staticmethod)
292-
... class foo(Stream):
293-
... ...
294-
295-
>>> Stream.foo(...) # Foo operates as a static method
296-
297-
You can also provide an optional ``attribute_name`` argument to control
298-
the name of the attribute your callable will be attached as.
299-
300-
>>> @Stream.register_api(attribute_name="bar")
301-
... class foo(Stream):
302-
... ...
303-
304-
>> Stream().bar(...) # foo was actually attached as bar
305-
"""
306-
def _(func):
307-
@functools.wraps(func)
308-
def wrapped(*args, **kwargs):
309-
return func(*args, **kwargs)
310-
name = attribute_name if attribute_name else func.__name__
311-
setattr(cls, name, modifier(wrapped))
312-
return func
313-
return _
314-
315-
@classmethod
316-
def register_plugin_entry_point(cls, entry_point, modifier=identity):
317-
if hasattr(cls, entry_point.name):
318-
raise ValueError(
319-
f"Can't add {entry_point.name} from {entry_point.module_name} "
320-
f"to {cls.__name__}: duplicate method name."
321-
)
322-
323-
def stub(*args, **kwargs):
324-
""" Entrypoints-based streamz plugin. Will be loaded on first call. """
325-
node = entry_point.load()
326-
if not issubclass(node, Stream):
327-
raise TypeError(
328-
f"Error loading {entry_point.name} "
329-
f"from module {entry_point.module_name}: "
330-
f"{node.__class__.__name__} must be a subclass of Stream"
331-
)
332-
if getattr(cls, entry_point.name).__name__ == "stub":
333-
cls.register_api(
334-
modifier=modifier, attribute_name=entry_point.name
335-
)(node)
336-
return node(*args, **kwargs)
337-
cls.register_api(modifier=modifier, attribute_name=entry_point.name)(stub)
338-
339344
def start(self):
340345
""" Start any upstream sources """
341346
for upstream in self.upstreams:
342347
upstream.start()
343348

349+
def stop(self):
350+
""" Stop upstream sources """
351+
for upstream in self.upstreams:
352+
upstream.stop()
353+
344354
def __str__(self):
345355
s_list = []
346356
if self.name:
@@ -430,6 +440,8 @@ def _emit(self, x, metadata=None):
430440
A reference counter used to check when data is done
431441
432442
"""
443+
self.current_value = x
444+
self.current_metadata = metadata
433445
if metadata:
434446
self._retain_refs(metadata, len(self.downstreams))
435447
else:
@@ -548,13 +560,6 @@ def remove(self, predicate):
548560
""" Only pass through elements for which the predicate returns False """
549561
return self.filter(lambda x: not predicate(x))
550562

551-
def stop(self):
552-
"""Call on any stream node to halt all upstream sources"""
553-
prev, s = self.upstream, self
554-
while s:
555-
prev, s = s, s.upstream
556-
prev.stopped = True
557-
558563
@property
559564
def scan(self):
560565
return self.accumulate

streamz/dataframe/core.py

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
from __future__ import division, print_function
1+
import asyncio
22

33
import operator
44
from collections import OrderedDict
55
import numpy as np
66
import pandas as pd
77
import toolz
8-
from tornado.ioloop import IOLoop
8+
99
from tornado import gen
1010

1111
from ..collection import Streaming, _stream_types, OperatorMixin
@@ -326,6 +326,10 @@ def concat(tup, module=None, columns=None):
326326
example = kwargs.get('example')
327327
elif len(args) > 1:
328328
example = args[1]
329+
if callable(example):
330+
example = example()
331+
kwargs["example"] = example
332+
329333
self._subtype = get_base_frame_type(self.__class__.__name__,
330334
is_dataframe_like, example)
331335
super(DataFrame, self).__init__(*args, **kwargs)
@@ -337,6 +341,15 @@ def verify(self, x):
337341
raise IndexError("Input expected to have columns %s, got %s" %
338342
(self.example.columns, x.columns))
339343

344+
@property
345+
def plot(self):
346+
try:
347+
# import has side-effect of attaching .hvplot attribute
348+
import hvplot.streamz # # noqa: F401
349+
except ImportError as err: # pragma: no cover
350+
raise ImportError("Streamz dataframe plotting requires hvplot") from err
351+
return self.hvplot
352+
340353

341354
class _SeriesMixin(object):
342355
@property
@@ -801,8 +814,10 @@ def _accumulate(self, Agg, **kwargs):
801814
return Streaming(outstream, example, stream_type=stream_type)
802815

803816

804-
def random_datapoint(now, **kwargs):
817+
def random_datapoint(now=None, **kwargs):
805818
"""Example of querying a single current value"""
819+
if now is None:
820+
now = pd.Timestamp.now()
806821
return pd.DataFrame(
807822
{'a': np.random.random(1)}, index=[now])
808823

@@ -838,6 +853,7 @@ def random_datablock(last, now, **kwargs):
838853
return df
839854

840855

856+
@DataFrame.register_api(staticmethod, "from_periodic")
841857
class PeriodicDataFrame(DataFrame):
842858
"""A streaming dataframe using the asyncio ioloop to poll a callback fn
843859
@@ -863,26 +879,31 @@ class PeriodicDataFrame(DataFrame):
863879
>>> df = PeriodicDataFrame(interval='1s', datafn=random_datapoint) # doctest: +SKIP
864880
"""
865881

866-
def __init__(self, datafn=random_datablock, interval='500ms', dask=False, **kwargs):
882+
def __init__(self, datafn=random_datablock, interval='500ms', dask=False,
883+
start=True, **kwargs):
867884
if dask:
868885
from streamz.dask import DaskStream
869886
source = DaskStream()
870-
loop = source.loop
871887
else:
872888
source = Source()
873-
loop = IOLoop.current()
889+
self.loop = source.loop
874890
self.interval = pd.Timedelta(interval).total_seconds()
875891
self.source = source
876-
self.continue_ = [True]
892+
self.continue_ = [False] # like the oppose of self.stopped
877893
self.kwargs = kwargs
878894

879895
stream = self.source.map(lambda x: datafn(**x, **kwargs))
880896
example = datafn(last=pd.Timestamp.now(), now=pd.Timestamp.now(), **kwargs)
881897

882898
super(PeriodicDataFrame, self).__init__(stream, example)
899+
if start:
900+
self.start()
883901

884-
loop.add_callback(self._cb, self.interval, self.source,
885-
self.continue_)
902+
def start(self):
903+
if not self.continue_[0]:
904+
self.continue_[0] = True
905+
self.loop.add_callback(self._cb, self.interval, self.source,
906+
self.continue_)
886907

887908
def __del__(self):
888909
self.stop()
@@ -891,16 +912,16 @@ def stop(self):
891912
self.continue_[0] = False
892913

893914
@staticmethod
894-
@gen.coroutine
895-
def _cb(interval, source, continue_):
915+
async def _cb(interval, source, continue_):
896916
last = pd.Timestamp.now()
897917
while continue_[0]:
898-
yield gen.sleep(interval)
918+
await gen.sleep(interval)
899919
now = pd.Timestamp.now()
900-
yield source._emit(dict(last=last, now=now))
920+
await asyncio.gather(*source._emit(dict(last=last, now=now)))
901921
last = now
902922

903923

924+
@DataFrame.register_api(staticmethod, "random")
904925
class Random(PeriodicDataFrame):
905926
"""PeriodicDataFrame providing random values by default
906927
@@ -916,8 +937,9 @@ class Random(PeriodicDataFrame):
916937
"""
917938

918939
def __init__(self, freq='100ms', interval='500ms', dask=False,
919-
datafn=random_datablock):
920-
super(Random, self).__init__(datafn, interval, dask, freq=pd.Timedelta(freq))
940+
start=True, datafn=random_datablock):
941+
super(Random, self).__init__(datafn, interval, dask, start,
942+
freq=pd.Timedelta(freq))
921943

922944

923945
_stream_types['streaming'].append((is_dataframe_like, DataFrame))

streamz/dataframe/tests/test_dataframes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,6 @@ def test_gc():
557557
a = DataFrame({'volatility': sdf.x.rolling('100ms').var(),
558558
'sub': sdf.x - sdf.x.rolling('100ms').mean()})
559559
n = len(sdf.stream.downstreams)
560-
yield gen.sleep(0.1)
561560
a = DataFrame({'volatility': sdf.x.rolling('100ms').var(),
562561
'sub': sdf.x - sdf.x.rolling('100ms').mean()})
563562
yield gen.sleep(0.1)
@@ -566,6 +565,7 @@ def test_gc():
566565
yield gen.sleep(0.1)
567566
a = DataFrame({'volatility': sdf.x.rolling('100ms').var(),
568567
'sub': sdf.x - sdf.x.rolling('100ms').mean()})
568+
yield gen.sleep(0.1)
569569

570570
assert len(sdf.stream.downstreams) == n
571571
del a

streamz/tests/test_sources.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,10 @@ def test_from_iterable():
141141
def test_from_iterable_backpressure():
142142
it = iter(range(5))
143143
source = Source.from_iterable(it)
144-
L = source.rate_limit(0.01).sink_to_list()
144+
L = source.rate_limit(0.1).sink_to_list()
145145
source.start()
146146

147-
wait_for(lambda: L == [0], 1)
147+
wait_for(lambda: L == [0], 1, period=0.01)
148148
assert next(it) == 2 # 1 is in blocked _emit
149149

150150

0 commit comments

Comments
 (0)