Skip to content

Commit caad325

Browse files
authored
Merge pull request #363 from jbednar/periodicdataframe
Added PeriodicDataFrame
2 parents 55e5fd7 + d410316 commit caad325

5 files changed

Lines changed: 169 additions & 31 deletions

File tree

docs/source/collections-api.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ Dataframes
8686
Rolling.sum
8787
Rolling.var
8888

89+
.. autosummary::
90+
PeriodicDataFrame
91+
8992
.. autosummary::
9093
Random
9194

docs/source/dataframes.rst

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ DataFrames
44
When handling large volumes of streaming tabular data it is often more
55
efficient to pass around larger Pandas dataframes with many rows each rather
66
than pass around individual Python tuples or dicts. Handling and computing on
7-
data with Pandas can be much faster than operating on Python objects.
7+
data with Pandas can be much faster than operating on individual Python objects.
88

99
So one could imagine building streaming dataframe pipelines using the ``.map``
1010
and ``.accumulate`` streaming operators with functions that consume and produce
@@ -178,5 +178,79 @@ and ``DaskStream`` objects.
178178
Not Yet Supported
179179
-----------------
180180

181-
Streaming dataframes algorithms do not currently pay special attention to data
181+
Streaming dataframe algorithms do not currently pay special attention to data
182182
arriving out-of-order.
183+
184+
185+
PeriodicDataFrame
186+
-----------------
187+
188+
As you have seen above, Streamz can handle arbitrarily complex pipelines,
189+
events, and topologies, but what if you simply want to run some Python
190+
function periodically and collect or plot the results?
191+
192+
streamz provides a high-level convenience class for this purpose, called
193+
a PeriodicDataFrame. A PeriodicDataFrame uses Python's asyncio event loop
194+
(used as part of Tornado in Jupyter and other interactive frameworks) to
195+
call a user-provided function at a regular interval, collecting the results
196+
and making them available for later processing.
197+
198+
In the simplest case, you can use a PeriodicDataFrame by first writing
199+
a callback function like:
200+
201+
.. code-block:: python
202+
203+
import numpy as np
204+
205+
def random_datapoint(**kwargs):
206+
return pd.DataFrame({'a': np.random.random(1)}, index=[pd.Timestamp.now()])
207+
208+
You can then make a streaming dataframe to poll this function
209+
e.g. every 300 milliseconds:
210+
211+
.. code-block:: python
212+
213+
df = PeriodicDataFrame(random_datapoint, interval='300ms')
214+
215+
``df`` will now be a steady stream of whatever values are returned by
216+
the `datafn`, which can of course be any Python code as long as it
217+
returns a DataFrame.
218+
219+
Here we returned only a single point, appropriate for streaming the
220+
results of system calls or other isolated actions, but any number of
221+
entries can be returned by the dataframe in a single batch. To
222+
facilitate collecting such batches, the callback is invoked with
223+
keyword arguments ``last`` (the time of the previous invocation) and
224+
``now`` (the time of the current invocation) as Pandas Timestamp
225+
objects. The callback can then generate or query for just the values
226+
in that time range.
227+
228+
Arbitrary keyword arguments can be provided to the PeriodicDataFrame
229+
constructor, which will be passed into the callback so that its behavior
230+
can be parameterized.
231+
232+
For instance, you can write a callback to return a suitable number of
233+
datapoints to keep a regularly updating stream, generated randomly
234+
as a batch since the last call:
235+
236+
.. code-block:: python
237+
238+
def datablock(last, now, **kwargs):
239+
freq = kwargs.get("freq", pd.Timedelta("50ms"))
240+
index = pd.date_range(start=last + freq, end=now, freq=freq)
241+
return pd.DataFrame({'x': np.random.random(len(index))}, index=index)
242+
243+
df = PeriodicDataFrame(datablock, interval='300ms')
244+
245+
The callback will now be invoked every 300ms, each time generating
246+
datapoints at a rate of 1 every 50ms, returned as a batch. If you
247+
wished, you could override the 50ms value by passing
248+
`freq=pd.Timedelta("100ms")` to the PeriodicDataFrame constructor.
249+
250+
Similar code could e.g. query an external database for the time range
251+
since the last update, returning all datapoints since then.
252+
253+
Once you have a PeriodicDataFrame defined using such callbacks, you
254+
can then use all the rest of the functionality supported by streamz,
255+
including aggregations, rolling windows, etc., and streaming
256+
`visualization. <plotting>`_

streamz/dataframe/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
from .core import (DataFrame, DataFrames, Frame, Frames, Series, Seriess, Index,
2-
Rolling, Window, Random, GroupBy)
2+
Rolling, Window, PeriodicDataFrame, Random, GroupBy)
33
from .aggregations import Aggregation

streamz/dataframe/core.py

Lines changed: 74 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import operator
44
from collections import OrderedDict
5-
from time import time
65
import numpy as np
76
import pandas as pd
87
import toolz
@@ -802,10 +801,35 @@ def _accumulate(self, Agg, **kwargs):
802801
return Streaming(outstream, example, stream_type=stream_type)
803802

804803

805-
def _random_df(tup):
806-
last, now, freq = tup
807-
index = pd.date_range(start=(last + freq.total_seconds()) * 1e9,
808-
end=now * 1e9, freq=freq)
804+
def random_datapoint(now, **kwargs):
805+
"""Example of querying a single current value"""
806+
return pd.DataFrame(
807+
{'a': np.random.random(1)}, index=[now])
808+
809+
810+
def random_datablock(last, now, **kwargs):
811+
"""
812+
Example of querying over a time range since last update
813+
814+
Parameters
815+
----------
816+
last: pd.Timestamp
817+
Time of previous call to this function.
818+
now: pd.Timestamp
819+
Current time.
820+
freq: pd.Timedelta, optional
821+
The time interval between individual records to be returned.
822+
For good throughput, should be much smaller than the
823+
interval at which this function is called.
824+
825+
Returns a pd.DataFrame with random values where:
826+
827+
The x column is uniformly distributed.
828+
The y column is Poisson distributed.
829+
The z column is normally distributed.
830+
"""
831+
freq = kwargs.get("freq", pd.Timedelta("100ms"))
832+
index = pd.date_range(start=last + freq, end=now, freq=freq)
809833

810834
df = pd.DataFrame({'x': np.random.random(len(index)),
811835
'y': np.random.poisson(size=len(index)),
@@ -814,47 +838,50 @@ def _random_df(tup):
814838
return df
815839

816840

817-
class Random(DataFrame):
818-
""" A streaming dataframe of random data
819-
820-
The x column is uniformly distributed.
821-
The y column is poisson distributed.
822-
The z column is normally distributed.
823-
824-
This class is experimental and will likely be removed in the future
841+
class PeriodicDataFrame(DataFrame):
842+
"""A streaming dataframe using the asyncio ioloop to poll a callback fn
825843
826844
Parameters
827845
----------
828-
freq: timedelta
829-
The time interval between records
846+
datafn: callable
847+
Callback function accepting **kwargs and returning a
848+
pd.DataFrame. kwargs will include at least
849+
'last' (time.time() datafn was last invoked), and
850+
'now' (current time.time()).
830851
interval: timedelta
831-
The time interval between new dataframes, should be significantly
832-
larger than freq
852+
The time interval between new dataframes.
853+
dask: boolean
854+
If true, uses a DaskStream instead of a regular Source.
855+
**kwargs:
856+
Optional keyword arguments to be passed into the callback function.
857+
858+
By default, returns a three-column random pd.DataFrame generated
859+
by the 'random_datablock' function.
833860
834861
Example
835862
-------
836-
>>> source = Random(freq='100ms', interval='1s') # doctest: +SKIP
863+
>>> df = PeriodicDataFrame(interval='1s', datafn=random_datapoint) # doctest: +SKIP
837864
"""
838865

839-
def __init__(self, freq='100ms', interval='500ms', dask=False):
866+
def __init__(self, datafn=random_datablock, interval='500ms', dask=False, **kwargs):
840867
if dask:
841868
from streamz.dask import DaskStream
842869
source = DaskStream()
843870
loop = source.loop
844871
else:
845872
source = Source()
846873
loop = IOLoop.current()
847-
self.freq = pd.Timedelta(freq)
848874
self.interval = pd.Timedelta(interval).total_seconds()
849875
self.source = source
850876
self.continue_ = [True]
877+
self.kwargs = kwargs
851878

852-
stream = self.source.map(_random_df)
853-
example = _random_df((time(), time(), self.freq))
879+
stream = self.source.map(lambda x: datafn(**x, **kwargs))
880+
example = datafn(last=pd.Timestamp.now(), now=pd.Timestamp.now(), **kwargs)
854881

855-
super(Random, self).__init__(stream, example)
882+
super(PeriodicDataFrame, self).__init__(stream, example)
856883

857-
loop.add_callback(self._cb, self.interval, self.freq, self.source,
884+
loop.add_callback(self._cb, self.interval, self.source,
858885
self.continue_)
859886

860887
def __del__(self):
@@ -865,15 +892,34 @@ def stop(self):
865892

866893
@staticmethod
867894
@gen.coroutine
868-
def _cb(interval, freq, source, continue_):
869-
last = time()
895+
def _cb(interval, source, continue_):
896+
last = pd.Timestamp.now()
870897
while continue_[0]:
871898
yield gen.sleep(interval)
872-
now = time()
873-
yield source._emit((last, now, freq))
899+
now = pd.Timestamp.now()
900+
yield source._emit(dict(last=last, now=now))
874901
last = now
875902

876903

904+
class Random(PeriodicDataFrame):
905+
"""PeriodicDataFrame providing random values by default
906+
907+
Accepts same parameters as PeriodicDataFrame, plus
908+
`freq`, a string that will be converted to a pd.Timedelta
909+
and passed to the 'datafn'.
910+
911+
Useful mainly for examples and docs.
912+
913+
Example
914+
-------
915+
>>> source = Random(freq='100ms', interval='1s') # doctest: +SKIP
916+
"""
917+
918+
def __init__(self, freq='100ms', interval='500ms', dask=False,
919+
datafn=random_datablock):
920+
super(Random, self).__init__(datafn, interval, dask, freq=pd.Timedelta(freq))
921+
922+
877923
_stream_types['streaming'].append((is_dataframe_like, DataFrame))
878924
_stream_types['streaming'].append((is_index_like, Index))
879925
_stream_types['streaming'].append((is_series_like, Series))

streamz/tests/test_batch.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,21 @@ def test_dataframes():
3737
assert result.z.tolist() == [3 * i for i in range(10)]
3838

3939

40+
def test_periodic_dataframes():
41+
pd = pytest.importorskip('pandas')
42+
from streamz.dataframe import PeriodicDataFrame
43+
from streamz.dataframe.core import random_datapoint
44+
df = random_datapoint(now=pd.Timestamp.now())
45+
assert len(df) == 1
46+
47+
def callback(now, **kwargs):
48+
return pd.DataFrame(dict(x=50, index=[now]))
49+
50+
df = PeriodicDataFrame(callback, interval='20ms')
51+
assert df.tail(0).x == 50
52+
df.stop()
53+
54+
4055
def test_filter():
4156
a = Batch()
4257
f = a.filter(lambda x: x % 2 == 0)

0 commit comments

Comments
 (0)