Skip to content

Commit 3033a29

Browse files
author
Martin Durant
committed
current_value plus DataFrame methods
1 parent ffe3840 commit 3033a29

3 files changed

Lines changed: 114 additions & 84 deletions

File tree

streamz/collection.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ def __rxor__(self, other):
152152
return self.map_partitions(operator.xor, other, self)
153153

154154

155-
class Streaming(OperatorMixin):
155+
class Streaming(OperatorMixin, core.APIRegisterMixin):
156156
"""
157157
Superclass for streaming collections
158158
@@ -228,12 +228,22 @@ def _repr_html_(self):
228228

229229
return "<h5>%s - elements like<h5>\n%s" % (type(self).__name__, body)
230230

231+
@property
232+
def current_value(self):
233+
return self.stream.current_value
234+
235+
@current_value.setter
236+
def current_value(self, *args, **kwargs):
237+
# this actually happens in self.stream - Streaming.emit is ignored
238+
pass
239+
231240
def _ipython_display_(self, **kwargs):
232241
return self.stream.latest().rate_limit(
233242
0.5).gather()._ipython_display_(**kwargs)
234243

235244
def emit(self, x):
236245
self.verify(x)
246+
self.current_value = x
237247
self.stream.emit(x)
238248

239249
def verify(self, x):

streamz/core.py

Lines changed: 82 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,79 @@ def __str__(self):
122122
__repr__ = __str__
123123

124124

125-
class Stream(object):
125+
class APIRegisterMixin(object):
126+
127+
@classmethod
128+
def register_api(cls, modifier=identity, attribute_name=None):
129+
""" Add callable to Stream API
130+
131+
This allows you to register a new method onto this class. You can use
132+
it as a decorator.::
133+
134+
>>> @Stream.register_api()
135+
... class foo(Stream):
136+
... ...
137+
138+
>>> Stream().foo(...) # this works now
139+
140+
It attaches the callable as a normal attribute to the class object. In
141+
doing so it respects inheritance (all subclasses of Stream will also
142+
get the foo attribute).
143+
144+
By default callables are assumed to be instance methods. If you like
145+
you can include modifiers to apply before attaching to the class as in
146+
the following case where we construct a ``staticmethod``.
147+
148+
>>> @Stream.register_api(staticmethod)
149+
... class foo(Stream):
150+
... ...
151+
152+
>>> Stream.foo(...) # Foo operates as a static method
153+
154+
You can also provide an optional ``attribute_name`` argument to control
155+
the name of the attribute your callable will be attached as.
156+
157+
>>> @Stream.register_api(attribute_name="bar")
158+
... class foo(Stream):
159+
... ...
160+
161+
>> Stream().bar(...) # foo was actually attached as bar
162+
"""
163+
def _(func):
164+
@functools.wraps(func)
165+
def wrapped(*args, **kwargs):
166+
return func(*args, **kwargs)
167+
name = attribute_name if attribute_name else func.__name__
168+
setattr(cls, name, modifier(wrapped))
169+
return func
170+
return _
171+
172+
@classmethod
173+
def register_plugin_entry_point(cls, entry_point, modifier=identity):
174+
if hasattr(cls, entry_point.name):
175+
raise ValueError(
176+
f"Can't add {entry_point.name} from {entry_point.module_name} "
177+
f"to {cls.__name__}: duplicate method name."
178+
)
179+
180+
def stub(*args, **kwargs):
181+
""" Entrypoints-based streamz plugin. Will be loaded on first call. """
182+
node = entry_point.load()
183+
if not issubclass(node, Stream):
184+
raise TypeError(
185+
f"Error loading {entry_point.name} "
186+
f"from module {entry_point.module_name}: "
187+
f"{node.__class__.__name__} must be a subclass of Stream"
188+
)
189+
if getattr(cls, entry_point.name).__name__ == "stub":
190+
cls.register_api(
191+
modifier=modifier, attribute_name=entry_point.name
192+
)(node)
193+
return node(*args, **kwargs)
194+
cls.register_api(modifier=modifier, attribute_name=entry_point.name)(stub)
195+
196+
197+
class Stream(APIRegisterMixin):
126198
""" A Stream is an infinite sequence of data.
127199
128200
Streams subscribe to each other passing and transforming data between them.
@@ -179,6 +251,8 @@ def __init__(self, upstream=None, upstreams=None, stream_name=None,
179251
loop=None, asynchronous=None, ensure_io_loop=False):
180252
self.name = stream_name
181253
self.downstreams = OrderedWeakrefSet()
254+
self.current_value = None
255+
self.current_metadata = None
182256
if upstreams is not None:
183257
self.upstreams = list(upstreams)
184258
elif upstream is not None:
@@ -267,80 +341,16 @@ def _remove_upstream(self, upstream):
267341
classes which handle stream specific buffers/caches"""
268342
self.upstreams.remove(upstream)
269343

270-
@classmethod
271-
def register_api(cls, modifier=identity, attribute_name=None):
272-
""" Add callable to Stream API
273-
274-
This allows you to register a new method onto this class. You can use
275-
it as a decorator.::
276-
277-
>>> @Stream.register_api()
278-
... class foo(Stream):
279-
... ...
280-
281-
>>> Stream().foo(...) # this works now
282-
283-
It attaches the callable as a normal attribute to the class object. In
284-
doing so it respects inheritance (all subclasses of Stream will also
285-
get the foo attribute).
286-
287-
By default callables are assumed to be instance methods. If you like
288-
you can include modifiers to apply before attaching to the class as in
289-
the following case where we construct a ``staticmethod``.
290-
291-
>>> @Stream.register_api(staticmethod)
292-
... class foo(Stream):
293-
... ...
294-
295-
>>> Stream.foo(...) # Foo operates as a static method
296-
297-
You can also provide an optional ``attribute_name`` argument to control
298-
the name of the attribute your callable will be attached as.
299-
300-
>>> @Stream.register_api(attribute_name="bar")
301-
... class foo(Stream):
302-
... ...
303-
304-
>> Stream().bar(...) # foo was actually attached as bar
305-
"""
306-
def _(func):
307-
@functools.wraps(func)
308-
def wrapped(*args, **kwargs):
309-
return func(*args, **kwargs)
310-
name = attribute_name if attribute_name else func.__name__
311-
setattr(cls, name, modifier(wrapped))
312-
return func
313-
return _
314-
315-
@classmethod
316-
def register_plugin_entry_point(cls, entry_point, modifier=identity):
317-
if hasattr(cls, entry_point.name):
318-
raise ValueError(
319-
f"Can't add {entry_point.name} from {entry_point.module_name} "
320-
f"to {cls.__name__}: duplicate method name."
321-
)
322-
323-
def stub(*args, **kwargs):
324-
""" Entrypoints-based streamz plugin. Will be loaded on first call. """
325-
node = entry_point.load()
326-
if not issubclass(node, Stream):
327-
raise TypeError(
328-
f"Error loading {entry_point.name} "
329-
f"from module {entry_point.module_name}: "
330-
f"{node.__class__.__name__} must be a subclass of Stream"
331-
)
332-
if getattr(cls, entry_point.name).__name__ == "stub":
333-
cls.register_api(
334-
modifier=modifier, attribute_name=entry_point.name
335-
)(node)
336-
return node(*args, **kwargs)
337-
cls.register_api(modifier=modifier, attribute_name=entry_point.name)(stub)
338-
339344
def start(self):
340345
""" Start any upstream sources """
341346
for upstream in self.upstreams:
342347
upstream.start()
343348

349+
def stop(self):
350+
""" Stop upstream sources """
351+
for upstream in self.upstreams:
352+
upstream.stop()
353+
344354
def __str__(self):
345355
s_list = []
346356
if self.name:
@@ -430,6 +440,8 @@ def _emit(self, x, metadata=None):
430440
A reference counter used to check when data is done
431441
432442
"""
443+
self.current_value = x
444+
self.current_metadata = metadata
433445
if metadata:
434446
self._retain_refs(metadata, len(self.downstreams))
435447
else:

streamz/dataframe/core.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
from __future__ import division, print_function
1+
import asyncio
22

33
import operator
44
from collections import OrderedDict
55
import numpy as np
66
import pandas as pd
77
import toolz
8-
from tornado.ioloop import IOLoop
8+
99
from tornado import gen
1010

1111
from ..collection import Streaming, _stream_types, OperatorMixin
@@ -801,8 +801,10 @@ def _accumulate(self, Agg, **kwargs):
801801
return Streaming(outstream, example, stream_type=stream_type)
802802

803803

804-
def random_datapoint(now, **kwargs):
804+
def random_datapoint(now=None, **kwargs):
805805
"""Example of querying a single current value"""
806+
if now is None:
807+
now = pd.Timestamp.now()
806808
return pd.DataFrame(
807809
{'a': np.random.random(1)}, index=[now])
808810

@@ -838,6 +840,7 @@ def random_datablock(last, now, **kwargs):
838840
return df
839841

840842

843+
@DataFrame.register_api(staticmethod, "from_periodic")
841844
class PeriodicDataFrame(DataFrame):
842845
"""A streaming dataframe using the asyncio ioloop to poll a callback fn
843846
@@ -863,26 +866,31 @@ class PeriodicDataFrame(DataFrame):
863866
>>> df = PeriodicDataFrame(interval='1s', datafn=random_datapoint) # doctest: +SKIP
864867
"""
865868

866-
def __init__(self, datafn=random_datablock, interval='500ms', dask=False, **kwargs):
869+
def __init__(self, datafn=random_datablock, interval='500ms', dask=False,
870+
start=True, **kwargs):
867871
if dask:
868872
from streamz.dask import DaskStream
869873
source = DaskStream()
870-
loop = source.loop
871874
else:
872875
source = Source()
873-
loop = IOLoop.current()
876+
self.loop = source.loop
874877
self.interval = pd.Timedelta(interval).total_seconds()
875878
self.source = source
876879
self.continue_ = [True]
877880
self.kwargs = kwargs
878881

879-
stream = self.source.map(lambda x: datafn(**x, **kwargs))
882+
stream = self.source.map(
883+
lambda x, continue_=self.continue_: datafn(continue_=continue_, **x, **kwargs)
884+
)
880885
example = datafn(last=pd.Timestamp.now(), now=pd.Timestamp.now(), **kwargs)
881886

882887
super(PeriodicDataFrame, self).__init__(stream, example)
888+
if start:
889+
self.start()
883890

884-
loop.add_callback(self._cb, self.interval, self.source,
885-
self.continue_)
891+
def start(self):
892+
self.loop.add_callback(self._cb, self.interval, self.source,
893+
self.continue_)
886894

887895
def __del__(self):
888896
self.stop()
@@ -891,16 +899,16 @@ def stop(self):
891899
self.continue_[0] = False
892900

893901
@staticmethod
894-
@gen.coroutine
895-
def _cb(interval, source, continue_):
902+
async def _cb(interval, source, continue_):
896903
last = pd.Timestamp.now()
897904
while continue_[0]:
898-
yield gen.sleep(interval)
905+
await gen.sleep(interval)
899906
now = pd.Timestamp.now()
900-
yield source._emit(dict(last=last, now=now))
907+
await asyncio.gather(*source._emit(dict(last=last, now=now)))
901908
last = now
902909

903910

911+
@DataFrame.register_api(staticmethod, "random")
904912
class Random(PeriodicDataFrame):
905913
"""PeriodicDataFrame providing random values by default
906914

0 commit comments

Comments
 (0)