Skip to content

Commit 91c0ba7

Browse files
Merge remote-tracking branch 'upstream/master'
2 parents 09ea1e2 + 2c18ef2 commit 91c0ba7

13 files changed

Lines changed: 296 additions & 64 deletions

File tree

docs/source/collections-api.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ Dataframes
8686
Rolling.sum
8787
Rolling.var
8888

89+
.. autosummary::
90+
PeriodicDataFrame
91+
8992
.. autosummary::
9093
Random
9194

docs/source/conf.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848

4949
# General information about the project.
5050
project = 'Streamz'
51-
copyright = '2017, Matthew Rocklin'
51+
copyright = '2017-2020, Matthew Rocklin'
5252
author = 'Matthew Rocklin'
5353

5454
# The version info for the project you're documenting, acts as replacement for
@@ -160,7 +160,7 @@
160160
# dir menu entry, description, category)
161161
texinfo_documents = [
162162
(master_doc, 'Streamz', 'Streamz Documentation',
163-
author, 'Streamz', 'One line description of project.',
163+
author, 'Streamz', 'Support for pipelines managing continuous streams of data.',
164164
'Miscellaneous'),
165165
]
166166

docs/source/core.rst

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ Map, emit, and sink
1515
map
1616
sink
1717

18-
You can create a basic pipeline by instantiating the ``Streamz`` object and then using methods like ``map``, ``accumulate``, and ``sink``.
18+
You can create a basic pipeline by instantiating the ``Streamz``
19+
object and then using methods like ``map``, ``accumulate``, and
20+
``sink``.
1921

2022
.. code-block:: python
2123
@@ -27,7 +29,10 @@ You can create a basic pipeline by instantiating the ``Streamz`` object and then
2729
source = Stream()
2830
source.map(increment).sink(print)
2931
30-
The ``map`` and ``sink`` methods both take a function and apply that function to every element in the stream. The ``map`` method returns a new stream with the modified elements while ``sink`` is typically used at the end of a stream for final actions.
32+
The ``map`` and ``sink`` methods both take a function and apply that
33+
function to every element in the stream. The ``map`` method returns a
34+
new stream with the modified elements while ``sink`` is typically used
35+
at the end of a stream for final actions.
3136

3237
To push data through our pipeline we call ``emit``
3338

@@ -383,14 +388,33 @@ want to read further about :doc:`collections <collections>`
383388
Metadata
384389
--------
385390

386-
Metadata can be emitted into the pipeline to accompany the data as a list of dictionaries. Most functions will pass the metadata to the downstream function without making any changes. However, functions that make the pipeline asynchronous require logic that dictates how and when the metadata will be passed downstream. Synchronous functions and asynchronous functions that have a 1:1 ratio of the number of values on the input to the number of values on the output will emit the metadata collection without any modification. However, functions that have multiple input streams or emit collections of data will emit the metadata associated with the emitted data as a collection.
391+
Metadata can be emitted into the pipeline to accompany the data as a
392+
list of dictionaries. Most functions will pass the metadata to the
393+
downstream function without making any changes. However, functions
394+
that make the pipeline asynchronous require logic that dictates how
395+
and when the metadata will be passed downstream. Synchronous functions
396+
and asynchronous functions that have a 1:1 ratio of the number of
397+
values on the input to the number of values on the output will emit
398+
the metadata collection without any modification. However, functions
399+
that have multiple input streams or emit collections of data will emit
400+
the metadata associated with the emitted data as a collection.
387401

388402

389403
Reference Counting and Checkpointing
390404
------------------------------------
391405

392-
Checkpointing is achieved in Streamz through the use of reference counting. With this method, a checkpoint can be saved when and only when data has progressed through all of the the pipeline without any issues. This prevents data loss and guarantees at-least-once semantics.
393-
394-
Any node that caches or holds data after it returns increments the reference counter associated with the given data by one. When a node is no longer holding the data, it will release it by decrementing the counter by one. When the counter changes to zero, a callback associated with the data is triggered.
395-
396-
References are passed in the metadata as a value of the `ref` keyword. Each metadata object contains only one reference counter object.
406+
Checkpointing is achieved in Streamz through the use of reference
407+
counting. With this method, a checkpoint can be saved when and only
408+
when data has progressed through all of the the pipeline without any
409+
issues. This prevents data loss and guarantees at-least-once
410+
semantics.
411+
412+
Any node that caches or holds data after it returns increments the
413+
reference counter associated with the given data by one. When a node
414+
is no longer holding the data, it will release it by decrementing the
415+
counter by one. When the counter changes to zero, a callback
416+
associated with the data is triggered.
417+
418+
References are passed in the metadata as a value of the `ref`
419+
keyword. Each metadata object contains only one reference counter
420+
object.

docs/source/dask.rst

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ Then start a local Dask cluster
3636
from dask.distributed import Client
3737
client = Client()
3838
39-
This operates on a local processes or threads. If you have Bokeh installed
39+
This operates on local processes or threads. If you have Bokeh installed
4040
then this will also start a diagnostics web server at
4141
http://localhost:8787/status which you may want to open to get a real-time view
4242
of execution.
@@ -49,7 +49,7 @@ Sequential Execution
4949
map
5050
sink
5151

52-
Before we build a parallel stream, lets build a sequential stream that maps a
52+
Before we build a parallel stream, let's build a sequential stream that maps a
5353
simple function across data, and then prints those results. We use the core
5454
``Stream`` object.
5555

@@ -69,7 +69,7 @@ simple function across data, and then prints those results. We use the core
6969
for i in range(10):
7070
source.emit(i)
7171
72-
This should take ten seconds we call the ``inc`` function ten times
72+
This should take ten seconds because we call the ``inc`` function ten times
7373
sequentially.
7474

7575
Parallel Execution
@@ -101,7 +101,7 @@ You may want to look at http://localhost:8787/status during execution to get a
101101
sense of the parallel execution.
102102

103103
This should have run much more quickly depending on how many cores you have on
104-
your machine. We added a few extra nodes to our stream, lets look at what they
104+
your machine. We added a few extra nodes to our stream; let's look at what they
105105
did.
106106

107107
- ``scatter``: Converted our Stream into a DaskStream. The elements that we
@@ -123,17 +123,20 @@ Gotchas
123123
+++++++
124124

125125

126-
An important gotcha with ``DaskStream`` is that it is a subclass ``Stream``, and so can be used as an input
127-
to any function expecting a ``Stream``. If there is no intervening ``.gather()``, then the downstream node will
128-
receive Dask futures instead of the data they represent::
126+
An important gotcha with ``DaskStream`` is that it is a subclass of
127+
``Stream``, and so can be used as an input to any function expecting a
128+
``Stream``. If there is no intervening ``.gather()``, then the
129+
downstream node will receive Dask futures instead of the data they
130+
represent::
129131

130132
source = Stream()
131133
source2 = Stream()
132134
a = source.scatter().map(inc)
133135
b = source2.combine_latest(a)
134136

135-
In this case, the combine operation will get real values from ``source2``, and Dask futures.
136-
Downstream nodes would be free to operate on the futures, but more likely, the line should be::
137+
In this case, the combine operation will get real values from
138+
``source2``, and Dask futures. Downstream nodes would be free to
139+
operate on the futures, but more likely, the line should be::
137140

138141
b = source2.combine_latest(a.gather())
139142

docs/source/dataframes.rst

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ DataFrames
44
When handling large volumes of streaming tabular data it is often more
55
efficient to pass around larger Pandas dataframes with many rows each rather
66
than pass around individual Python tuples or dicts. Handling and computing on
7-
data with Pandas can be much faster than operating on Python objects.
7+
data with Pandas can be much faster than operating on individual Python objects.
88

99
So one could imagine building streaming dataframe pipelines using the ``.map``
1010
and ``.accumulate`` streaming operators with functions that consume and produce
@@ -178,5 +178,79 @@ and ``DaskStream`` objects.
178178
Not Yet Supported
179179
-----------------
180180

181-
Streaming dataframes algorithms do not currently pay special attention to data
181+
Streaming dataframe algorithms do not currently pay special attention to data
182182
arriving out-of-order.
183+
184+
185+
PeriodicDataFrame
186+
-----------------
187+
188+
As you have seen above, Streamz can handle arbitrarily complex pipelines,
189+
events, and topologies, but what if you simply want to run some Python
190+
function periodically and collect or plot the results?
191+
192+
streamz provides a high-level convenience class for this purpose, called
193+
a PeriodicDataFrame. A PeriodicDataFrame uses Python's asyncio event loop
194+
(used as part of Tornado in Jupyter and other interactive frameworks) to
195+
call a user-provided function at a regular interval, collecting the results
196+
and making them available for later processing.
197+
198+
In the simplest case, you can use a PeriodicDataFrame by first writing
199+
a callback function like:
200+
201+
.. code-block:: python
202+
203+
import numpy as np
204+
205+
def random_datapoint(**kwargs):
206+
return pd.DataFrame({'a': np.random.random(1)}, index=[pd.Timestamp.now()])
207+
208+
You can then make a streaming dataframe to poll this function
209+
e.g. every 300 milliseconds:
210+
211+
.. code-block:: python
212+
213+
df = PeriodicDataFrame(random_datapoint, interval='300ms')
214+
215+
``df`` will now be a steady stream of whatever values are returned by
216+
the `datafn`, which can of course be any Python code as long as it
217+
returns a DataFrame.
218+
219+
Here we returned only a single point, appropriate for streaming the
220+
results of system calls or other isolated actions, but any number of
221+
entries can be returned by the dataframe in a single batch. To
222+
facilitate collecting such batches, the callback is invoked with
223+
keyword arguments ``last`` (the time of the previous invocation) and
224+
``now`` (the time of the current invocation) as Pandas Timestamp
225+
objects. The callback can then generate or query for just the values
226+
in that time range.
227+
228+
Arbitrary keyword arguments can be provided to the PeriodicDataFrame
229+
constructor, which will be passed into the callback so that its behavior
230+
can be parameterized.
231+
232+
For instance, you can write a callback to return a suitable number of
233+
datapoints to keep a regularly updating stream, generated randomly
234+
as a batch since the last call:
235+
236+
.. code-block:: python
237+
238+
def datablock(last, now, **kwargs):
239+
freq = kwargs.get("freq", pd.Timedelta("50ms"))
240+
index = pd.date_range(start=last + freq, end=now, freq=freq)
241+
return pd.DataFrame({'x': np.random.random(len(index))}, index=index)
242+
243+
df = PeriodicDataFrame(datablock, interval='300ms')
244+
245+
The callback will now be invoked every 300ms, each time generating
246+
datapoints at a rate of 1 every 50ms, returned as a batch. If you
247+
wished, you could override the 50ms value by passing
248+
`freq=pd.Timedelta("100ms")` to the PeriodicDataFrame constructor.
249+
250+
Similar code could e.g. query an external database for the time range
251+
since the last update, returning all datapoints since then.
252+
253+
Once you have a PeriodicDataFrame defined using such callbacks, you
254+
can then use all the rest of the functionality supported by streamz,
255+
including aggregations, rolling windows, etc., and streaming
256+
`visualization. <plotting>`_

docs/source/gpu-dataframes.rst

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
1-
Streaming GPU DataFrames(cudf)
2-
------------------------------
1+
Streaming GPU DataFrames (cudf)
2+
-------------------------------
33

4-
The ``streamz.dataframe`` module provides DataFrame-like interface on streaming
5-
data as described in ``dataframes`` documentation. It provides support for dataframe
6-
like libraries such as pandas and cudf. This documentation is specific to streaming GPU
7-
dataframes(cudf).
4+
The ``streamz.dataframe`` module provides a DataFrame-like interface
5+
on streaming data as described in the ``dataframes`` documentation. It
6+
provides support for dataframe-like libraries such as pandas and
7+
cudf. This documentation is specific to streaming GPU dataframes using
8+
cudf.
89

9-
The example in the ``dataframes`` documentation is rewritten below using cudf dataframes
10-
just by replacing ``pandas`` module with ``cudf``:
10+
The example in the ``dataframes`` documentation is rewritten below
11+
using cudf dataframes just by replacing the ``pandas`` module with
12+
``cudf``:
1113

1214
.. code-block:: python
1315
@@ -23,19 +25,21 @@ just by replacing ``pandas`` module with ``cudf``:
2325
Supported Operations
2426
--------------------
2527

26-
Streaming cudf dataframes support the following classes of operations
28+
Streaming cudf dataframes support the following classes of operations:
2729

2830
- Elementwise operations like ``df.x + 1``
2931
- Filtering like ``df[df.name == 'Alice']``
3032
- Column addition like ``df['z'] = df.x + df.y``
3133
- Reductions like ``df.amount.mean()``
3234
- Windowed aggregations (fixed length) like ``df.window(n=100).amount.sum()``
3335

34-
The following operations are not supported with cudf(as of version 0.8) yet
36+
The following operations are not yet supported with cudf (as of version 0.8):
37+
3538
- Groupby-aggregations like ``df.groupby(df.name).amount.mean()``
3639
- Windowed aggregations (index valued) like ``df.window(value='2h').amount.sum()``
3740
- Windowed groupby aggregations like ``df.window(value='2h').groupby('name').amount.sum()``
3841

3942

40-
Window based Aggregations with cudf are supported just as explained in ``dataframes`` documentation.
41-
The support for groupby operations will be added in future.
43+
Window-based Aggregations with cudf are supported just as explained in
44+
the ``dataframes`` documentation. Support for groupby operations is
45+
expected to be added in the future.

docs/source/index.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,10 @@ data streaming systems like `Apache Flink <https://flink.apache.org/>`_,
111111

112112
core.rst
113113
dataframes.rst
114+
gpu-dataframes.rst
114115
dask.rst
115116
collections.rst
116117
api.rst
117118
collections-api.rst
118119
async.rst
120+
plotting.rst

docs/source/plotting.rst

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
Visualizing streamz
2+
===================
3+
4+
A variety of tools are available to help you understand, debug,
5+
visualize your streaming objects:
6+
7+
- Most Streamz objects automatically display themselves in Jupyter
8+
notebooks, periodically updating their visual representation as text
9+
or tables by registering events with the Tornado IOLoop used by Jupyter
10+
- The network graph underlying a stream can be visualized using `dot` to
11+
render a PNG using `Stream.visualize(filename)`
12+
- Streaming data can be visualized using the optional separate packages
13+
hvPlot, HoloViews, and Panel (see below)
14+
15+
16+
hvplot.streamz
17+
--------------
18+
19+
hvPlot is a separate plotting library providing Bokeh-based plots for
20+
Pandas dataframes and a variety of other object types, including
21+
streamz DataFrame and Series objects.
22+
23+
See `hvplot.holoviz.org <https://hvplot.holoviz.org>`_ for
24+
instructions on how to install hvplot. Once it is installed, you can
25+
use the Pandas .plot() API to get a dynamically updating plot in
26+
Jupyter or in Bokeh/Panel Server:
27+
28+
.. code-block:: python
29+
30+
import hvplot.streamz
31+
from streamz.dataframe import Random
32+
33+
df = Random()
34+
df.hvplot(backlog=100)
35+
36+
See the `streaming section
37+
<https://hvplot.holoviz.org/user_guide/Streaming.html>`_ of the hvPlot
38+
user guide for more details, and the `dataframes.ipynb` example that
39+
comes with streamz for a simple runnable example.
40+
41+
42+
HoloViews
43+
---------
44+
45+
hvPlot is built on HoloViews, and you can also use HoloViews directly
46+
if you want more control over events and how they are processed. See
47+
the `HoloViews user guide
48+
<http://holoviews.org/user_guide/Streaming_Data.html>`_ for more
49+
details.
50+
51+
52+
Panel
53+
-----
54+
55+
Panel is a general purpose dashboard and app framework, supporting a
56+
wide variety of displayable objects as "Panes". Panel provides a
57+
`streamz Pane
58+
<https://panel.holoviz.org/reference/panes/Streamz.html>`_ for
59+
rendering arbitrary streamz objects, and streamz DataFrames are
60+
handled by the Panel `DataFrame Pane
61+
<https://panel.holoviz.org/reference/panes/DataFrame.html>`_.

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010

1111
setup(name='streamz',
12-
version='0.5.6',
12+
version='0.6.0',
1313
description='Streams',
1414
url='http://github.com/python-streamz/streamz/',
1515
maintainer='Matthew Rocklin',

streamz/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@
88
except ImportError:
99
pass
1010

11-
__version__ = '0.5.6'
11+
__version__ = '0.6.0'

0 commit comments

Comments
 (0)